from typing import List import tldextract from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from fastapi import HTTPException from crowdtls.logs import logger from crowdtls.models import Certificate from crowdtls.models import Domain def decode_der(fingerprint: str, raw_der_certificate: List[int]) -> Certificate: der_cert_bytes = bytes(raw_der_certificate) cert = x509.load_der_x509_certificate(der_cert_bytes, default_backend()) public_key_bytes = cert.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) return Certificate( fingerprint=fingerprint, version=cert.version.value, serial_number=cert.serial_number, signature=cert.signature, issuer=cert.issuer.rfc4514_string(), not_valid_before=cert.not_valid_before, not_valid_after=cert.not_valid_after, subject=cert.subject.rfc4514_string(), subject_public_key_info=public_key_bytes, raw_der_certificate=der_cert_bytes, ) def parse_hostname(hostname: str) -> Domain: try: parsed_domain = tldextract.extract(hostname) return Domain(fqdn=hostname, root=parsed_domain.domain, tld=parsed_domain.suffix) except Exception: logger.error(f"Failed to parse hostname: {hostname}") def raise_HTTPException( status_code: int = 500, detail: str = "Error encountered and reported. Please try again later." ) -> None: raise HTTPException(status_code=status_code, detail=detail)