This commit is contained in:
@@ -11,7 +11,15 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
- name: Checkout repository
|
- name: Checkout repository
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
|
- name: Test backend
|
||||||
|
uses: actions/setup-python@v4
|
||||||
|
with:
|
||||||
|
python-version: "3.11"
|
||||||
|
run: |
|
||||||
|
- python -m pip install --upgrade pip
|
||||||
|
- pip install -r back/requirements.txt
|
||||||
|
- pytest -sv
|
||||||
|
|
||||||
- name: Build & deploy
|
- name: Build & deploy
|
||||||
run: |
|
run: |
|
||||||
git pull
|
|
||||||
docker compose up -d --build
|
docker compose up -d --build
|
||||||
72
back/crud.py
72
back/crud.py
@@ -1,6 +1,4 @@
|
|||||||
from database import connection
|
def get_books(db, page, limit=50, order="Auteur", search=""):
|
||||||
|
|
||||||
def get_books(page, limit=50, order="Auteur", search=""):
|
|
||||||
"""
|
"""
|
||||||
:param limit: Item limit
|
:param limit: Item limit
|
||||||
:type limit: int
|
:type limit: int
|
||||||
@@ -9,8 +7,8 @@ def get_books(page, limit=50, order="Auteur", search=""):
|
|||||||
:param order: Book fields are : Auteur, Titre, Editeur, Type
|
:param order: Book fields are : Auteur, Titre, Editeur, Type
|
||||||
:type order: str
|
:type order: str
|
||||||
"""
|
"""
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
if search == "":
|
if search == "":
|
||||||
t = f"""
|
t = f"""
|
||||||
SELECT * from Books order by %s asc LIMIT %s OFFSET %s;
|
SELECT * from Books order by %s asc LIMIT %s OFFSET %s;
|
||||||
@@ -54,9 +52,9 @@ def get_books(page, limit=50, order="Auteur", search=""):
|
|||||||
return False
|
return False
|
||||||
return r, n
|
return r, n
|
||||||
|
|
||||||
def remove_book(id):
|
def remove_book(db, id):
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
t = f"""
|
t = f"""
|
||||||
DELETE FROM Books where biblio_Index=%s;
|
DELETE FROM Books where biblio_Index=%s;
|
||||||
"""
|
"""
|
||||||
@@ -64,18 +62,18 @@ def remove_book(id):
|
|||||||
cursor.execute(t, (id))
|
cursor.execute(t, (id))
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
connection.commit()
|
db.commit()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def edit_book(id, newContent):
|
def edit_book(db, id, newContent):
|
||||||
"""
|
"""
|
||||||
:param id: item to update
|
:param id: item to update
|
||||||
:type id: str
|
:type id: str
|
||||||
:param newContent: New values to update
|
:param newContent: New values to update
|
||||||
:type newContent: dict
|
:type newContent: dict
|
||||||
"""
|
"""
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
t = f"""
|
t = f"""
|
||||||
UPDATE Books
|
UPDATE Books
|
||||||
SET Auteur=%s, Titre=%s, Editeur=%s, Type=%s
|
SET Auteur=%s, Titre=%s, Editeur=%s, Type=%s
|
||||||
@@ -85,16 +83,16 @@ def edit_book(id, newContent):
|
|||||||
cursor.execute(t, (newContent['author'], newContent['title'], newContent['editor'], newContent['type'], id))
|
cursor.execute(t, (newContent['author'], newContent['title'], newContent['editor'], newContent['type'], id))
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
connection.commit()
|
db.commit()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def add_book(newContent):
|
def add_book(db, newContent):
|
||||||
"""
|
"""
|
||||||
:param newContent: New values to add
|
:param newContent: New values to add
|
||||||
:type newContent: dict
|
:type newContent: dict
|
||||||
"""
|
"""
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
t = f"""
|
t = f"""
|
||||||
INSERT INTO Books (Auteur, Titre, Editeur, Type)
|
INSERT INTO Books (Auteur, Titre, Editeur, Type)
|
||||||
VALUES (%s, %s, %s, %s);
|
VALUES (%s, %s, %s, %s);
|
||||||
@@ -103,12 +101,12 @@ def add_book(newContent):
|
|||||||
cursor.execute(t, (newContent['author'], newContent['title'], newContent['editor'], newContent['type']))
|
cursor.execute(t, (newContent['author'], newContent['title'], newContent['editor'], newContent['type']))
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
connection.commit()
|
db.commit()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def add_film(newContent):
|
def add_film(db, newContent):
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
t = f"""
|
t = f"""
|
||||||
INSERT INTO Films (Title, Director, Producer, Actors, Length, Type)
|
INSERT INTO Films (Title, Director, Producer, Actors, Length, Type)
|
||||||
VALUES (%s, %s, %s, %s, %s, %s);
|
VALUES (%s, %s, %s, %s, %s, %s);
|
||||||
@@ -117,11 +115,11 @@ def add_film(newContent):
|
|||||||
cursor.execute(t, (newContent['title'], newContent['director'], newContent['producer'], newContent['actors'], newContent['length'], newContent['type']))
|
cursor.execute(t, (newContent['title'], newContent['director'], newContent['producer'], newContent['actors'], newContent['length'], newContent['type']))
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
connection.commit()
|
db.commit()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def remove_film(id):
|
def remove_film(db, id):
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
t = f"""
|
t = f"""
|
||||||
DELETE FROM Films where Number=%s;
|
DELETE FROM Films where Number=%s;
|
||||||
"""
|
"""
|
||||||
@@ -129,12 +127,12 @@ def remove_film(id):
|
|||||||
cursor.execute(t, (id))
|
cursor.execute(t, (id))
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
connection.commit()
|
db.commit()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def edit_film(id, newContent):
|
def edit_film(db, id, newContent):
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
t = f"""
|
t = f"""
|
||||||
UPDATE Films
|
UPDATE Films
|
||||||
SET Title=%s, Director=%s, Producer=%s, Actors=%s, Length=%s, Type=%s
|
SET Title=%s, Director=%s, Producer=%s, Actors=%s, Length=%s, Type=%s
|
||||||
@@ -144,12 +142,12 @@ def edit_film(id, newContent):
|
|||||||
cursor.execute(t, (newContent['title'], newContent['director'], newContent['producer'], newContent['actors'], newContent['length'], newContent['type'], id))
|
cursor.execute(t, (newContent['title'], newContent['director'], newContent['producer'], newContent['actors'], newContent['length'], newContent['type'], id))
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
connection.commit()
|
db.commit()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def get_films(page, limit=50, order="Director"):
|
def get_films(db, page, limit=50, order="Director"):
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
t = f"""
|
t = f"""
|
||||||
SELECT * from Films order by %s asc LIMIT %s OFFSET %s;
|
SELECT * from Films order by %s asc LIMIT %s OFFSET %s;
|
||||||
"""
|
"""
|
||||||
@@ -159,9 +157,9 @@ def get_films(page, limit=50, order="Director"):
|
|||||||
return False
|
return False
|
||||||
return cursor.fetchall()
|
return cursor.fetchall()
|
||||||
|
|
||||||
def get_field_values(table, field):
|
def get_field_values(db, table, field):
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
t = f"""
|
t = f"""
|
||||||
SELECT DISTINCT {field} FROM {table} ORDER BY {field} asc;
|
SELECT DISTINCT {field} FROM {table} ORDER BY {field} asc;
|
||||||
"""
|
"""
|
||||||
@@ -173,9 +171,9 @@ def get_field_values(table, field):
|
|||||||
res = cursor.fetchall()
|
res = cursor.fetchall()
|
||||||
return [a[field] for a in res]
|
return [a[field] for a in res]
|
||||||
|
|
||||||
def get_user(username):
|
def get_user(db, username):
|
||||||
connection.ping(reconnect=True)
|
db.ping(reconnect=True)
|
||||||
cursor = connection.cursor()
|
cursor = db.cursor()
|
||||||
t = f"""
|
t = f"""
|
||||||
SELECT * from Users WHERE username=%s;
|
SELECT * from Users WHERE username=%s;
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -1,17 +1,22 @@
|
|||||||
from secret import host, user, password, database
|
from .secret import host, user, password, database
|
||||||
|
|
||||||
import pymysql.cursors
|
import pymysql.cursors
|
||||||
|
|
||||||
connection = pymysql.connect(
|
def get_db():
|
||||||
|
connection = pymysql.connect(
|
||||||
host=host,
|
host=host,
|
||||||
user=user,
|
user=user,
|
||||||
password=password,
|
password=password,
|
||||||
database=database,
|
database=database,
|
||||||
charset='utf8mb4',
|
charset='utf8mb4',
|
||||||
cursorclass=pymysql.cursors.DictCursor
|
cursorclass=pymysql.cursors.DictCursor
|
||||||
)
|
)
|
||||||
|
try:
|
||||||
|
yield connection
|
||||||
|
finally:
|
||||||
|
connection.close()
|
||||||
|
|
||||||
cursor = connection.cursor()
|
# cursor = connection.cursor()
|
||||||
# cursor.execute(sql, ())
|
# cursor.execute(sql, ())
|
||||||
# cursor.fetchall()
|
# cursor.fetchall()
|
||||||
# connection.commit()
|
# connection.commit()
|
||||||
119
back/main.py
119
back/main.py
@@ -5,10 +5,11 @@ from fastapi.middleware.cors import CORSMiddleware
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from crud import get_books, remove_book, edit_book, add_book, get_films, remove_film, edit_film, add_film, get_field_values, get_user
|
from .database import get_db
|
||||||
|
from .crud import get_books, remove_book, edit_book, add_book, get_films, remove_film, edit_film, add_film, get_field_values, get_user
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from passlib.context import CryptContext
|
from passlib.context import CryptContext
|
||||||
from secret import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, origins
|
from .secret import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, origins
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from jose import JWTError, jwt
|
from jose import JWTError, jwt
|
||||||
|
|
||||||
@@ -55,7 +56,7 @@ def get_password_hash(password):
|
|||||||
return pwd_context.hash(password)
|
return pwd_context.hash(password)
|
||||||
|
|
||||||
def authenticate_user(name, password):
|
def authenticate_user(name, password):
|
||||||
user = get_user(name)
|
user = get_user(db, name)
|
||||||
if len(user) == 0:
|
if len(user) == 0:
|
||||||
return False
|
return False
|
||||||
if verify_password(password, user[0]['password']):
|
if verify_password(password, user[0]['password']):
|
||||||
@@ -92,7 +93,10 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
|||||||
return encoded_jwt
|
return encoded_jwt
|
||||||
|
|
||||||
@app.post("/api/token", response_model=Token)
|
@app.post("/api/token", response_model=Token)
|
||||||
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
|
async def login_for_access_token(
|
||||||
|
form_data: OAuth2PasswordRequestForm = Depends(),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
user = authenticate_user(form_data.username, form_data.password)
|
user = authenticate_user(form_data.username, form_data.password)
|
||||||
username = form_data.username
|
username = form_data.username
|
||||||
if not user:
|
if not user:
|
||||||
@@ -108,60 +112,83 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(
|
|||||||
return JSONResponse(content={"access_token": access_token, "token_type": "bearer"})
|
return JSONResponse(content={"access_token": access_token, "token_type": "bearer"})
|
||||||
|
|
||||||
@app.post('/api/books')
|
@app.post('/api/books')
|
||||||
async def createBook(book: Book, current_user: dict = Depends(get_current_user)):
|
async def createBook(
|
||||||
|
book: Book,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
res = add_book({
|
res = add_book(db, {
|
||||||
'author': book.author,
|
'author': book.author,
|
||||||
'title': book.title,
|
'title': book.title,
|
||||||
'type': book.type,
|
'type': book.type,
|
||||||
'editor': book.editor
|
'editor': book.editor
|
||||||
})
|
})
|
||||||
if res == None:
|
if res == None:
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@app.get('/api/books')
|
@app.get('/api/books')
|
||||||
async def readBook(page: int = 0, limit: int = 50, sort: str = "", search: str = "", current_user: dict = Depends(get_current_user)):
|
async def readBook(
|
||||||
|
page: int = 0,
|
||||||
|
limit: int = 50,
|
||||||
|
sort: str = "",
|
||||||
|
search: str = "",
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
res, nb = get_books(page, limit=limit, search=search)
|
res, nb = get_books(db, page, limit=limit, search=search)
|
||||||
if res is False or not nb:
|
if res is False or not nb:
|
||||||
raise HTTPException(status_code=400, detail="Badly formated")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
header = {'x-nbpage': str(int(nb[0]['count'] / limit))}
|
header = {'x-nbpage': str(int(nb[0]['count'] / limit))}
|
||||||
content = {'result': [dict(r) for r in res]}
|
content = {'result': [dict(r) for r in res]}
|
||||||
return JSONResponse(content=content, headers=header)
|
return JSONResponse(content=content, headers=header)
|
||||||
|
|
||||||
@app.put('/api/book/{id}')
|
@app.put('/api/book/{id}')
|
||||||
async def updateBook(id: str, book: Book, current_user: dict = Depends(get_current_user)):
|
async def updateBook(
|
||||||
|
id: str,
|
||||||
|
book: Book,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
res = edit_book(id, {
|
res = edit_book(db, id, {
|
||||||
'author': book.author,
|
'author': book.author,
|
||||||
'title': book.title,
|
'title': book.title,
|
||||||
'type': book.type,
|
'type': book.type,
|
||||||
'editor': book.editor
|
'editor': book.editor
|
||||||
})
|
})
|
||||||
if res == None:
|
if res == None:
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@app.delete('/api/book/{id}')
|
@app.delete('/api/book/{id}')
|
||||||
async def deleteBook(id: str, current_user: dict = Depends(get_current_user)):
|
async def deleteBook(id: str,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
res = remove_book(id)
|
res = remove_book(db, id)
|
||||||
_, nb = get_books(0)
|
_, nb = get_books(db, 0)
|
||||||
header = {'x-nbpage': str(int(nb[0]['count'] / 50))}
|
header = {'x-nbpage': str(int(nb[0]['count'] / 50))}
|
||||||
if res == None:
|
if res == None:
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
return JSONResponse(content=res, headers=header)
|
return JSONResponse(content=res, headers=header)
|
||||||
|
|
||||||
@app.post('/api/films')
|
@app.post('/api/films')
|
||||||
async def createFilm(current_user: dict = Depends(get_current_user)):
|
async def createFilm(
|
||||||
|
film: Film,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
res = add_film({
|
res = add_film(db, {
|
||||||
'title': film.title,
|
'title': film.title,
|
||||||
'director': film.director,
|
'director': film.director,
|
||||||
'producer': film.producer,
|
'producer': film.producer,
|
||||||
@@ -170,23 +197,33 @@ async def createFilm(current_user: dict = Depends(get_current_user)):
|
|||||||
'type': film.type
|
'type': film.type
|
||||||
})
|
})
|
||||||
if res == None:
|
if res == None:
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@app.get('/api/films')
|
@app.get('/api/films')
|
||||||
async def readFilm(page: int = 0, limit: int = 50, current_user: dict = Depends(get_current_user)):
|
async def readFilm(
|
||||||
|
page: int = 0,
|
||||||
|
limit: int = 50,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
res = get_films(page, limit=limit)
|
res = get_films(db, page, limit=limit)
|
||||||
if res == None:
|
if res == None:
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@app.put('/api/film/{id}')
|
@app.put('/api/film/{id}')
|
||||||
async def updateFilm(id: str, film: Film, current_user: dict = Depends(get_current_user)):
|
async def updateFilm(
|
||||||
|
id: str,
|
||||||
|
film: Film,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
res = edit_film(id, {
|
res = edit_film(db, id, {
|
||||||
'title': film.title,
|
'title': film.title,
|
||||||
'director': film.director,
|
'director': film.director,
|
||||||
'producer': film.producer,
|
'producer': film.producer,
|
||||||
@@ -195,32 +232,44 @@ async def updateFilm(id: str, film: Film, current_user: dict = Depends(get_curre
|
|||||||
'type': film.type
|
'type': film.type
|
||||||
})
|
})
|
||||||
if res == None:
|
if res == None:
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@app.delete('/api/film/{id}')
|
@app.delete('/api/film/{id}')
|
||||||
async def deleteFilm(id: str, current_user: dict = Depends(get_current_user)):
|
async def deleteFilm(
|
||||||
|
id: str,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
res = remove_film(id)
|
res = remove_film(db, id)
|
||||||
if res == None:
|
if res == None:
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@app.get('/api/books/{field}')
|
@app.get('/api/books/{field}')
|
||||||
async def getBookFields(field: str, current_user: dict = Depends(get_current_user)):
|
async def getBookFields(
|
||||||
|
field: str,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
if field == "":
|
if field == "":
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
res = get_field_values('Books', field)
|
res = get_field_values(db, 'Books', field)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@app.get('/api/films/{field}')
|
@app.get('/api/films/{field}')
|
||||||
async def getFilmFields(field: str, current_user: dict = Depends(get_current_user)):
|
async def getFilmFields(
|
||||||
|
field: str,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db=Depends(get_db)
|
||||||
|
):
|
||||||
if not current_user:
|
if not current_user:
|
||||||
raise HTTPException(status_code=401, detail="User not allowed")
|
raise HTTPException(status_code=401, detail="User not allowed")
|
||||||
if field == "":
|
if field == "":
|
||||||
raise HTTPException(status_code=400, detail="Badly formated body")
|
raise HTTPException(status_code=400, detail="Error with request, database may be offline")
|
||||||
res = get_field_values('Films', field)
|
res = get_field_values(db, 'Films', field)
|
||||||
return res
|
return res
|
||||||
@@ -1,37 +1,61 @@
|
|||||||
anyio==3.6.1
|
annotated-doc==0.0.4
|
||||||
bcrypt==4.0.0
|
annotated-types==0.7.0
|
||||||
certifi==2022.9.24
|
anyio==4.12.1
|
||||||
charset-normalizer==2.1.1
|
bcrypt==5.0.0
|
||||||
click==8.1.3
|
certifi==2026.1.4
|
||||||
dnspython==2.2.1
|
click==8.1.8
|
||||||
ecdsa==0.18.0
|
dnspython==2.7.0
|
||||||
email-validator==1.3.0
|
ecdsa==0.19.1
|
||||||
fastapi==0.85.0
|
email-validator==2.3.0
|
||||||
h11==0.14.0
|
exceptiongroup==1.3.1
|
||||||
httptools==0.5.0
|
fastapi==0.128.0
|
||||||
idna==3.4
|
fastapi-cli==0.0.20
|
||||||
itsdangerous==2.1.2
|
fastapi-cloud-cli==0.8.0
|
||||||
Jinja2==3.1.2
|
fastar==0.8.0
|
||||||
MarkupSafe==2.1.1
|
h11==0.16.0
|
||||||
orjson==3.8.0
|
httpcore==1.0.9
|
||||||
|
httptools==0.7.1
|
||||||
|
httpx==0.28.1
|
||||||
|
idna==3.11
|
||||||
|
iniconfig==2.1.0
|
||||||
|
itsdangerous==2.2.0
|
||||||
|
Jinja2==3.1.6
|
||||||
|
markdown-it-py==3.0.0
|
||||||
|
MarkupSafe==3.0.3
|
||||||
|
mdurl==0.1.2
|
||||||
|
orjson==3.11.5
|
||||||
|
packaging==26.0
|
||||||
passlib==1.7.4
|
passlib==1.7.4
|
||||||
pyasn1==0.4.8
|
pluggy==1.6.0
|
||||||
pydantic==1.10.2
|
pyasn1==0.6.1
|
||||||
PyMySQL==1.0.2
|
pydantic==2.12.5
|
||||||
python-dotenv==0.21.0
|
pydantic-extra-types==2.11.0
|
||||||
python-jose==3.3.0
|
pydantic-settings==2.11.0
|
||||||
python-multipart==0.0.5
|
pydantic_core==2.41.5
|
||||||
|
Pygments==2.19.2
|
||||||
|
PyMySQL==1.1.2
|
||||||
|
pytest==8.4.2
|
||||||
|
pytest-dotenv==0.5.2
|
||||||
|
python-dotenv==1.2.1
|
||||||
|
python-jose==3.5.0
|
||||||
|
python-multipart==0.0.20
|
||||||
python-pam==2.0.2
|
python-pam==2.0.2
|
||||||
PyYAML==6.0
|
PyYAML==6.0.3
|
||||||
requests==2.28.1
|
rich==14.2.0
|
||||||
rsa==4.9
|
rich-toolkit==0.17.1
|
||||||
six==1.16.0
|
rignore==0.7.6
|
||||||
sniffio==1.3.0
|
rsa==4.9.1
|
||||||
starlette==0.20.4
|
sentry-sdk==2.48.0
|
||||||
typing_extensions==4.3.0
|
shellingham==1.5.4
|
||||||
ujson==5.5.0
|
six==1.17.0
|
||||||
urllib3==1.26.12
|
starlette==0.49.3
|
||||||
uvicorn==0.18.3
|
tomli==2.3.0
|
||||||
uvloop==0.17.0
|
typer==0.21.1
|
||||||
watchfiles==0.17.0
|
typing-inspection==0.4.2
|
||||||
websockets==10.3
|
typing_extensions==4.15.0
|
||||||
|
ujson==5.11.0
|
||||||
|
urllib3==2.6.2
|
||||||
|
uvicorn==0.39.0
|
||||||
|
uvloop==0.22.1
|
||||||
|
watchfiles==1.1.1
|
||||||
|
websockets==15.0.1
|
||||||
0
back/tests/__init__.py
Normal file
0
back/tests/__init__.py
Normal file
396
back/tests/test_main.py
Normal file
396
back/tests/test_main.py
Normal file
@@ -0,0 +1,396 @@
|
|||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from unittest.mock import MagicMock, patch, ANY
|
||||||
|
from ..main import app, get_current_user
|
||||||
|
from ..database import get_db
|
||||||
|
|
||||||
|
def override_get_db():
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_cursor = MagicMock()
|
||||||
|
|
||||||
|
mock_cursor.fetchone.return_value = {}
|
||||||
|
|
||||||
|
mock_db.cursor.return_value.__enter__.return_value = mock_cursor
|
||||||
|
yield mock_db
|
||||||
|
|
||||||
|
def override_get_user(user):
|
||||||
|
return user
|
||||||
|
|
||||||
|
app.dependency_overrides[get_db] = override_get_db
|
||||||
|
|
||||||
|
class TestBooks:
|
||||||
|
test_success_payload = {
|
||||||
|
'title': '1984',
|
||||||
|
'author': 'George Orwell',
|
||||||
|
'type': 'novel',
|
||||||
|
'editor': 'Secker & Warburg'
|
||||||
|
}
|
||||||
|
|
||||||
|
test_error_payload = {
|
||||||
|
'title': 1984,
|
||||||
|
'author': 'George Orwell',
|
||||||
|
'type': 'novel',
|
||||||
|
'editor': 'Secker & Warburg'
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_result = {
|
||||||
|
'id': 1,
|
||||||
|
**test_success_payload
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_create_book_success(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.add_book', MagicMock(return_value=self.mock_result)) as mock_add_book:
|
||||||
|
response = client.post('/api/books', json=self.test_success_payload)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == self.mock_result
|
||||||
|
mock_add_book.assert_called_once_with(ANY, self.test_success_payload)
|
||||||
|
|
||||||
|
def test_create_book_unauthorized(self):
|
||||||
|
def override_get_user():
|
||||||
|
return None
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.post('/api/books', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.json()['detail'] == 'User not allowed'
|
||||||
|
|
||||||
|
def test_create_book_error_request(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.add_book', MagicMock(return_value=None)) as mock_add_book:
|
||||||
|
response = client.post('/api/books', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert response.json()['detail'] == 'Error with request, database may be offline'
|
||||||
|
|
||||||
|
def test_create_book_wrong_body(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.add_book', MagicMock(return_value=self.mock_result)) as mock_add_book:
|
||||||
|
response = client.post('/api/books', json=self.test_error_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 422
|
||||||
|
assert response.json()['detail'] == [{'input': 1984, 'loc': ['body', 'title'], 'msg': 'Input should be a valid string', 'type': 'string_type'}]
|
||||||
|
|
||||||
|
def test_get_book_success(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.get_books', MagicMock(return_value=([self.mock_result], [{'count': 1}]))) as mock_get_book:
|
||||||
|
response = client.get('/api/books')
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == {'result': [self.mock_result]}
|
||||||
|
mock_get_book.assert_called_once_with(ANY, 0, limit=50, search='')
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_book_unauthorized(self):
|
||||||
|
def override_get_user():
|
||||||
|
return None
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.get('/api/books')
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.json()['detail'] == 'User not allowed'
|
||||||
|
|
||||||
|
def test_get_book_error_request(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.get_books', MagicMock(return_value=(False, None))) as mock_get_book:
|
||||||
|
response = client.get('/api/books')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert response.json()['detail'] == 'Error with request, database may be offline'
|
||||||
|
|
||||||
|
def test_update_book_success(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.edit_book', MagicMock(return_value=True)) as mock_edit_book:
|
||||||
|
response = client.put('api/book/1', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == True
|
||||||
|
mock_edit_book.assert_called_once_with(ANY, '1', self.test_success_payload)
|
||||||
|
|
||||||
|
def test_update_book_unothorized(self):
|
||||||
|
def override_get_user():
|
||||||
|
return None
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.put('api/book/1', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.json()['detail'] == 'User not allowed'
|
||||||
|
|
||||||
|
def test_update_book_error_request(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.edit_book', MagicMock(return_value=None)) as mock_edit_book:
|
||||||
|
response = client.put('api/book/1', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert response.json()['detail'] == 'Error with request, database may be offline'
|
||||||
|
mock_edit_book.assert_called_once_with(ANY, '1', self.test_success_payload)
|
||||||
|
|
||||||
|
def test_update_book_error_body(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.edit_book', MagicMock(return_value=False)) as mock_edit_book:
|
||||||
|
response = client.put('api/book/1', json=self.test_error_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 422
|
||||||
|
|
||||||
|
def test_remove_book_success(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.remove_book', MagicMock(return_value=True)) as mock_remove_book:
|
||||||
|
response = client.delete('api/book/1')
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == True
|
||||||
|
mock_remove_book.assert_called_once_with(ANY, '1')
|
||||||
|
|
||||||
|
def test_remove_book_unauthorized(self):
|
||||||
|
def override_get_user():
|
||||||
|
return None
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.delete('api/book/1')
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.json()['detail'] == 'User not allowed'
|
||||||
|
|
||||||
|
def test_remove_book_error_request(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.remove_book', MagicMock(return_value=None)) as mock_remove_book:
|
||||||
|
response = client.delete('api/book/1')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert response.json()['detail'] == 'Error with request, database may be offline'
|
||||||
|
mock_remove_book.assert_called_once_with(ANY, '1')
|
||||||
|
|
||||||
|
class TestFilms:
|
||||||
|
test_success_payload = {
|
||||||
|
'title': 'Harry Potter and the Half-Blood Prince',
|
||||||
|
'director': 'David Yates',
|
||||||
|
'producer': 'David Heyman',
|
||||||
|
'actors': 'Daniel Radcliffe, Rupert Grint, Emma Watson',
|
||||||
|
'length': 153,
|
||||||
|
'type': 'action'
|
||||||
|
}
|
||||||
|
|
||||||
|
test_error_payload = {
|
||||||
|
'title': 1234,
|
||||||
|
'director': 'David Yates',
|
||||||
|
'producer': 'David Heyman',
|
||||||
|
'actors': 'Daniel Radcliffe, Rupert Grint, Emma Watson',
|
||||||
|
'length': '153',
|
||||||
|
'type': 'action'
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_result = {
|
||||||
|
'id': 1,
|
||||||
|
**test_success_payload
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_create_film_success(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.add_film', MagicMock(return_value=True)) as mock_add_film:
|
||||||
|
response = client.post('api/films', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == True
|
||||||
|
mock_add_film.assert_called_once_with(ANY, self.test_success_payload)
|
||||||
|
|
||||||
|
def test_create_film_unauthorized(self):
|
||||||
|
def override_get_user():
|
||||||
|
return None
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.post('api/films', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.json()['detail'] == 'User not allowed'
|
||||||
|
|
||||||
|
def test_create_film_error_request(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.add_film', MagicMock(return_value=None)) as mock_add_film:
|
||||||
|
response = client.post('api/films', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
mock_add_film.assert_called_once_with(ANY, self.test_success_payload)
|
||||||
|
|
||||||
|
def test_create_film_error_body(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.post('api/films', json=self.test_error_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 422
|
||||||
|
|
||||||
|
def test_read_film_success(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.get_films', MagicMock(return_value=True)) as mock_read_book:
|
||||||
|
response = client.get('api/films')
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == True
|
||||||
|
mock_read_book.assert_called_once_with(ANY, 0, limit=50)
|
||||||
|
|
||||||
|
def test_read_film_unauthorized(self):
|
||||||
|
def override_get_user():
|
||||||
|
return None
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.get('api/films')
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.json()['detail'] == 'User not allowed'
|
||||||
|
|
||||||
|
def test_read_film_error_request(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.get_films', MagicMock(return_value=None)) as mock_read_book:
|
||||||
|
response = client.get('api/films')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
mock_read_book.assert_called_once_with(ANY, 0, limit=50)
|
||||||
|
|
||||||
|
def test_update_film_success(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.edit_film', MagicMock(return_value=True)) as mock_edit_film:
|
||||||
|
response = client.put('api/film/1', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == True
|
||||||
|
mock_edit_film.assert_called_once_with(ANY, '1', self.test_success_payload)
|
||||||
|
|
||||||
|
def test_update_film_unauthorized(self):
|
||||||
|
def override_get_user():
|
||||||
|
return None
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.put('api/film/1', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.json()['detail'] == 'User not allowed'
|
||||||
|
|
||||||
|
def test_update_film_error_request(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.edit_film', MagicMock(return_value=None)) as mock_edit_film:
|
||||||
|
response = client.put('api/film/1', json=self.test_success_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
mock_edit_film.assert_called_once_with(ANY, '1', self.test_success_payload)
|
||||||
|
|
||||||
|
def test_update_film_error_body(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.put('api/film/1', json=self.test_error_payload)
|
||||||
|
|
||||||
|
assert response.status_code == 422
|
||||||
|
|
||||||
|
def test_delete_film_success(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.remove_film', MagicMock(return_value=True)) as mock_remove_film:
|
||||||
|
response = client.delete('api/film/1')
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == True
|
||||||
|
mock_remove_film.assert_called_once_with(ANY, '1')
|
||||||
|
|
||||||
|
def test_delete_film_unauthorized(self):
|
||||||
|
def override_get_user():
|
||||||
|
return None
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
response = client.delete('api/film/1')
|
||||||
|
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.json()['detail'] == 'User not allowed'
|
||||||
|
|
||||||
|
def test_delete_film_error_request(self):
|
||||||
|
def override_get_user():
|
||||||
|
return {'username': 'test'}
|
||||||
|
app.dependency_overrides[get_current_user] = override_get_user
|
||||||
|
client = TestClient(app)
|
||||||
|
|
||||||
|
with patch('back.main.remove_film', MagicMock(return_value=None)) as mock_remove_film:
|
||||||
|
response = client.delete('api/film/1')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
mock_remove_film.assert_called_once_with(ANY, '1')
|
||||||
Reference in New Issue
Block a user