add project files

This commit is contained in:
Julien Aldon
2026-01-12 11:17:01 +01:00
parent 38293b1352
commit 4d5b4698c6
35 changed files with 23425 additions and 0 deletions

11
back/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM python:3.10
WORKDIR /code
COPY back/requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY back/ /code/
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers"]

17
back/Pipfile Normal file
View File

@@ -0,0 +1,17 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
fastapi = {extras = ["all"], version = "*"}
python-jose = "*"
passlib = {extras = ["bcrypt"], version = "*"}
uvicorn = {extras = ["standard"], version = "*"}
python-pam = "*"
pymysql = "*"
[dev-packages]
[requires]
python_version = "3.9"

1596
back/Pipfile.lock generated Normal file

File diff suppressed because it is too large Load Diff

0
back/__init__.py Normal file
View File

188
back/crud.py Normal file
View File

@@ -0,0 +1,188 @@
from database import connection
def get_books(page, limit=50, order="Auteur", search=""):
"""
:param limit: Item limit
:type limit: int
:param page: current page
:type page: int
:param order: Book fields are : Auteur, Titre, Editeur, Type
:type order: str
"""
cursor = connection.cursor()
connection.ping(reconnect=True)
if search == "":
t = f"""
SELECT * from Books order by %s asc LIMIT %s OFFSET %s;
"""
nb = f"""
SELECT COUNT(*) as count from Books order by %s asc;
"""
try:
cursor.execute(t, (order, limit, limit * page))
r = cursor.fetchall()
cursor.execute(nb, (order))
n = cursor.fetchall()
except Exception as e:
print(e)
return False
else:
search = search.replace(' ', ', ')
t = f"""
SELECT * FROM `Books`
WHERE `Titre` LIKE %s
OR `Auteur` LIKE %s
OR `Editeur` LIKE %s
OR `Type` LIKE %s
ORDER BY %s asc LIMIT %s OFFSET %s;
"""
nb = f"""
SELECT COUNT(*) as count from `Books`
WHERE `Titre` LIKE %s
OR `Auteur` LIKE %s
OR `Editeur` LIKE %s
OR `Type` LIKE %s
ORDER BY %s asc;
"""
try:
cursor.execute(t, (f'%{search}%', f'%{search}%', f'%{search}%', f'%{search}%', order, limit, limit * page))
r = cursor.fetchall()
cursor.execute(nb, (f'%{search}%', f'%{search}%', f'%{search}%', f'%{search}%', order))
n = cursor.fetchall()
except Exception as e:
print(e)
return False
return r, n
def remove_book(id):
cursor = connection.cursor()
connection.ping(reconnect=True)
t = f"""
DELETE FROM Books where biblio_Index=%s;
"""
try:
cursor.execute(t, (id))
except:
return False
connection.commit()
return True
def edit_book(id, newContent):
"""
:param id: item to update
:type id: str
:param newContent: New values to update
:type newContent: dict
"""
cursor = connection.cursor()
connection.ping(reconnect=True)
t = f"""
UPDATE Books
SET Auteur=%s, Titre=%s, Editeur=%s, Type=%s
WHERE biblio_Index=%s;
"""
try:
cursor.execute(t, (newContent['author'], newContent['title'], newContent['editor'], newContent['type'], id))
except:
return False
connection.commit()
return True
def add_book(newContent):
"""
:param newContent: New values to add
:type newContent: dict
"""
connection.ping(reconnect=True)
cursor = connection.cursor()
t = f"""
INSERT INTO Books (Auteur, Titre, Editeur, Type)
VALUES (%s, %s, %s, %s);
"""
try:
cursor.execute(t, (newContent['author'], newContent['title'], newContent['editor'], newContent['type']))
except:
return False
connection.commit()
return True
def add_film(newContent):
connection.ping(reconnect=True)
cursor = connection.cursor()
t = f"""
INSERT INTO Films (Title, Director, Producer, Actors, Length, Type)
VALUES (%s, %s, %s, %s, %s, %s);
"""
try:
cursor.execute(t, (newContent['title'], newContent['director'], newContent['producer'], newContent['actors'], newContent['length'], newContent['type']))
except:
return False
connection.commit()
return True
def remove_film(id):
cursor = connection.cursor()
t = f"""
DELETE FROM Films where Number=%s;
"""
try:
cursor.execute(t, (id))
except:
return False
connection.commit()
return True
def edit_film(id, newContent):
connection.ping(reconnect=True)
cursor = connection.cursor()
t = f"""
UPDATE Films
SET Title=%s, Director=%s, Producer=%s, Actors=%s, Length=%s, Type=%s
WHERE Number=%s;
"""
try:
cursor.execute(t, (newContent['title'], newContent['director'], newContent['producer'], newContent['actors'], newContent['length'], newContent['type'], id))
except:
return False
connection.commit()
return True
def get_films(page, limit=50, order="Director"):
connection.ping(reconnect=True)
cursor = connection.cursor()
t = f"""
SELECT * from Films order by %s asc LIMIT %s OFFSET %s;
"""
try:
cursor.execute(t, (order, limit, limit * page))
except:
return False
return cursor.fetchall()
def get_field_values(table, field):
connection.ping(reconnect=True)
cursor = connection.cursor()
t = f"""
SELECT DISTINCT {field} FROM {table} ORDER BY {field} asc;
"""
try:
cursor.execute(t)
except Exception as e:
print(e)
return False
res = cursor.fetchall()
return [a[field] for a in res]
def get_user(username):
connection.ping(reconnect=True)
cursor = connection.cursor()
t = f"""
SELECT * from Users WHERE username=%s;
"""
try:
cursor.execute(t, (username))
except Exception as e:
print(e)
return False
res = cursor.fetchall()
return res

17
back/database.py Normal file
View File

@@ -0,0 +1,17 @@
from secret import host, user, password, database
import pymysql.cursors
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
cursor = connection.cursor()
# cursor.execute(sql, ())
# cursor.fetchall()
# connection.commit()

226
back/main.py Normal file
View File

@@ -0,0 +1,226 @@
from fastapi import Depends, FastAPI, HTTPException, Response
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
import json
from typing import Optional
from crud import get_books, remove_book, edit_book, add_book, get_films, remove_film, edit_film, add_film, get_field_values, get_user
from pydantic import BaseModel
from passlib.context import CryptContext
from secret import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, origins
from datetime import datetime, timedelta
from jose import JWTError, jwt
class TokenData(BaseModel):
username: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
class Book(BaseModel):
title: str
author: str
type: str
editor: str
class Film(BaseModel):
title: str
actors: str
director: str
type: str
producer: str
length: int
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=['x-nbpage']
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def authenticate_user(name, password):
user = get_user(name)
if len(user) == 0:
return False
if verify_password(password, user[0]['password']):
return True
return False
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = TokenData(username=username)
except JWTError:
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return {'username': username}
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/api/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
username = form_data.username
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return JSONResponse(content={"access_token": access_token, "token_type": "bearer"})
@app.post('/api/books')
async def createBook(book: Book, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = add_book({
'author': book.author,
'title': book.title,
'type': book.type,
'editor': book.editor
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/books')
async def readBook(page: int = 0, limit: int = 50, sort: str = "", search: str = "", current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res, nb = get_books(page, limit=limit, search=search)
if res is False or not nb:
raise HTTPException(status_code=400, detail="Badly formated")
header = {'x-nbpage': str(int(nb[0]['count'] / limit))}
content = {'result': [dict(r) for r in res]}
return JSONResponse(content=content, headers=header)
@app.put('/api/book/{id}')
async def updateBook(id: str, book: Book, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = edit_book(id, {
'author': book.author,
'title': book.title,
'type': book.type,
'editor': book.editor
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.delete('/api/book/{id}')
async def deleteBook(id: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = remove_book(id)
_, nb = get_books(0)
header = {'x-nbpage': str(int(nb[0]['count'] / 50))}
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return JSONResponse(content=res, headers=header)
@app.post('/api/films')
async def createFilm(current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = add_film({
'title': film.title,
'director': film.director,
'producer': film.producer,
'actors': film.actors,
'length': film.length,
'type': film.type
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/films')
async def readFilm(page: int = 0, limit: int = 50, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = get_films(page, limit=limit)
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.put('/api/film/{id}')
async def updateFilm(id: str, film: Film, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = edit_film(id, {
'title': film.title,
'director': film.director,
'producer': film.producer,
'actors': film.actors,
'length': film.length,
'type': film.type
})
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.delete('/api/film/{id}')
async def deleteFilm(id: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
res = remove_film(id)
if res == None:
raise HTTPException(status_code=400, detail="Badly formated body")
return res
@app.get('/api/books/{field}')
async def getBookFields(field: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
if field == "":
raise HTTPException(status_code=400, detail="Badly formated body")
res = get_field_values('Books', field)
return res
@app.get('/api/films/{field}')
async def getFilmFields(field: str, current_user: dict = Depends(get_current_user)):
if not current_user:
raise HTTPException(status_code=401, detail="User not allowed")
if field == "":
raise HTTPException(status_code=400, detail="Badly formated body")
res = get_field_values('Films', field)
return res

37
back/requirements.txt Normal file
View File

@@ -0,0 +1,37 @@
anyio==3.6.1
bcrypt==4.0.0
certifi==2022.9.24
charset-normalizer==2.1.1
click==8.1.3
dnspython==2.2.1
ecdsa==0.18.0
email-validator==1.3.0
fastapi==0.85.0
h11==0.14.0
httptools==0.5.0
idna==3.4
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.1
orjson==3.8.0
passlib==1.7.4
pyasn1==0.4.8
pydantic==1.10.2
PyMySQL==1.0.2
python-dotenv==0.21.0
python-jose==3.3.0
python-multipart==0.0.5
python-pam==2.0.2
PyYAML==6.0
requests==2.28.1
rsa==4.9
six==1.16.0
sniffio==1.3.0
starlette==0.20.4
typing_extensions==4.3.0
ujson==5.5.0
urllib3==1.26.12
uvicorn==0.18.3
uvloop==0.17.0
watchfiles==0.17.0
websockets==10.3

15
back/secret.py Normal file
View File

@@ -0,0 +1,15 @@
import os
origins = [
os.environ['ORIGIN']
]
host = os.environ['DB_HOST']
user = os.environ['DB_USER']
password = os.environ['DB_PASS']
database = os.environ['DB_NAME']
# openssl rand -hex 32
SECRET_KEY = os.environ['SECRET_KEY']
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30