2023-06-07 14:35:48 -07:00
|
|
|
import datetime
|
|
|
|
from typing import List
|
|
|
|
from typing import Optional
|
|
|
|
|
2023-06-07 20:02:49 -07:00
|
|
|
from sqlalchemy import event
|
2023-06-07 14:35:48 -07:00
|
|
|
from sqlalchemy import LargeBinary
|
|
|
|
from sqlmodel import Field
|
|
|
|
from sqlmodel import Relationship
|
|
|
|
from sqlmodel import SQLModel
|
|
|
|
|
|
|
|
|
|
|
|
class DomainCertificateLink(SQLModel, table=True):
|
|
|
|
fqdn: Optional[str] = Field(default=None, foreign_key="domain.fqdn", primary_key=True)
|
|
|
|
fingerprint: Optional[str] = Field(default=None, foreign_key="certificate.fingerprint", primary_key=True)
|
2023-06-07 20:02:49 -07:00
|
|
|
first_linked: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
|
|
|
|
|
|
|
|
|
|
|
|
class CertificateAnomalyFlagsLink(SQLModel, table=True):
|
|
|
|
certificate_fingerprint: str = Field(foreign_key="certificate.fingerprint", primary_key=True)
|
|
|
|
anomaly_flag_id: int = Field(foreign_key="anomalyflags.id", primary_key=True)
|
|
|
|
first_linked: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
|
2023-06-07 14:35:48 -07:00
|
|
|
|
|
|
|
|
|
|
|
class Domain(SQLModel, table=True):
|
|
|
|
fqdn: str = Field(primary_key=True)
|
|
|
|
root: str
|
|
|
|
tld: str
|
2023-06-07 20:02:49 -07:00
|
|
|
first_seen: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
|
|
|
|
last_seen: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
|
2023-06-07 14:35:48 -07:00
|
|
|
certificates: Optional[List["Certificate"]] = Relationship(
|
|
|
|
back_populates="domains", link_model=DomainCertificateLink
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class Certificate(SQLModel, table=True):
|
|
|
|
fingerprint: str = Field(index=True, primary_key=True)
|
|
|
|
version: int
|
|
|
|
serial_number: str
|
|
|
|
signature: bytes = Field(default_factory=LargeBinary)
|
|
|
|
issuer: str
|
|
|
|
not_valid_before: datetime.datetime
|
|
|
|
not_valid_after: datetime.datetime
|
|
|
|
subject: str
|
|
|
|
subject_public_key_info: str
|
|
|
|
raw_der_certificate: bytes = Field(default_factory=LargeBinary)
|
2023-06-07 20:02:49 -07:00
|
|
|
first_seen: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
|
|
|
|
last_seen: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
|
|
|
|
seen_count: int = Field(default=1)
|
2023-06-07 14:35:48 -07:00
|
|
|
domains: Optional[List[Domain]] = Relationship(back_populates="certificates", link_model=DomainCertificateLink)
|
2023-06-07 20:02:49 -07:00
|
|
|
anomalies: Optional[List["AnomalyFlags"]] = Relationship(
|
|
|
|
back_populates="certificates", link_model=CertificateAnomalyFlagsLink
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@event.listens_for(Certificate, "before_insert")
|
|
|
|
@event.listens_for(Certificate, "before_update")
|
|
|
|
def certificate_written(mapper, connection, target):
|
|
|
|
target.subject_public_key_info = target.subject_public_key_info.lstrip("-----BEGIN PUBLIC KEY-----\n").rstrip(
|
|
|
|
"\n-----END PUBLIC KEY-----"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@event.listens_for(Certificate, "load")
|
|
|
|
def certificate_loaded(target, context):
|
|
|
|
target.subject_public_key_info = (
|
|
|
|
f"-----BEGIN PUBLIC KEY-----\n{target.subject_public_key_info}\n-----END PUBLIC KEY-----"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class AnomalyTypes(SQLModel, table=True):
|
|
|
|
id: int = Field(primary_key=True)
|
|
|
|
anomalyString: str
|
|
|
|
|
|
|
|
|
|
|
|
class AnomalyFlags(SQLModel, table=True):
|
|
|
|
id: int = Field(primary_key=True)
|
|
|
|
details: str
|
|
|
|
anomaly_type_id: int = Field(foreign_key=AnomalyTypes.id)
|
|
|
|
date_flagged: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
|
|
|
|
certificates: Optional[List[Certificate]] = Relationship(
|
|
|
|
back_populates="anomalies", link_model=CertificateAnomalyFlagsLink
|
|
|
|
)
|