import secrets from typing import Annotated from urllib.parse import urlencode import jwt import requests import src.users.service as service from fastapi import APIRouter, Cookie, Depends, HTTPException, Request from fastapi.responses import RedirectResponse, Response from fastapi.security import HTTPBearer from jwt import PyJWKClient from sqlmodel import Session, select from src import messages from src.database import get_session from src.models import User, UserCreate, UserPublic from src.settings import (AUTH_URL, ISSUER, JWKS_URL, LOGOUT_URL, TOKEN_URL, settings) router = APIRouter(prefix='/auth') jwk_client = PyJWKClient(JWKS_URL) security = HTTPBearer() @router.get('/logout') def logout(): params = { 'client_id': settings.keycloak_client_id, 'post_logout_redirect_uri': settings.origins, } response = RedirectResponse(f'{LOGOUT_URL}?{urlencode(params)}') response.delete_cookie( key='access_token', path='/', secure=not settings.debug, samesite='strict', ) response.delete_cookie( key='refresh_token', path='/', secure=not settings.debug, samesite='strict', ) response.delete_cookie( key='id_token', path='/', secure=not settings.debug, samesite='strict', ) return response @router.get('/login') def login(): state = secrets.token_urlsafe(16) params = { 'client_id': settings.keycloak_client_id, 'response_type': 'code', 'scope': 'openid', 'redirect_uri': settings.keycloak_redirect_uri, 'state': state, } request_url = requests.Request( 'GET', AUTH_URL, params=params).prepare().url return RedirectResponse(request_url) @router.get('/callback') def callback(code: str, session: Session = Depends(get_session)): data = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': settings.keycloak_redirect_uri, 'client_id': settings.keycloak_client_id, 'client_secret': settings.keycloak_client_secret, } headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: response = requests.post( TOKEN_URL, data=data, headers=headers, timeout=10 ) except requests.exceptions.Timeout as error: raise HTTPException( status_code=404, detail=messages.Messages.not_found('token') ) from error if response.status_code != 200: raise HTTPException( status_code=404, detail=messages.Messages.not_found('token') ) token_data = response.json() id_token = token_data['id_token'] decoded_token = jwt.decode(id_token, options={'verify_signature': False}) decoded_access_token = jwt.decode( token_data['access_token'], options={ 'verify_signature': False}) resource_access = decoded_access_token.get('resource_access') if not resource_access: data = { 'client_id': settings.keycloak_client_id, 'client_secret': settings.keycloak_client_secret, 'refresh_token': token_data['refresh_token'], } try: requests.post(LOGOUT_URL, data=data, timeout=10) except requests.exceptions.Timeout as error: raise HTTPException( status_code=404, detail=messages.Messages.not_found('token') ) from error resp = RedirectResponse(f'{settings.origins}?userNotAllowed=true') return resp roles = resource_access.get(settings.keycloak_client_id) if not roles: data = { 'client_id': settings.keycloak_client_id, 'client_secret': settings.keycloak_client_secret, 'refresh_token': token_data['refresh_token'], } try: requests.post(LOGOUT_URL, data=data, timeout=10) except requests.exceptions.Timeout as error: raise HTTPException( status_code=404, detail=messages.Messages.not_found('token') ) from error resp = RedirectResponse(f'{settings.origins}?userNotAllowed=true') return resp user_create = UserCreate( email=decoded_token.get('email'), name=decoded_token.get('name'), role_names=roles['roles'] ) service.get_or_create_user(session, user_create) response = RedirectResponse(settings.origins) response.set_cookie( key='access_token', value=token_data['access_token'], httponly=True, secure=not settings.debug, samesite='strict', max_age=settings.max_age ) response.set_cookie( key='refresh_token', value=token_data['refresh_token'] or '', httponly=True, secure=not settings.debug, samesite='strict', max_age=30 * 24 * settings.max_age ) response.set_cookie( key='id_token', value=token_data['id_token'], httponly=True, secure=not settings.debug, samesite='strict', max_age=settings.max_age ) return response def verify_token(token: str): try: signing_key = jwk_client.get_signing_key_from_jwt(token) decoded = jwt.decode( token, signing_key.key, algorithms=['RS256'], audience=settings.keycloak_client_id, issuer=ISSUER, leeway=60, ) return decoded except jwt.ExpiredSignatureError as error: raise HTTPException( status_code=401, detail=messages.Messages.tokenexipired ) from error except jwt.InvalidTokenError as error: raise HTTPException( status_code=401, detail=messages.Messages.invalidtoken ) from error def get_current_user( request: Request, session: Session = Depends(get_session)): access_token = request.cookies.get('access_token') if not access_token: raise HTTPException( status_code=401, detail=messages.Messages.notauthenticated ) payload = verify_token(access_token) if not payload: raise HTTPException( status_code=401, detail='aze' ) email = payload.get('email') if not email: raise HTTPException( status_code=401, detail=messages.Messages.notauthenticated ) user = session.exec(select(User).where(User.email == email)).first() if not user: raise HTTPException( status_code=401, detail=messages.Messages.not_found('user') ) return user @router.post('/refresh') def refresh_user_token(refresh_token: Annotated[str | None, Cookie()] = None): refresh = refresh_token data = { 'grant_type': 'refresh_token', 'client_id': settings.keycloak_client_id, 'client_secret': settings.keycloak_client_secret, 'refresh_token': refresh, } headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: result = requests.post( TOKEN_URL, data=data, headers=headers, timeout=10, ) except requests.exceptions.Timeout as error: raise HTTPException( status_code=404, detail=messages.Messages.not_found('token') ) from error if result.status_code != 200: raise HTTPException( status_code=404, detail=messages.Messages.not_found('token') ) token_data = result.json() response = Response() response.set_cookie( key='access_token', value=token_data['access_token'], httponly=True, secure=True if settings.debug is False else True, samesite='strict', max_age=settings.max_age ) response.set_cookie( key='refresh_token', value=token_data['refresh_token'] or '', httponly=True, secure=True if settings.debug is False else True, samesite='strict', max_age=30 * 24 * settings.max_age ) response.set_cookie( key='id_token', value=token_data['id_token'], httponly=True, secure=not settings.debug, samesite='strict', max_age=settings.max_age ) return response @router.get('/user/me') def me(user: UserPublic = Depends(get_current_user)): if not user: return {'logged': False} return { 'logged': True, 'user': { 'name': user.name, 'email': user.email, 'id': user.id, 'roles': user.roles } }