Basic JWT endpoints

This commit is contained in:
Darryl Nixon 2023-05-23 17:14:49 -07:00
parent 7c5919f073
commit 1fa3d8a372
13 changed files with 180 additions and 27 deletions

View file

@ -1,9 +0,0 @@
"""
Initialization constants for ghostforge.
Constants:
_PROJECT (str): Name for the project
__version__ (str): The current version of the binhop package
"""
_PROJECT = "ghostforge"
__version__ = "0.0.1"

35
ghostforge/api.py Normal file
View file

@ -0,0 +1,35 @@
from fastapi import Body
from fastapi import Depends
from fastapi import FastAPI
from ghostforge.auth import check_user
from ghostforge.auth.bearer import JWTBearer
from ghostforge.auth.handler import signJWT
from ghostforge.models import UserLoginSchema
# define("port", default=os.environ.get("GHOSTFORGE_INTERNAL_WEB_PORT", 1337), help="Webserver port", type=int)
# define("db_host", default=os.environ.get("DATABASE_CONTAINER_NAME", "localhost"), help="Host/IP of db")
# define("db_port", default=os.environ.get("DATABASE_PORT", 5432), help="Port of db", type=int)
# define("db_database", default=os.environ.get("DATABASE_NAME", "ghostforge"), help="Name of db")
# define("db_user", default=os.environ.get("DATABASE_USER", "ghostforge"), help="User with access to db")
# define("db_password", default=os.environ.get("DATABASE_PASSWORD", "secure"), help="Password for db user")
# define("secret_key", default=os.environ.get("DATABASE_PASSWORD", "secure"), help="Password for db user")
app = FastAPI()
@app.get("/", tags=["root"])
async def read_root() -> dict:
return {"message": "Welcome!"}
@app.post("/login", tags=["user"])
async def user_login(user: UserLoginSchema = Body(...)):
if check_user(user):
return signJWT(user.name)
return {"error": "Wrong login details!"}
@app.post("/auth_check", dependencies=[Depends(JWTBearer())], tags=["auth_check"])
async def auth_check(post) -> dict:
return {"data": "post added."}

View file

@ -0,0 +1,11 @@
from ghostforge.models import UserLoginSchema
users = [{"name": "test", "password": "pw"}]
def check_user(data: UserLoginSchema):
print(data)
for user in users:
if user["name"] == data.name and user["password"] == data.password:
return True
return False

35
ghostforge/auth/bearer.py Normal file
View file

@ -0,0 +1,35 @@
from fastapi import HTTPException
from fastapi import Request
from fastapi.security import HTTPAuthorizationCredentials
from fastapi.security import HTTPBearer
from ghostforge.auth.handler import decodeJWT
# from https://github.com/testdrivenio/fastapi-jwt/
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
try:
payload = decodeJWT(jwtoken)
except Exception:
payload = None
if payload:
isTokenValid = True
return isTokenValid

View file

@ -0,0 +1,28 @@
import os
import time
from typing import Dict
import jwt
# Based on handler from https://github.com/testdrivenio/fastapi-jwt/
JWT_SECRET = os.environ.get("GHOSTFORGE_JWT_SECRET")
def token_response(token: str):
return {"access_token": token}
def signJWT(user: str) -> Dict[str, str]:
payload = {"user_id": user, "expires": time.time() + 600}
token = jwt.encode(payload, JWT_SECRET, algorithm="HS256")
return token_response(token)
def decodeJWT(token: str) -> dict:
try:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return decoded_token if decoded_token["expires"] >= time.time() else None
except Exception:
return {}

View file

@ -0,0 +1,9 @@
import os
import uvicorn
def start_api() -> None:
uvicorn.run(
"ghostforge.api:app", host="0.0.0.0", port=os.environ.get("GHOSTFORGE_INTERNAL_WEB_PORT", 1337), reload=True
)

27
ghostforge/models.py Normal file
View file

@ -0,0 +1,27 @@
import datetime
from pydantic import BaseModel
from pydantic import Field
class UserSchema(BaseModel):
name: str = Field(...)
password: str = Field(...)
created: datetime.datetime = datetime.datetime
class Config:
schema_extra = {
"example": {
"name": "Jeremy Tootsieroll",
"password": "notarealpassword",
"created": "2021-03-05T08:21:00.000Z",
}
}
class UserLoginSchema(BaseModel):
name: str = Field(...)
password: str = Field(...)
class Config:
schema_extra = {"example": {"name": "Jeremy Tootsieroll", "password": "notarealpassword"}}