diff --git a/.dockerignore b/.dockerignore index fca47d6..7378c18 100644 --- a/.dockerignore +++ b/.dockerignore @@ -17,3 +17,4 @@ build/ **/*.log *.egg-info/ *.png +certs/ diff --git a/.gitignore b/.gitignore index 4d2ff84..556a5bb 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,4 @@ cython_debug/ #.idea/ **/.DS_Store +certs/ diff --git a/README.md b/README.md index e7fda4b..d68fbc9 100644 --- a/README.md +++ b/README.md @@ -33,11 +33,11 @@ Below is an enumeration of analytics that are run on the resulting data set to t | Analytic Name | Description | Completeness | | --- | --- | --- | -| Multiple Active Certificates | Flag an unusually high number of active certificates for a single FQDN, especially if they're from multiple CAs. | ✅ | | Certificate Lifespan Analysis | Flag certificates with unusually short or long lifespans. | ✅ | +| Uncommon SAN Usage | Flag certificates with an unusually high number of SAN entries. | ✅ | +| Multiple Active Certificates | Flag an unusually high number of active certificates for a single FQDN, especially if they're from multiple CAs. | ⌛ | | Changes in Certificate Details | Track historical data of certificates for each FQDN and flag abrupt changes. | ❌ | | Certificates from Untrusted CAs | Flag certificates issued by untrusted or less common CAs. | ❌ | -| Uncommon SAN Usage | Flag certificates with an unusually high number of SAN entries. | ✅ | | Use of Deprecated or Weak Encryption | Flag certificates that use deprecated or weak cryptographic algorithms. | ❌ | | New Certificate Detection | Alert users when a certificate for a known domain changes unexpectedly. | ❌ | | Mismatched Issuer and Subject | Flag certificates where the issuer and subject fields match (self-signed) and are not trusted roots. | ❌ | diff --git a/crowdtls/analytics/main.py b/crowdtls/analytics/main.py index 8588158..b40c4ee 100644 --- a/crowdtls/analytics/main.py +++ b/crowdtls/analytics/main.py @@ -1,16 +1,13 @@ import asyncio import datetime -from functools import cache from typing import Callable +from asyncstdlib.functools import lru_cache from cryptography import x509 -from fastapi import Depends -from sqlalchemy.ext.asyncio import AsyncSession -from sqlmodel import and_ +from sqlalchemy.orm import selectinload from sqlmodel import func from sqlmodel import select -from crowdtls.db import get_session from crowdtls.db import session_maker from crowdtls.logs import logger from crowdtls.models import AnomalyFlags @@ -34,30 +31,39 @@ def anomaly_scanner(priority: int): return decorator +@lru_cache(maxsize=len(ANOMALY_HTTP_CODE)) +async def get_anomaly_type(response_code: int): + async with session_maker() as session: + query = select(AnomalyTypes.id).where(AnomalyTypes.response_code == response_code).limit(1) + return (await session.execute(query)).scalars().one() + + +async def anomaly_exists(name: str, anomalies: list): + anomaly_id = await get_anomaly_type(ANOMALY_HTTP_CODE[name]) + """Check if a given anomaly type exists in a list of anomalies.""" + return any((a.anomaly_type_id == anomaly_id for a in anomalies)) + + @anomaly_scanner(priority=1) async def check_certs_for_fqdn(): """Check certificates for a given FQDN for anomalies.""" - now = datetime.datetime.utcnow() # Query for all certificates and domains which have at least 10 DomainCertificateLink entries for unexpired certificates. query = ( select(DomainCertificateLink.fqdn, Certificate) + .options(selectinload(Certificate.anomalies)) .join(Certificate, Certificate.fingerprint == DomainCertificateLink.fingerprint) - .where( - and_( - Certificate.not_valid_after > now, - Certificate.not_valid_before < now, - ) - ) .group_by(DomainCertificateLink.fqdn, Certificate.fingerprint) .having(func.count(DomainCertificateLink.fqdn) > 10) ) logger.info(query) async with session_maker() as session: - results = await session.execute(query) + results = (await session.execute(query)).scalars().all() if results: - for result in results.scalars().all(): + for result in results: + if any(await anomaly_exists("multiple_cas", certificate.anomalies) for certificate in result.certificates): + continue yield { "anomaly": "multiple_cas", "details": f"Unusually high number of certificates for FQDN {result.fqdn}.", @@ -71,90 +77,83 @@ async def check_cert_lifespan(): # Query for fingerprints of all unexpired certificates which have a lifespan of less than 3 weeks. query = ( - select(Certificate.fingerprint) - .where( - and_( - Certificate.not_valid_after - Certificate.not_valid_before < datetime.timedelta(weeks=3), - Certificate.not_valid_after > datetime.datetime.utcnow(), - Certificate.not_valid_before < datetime.datetime.utcnow(), - ) - ) - .distinct() + select(Certificate) + .options(selectinload(Certificate.anomalies)) + .where(Certificate.not_valid_after - Certificate.not_valid_before < datetime.timedelta(weeks=3)) ) async with session_maker() as session: - results = await session.execute(query) + certificates = (await session.execute(query)).scalars().all() + + if certificates: + for certificate in certificates: + if await anomaly_exists("short_lifespan", certificate.anomalies): + continue - if results: - for result in results.scalars().all(): yield { "anomaly": "short_lifespan", - "details": f"Unusually short lifespan for certificate {result.fingerprint}.", - "certificates": [result.fingerprint], + "details": f"Unusually short lifespan observed. Check certificate for {certificate.subject}", + "certificates": [certificate.fingerprint], } @anomaly_scanner(priority=4) async def check_cert_sans(): - """Check the number of Subject Alternative Names (SANs) in a certificate.""" + """Check for a high number of Subject Alternative Names (SANs) in a certificate.""" # Query for raw certificate data. - query = ( - select(Certificate.fingerprint, Certificate.raw_der_certificate) - .where( - and_( - Certificate.not_valid_after > datetime.datetime.utcnow(), - Certificate.not_valid_before < datetime.datetime.utcnow(), - ) - ) - .distinct() - ) + query = select(Certificate).options(selectinload(Certificate.anomalies)) # Execute the query and iterate over the results. Load each certificate with # x509 and yield for each result which has more than 8 SAN entries. async with session_maker() as session: - results = await session.execute(query) - if results: - for result in results.scalars().all(): - cert = x509.load_der_x509_certificate(result.raw_der_certificate) - if (num_sans := len(cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value)) > 80: + certificates = (await session.execute(query)).scalars().all() + + if certificates: + for certificate in certificates: + if await anomaly_exists("many_sans", certificate.anomalies): + continue + + try: + certificate_x509 = x509.load_der_x509_certificate(certificate.raw_der_certificate) + except Exception as e: + logger.error(f"Error loading certificate {certificate.fingerprint}: {e}") + continue + + try: + num_sans = len(certificate_x509.extensions.get_extension_for_class(x509.SubjectAlternativeName).value) + except x509.extensions.ExtensionNotFound: + num_sans = 0 + + if num_sans > 80: + logger.info(f"Certificate has {num_sans} SANs.") yield { "anomaly": "many_sans", - "details": f"Unusually high number of SANs ({num_sans}) for certificate {result.fingerprint}.", - "certificates": [result.fingerprint], + "details": f"Unusually high number of SANs ({num_sans}) observed.", + "certificates": [certificate.fingerprint], } -@cache -async def get_anomaly_type(response_code: int): - async with session_maker() as session: - anomaly_type = await session.fetch_val( - query=select([AnomalyTypes]).where(AnomalyTypes.response_code == response_code) - ) - return anomaly_type - - -async def create_anomaly_flag(anomaly, session: AsyncSession = Depends(get_session)): +async def create_anomaly_flag(anomaly): """Create a new AnomalyFlag in the database.""" - anomaly_type = await get_anomaly_type(ANOMALY_HTTP_CODE[anomaly["anomaly"]]) + logger.info("Creating anomaly flag.") + anomaly_id = await get_anomaly_type(ANOMALY_HTTP_CODE[anomaly["anomaly"]]) + logger.info(f"Creating anomaly flag for {anomaly_id}.") async with session_maker() as session: for fingerprint in anomaly["certificates"]: - await session.execute( - query=CertificateAnomalyFlagsLink.insert(), - values={ - "certificate_fingerprint": fingerprint, - "anomaly_flag_id": anomaly_type.id, - "first_linked": datetime.utcnow(), - }, + logger.info(f"Creating anomaly flag for certificate {fingerprint}.") + new_anomaly = AnomalyFlags( + details=anomaly["details"], + anomaly_type_id=anomaly_id, ) - await session.execute( - query=AnomalyFlags.insert(), - values={ - "details": anomaly["details"], - "anomaly_type_id": anomaly_type.id, - "date_flagged": datetime.utcnow(), - }, + session.add(new_anomaly) + await session.flush() + session.add( + CertificateAnomalyFlagsLink( + certificate_fingerprint=fingerprint, + anomaly_flag_id=new_anomaly.id, + ) ) await session.commit() @@ -163,8 +162,11 @@ async def create_anomaly_flag(anomaly, session: AsyncSession = Depends(get_sessi async def find_anomalies(): """Run all registered anomaly scanners in priority order""" await asyncio.sleep(3) - for scanner, priority in sorted(ANOMALY_SCANNERS.items(), key=lambda x: x[1]): - logger.info(f"Running {scanner.__name__}") - async for anomaly in scanner(): - logger.info(f"{scanner.__name__} found {anomaly['anomaly']}") - await create_anomaly_flag(anomaly) + for analytic, priority in sorted(ANOMALY_SCANNERS.items(), key=lambda x: x[1]): + logger.info(f"Running {analytic.__name__}") + try: + async for anomaly in analytic(): + logger.warning(f"{analytic.__name__} found {anomaly['anomaly']}") + await create_anomaly_flag(anomaly) + except Exception as e: + logger.error(f"Error running {analytic.__name__}: {e}") diff --git a/crowdtls/apis/v1.py b/crowdtls/apis/v1.py index afb3673..72e336a 100644 --- a/crowdtls/apis/v1.py +++ b/crowdtls/apis/v1.py @@ -6,6 +6,7 @@ from fastapi import Depends from fastapi import Request from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload +from sqlmodel import and_ from sqlmodel import select from crowdtls.db import get_session @@ -13,9 +14,10 @@ from crowdtls.helpers import decode_der from crowdtls.helpers import parse_hostname from crowdtls.helpers import raise_HTTPException from crowdtls.logs import logger +from crowdtls.models import AnomalyFlags from crowdtls.models import Certificate +from crowdtls.models import CertificateAnomalyFlagsLink from crowdtls.models import Domain -from crowdtls.models import DomainCertificateLink app = APIRouter() @@ -38,48 +40,8 @@ async def insert_certificate(hostname: str, certificate: Certificate, session: A logger.error(f"Failed to insert certificate into database for domain {domain.fqdn}: {certificate.fingerprint}") -# @app.post("/check") -# async def check_fingerprints( -# fingerprints: Dict[str, Union[str, List[str]]], -# request: Request = None, -# session: AsyncSession = Depends(get_session), -# ): -# logger.info("Received request to check fingerprints from client {request.client.host}") -# hostname = parse_hostname(fingerprints.get("host")) -# fps = fingerprints.get("fps") -# logger.info(f"Received {len(fps)} fingerprints to check from client {request.client.host}") - -# subquery = select(DomainCertificateLink.fqdn).join(Certificate).where(Certificate.fingerprint.in_(fps)).subquery() - -# stmt = ( -# select(Certificate) -# .join(DomainCertificateLink) -# .join(subquery, DomainCertificateLink.fqdn == subquery.c.fqdn) -# .options(selectinload(Certificate.domains)) -# .where(DomainCertificateLink.fqdn == hostname.fqdn if hostname else True) -# ) -# try: -# result = await session.execute(stmt) -# except Exception: -# logger.error(f"Failed to execute stmt: {stmt} (req body {request.body}) and IP address: {request.client.host}") -# raise_HTTPException() - -# certificates = result.scalars().all() -# logger.info( -# f"Found {len(certificates)} certificates (of {len(fps)} requested) in the database for client {request.client.host}" -# ) - -# if len(certificates) == len(fps): -# return {"send": False} - -# for certificate in certificates: -# if hostname and hostname.fqdn not in [domain.fqdn for domain in certificate.domains]: -# certificate.domains.append(hostname) -# session.add(certificate) - -# await session.commit() -# logger.info(f"Added mappings between {hostname.fqdn} up to {len(fps)} certificates in the database.") -# return {"send": True} +async def get_domain_by_fqdn(fqdn: str, session: AsyncSession = Depends(get_session)): + return await session.get(Domain, fqdn) @app.post("/check") @@ -88,49 +50,82 @@ async def check_fingerprints( request: Request = None, session: AsyncSession = Depends(get_session), ): - logger.info(f"Received request to check fingerprints from client {request.client.host}") - response_dict = {} for hostname, fps in fingerprints.items(): parsed_hostname = parse_hostname(hostname) - logger.info(f"Received {len(fps)} fingerprints to check from client {request.client.host} for host {hostname}") + logger.info(f"{request.client.host} requested {hostname}: {len(fps)}") - subquery = ( - select(DomainCertificateLink.fqdn).join(Certificate).where(Certificate.fingerprint.in_(fps)).subquery() - ) - - stmt = ( - select(Certificate) - .join(DomainCertificateLink) - .join(subquery, DomainCertificateLink.fqdn == subquery.c.fqdn) - .options(selectinload(Certificate.domains)) - .where(DomainCertificateLink.fqdn == parsed_hostname.fqdn if parsed_hostname else True) - ) + # Query for all certificates and associated domains (from links) with the given fingerprints + stmt = select(Certificate).options(selectinload(Certificate.domains)).where(Certificate.fingerprint.in_(fps)) try: - result = await session.execute(stmt) + results = await session.execute(stmt) except Exception: logger.error( f"Failed to execute stmt: {stmt} (req body {request.body}) and IP address: {request.client.host}" ) raise_HTTPException() - certificates = result.scalars().all() + certificates = results.scalars().all() logger.info( f"Found {len(certificates)} certificates (of {len(fps)} requested) in the database for client {request.client.host}" ) - if len(certificates) != len(fps): - for certificate in certificates: - if parsed_hostname and parsed_hostname.fqdn not in [domain.fqdn for domain in certificate.domains]: + count = 0 + for certificate in certificates: + if parsed_hostname and parsed_hostname.fqdn not in [domain.fqdn for domain in certificate.domains]: + count += 1 + logger.info(f"Adding {parsed_hostname.fqdn} to {certificate.fingerprint} in the database.") + if existing_domain := await get_domain_by_fqdn(hostname): + existing_domain.certificates.append(certificate) + session.add(existing_domain) + else: certificate.domains.append(parsed_hostname) session.add(certificate) + if count: await session.commit() - logger.info(f"Added mappings between {parsed_hostname.fqdn} up to {len(fps)} certificates in the database.") + logger.info(f"Added mappings between {parsed_hostname.fqdn} and {count} certificates in the database.") + if any(fp for fp in fps if fp not in [cert.fingerprint for cert in certificates]): + logger.info(f"Requesting new certs for {hostname}.") response_dict[hostname] = True + # Query for relevant anomalies and the associated certificate fingerprints which are used as keys + # in the response dict and are used by the client browser extensions to alert the user + stmt = ( + select(AnomalyFlags, Certificate.fingerprint) + .options(selectinload(AnomalyFlags.certificates)) + .where( + and_( + Certificate.fingerprint.in_(fps), + Certificate.fingerprint == CertificateAnomalyFlagsLink.certificate_fingerprint, + ) + ) + ) + + try: + results = await session.execute(stmt) + except Exception: + logger.error( + f"Failed to execute stmt: {stmt} (req body {request.body}) and IP address: {request.client.host}" + ) + raise_HTTPException() + + anomalies = results.scalars().all() + + logger.info( + f"Found {len(anomalies)} anomalies (of {len(fps)} requested) in the database for client {request.client.host}" + ) + + if anomalies: + response_dict["anomalies"] = {} + + for anomaly in anomalies: + for certificate in anomaly.certificates: + if certificate.fingerprint not in response_dict["anomalies"]: + response_dict["anomalies"][certificate.fingerprint] = anomaly.details + return response_dict @@ -153,12 +148,16 @@ async def new_fingerprints( ) raise_HTTPException() - logger.info(f"Received {len(certs)} fingerprints to add from client {request.client.host} for host {hostname}") existing_fingerprints = {certificate.fingerprint for certificate in result.scalars().all()} + logger.info(f"Received {len(certs)} fingerprints to add from client {request.client.host} for host {hostname}") + logger.info(f"Found {len(existing_fingerprints)} existing fingerprints in the database.") + + logger.info(f"{existing_fingerprints=}") certificates_to_add = [] for fp, rawDER in certs.items(): if fp not in existing_fingerprints: + logger.info(f"Adding {fp}") decoded = decode_der(fp, rawDER) certificate = Certificate.from_orm(decoded) certificate.domains.append(parsed_hostname) @@ -173,41 +172,3 @@ async def new_fingerprints( ) raise_HTTPException() return {"status": "OK"} - - -# @app.post("/new") -# async def new_fingerprints( -# fingerprints: Dict[str, Union[str, Dict[str, List[int]]]], -# request: Request = None, -# session: AsyncSession = Depends(get_session), -# ): -# try: -# hostname = parse_hostname(fingerprints.get("host")) -# certs = fingerprints.get("certs") -# fps = certs.keys() -# stmt = select(Certificate).where(Certificate.fingerprint.in_(fps)) -# result = await session.execute(stmt) -# except Exception: -# logger.error(f"Failed to execute stmt: {stmt} (req body {request.body}) and IP address: {request.client.host}") -# raise_HTTPException() - -# logger.info(f"Received {len(fingerprints)} fingerprints to add from client {request.client.host}") -# existing_fingerprints = {certificate.fingerprint for certificate in result.scalars().all()} - -# certificates_to_add = [] -# for fp, rawDER in certs.items(): -# if fp not in existing_fingerprints: -# decoded = decode_der(fp, rawDER) -# certificate = Certificate.from_orm(decoded) -# certificate.domains.append(hostname) -# certificates_to_add.append(certificate) - -# try: -# session.add_all(certificates_to_add) -# await session.commit() -# except Exception: -# logger.error( -# f"Failed to add certificates to db: {certificates_to_add} after stmt: {stmt} (req body {request.body}) and IP address: {request.client.host}" -# ) -# raise_HTTPException() -# return {"status": "OK"} diff --git a/crowdtls/db.py b/crowdtls/db.py index 27d68a6..44faac2 100644 --- a/crowdtls/db.py +++ b/crowdtls/db.py @@ -28,6 +28,7 @@ async def get_session() -> AsyncSession: async def create_db_and_tables() -> None: async with engine.begin() as conn: + # pass await conn.run_sync(SQLModel.metadata.drop_all) await conn.run_sync(SQLModel.metadata.create_all) diff --git a/pyproject.toml b/pyproject.toml index b4e0c06..34020f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "rocketry>=2.5.1", "uvloop>=0.17.0", "python-dotenv>=1.0.0", + "asyncstdlib>=3.10.8", ] [project.scripts]