Files
bookshelf/back/main.py
2026-01-12 11:17:01 +01:00

226 lines
7.7 KiB
Python

from fastapi import Depends, FastAPI, HTTPException, Response
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
import json
from typing import Optional
from crud import get_books, remove_book, edit_book, add_book, get_films, remove_film, edit_film, add_film, get_field_values, get_user
from pydantic import BaseModel
from passlib.context import CryptContext
from secret import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, origins
from datetime import datetime, timedelta
from jose import JWTError, jwt
class TokenData(BaseModel):
username: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
class Book(BaseModel):
title: str
author: str
type: str
editor: str
class Film(BaseModel):
title: str
actors: str
director: str
type: str
producer: str
length: int
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=['x-nbpage']
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def authenticate_user(name, password):
user = get_user(name)
if len(user) == 0:
return False
if verify_password(password, user[0]['password']):
return True
return False
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = TokenData(username=username)
except JWTError:
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return {'username': username}
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/api/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
username = form_data.username
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return JSONResponse(content={"access_token": access_token, "token_type": "bearer"})
@app.post('/api/books')
async def createBook(book: Book, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = add_book({
'author': book.author,
'title': book.title,
'type': book.type,
'editor': book.editor
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/books')
async def readBook(page: int = 0, limit: int = 50, sort: str = "", search: str = "", current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res, nb = get_books(page, limit=limit, search=search)
if res is False or not nb:
raise HTTPException(status_code=400, detail="Badly formated")
header = {'x-nbpage': str(int(nb[0]['count'] / limit))}
content = {'result': [dict(r) for r in res]}
return JSONResponse(content=content, headers=header)
@app.put('/api/book/{id}')
async def updateBook(id: str, book: Book, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = edit_book(id, {
'author': book.author,
'title': book.title,
'type': book.type,
'editor': book.editor
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.delete('/api/book/{id}')
async def deleteBook(id: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = remove_book(id)
_, nb = get_books(0)
header = {'x-nbpage': str(int(nb[0]['count'] / 50))}
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return JSONResponse(content=res, headers=header)
@app.post('/api/films')
async def createFilm(current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = add_film({
'title': film.title,
'director': film.director,
'producer': film.producer,
'actors': film.actors,
'length': film.length,
'type': film.type
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/films')
async def readFilm(page: int = 0, limit: int = 50, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = get_films(page, limit=limit)
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.put('/api/film/{id}')
async def updateFilm(id: str, film: Film, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = edit_film(id, {
'title': film.title,
'director': film.director,
'producer': film.producer,
'actors': film.actors,
'length': film.length,
'type': film.type
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.delete('/api/film/{id}')
async def deleteFilm(id: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = remove_film(id)
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/books/{field}')
async def getBookFields(field: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
if field == "":
raise HTTPException(status_code=400, detail="Badly formated body")
res = get_field_values('Books', field)
return res
@app.get('/api/films/{field}')
async def getFilmFields(field: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
if field == "":
raise HTTPException(status_code=400, detail="Badly formated body")
res = get_field_values('Films', field)
return res