add permission check for form productor and product
This commit is contained in:
@@ -1,20 +1,21 @@
|
||||
from typing import Annotated
|
||||
from fastapi import APIRouter, Security, HTTPException, Depends, Request, Cookie
|
||||
from fastapi.responses import RedirectResponse, Response
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from sqlmodel import Session, select
|
||||
import jwt
|
||||
from jwt import PyJWKClient
|
||||
|
||||
from src.settings import AUTH_URL, TOKEN_URL, JWKS_URL, ISSUER, LOGOUT_URL, settings
|
||||
import src.users.service as service
|
||||
from src.database import get_session
|
||||
from src.models import UserCreate, User, UserPublic
|
||||
|
||||
import secrets
|
||||
import requests
|
||||
from typing import Annotated
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import jwt
|
||||
import requests
|
||||
import src.messages as messages
|
||||
import src.users.service as service
|
||||
from fastapi import (APIRouter, Cookie, Depends, HTTPException, Request,
|
||||
Security)
|
||||
from fastapi.responses import RedirectResponse, Response
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
from jwt import PyJWKClient
|
||||
from sqlmodel import Session, select
|
||||
from src.database import get_session
|
||||
from src.models import User, UserCreate, UserPublic
|
||||
from src.settings import (AUTH_URL, ISSUER, JWKS_URL, LOGOUT_URL, TOKEN_URL,
|
||||
settings)
|
||||
|
||||
router = APIRouter(prefix='/auth')
|
||||
|
||||
@@ -98,7 +99,7 @@ def callback(code: str, session: Session = Depends(get_session)):
|
||||
'client_secret': settings.keycloak_client_secret,
|
||||
'refresh_token': token_data['refresh_token'],
|
||||
}
|
||||
res = requests.post(LOGOUT_URL, data=data)
|
||||
requests.post(LOGOUT_URL, data=data)
|
||||
resp = RedirectResponse(f'{settings.origins}?userNotAllowed=true')
|
||||
return resp
|
||||
roles = resource_access.get(settings.keycloak_client_id)
|
||||
@@ -108,7 +109,7 @@ def callback(code: str, session: Session = Depends(get_session)):
|
||||
'client_secret': settings.keycloak_client_secret,
|
||||
'refresh_token': token_data['refresh_token'],
|
||||
}
|
||||
res = requests.post(LOGOUT_URL, data=data)
|
||||
requests.post(LOGOUT_URL, data=data)
|
||||
resp = RedirectResponse(f'{settings.origins}?userNotAllowed=true')
|
||||
return resp
|
||||
|
||||
@@ -160,12 +161,15 @@ def verify_token(token: str):
|
||||
)
|
||||
return decoded
|
||||
except jwt.ExpiredSignatureError:
|
||||
raise HTTPException(status_code=401,
|
||||
detail=messages.Messages.tokenexipired)
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail=messages.Messages.tokenexipired
|
||||
)
|
||||
except jwt.InvalidTokenError:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail=messages.Messages.invalidtoken)
|
||||
detail=messages.Messages.invalidtoken
|
||||
)
|
||||
|
||||
|
||||
def get_current_user(
|
||||
@@ -173,21 +177,30 @@ def get_current_user(
|
||||
session: Session = Depends(get_session)):
|
||||
access_token = request.cookies.get('access_token')
|
||||
if not access_token:
|
||||
raise HTTPException(status_code=401,
|
||||
detail=messages.Messages.notauthenticated)
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail=messages.Messages.notauthenticated
|
||||
)
|
||||
payload = verify_token(access_token)
|
||||
if not payload:
|
||||
raise HTTPException(status_code=401, detail='aze')
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail='aze'
|
||||
)
|
||||
email = payload.get('email')
|
||||
|
||||
if not email:
|
||||
raise HTTPException(status_code=401,
|
||||
detail=messages.Messages.notauthenticated)
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail=messages.Messages.notauthenticated
|
||||
)
|
||||
|
||||
user = session.exec(select(User).where(User.email == email)).first()
|
||||
if not user:
|
||||
raise HTTPException(status_code=401,
|
||||
detail=messages.Messages.not_found('user'))
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail=messages.Messages.not_found('user')
|
||||
)
|
||||
return user
|
||||
|
||||
|
||||
@@ -249,6 +262,6 @@ def me(user: UserPublic = Depends(get_current_user)):
|
||||
'name': user.name,
|
||||
'email': user.email,
|
||||
'id': user.id,
|
||||
'roles': [role.name for role in user.roles]
|
||||
'roles': user.roles
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user