add all project files

This commit is contained in:
Julien Aldon
2026-01-08 10:57:31 +01:00
parent 2873f6fd1b
commit 9766e18fd5
40 changed files with 22638 additions and 0 deletions

226
back/main.py Normal file
View File

@@ -0,0 +1,226 @@
from fastapi import Depends, FastAPI, HTTPException, Response
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
import json
from typing import Optional
from crud import get_books, remove_book, edit_book, add_book, get_films, remove_film, edit_film, add_film, get_field_values, get_user
from pydantic import BaseModel
from passlib.context import CryptContext
from secret import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, origins
from datetime import datetime, timedelta
from jose import JWTError, jwt
class TokenData(BaseModel):
username: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
class Book(BaseModel):
title: str
author: str
type: str
editor: str
class Film(BaseModel):
title: str
actors: str
director: str
type: str
producer: str
length: int
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=['x-nbpage']
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def authenticate_user(name, password):
user = get_user(name)
if len(user) == 0:
return False
if verify_password(password, user[0]['password']):
return True
return False
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = TokenData(username=username)
except JWTError:
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return {'username': username}
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/api/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
username = form_data.username
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return JSONResponse(content={"access_token": access_token, "token_type": "bearer"})
@app.post('/api/books')
async def createBook(book: Book, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = add_book({
'author': book.author,
'title': book.title,
'type': book.type,
'editor': book.editor
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/books')
async def readBook(page: int = 0, limit: int = 50, sort: str = "", search: str = "", current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res, nb = get_books(page, limit=limit, search=search)
if res is False or not nb:
raise HTTPException(status_code=400, detail="Badly formated")
header = {'x-nbpage': str(int(nb[0]['count'] / limit))}
content = {'result': [dict(r) for r in res]}
return JSONResponse(content=content, headers=header)
@app.put('/api/book/{id}')
async def updateBook(id: str, book: Book, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = edit_book(id, {
'author': book.author,
'title': book.title,
'type': book.type,
'editor': book.editor
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.delete('/api/book/{id}')
async def deleteBook(id: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = remove_book(id)
_, nb = get_books(0)
header = {'x-nbpage': str(int(nb[0]['count'] / 50))}
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return JSONResponse(content=res, headers=header)
@app.post('/api/films')
async def createFilm(current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = add_film({
'title': film.title,
'director': film.director,
'producer': film.producer,
'actors': film.actors,
'length': film.length,
'type': film.type
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/films')
async def readFilm(page: int = 0, limit: int = 50, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = get_films(page, limit=limit)
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.put('/api/film/{id}')
async def updateFilm(id: str, film: Film, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = edit_film(id, {
'title': film.title,
'director': film.director,
'producer': film.producer,
'actors': film.actors,
'length': film.length,
'type': film.type
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.delete('/api/film/{id}')
async def deleteFilm(id: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = remove_film(id)
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/books/{field}')
async def getBookFields(field: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
if field == "":
raise HTTPException(status_code=400, detail="Badly formated body")
res = get_field_values('Books', field)
return res
@app.get('/api/films/{field}')
async def getFilmFields(field: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
if field == "":
raise HTTPException(status_code=400, detail="Badly formated body")
res = get_field_values('Films', field)
return res