Add authentification

This commit is contained in:
2026-02-17 00:54:36 +01:00
parent ab98ba81c8
commit a8c8c489da
31 changed files with 1118 additions and 451 deletions

View File

@@ -1,91 +1,140 @@
from fastapi import APIRouter, Security, HTTPException, Depends
from fastapi.responses import RedirectResponse
from fastapi import APIRouter, Security, HTTPException, Depends, Request
from fastapi.responses import RedirectResponse, Response
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlmodel import Session
from sqlmodel import Session, select
import jwt
from jwt import PyJWKClient
from src.settings import AUTH_URL, TOKEN_URL, JWKS_URL, ISSUER, settings
import src.users.service as service
from src.database import get_session
from src.models import UserCreate
from src.models import UserCreate, User, UserPublic
import secrets
import jwt
from jwt import PyJWKClient
import requests
from src.messages import tokenExpired, invalidToken
import src.messages as messages
router = APIRouter(prefix="/auth")
router = APIRouter(prefix='/auth')
jwk_client = PyJWKClient(JWKS_URL)
security = HTTPBearer()
@router.post('/logout')
def logout(response: Response):
response.delete_cookie('access_token')
response.delete_cookie('refresh_token')
return {'detail': messages.userloggedout}
@router.get('/login')
def login():
state = secrets.token_urlsafe(16)
params = {
"client_id": settings.keycloak_client_id,
"response_type": "code",
"scope": "openid",
"redirect_uri": settings.keycloak_redirect_uri,
"state": state,
'client_id': settings.keycloak_client_id,
'response_type': 'code',
'scope': 'openid',
'redirect_uri': settings.keycloak_redirect_uri,
'state': state,
}
request_url = requests.Request('GET', AUTH_URL, params=params).prepare().url
return RedirectResponse(request_url)
@router.get("/callback")
@router.get('/callback')
def callback(code: str, session: Session = Depends(get_session)):
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": settings.keycloak_redirect_uri,
"client_id": settings.keycloak_client_id,
"client_secret": settings.keycloak_client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': settings.keycloak_redirect_uri,
'client_id': settings.keycloak_client_id,
'client_secret': settings.keycloak_client_secret,
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
'Content-Type': 'application/x-www-form-urlencoded'
}
response = requests.post(TOKEN_URL, data=data, headers=headers)
if response.status_code != 200:
return JSONResponse(
{"error": "Failed to get token"},
status_code=400
raise HTTPException(
status_code=400,
detail=messages.failtogettoken
)
token_data = response.json()
id_token = token_data["id_token"]
decoded_token = jwt.decode(id_token, options={"verify_signature": False})
id_token = token_data['id_token']
decoded_token = jwt.decode(id_token, options={'verify_signature': False})
decoded_access_token = jwt.decode(token_data['access_token'], options={'verify_signature': False})
roles = decoded_access_token['resource_access'][settings.keycloak_client_id]
user_create = UserCreate(
email=decoded_token.get("email"),
name=decoded_token.get("preferred_username")
email=decoded_token.get('email'),
name=decoded_token.get('preferred_username'),
role_names=roles['roles']
)
user = service.get_or_create_user(session, user_create)
return {
"access_token": token_data["access_token"],
"id_token": token_data["id_token"],
"refresh_token": token_data["refresh_token"],
}
service.get_or_create_user(session, user_create)
response = RedirectResponse(settings.origins)
response.set_cookie(
key='access_token',
value=token_data['access_token'],
httponly=True,
secure=True if settings.debug == False else True,
samesite='strict',
max_age=settings.max_age
)
response.set_cookie(
key='refresh_token',
value=token_data['refresh_token'] or '',
httponly=True,
secure=True if settings.debug == False else True,
samesite='strict',
max_age=30 * 24 * settings.max_age
)
return response
def verify_token(token: str):
try:
signing_key = jwk_client.get_signing_key_from_jwt(token)
decoded = jwt.decode(token, options={"verify_signature": False})
decoded = jwt.decode(token, options={'verify_signature': False})
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
algorithms=['RS256'],
audience=settings.keycloak_client_id,
issuer=ISSUER,
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail=tokenExpired)
raise HTTPException(status_code=401, detail=messages.tokenexipired)
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail=invalidToken)
raise HTTPException(status_code=401, detail=messages.invalidtoken)
def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security)
):
return verify_token(credentials.credentials)
def get_current_user(request: Request, session: Session = Depends(get_session)):
access_token = request.cookies.get("access_token")
if not access_token:
raise HTTPException(status_code=401, detail=messages.notauthenticated)
payload = verify_token(access_token)
if not payload:
raise HTTPException(status_code=401, detail="aze")
email = payload.get('email')
if not email:
raise HTTPException(status_code=401, detail=messages.notauthenticated)
user = session.exec(select(User).where(User.email == email)).first()
if not user:
raise HTTPException(status_code=401, detail=messages.usernotfound)
return user
@router.get('/user/me')
def me(user: UserPublic = Depends(get_current_user)):
if not user:
return {"logged": False}
return {
"logged": True,
"user": {
"name": user.name,
"email": user.email,
"id": user.id,
"roles": [role.name for role in user.roles]
}
}