CrowdTLS-server/crowdtls/models.py

83 lines
3.2 KiB
Python

import datetime
from typing import List
from typing import Optional
from sqlalchemy import event
from sqlalchemy import LargeBinary
from sqlmodel import Field
from sqlmodel import Relationship
from sqlmodel import SQLModel
class DomainCertificateLink(SQLModel, table=True):
fqdn: Optional[str] = Field(default=None, foreign_key="domain.fqdn", primary_key=True)
fingerprint: Optional[str] = Field(default=None, foreign_key="certificate.fingerprint", primary_key=True)
first_linked: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
class CertificateAnomalyFlagsLink(SQLModel, table=True):
certificate_fingerprint: str = Field(foreign_key="certificate.fingerprint", primary_key=True)
anomaly_flag_id: int = Field(foreign_key="anomalyflags.id", primary_key=True)
first_linked: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
class Domain(SQLModel, table=True):
fqdn: str = Field(primary_key=True)
root: str
tld: str
first_seen: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
last_seen: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
certificates: Optional[List["Certificate"]] = Relationship(
back_populates="domains", link_model=DomainCertificateLink
)
class Certificate(SQLModel, table=True):
fingerprint: str = Field(index=True, primary_key=True)
version: int
serial_number: str
signature: bytes = Field(default_factory=LargeBinary)
issuer: str
not_valid_before: datetime.datetime
not_valid_after: datetime.datetime
subject: str
subject_public_key_info: str
raw_der_certificate: bytes = Field(default_factory=LargeBinary)
first_seen: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
last_seen: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
seen_count: int = Field(default=1)
domains: Optional[List[Domain]] = Relationship(back_populates="certificates", link_model=DomainCertificateLink)
anomalies: Optional[List["AnomalyFlags"]] = Relationship(
back_populates="certificates", link_model=CertificateAnomalyFlagsLink
)
@event.listens_for(Certificate, "before_insert")
@event.listens_for(Certificate, "before_update")
def certificate_written(mapper, connection, target):
target.subject_public_key_info = target.subject_public_key_info.lstrip("-----BEGIN PUBLIC KEY-----\n").rstrip(
"\n-----END PUBLIC KEY-----"
)
@event.listens_for(Certificate, "load")
def certificate_loaded(target, context):
target.subject_public_key_info = (
f"-----BEGIN PUBLIC KEY-----\n{target.subject_public_key_info}\n-----END PUBLIC KEY-----"
)
class AnomalyTypes(SQLModel, table=True):
id: int = Field(primary_key=True)
response_code: int
anomalyString: str
class AnomalyFlags(SQLModel, table=True):
id: int = Field(primary_key=True)
details: str
anomaly_type_id: int = Field(foreign_key=AnomalyTypes.id)
date_flagged: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
certificates: Optional[List[Certificate]] = Relationship(
back_populates="anomalies", link_model=CertificateAnomalyFlagsLink
)