CrowdTLS-server/crowdtls/cli.py

50 lines
1.6 KiB
Python

from typing import Dict
from typing import List
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlmodel import SQLModel
from crowdtls.helpers import decode_der
from db import CertificateChain
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/database"
engine = create_async_engine(DATABASE_URL, echo=True)
app = FastAPI()
@app.on_event("startup")
async def startup_event():
async with engine.begin() as connection:
await connection.run_sync(SQLModel.metadata.create_all)
@app.post("/check")
async def check_fingerprints(fingerprints: Dict[str, List[int]]):
fps = fingerprints.get("fps")
async with AsyncSession(engine) as session:
for fp in fps:
stmt = select(CertificateChain).where(CertificateChain.fingerprint == fp)
result = await session.execute(stmt)
certificate = result.scalars().first()
if not certificate:
return {"send": True}
return {"send": False}
@app.post("/new")
async def new_fingerprints(fingerprints: Dict[str, List[int]]):
async with AsyncSession(engine) as session:
for fp, _ in fingerprints.items():
stmt = select(CertificateChain).where(CertificateChain.fingerprint == fp)
result = await session.execute(stmt)
certificate = result.scalars().first()
if not certificate:
new_certificate = decode_der(fp)
session.add(new_certificate)
pass
await session.commit()
return {"status": "OK"}