add login / logout logic for user
This commit is contained in:
@@ -1,11 +1,12 @@
|
||||
from fastapi import APIRouter, Security, HTTPException, Depends, Request
|
||||
from typing import Annotated
|
||||
from fastapi import APIRouter, Security, HTTPException, Depends, Request, Cookie
|
||||
from fastapi.responses import RedirectResponse, Response
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from sqlmodel import Session, select
|
||||
import jwt
|
||||
from jwt import PyJWKClient
|
||||
|
||||
from src.settings import AUTH_URL, TOKEN_URL, JWKS_URL, ISSUER, settings
|
||||
from src.settings import AUTH_URL, TOKEN_URL, JWKS_URL, ISSUER, LOGOUT_URL, settings
|
||||
import src.users.service as service
|
||||
from src.database import get_session
|
||||
from src.models import UserCreate, User, UserPublic
|
||||
@@ -20,11 +21,45 @@ router = APIRouter(prefix='/auth')
|
||||
jwk_client = PyJWKClient(JWKS_URL)
|
||||
security = HTTPBearer()
|
||||
|
||||
@router.post('/logout')
|
||||
def logout(response: Response):
|
||||
response.delete_cookie('access_token')
|
||||
response.delete_cookie('refresh_token')
|
||||
return {'detail': messages.userloggedout}
|
||||
@router.get('/logout')
|
||||
def logout(
|
||||
id_token: Annotated[str | None, Cookie()] = None,
|
||||
refresh_token: Annotated[str | None, Cookie()] = None,
|
||||
):
|
||||
if refresh_token:
|
||||
print("invalidate tokens")
|
||||
requests.post(LOGOUT_URL, data={
|
||||
"client_id": settings.keycloak_client_id,
|
||||
"client_secret": settings.keycloak_client_secret,
|
||||
"refresh_token": refresh_token
|
||||
})
|
||||
|
||||
if id_token:
|
||||
print("redirect keycloak")
|
||||
response = RedirectResponse(f'{LOGOUT_URL}?post_logout_redirect_uri={settings.origins}&id_token_hint={id_token}')
|
||||
else:
|
||||
response = RedirectResponse(settings.origins)
|
||||
|
||||
print("clear cookies")
|
||||
response.delete_cookie(
|
||||
key='access_token',
|
||||
path='/',
|
||||
secure=not settings.debug,
|
||||
samesite='lax',
|
||||
)
|
||||
response.delete_cookie(
|
||||
key='refresh_token',
|
||||
path='/',
|
||||
secure=not settings.debug,
|
||||
samesite='lax',
|
||||
)
|
||||
response.delete_cookie(
|
||||
key='id_token',
|
||||
path='/',
|
||||
secure=not settings.debug,
|
||||
samesite='lax',
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
@router.get('/login')
|
||||
@@ -64,7 +99,27 @@ def callback(code: str, session: Session = Depends(get_session)):
|
||||
id_token = token_data['id_token']
|
||||
decoded_token = jwt.decode(id_token, options={'verify_signature': False})
|
||||
decoded_access_token = jwt.decode(token_data['access_token'], options={'verify_signature': False})
|
||||
roles = decoded_access_token['resource_access'][settings.keycloak_client_id]
|
||||
resource_access = decoded_access_token.get('resource_access')
|
||||
if not resource_access:
|
||||
data = {
|
||||
'client_id': settings.keycloak_client_id,
|
||||
'client_secret': settings.keycloak_client_secret,
|
||||
'refresh_token': token_data['refresh_token'],
|
||||
}
|
||||
res = requests.post(LOGOUT_URL, data=data)
|
||||
resp = RedirectResponse(settings.origins)
|
||||
return resp
|
||||
resource_access.get(settings.keycloak_client_id)
|
||||
if not roles:
|
||||
data = {
|
||||
'client_id': settings.keycloak_client_id,
|
||||
'client_secret': settings.keycloak_client_secret,
|
||||
'refresh_token': token_data['refresh_token'],
|
||||
}
|
||||
res = requests.post(LOGOUT_URL, data=data)
|
||||
resp = RedirectResponse(settings.origins)
|
||||
return resp
|
||||
|
||||
user_create = UserCreate(
|
||||
email=decoded_token.get('email'),
|
||||
name=decoded_token.get('preferred_username'),
|
||||
@@ -76,18 +131,27 @@ def callback(code: str, session: Session = Depends(get_session)):
|
||||
key='access_token',
|
||||
value=token_data['access_token'],
|
||||
httponly=True,
|
||||
secure=True if settings.debug == False else True,
|
||||
samesite='strict',
|
||||
secure=not settings.debug,
|
||||
samesite='lax',
|
||||
max_age=settings.max_age
|
||||
)
|
||||
response.set_cookie(
|
||||
key='refresh_token',
|
||||
value=token_data['refresh_token'] or '',
|
||||
httponly=True,
|
||||
secure=True if settings.debug == False else True,
|
||||
samesite='strict',
|
||||
secure=not settings.debug,
|
||||
samesite='lax',
|
||||
max_age=30 * 24 * settings.max_age
|
||||
)
|
||||
response.set_cookie(
|
||||
key='id_token',
|
||||
value=token_data['id_token'],
|
||||
httponly=True,
|
||||
secure=not settings.debug,
|
||||
samesite='lax',
|
||||
max_age=settings.max_age
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
def verify_token(token: str):
|
||||
@@ -109,12 +173,12 @@ def verify_token(token: str):
|
||||
|
||||
|
||||
def get_current_user(request: Request, session: Session = Depends(get_session)):
|
||||
access_token = request.cookies.get("access_token")
|
||||
access_token = request.cookies.get('access_token')
|
||||
if not access_token:
|
||||
raise HTTPException(status_code=401, detail=messages.notauthenticated)
|
||||
payload = verify_token(access_token)
|
||||
if not payload:
|
||||
raise HTTPException(status_code=401, detail="aze")
|
||||
raise HTTPException(status_code=401, detail='aze')
|
||||
email = payload.get('email')
|
||||
|
||||
if not email:
|
||||
@@ -125,16 +189,55 @@ def get_current_user(request: Request, session: Session = Depends(get_session)):
|
||||
raise HTTPException(status_code=401, detail=messages.usernotfound)
|
||||
return user
|
||||
|
||||
@router.post('/refresh')
|
||||
def refresh_token(refresh_token: Annotated[str | None, Cookie()] = None):
|
||||
refresh = refresh_token
|
||||
data = {
|
||||
'grant_type': 'refresh_token',
|
||||
'client_id': settings.keycloak_client_id,
|
||||
'client_secret': settings.keycloak_client_secret,
|
||||
'refresh_token': refresh,
|
||||
}
|
||||
headers = {
|
||||
'Content-Type': 'application/x-www-form-urlencoded'
|
||||
}
|
||||
result = requests.post(TOKEN_URL, data=data, headers=headers)
|
||||
if result.status_code != 200:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=messages.failtogettoken
|
||||
)
|
||||
|
||||
token_data = result.json()
|
||||
response = Response()
|
||||
response.set_cookie(
|
||||
key='access_token',
|
||||
value=token_data['access_token'],
|
||||
httponly=True,
|
||||
secure=True if settings.debug == False else True,
|
||||
samesite='lax',
|
||||
max_age=settings.max_age
|
||||
)
|
||||
response.set_cookie(
|
||||
key='refresh_token',
|
||||
value=token_data['refresh_token'] or '',
|
||||
httponly=True,
|
||||
secure=True if settings.debug == False else True,
|
||||
samesite='lax',
|
||||
max_age=4
|
||||
)
|
||||
return response
|
||||
|
||||
@router.get('/user/me')
|
||||
def me(user: UserPublic = Depends(get_current_user)):
|
||||
if not user:
|
||||
return {"logged": False}
|
||||
return {'logged': False}
|
||||
return {
|
||||
"logged": True,
|
||||
"user": {
|
||||
"name": user.name,
|
||||
"email": user.email,
|
||||
"id": user.id,
|
||||
"roles": [role.name for role in user.roles]
|
||||
'logged': True,
|
||||
'user': {
|
||||
'name': user.name,
|
||||
'email': user.email,
|
||||
'id': user.id,
|
||||
'roles': [role.name for role in user.roles]
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user