add forms, shipments tests
Some checks failed
Deploy Amap / deploy (push) Failing after 52s

This commit is contained in:
Julien Aldon
2026-02-27 12:21:50 +01:00
parent 61cbbf0366
commit d28640711c
27 changed files with 606 additions and 138 deletions

View File

@@ -11,6 +11,14 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Test backend
uses: actions/setup-python@v6
with:
python-version: "3.10"
- run: |
python -m pip install --upgrade pip
pip install -r back/requirements.txt
pytest -sv
- name: Build & deploy - name: Build & deploy
run: | run: |
docker compose -f docker-compose.yaml up -d --build docker compose -f docker-compose.yaml up -d --build

View File

@@ -1,33 +1,39 @@
alembic==1.18.4
annotated-doc==0.0.4 annotated-doc==0.0.4
annotated-types==0.7.0 annotated-types==0.7.0
anyio==4.12.1 anyio==4.12.1
brotli==1.2.0 brotli==1.2.0
certifi==2026.1.4 certifi==2026.2.25
cffi==2.0.0 cffi==2.0.0
charset-normalizer==3.4.4 charset-normalizer==3.4.4
click==8.3.1 click==8.3.1
coverage==7.13.4
cryptography==46.0.5 cryptography==46.0.5
cssselect2==0.9.0 cssselect2==0.9.0
dnspython==2.8.0 dnspython==2.8.0
email-validator==2.3.0 email-validator==2.3.0
fastapi==0.129.0 fastapi==0.133.0
fastapi-cli==0.0.23 fastapi-cli==0.0.24
fastapi-cloud-cli==0.13.0 fastapi-cloud-cli==0.13.0
fastar==0.8.0 fastar==0.8.0
fonttools==4.61.1 fonttools==4.61.1
greenlet==3.3.1 greenlet==3.3.2
h11==0.16.0 h11==0.16.0
httpcore==1.0.9 httpcore==1.0.9
httptools==0.7.1 httptools==0.7.1
httpx==0.28.1 httpx==0.28.1
idna==3.11 idna==3.11
iniconfig==2.3.0
Jinja2==3.1.6 Jinja2==3.1.6
lxml==6.0.2 lxml==6.0.2
Mako==1.3.10
markdown-it-py==4.0.0 markdown-it-py==4.0.0
MarkupSafe==3.0.3 MarkupSafe==3.0.3
mdurl==0.1.2 mdurl==0.1.2
odfdo==3.20.2 odfdo==3.20.2
packaging==26.0
pillow==12.1.1 pillow==12.1.1
pluggy==1.6.0
psycopg2-binary==2.9.11 psycopg2-binary==2.9.11
pycparser==3.0 pycparser==3.0
pydantic==2.12.5 pydantic==2.12.5
@@ -38,21 +44,24 @@ pydyf==0.12.1
Pygments==2.19.2 Pygments==2.19.2
PyJWT==2.11.0 PyJWT==2.11.0
pyphen==0.17.2 pyphen==0.17.2
pytest==9.0.2
pytest-cov==7.0.0
pytest-mock==3.15.1
python-dotenv==1.2.1 python-dotenv==1.2.1
python-multipart==0.0.22 python-multipart==0.0.22
PyYAML==6.0.3 PyYAML==6.0.3
requests==2.32.5 requests==2.32.5
rich==14.3.2 rich==14.3.3
rich-toolkit==0.19.4 rich-toolkit==0.19.7
rignore==0.7.6 rignore==0.7.6
sentry-sdk==2.53.0 sentry-sdk==2.53.0
shellingham==1.5.4 shellingham==1.5.4
SQLAlchemy==2.0.46 SQLAlchemy==2.0.47
sqlmodel==0.0.34 sqlmodel==0.0.37
starlette==0.52.1 starlette==0.52.1
tinycss2==1.5.1 tinycss2==1.5.1
tinyhtml5==2.0.0 tinyhtml5==2.0.0
typer==0.24.0 typer==0.24.1
typing-inspection==0.4.2 typing-inspection==0.4.2
typing_extensions==4.15.0 typing_extensions==4.15.0
urllib3==2.6.3 urllib3==2.6.3
@@ -63,4 +72,3 @@ weasyprint==68.1
webencodings==0.5.1 webencodings==0.5.1
websockets==16.0 websockets==16.0
zopfli==0.4.1 zopfli==0.4.1
alembic==1.18.4

View File

@@ -77,8 +77,8 @@ def callback(code: str, session: Session = Depends(get_session)):
response = requests.post(TOKEN_URL, data=data, headers=headers) response = requests.post(TOKEN_URL, data=data, headers=headers)
if response.status_code != 200: if response.status_code != 200:
raise HTTPException( raise HTTPException(
status_code=400, status_code=404,
detail=messages.failtogettoken detail=messages.Messages.not_found('token')
) )
token_data = response.json() token_data = response.json()
@@ -154,26 +154,26 @@ def verify_token(token: str):
) )
return decoded return decoded
except jwt.ExpiredSignatureError: except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail=messages.tokenexipired) raise HTTPException(status_code=401, detail=messages.Messages.tokenexipired)
except jwt.InvalidTokenError: except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail=messages.invalidtoken) raise HTTPException(status_code=401, detail=messages.Messages.invalidtoken)
def get_current_user(request: Request, session: Session = Depends(get_session)): def get_current_user(request: Request, session: Session = Depends(get_session)):
access_token = request.cookies.get('access_token') access_token = request.cookies.get('access_token')
if not access_token: if not access_token:
raise HTTPException(status_code=401, detail=messages.notauthenticated) raise HTTPException(status_code=401, detail=messages.Messages.notauthenticated)
payload = verify_token(access_token) payload = verify_token(access_token)
if not payload: if not payload:
raise HTTPException(status_code=401, detail='aze') raise HTTPException(status_code=401, detail='aze')
email = payload.get('email') email = payload.get('email')
if not email: if not email:
raise HTTPException(status_code=401, detail=messages.notauthenticated) raise HTTPException(status_code=401, detail=messages.Messages.notauthenticated)
user = session.exec(select(User).where(User.email == email)).first() user = session.exec(select(User).where(User.email == email)).first()
if not user: if not user:
raise HTTPException(status_code=401, detail=messages.usernotfound) raise HTTPException(status_code=401, detail=messages.Messages.not_found('user'))
return user return user
@router.post('/refresh') @router.post('/refresh')
@@ -191,8 +191,8 @@ def refresh_token(refresh_token: Annotated[str | None, Cookie()] = None):
result = requests.post(TOKEN_URL, data=data, headers=headers) result = requests.post(TOKEN_URL, data=data, headers=headers)
if result.status_code != 200: if result.status_code != 200:
raise HTTPException( raise HTTPException(
status_code=400, status_code=404,
detail=messages.failtogettoken detail=messages.Messages.not_found('token')
) )
token_data = result.json() token_data = result.json()

View File

@@ -81,8 +81,6 @@ async def create_contract(
recurrent_price = compute_recurrent_prices(recurrents, len(new_contract.form.shipments)) recurrent_price = compute_recurrent_prices(recurrents, len(new_contract.form.shipments))
price = recurrent_price + compute_occasional_prices(occasionals) price = recurrent_price + compute_occasional_prices(occasionals)
cheques = list(map(lambda x: {"name": x.name, "value": x.value}, new_contract.cheques)) cheques = list(map(lambda x: {"name": x.name, "value": x.value}, new_contract.cheques))
# TODO: send contract to referer
try: try:
pdf_bytes = generate_html_contract( pdf_bytes = generate_html_contract(
new_contract, new_contract,
@@ -165,10 +163,10 @@ def get_contract_file(
user: models.User = Depends(get_current_user) user: models.User = Depends(get_current_user)
): ):
if not service.is_allowed(session, user, id): if not service.is_allowed(session, user, id):
raise HTTPException(status_code=403, detail=messages.notallowed) raise HTTPException(status_code=403, detail=messages.Messages.not_allowed('contract', 'get'))
contract = service.get_one(session, id) contract = service.get_one(session, id)
if contract is None: if contract is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('contract'))
filename = f'{contract.form.name.replace(' ', '_')}_{contract.form.season}_{contract.firstname}-{contract.lastname}' filename = f'{contract.form.name.replace(' ', '_')}_{contract.form.season}_{contract.firstname}-{contract.lastname}'
return StreamingResponse( return StreamingResponse(
io.BytesIO(contract.file), io.BytesIO(contract.file),
@@ -185,7 +183,7 @@ def get_contract_files(
user: models.User = Depends(get_current_user) user: models.User = Depends(get_current_user)
): ):
if not form_service.is_allowed(session, user, form_id): if not form_service.is_allowed(session, user, form_id):
raise HTTPException(status_code=403, detail=messages.notallowed) raise HTTPException(status_code=403, detail=messages.Messages.not_allowed('contracts', 'get'))
form = form_service.get_one(session, form_id=form_id) form = form_service.get_one(session, form_id=form_id)
contracts = service.get_all(session, user, forms=[form.name]) contracts = service.get_all(session, user, forms=[form.name])
zipped_contracts = io.BytesIO() zipped_contracts = io.BytesIO()
@@ -210,7 +208,7 @@ def get_contract_recap(
user: models.User = Depends(get_current_user) user: models.User = Depends(get_current_user)
): ):
if not form_service.is_allowed(session, user, form_id): if not form_service.is_allowed(session, user, form_id):
raise HTTPException(status_code=403, detail=messages.notallowed) raise HTTPException(status_code=403, detail=messages.Messages.not_allowed('contract recap', 'get'))
form = form_service.get_one(session, form_id=form_id) form = form_service.get_one(session, form_id=form_id)
contracts = service.get_all(session, user, forms=[form.name]) contracts = service.get_all(session, user, forms=[form.name])
@@ -225,17 +223,17 @@ def get_contract_recap(
@router.get('/{id}', response_model=models.ContractPublic) @router.get('/{id}', response_model=models.ContractPublic)
def get_contract(id: int, session: Session = Depends(get_session), user: models.User = Depends(get_current_user)): def get_contract(id: int, session: Session = Depends(get_session), user: models.User = Depends(get_current_user)):
if not service.is_allowed(session, user, id): if not service.is_allowed(session, user, id):
raise HTTPException(status_code=403, detail=messages.notallowed) raise HTTPException(status_code=403, detail=messages.Messages.not_allowed('contract', 'get'))
result = service.get_one(session, id) result = service.get_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('contract'))
return result return result
@router.delete('/{id}', response_model=models.ContractPublic) @router.delete('/{id}', response_model=models.ContractPublic)
def delete_contract(id: int, session: Session = Depends(get_session), user: models.User = Depends(get_current_user)): def delete_contract(id: int, session: Session = Depends(get_session), user: models.User = Depends(get_current_user)):
if not service.is_allowed(session, user, id): if not service.is_allowed(session, user, id):
raise HTTPException(status_code=403, detail=messages.notallowed) raise HTTPException(status_code=403, detail=messages.Messages.not_allowed('contract', 'delete'))
result = service.delete_one(session, id) result = service.delete_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('contract'))
return result return result

View File

@@ -32,7 +32,7 @@ async def get_forms_filtered(
async def get_form(id: int, session: Session = Depends(get_session)): async def get_form(id: int, session: Session = Depends(get_session)):
result = service.get_one(session, id) result = service.get_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('form'))
return result return result
@router.post('', response_model=models.FormPublic) @router.post('', response_model=models.FormPublic)
@@ -43,12 +43,12 @@ async def create_form(
): ):
try: try:
form = service.create_one(session, form) form = service.create_one(session, form)
except exceptions.ProductorNotFoundError: except exceptions.ProductorNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.productornotfound) raise HTTPException(status_code=404, detail=str(error))
except exceptions.UserNotFoundError: except exceptions.UserNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.usernotfound) raise HTTPException(status_code=404, detail=str(error))
except exceptions.FormCreateError: except exceptions.FormCreateError as error:
raise HTTPException(status_code=400, detail=messages.forminputinvalid) raise HTTPException(status_code=400, detail=str(error))
return form return form
@router.put('/{id}', response_model=models.FormPublic) @router.put('/{id}', response_model=models.FormPublic)
@@ -59,12 +59,12 @@ async def update_form(
): ):
try: try:
result = service.update_one(session, id, form) result = service.update_one(session, id, form)
except exceptions.FormNotFoundError: except exceptions.FormNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=str(error))
except exceptions.ProductorNotFoundError: except exceptions.ProductorNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.productornotfound) raise HTTPException(status_code=404, detail=str(error))
except exceptions.UserNotFoundError: except exceptions.UserNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.usernotfound) raise HTTPException(status_code=404, detail=str(error))
return result return result
@router.delete('/{id}', response_model=models.FormPublic) @router.delete('/{id}', response_model=models.FormPublic)
@@ -75,6 +75,6 @@ async def delete_form(
): ):
try: try:
result = service.delete_one(session, id) result = service.delete_one(session, id)
except exceptions.FormNotFoundError: except exceptions.FormNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=str(error))
return result return result

View File

@@ -3,6 +3,7 @@ from sqlalchemy import func
import src.models as models import src.models as models
import src.forms.exceptions as exceptions import src.forms.exceptions as exceptions
import src.messages as messages
def get_all( def get_all(
session: Session, session: Session,
@@ -49,11 +50,11 @@ def get_one(session: Session, form_id: int) -> models.FormPublic:
def create_one(session: Session, form: models.FormCreate) -> models.FormPublic: def create_one(session: Session, form: models.FormCreate) -> models.FormPublic:
if not form: if not form:
raise exceptions.FormCreateError('FormCreate input cannot be None') raise exceptions.FormCreateError(messages.Messages.invalid_input('form', 'input cannot be None'))
if not session.get(models.Productor, form.productor_id): if not session.get(models.Productor, form.productor_id):
raise exceptions.ProductorNotFoundError(f'Productor {form.productor_id} not found') raise exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
if not session.get(models.User, form.referer_id): if not session.get(models.User, form.referer_id):
raise exceptions.UserNotFoundError(f'User {form.referer_id} not found') raise exceptions.UserNotFoundError(messages.Messages.not_found('user'))
form_create = form.model_dump(exclude_unset=True) form_create = form.model_dump(exclude_unset=True)
new_form = models.Form(**form_create) new_form = models.Form(**form_create)
session.add(new_form) session.add(new_form)
@@ -66,11 +67,11 @@ def update_one(session: Session, id: int, form: models.FormUpdate) -> models.For
result = session.exec(statement) result = session.exec(statement)
new_form = result.first() new_form = result.first()
if not new_form: if not new_form:
raise exceptions.FormNotFoundError(f'Form {id} not found') raise exceptions.FormNotFoundError(messages.Messages.not_found('form'))
if form.productor_id and not session.get(models.Productor, form.productor_id): if form.productor_id and not session.get(models.Productor, form.productor_id):
raise exceptions.ProductorNotFoundError(f'Productor {form.productor_id} not found') raise exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
if form.referer_id and not session.get(models.User, form.referer_id): if form.referer_id and not session.get(models.User, form.referer_id):
raise exceptions.UserNotFoundError(f'User {form.referer_id} not found') raise exceptions.UserNotFoundError(messages.Messages.not_found('user'))
form_updates = form.model_dump(exclude_unset=True) form_updates = form.model_dump(exclude_unset=True)
for key, value in form_updates.items(): for key, value in form_updates.items():
setattr(new_form, key, value) setattr(new_form, key, value)
@@ -84,7 +85,7 @@ def delete_one(session: Session, id: int) -> models.FormPublic:
result = session.exec(statement) result = session.exec(statement)
form = result.first() form = result.first()
if not form: if not form:
raise exceptions.FormNotFoundError(f'Form {id} not found') raise exceptions.FormNotFoundError(messages.Messages.not_found('form'))
result = models.FormPublic.model_validate(form) result = models.FormPublic.model_validate(form)
session.delete(form) session.delete(form)
session.commit() session.commit()

View File

@@ -1,21 +1,19 @@
pdferror = 'An error occured during PDF generation please contact administrator' pdferror = 'An error occured during PDF generation please contact administrator'
tokenexipired = 'Token expired' class Messages:
invalidtoken = 'Invalid token' unauthorized = 'User is Unauthorized'
notauthenticated = 'Not authenticated' notauthenticated = 'User is not authenticated'
failtogettoken = 'Failed to get token' tokenexipired = 'Token has expired'
unauthorized = 'Unauthorized' invalidtoken = 'Token is invalid'
notallowed = 'Not Allowed'
notfound = 'Resource was not found.' @staticmethod
usernotfound = 'User not found' def not_found(resource: str) -> str:
userloggedout = 'User logged out' return f'{resource.capitalize()} not found'
productorinputinvalid = 'Invalid productor input' @staticmethod
productornotfound = 'Productor not found' def invalid_input(resource: str, reason: str = "") -> str:
return f'Invalid {resource} input {':' if reason else ""} {reason}'
forminputinvalid = 'Invalid form input' @staticmethod
formnotfound = 'Form not found' def not_allowed(resource: str, action: str) -> str:
return f'User is not allowed to {action} this {resource}'
productinputinvalid = 'Invalid product input'
productnotfound = 'Product not found'

View File

@@ -303,7 +303,7 @@ class Shipment(ShipmentBase, table=True):
class ShipmentUpdate(SQLModel): class ShipmentUpdate(SQLModel):
name: str | None name: str | None
date: str | None date: datetime.date | None
product_ids: list[int] | None = [] product_ids: list[int] | None = []
class ShipmentCreate(ShipmentBase): class ShipmentCreate(ShipmentBase):

View File

@@ -26,7 +26,7 @@ def get_productor(
): ):
result = service.get_one(session, id) result = service.get_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('productor'))
return result return result
@router.post('', response_model=models.ProductorPublic) @router.post('', response_model=models.ProductorPublic)
@@ -37,8 +37,8 @@ def create_productor(
): ):
try: try:
result = service.create_one(session, productor) result = service.create_one(session, productor)
except exceptions.ProductorCreateError: except exceptions.ProductorCreateError as error:
raise HTTPException(status_code=400, detail=messages.productorinputinvalid) raise HTTPException(status_code=400, detail=str(error))
return result return result
@router.put('/{id}', response_model=models.ProductorPublic) @router.put('/{id}', response_model=models.ProductorPublic)
@@ -49,8 +49,8 @@ def update_productor(
): ):
try: try:
result = service.update_one(session, id, productor) result = service.update_one(session, id, productor)
except exceptions.ProductorNotFoundError: except exceptions.ProductorNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.productornotfound) raise HTTPException(status_code=404, detail=str(error))
return result return result
@router.delete('/{id}', response_model=models.ProductorPublic) @router.delete('/{id}', response_model=models.ProductorPublic)
@@ -61,6 +61,6 @@ def delete_productor(
): ):
try: try:
result = service.delete_one(session, id) result = service.delete_one(session, id)
except exceptions.ProductorNotFoundError: except exceptions.ProductorNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.productornotfound) raise HTTPException(status_code=404, detail=str(error))
return result return result

View File

@@ -1,6 +1,7 @@
from sqlmodel import Session, select from sqlmodel import Session, select
import src.models as models import src.models as models
import src.productors.exceptions as exceptions import src.productors.exceptions as exceptions
import src.messages as messages
def get_all( def get_all(
session: Session, session: Session,
@@ -22,7 +23,7 @@ def get_one(session: Session, productor_id: int) -> models.ProductorPublic:
def create_one(session: Session, productor: models.ProductorCreate) -> models.ProductorPublic: def create_one(session: Session, productor: models.ProductorCreate) -> models.ProductorPublic:
if not productor: if not productor:
raise exceptions.ProductorCreateError('ProductorCreate input cannot be None') raise exceptions.ProductorCreateError(messages.Messages.invalid_input('productor', 'input cannot be None'))
productor_create = productor.model_dump(exclude_unset=True, exclude='payment_methods') productor_create = productor.model_dump(exclude_unset=True, exclude='payment_methods')
new_productor = models.Productor(**productor_create) new_productor = models.Productor(**productor_create)
@@ -43,7 +44,7 @@ def update_one(session: Session, id: int, productor: models.ProductorUpdate) ->
result = session.exec(statement) result = session.exec(statement)
new_productor = result.first() new_productor = result.first()
if not new_productor: if not new_productor:
raise exceptions.ProductorNotFoundError(f'Productor {id} not found') raise exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
productor_updates = productor.model_dump(exclude_unset=True) productor_updates = productor.model_dump(exclude_unset=True)
if 'payment_methods' in productor_updates: if 'payment_methods' in productor_updates:
@@ -71,7 +72,7 @@ def delete_one(session: Session, id: int) -> models.ProductorPublic:
result = session.exec(statement) result = session.exec(statement)
productor = result.first() productor = result.first()
if not productor: if not productor:
raise exceptions.ProductorNotFoundError(f'Productor {id} not found') raise exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
result = models.ProductorPublic.model_validate(productor) result = models.ProductorPublic.model_validate(productor)
session.delete(productor) session.delete(productor)
session.commit() session.commit()

View File

@@ -33,7 +33,7 @@ def get_product(
): ):
result = service.get_one(session, id) result = service.get_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('product'))
return result return result
@router.post('', response_model=models.ProductPublic) @router.post('', response_model=models.ProductPublic)
@@ -44,10 +44,10 @@ def create_product(
): ):
try: try:
result = service.create_one(session, product) result = service.create_one(session, product)
except exceptions.ProductCreateError: except exceptions.ProductCreateError as error:
raise HTTPException(status_code=400, detail=messages.productinputinvalid) raise HTTPException(status_code=400, detail=str(error))
except exceptions.ProductorNotFoundError: except exceptions.ProductorNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.productornotfound) raise HTTPException(status_code=404, detail=str(error))
return result return result
@router.put('/{id}', response_model=models.ProductPublic) @router.put('/{id}', response_model=models.ProductPublic)
@@ -58,10 +58,10 @@ def update_product(
): ):
try: try:
result = service.update_one(session, id, product) result = service.update_one(session, id, product)
except exceptions.ProductNotFoundError: except exceptions.ProductNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.productnotfound) raise HTTPException(status_code=404, detail=str(error))
except exceptions.ProductorNotFoundError: except exceptions.ProductorNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.productornotfound) raise HTTPException(status_code=404, detail=str(error))
return result return result
@router.delete('/{id}', response_model=models.ProductPublic) @router.delete('/{id}', response_model=models.ProductPublic)
@@ -72,6 +72,6 @@ def delete_product(
): ):
try: try:
result = service.delete_one(session, id) result = service.delete_one(session, id)
except exceptions.ProductNotFoundError: except exceptions.ProductNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=str(error))
return result return result

View File

@@ -1,6 +1,7 @@
from sqlmodel import Session, select from sqlmodel import Session, select
import src.models as models import src.models as models
import src.products.exceptions as exceptions import src.products.exceptions as exceptions
import src.messages as messages
def get_all( def get_all(
session: Session, session: Session,
@@ -26,9 +27,9 @@ def get_one(session: Session, product_id: int) -> models.ProductPublic:
def create_one(session: Session, product: models.ProductCreate) -> models.ProductPublic: def create_one(session: Session, product: models.ProductCreate) -> models.ProductPublic:
if not product: if not product:
raise exceptions.ProductCreateError('ProductCreate input cannot be None') raise exceptions.ProductCreateError(messages.Messages.invalid_input('product', 'input cannot be None'))
if not session.get(models.Productor, product.productor_id): if not session.get(models.Productor, product.productor_id):
raise exceptions.ProductorNotFoundError(f'Productor {product.productor_id} not found') raise exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
product_create = product.model_dump(exclude_unset=True) product_create = product.model_dump(exclude_unset=True)
new_product = models.Product(**product_create) new_product = models.Product(**product_create)
session.add(new_product) session.add(new_product)
@@ -41,9 +42,9 @@ def update_one(session: Session, id: int, product: models.ProductUpdate) -> mode
result = session.exec(statement) result = session.exec(statement)
new_product = result.first() new_product = result.first()
if not new_product: if not new_product:
raise exceptions.ProductNotFoundError(f'Product {id} not found') raise exceptions.ProductNotFoundError(messages.Messages.not_found('product'))
if product.productor_id and not session.get(models.Productor, product.productor_id): if product.productor_id and not session.get(models.Productor, product.productor_id):
raise exceptions.ProductorNotFoundError(f'Productor {product.productor_id} not found') raise exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
product_updates = product.model_dump(exclude_unset=True) product_updates = product.model_dump(exclude_unset=True)
for key, value in product_updates.items(): for key, value in product_updates.items():
@@ -59,7 +60,7 @@ def delete_one(session: Session, id: int) -> models.ProductPublic:
result = session.exec(statement) result = session.exec(statement)
product = result.first() product = result.first()
if not product: if not product:
raise exceptions.ProductNotFoundError(f'Product {id} not found') raise exceptions.ProductNotFoundError(messages.Messages.not_found('product'))
result = models.ProductPublic.model_validate(product) result = models.ProductPublic.model_validate(product)
session.delete(product) session.delete(product)
session.commit() session.commit()

View File

@@ -0,0 +1,11 @@
class ShipmentServiceError(Exception):
def __init__(self, message: str):
super().__init__(message)
class ShipmentNotFoundError(ShipmentServiceError):
pass
class ShipmentCreateError(ShipmentServiceError):
def __init__(self, message: str, field: str | None = None):
super().__init__(message)
self.field = field

View File

@@ -1,12 +1,15 @@
from sqlmodel import Session, select from sqlmodel import Session, select
import src.models as models import src.models as models
import src.shipments.exceptions as exceptions
import src.messages as messages
import datetime
def get_all( def get_all(
session: Session, session: Session,
user: models.User, user: models.User,
names: list[str], names: list[str],
dates: list[str], dates: list[str],
forms: list[int] forms: list[str]
) -> list[models.ShipmentPublic]: ) -> list[models.ShipmentPublic]:
statement = select(models.Shipment)\ statement = select(models.Shipment)\
.join(models.Form, models.Shipment.form_id == models.Form.id)\ .join(models.Form, models.Shipment.form_id == models.Form.id)\
@@ -16,15 +19,17 @@ def get_all(
if len(names) > 0: if len(names) > 0:
statement = statement.where(models.Shipment.name.in_(names)) statement = statement.where(models.Shipment.name.in_(names))
if len(dates) > 0: if len(dates) > 0:
statement = statement.where(models.Shipment.date.in_(list(map(lambda x: datetime.strptime(x, '%Y-%m-%d'), dates)))) statement = statement.where(models.Shipment.date.in_(list(map(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d').date(), dates))))
if len(forms) > 0: if len(forms) > 0:
statement = statement.join(models.Form).where(models.Form.name.in_(forms)) statement = statement.where(models.Form.name.in_(forms))
return session.exec(statement.order_by(models.Shipment.name)).all() return session.exec(statement.order_by(models.Shipment.name)).all()
def get_one(session: Session, shipment_id: int) -> models.ShipmentPublic: def get_one(session: Session, shipment_id: int) -> models.ShipmentPublic:
return session.get(models.Shipment, shipment_id) return session.get(models.Shipment, shipment_id)
def create_one(session: Session, shipment: models.ShipmentCreate) -> models.ShipmentPublic: def create_one(session: Session, shipment: models.ShipmentCreate) -> models.ShipmentPublic:
if shipment is None:
raise exceptions.ShipmentCreateError(messages.Messages.invalid_input('shipment', 'input cannot be None'))
products = session.exec(select(models.Product).where(models.Product.id.in_(shipment.product_ids))).all() products = session.exec(select(models.Product).where(models.Product.id.in_(shipment.product_ids))).all()
shipment_create = shipment.model_dump(exclude_unset=True, exclude={'product_ids'}) shipment_create = shipment.model_dump(exclude_unset=True, exclude={'product_ids'})
new_shipment = models.Shipment(**shipment_create, products=products) new_shipment = models.Shipment(**shipment_create, products=products)
@@ -34,11 +39,13 @@ def create_one(session: Session, shipment: models.ShipmentCreate) -> models.Ship
return new_shipment return new_shipment
def update_one(session: Session, id: int, shipment: models.ShipmentUpdate) -> models.ShipmentPublic: def update_one(session: Session, id: int, shipment: models.ShipmentUpdate) -> models.ShipmentPublic:
if shipment is None:
raise exceptions.ShipmentCreateError(messages.Messages.invalid_input('shipment', 'input cannot be None'))
statement = select(models.Shipment).where(models.Shipment.id == id) statement = select(models.Shipment).where(models.Shipment.id == id)
result = session.exec(statement) result = session.exec(statement)
new_shipment = result.first() new_shipment = result.first()
if not new_shipment: if not new_shipment:
return None raise exceptions.ShipmentNotFoundError(messages.Messages.not_found('shipment'))
products_to_add = session.exec(select(models.Product).where(models.Product.id.in_(shipment.product_ids))).all() products_to_add = session.exec(select(models.Product).where(models.Product.id.in_(shipment.product_ids))).all()
new_shipment.products.clear() new_shipment.products.clear()
@@ -59,7 +66,8 @@ def delete_one(session: Session, id: int) -> models.ShipmentPublic:
result = session.exec(statement) result = session.exec(statement)
shipment = result.first() shipment = result.first()
if not shipment: if not shipment:
return None raise exceptions.ShipmentNotFoundError(messages.Messages.not_found('shipment'))
result = models.ShipmentPublic.model_validate(shipment) result = models.ShipmentPublic.model_validate(shipment)
session.delete(shipment) session.delete(shipment)
session.commit() session.commit()

View File

@@ -4,6 +4,7 @@ import src.models as models
from src.database import get_session from src.database import get_session
from sqlmodel import Session from sqlmodel import Session
import src.shipments.service as service import src.shipments.service as service
import src.shipments.exceptions as exceptions
from src.auth.auth import get_current_user from src.auth.auth import get_current_user
router = APIRouter(prefix='/shipments') router = APIRouter(prefix='/shipments')
@@ -32,7 +33,7 @@ def get_shipment(
): ):
result = service.get_one(session, id) result = service.get_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('shipment'))
return result return result
@router.post('', response_model=models.ShipmentPublic) @router.post('', response_model=models.ShipmentPublic)
@@ -41,17 +42,23 @@ def create_shipment(
user: models.User = Depends(get_current_user), user: models.User = Depends(get_current_user),
session: Session = Depends(get_session) session: Session = Depends(get_session)
): ):
return service.create_one(session, shipment) try:
result = service.create_one(session, shipment)
except exceptions.ShipmentCreateError as error:
raise HTTPException(status_code=400, detail=str(error))
return result
@router.put('/{id}', response_model=models.ShipmentPublic) @router.put('/{id}', response_model=models.ShipmentPublic)
def update_shipment( def update_shipment(
id: int, shipment: models.ShipmentUpdate, id: int,
shipment: models.ShipmentUpdate,
user: models.User = Depends(get_current_user), user: models.User = Depends(get_current_user),
session: Session = Depends(get_session) session: Session = Depends(get_session)
): ):
result = service.update_one(session, id, shipment) try:
if result is None: result = service.update_one(session, id, shipment)
raise HTTPException(status_code=404, detail=messages.notfound) except exceptions.ShipmentNotFoundError as error:
raise HTTPException(status_code=404, detail=str(error))
return result return result
@router.delete('/{id}', response_model=models.ShipmentPublic) @router.delete('/{id}', response_model=models.ShipmentPublic)
@@ -60,7 +67,8 @@ def delete_shipment(
user: models.User = Depends(get_current_user), user: models.User = Depends(get_current_user),
session: Session = Depends(get_session) session: Session = Depends(get_session)
): ):
result = service.delete_one(session, id) try:
if result is None: result = service.delete_one(session, id)
raise HTTPException(status_code=404, detail=messages.notfound) except exceptions.ShipmentNotFoundError as error:
raise HTTPException(status_code=404, detail=str(error))
return result return result

View File

@@ -23,7 +23,7 @@ def get_template(
): ):
result = service.get_one(session, id) result = service.get_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('template'))
return result return result
@router.post('', response_model=models.TemplatePublic) @router.post('', response_model=models.TemplatePublic)
@@ -42,7 +42,7 @@ def update_template(
): ):
result = service.update_one(session, id, template) result = service.update_one(session, id, template)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('template'))
return result return result
@router.delete('/{id}', response_model=models.TemplatePublic) @router.delete('/{id}', response_model=models.TemplatePublic)
@@ -53,5 +53,5 @@ def delete_template(
): ):
result = service.delete_one(session, id) result = service.delete_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('template'))
return result return result

View File

@@ -0,0 +1,11 @@
class UserServiceError(Exception):
def __init__(self, message: str):
super().__init__(message)
class UserNotFoundError(UserServiceError):
pass
class UserCreateError(UserServiceError):
def __init__(self, message: str, field: str | None = None):
super().__init__(message)
self.field = field

View File

@@ -1,5 +1,9 @@
from sqlmodel import Session, select from sqlmodel import Session, select
import src.models as models import src.models as models
import src.messages as messages
import src.users.exceptions as exceptions
def get_all( def get_all(
session: Session, session: Session,
@@ -49,6 +53,8 @@ def get_roles(session: Session):
return session.exec(statement.order_by(models.ContractType.name)).all() return session.exec(statement.order_by(models.ContractType.name)).all()
def create_one(session: Session, user: models.UserCreate) -> models.UserPublic: def create_one(session: Session, user: models.UserCreate) -> models.UserPublic:
if user is None:
raise exceptions.UserCreateError(messages.Messages.invalid_input('user', 'input cannot be None'))
new_user = models.User( new_user = models.User(
name=user.name, name=user.name,
email=user.email email=user.email
@@ -63,12 +69,13 @@ def create_one(session: Session, user: models.UserCreate) -> models.UserPublic:
return new_user return new_user
def update_one(session: Session, id: int, user: models.UserCreate) -> models.UserPublic: def update_one(session: Session, id: int, user: models.UserCreate) -> models.UserPublic:
if user is None:
raise exceptions.UserCreateError(messages.s.invalid_input('user', 'input cannot be None'))
statement = select(models.User).where(models.User.id == id) statement = select(models.User).where(models.User.id == id)
result = session.exec(statement) result = session.exec(statement)
new_user = result.first() new_user = result.first()
if not new_user: if not new_user:
return None raise exceptions.UserNotFoundError(f'User {id} not found')
new_user.email = user.email new_user.email = user.email
new_user.name = user.name new_user.name = user.name
@@ -84,7 +91,7 @@ def delete_one(session: Session, id: int) -> models.UserPublic:
result = session.exec(statement) result = session.exec(statement)
user = result.first() user = result.first()
if not user: if not user:
return None raise exceptions.UserNotFoundError(f'User {id} not found')
result = models.UserPublic.model_validate(user) result = models.UserPublic.model_validate(user)
session.delete(user) session.delete(user)
session.commit() session.commit()

View File

@@ -5,6 +5,7 @@ from src.database import get_session
from sqlmodel import Session from sqlmodel import Session
import src.users.service as service import src.users.service as service
from src.auth.auth import get_current_user from src.auth.auth import get_current_user
import src.users.exceptions as exceptions
router = APIRouter(prefix='/users') router = APIRouter(prefix='/users')
@@ -36,7 +37,7 @@ def get_users(
): ):
result = service.get_one(session, id) result = service.get_one(session, id)
if result is None: if result is None:
raise HTTPException(status_code=404, detail=messages.notfound) raise HTTPException(status_code=404, detail=messages.Messages.not_found('user'))
return result return result
@router.post('', response_model=models.UserPublic) @router.post('', response_model=models.UserPublic)
@@ -45,7 +46,11 @@ def create_user(
logged_user: models.User = Depends(get_current_user), logged_user: models.User = Depends(get_current_user),
session: Session = Depends(get_session) session: Session = Depends(get_session)
): ):
return service.create_one(session, user) try:
user = service.create_one(session, user)
except exceptions.UserCreateError as error:
raise HTTPException(status_code=400, detail=str(error))
return user
@router.put('/{id}', response_model=models.UserPublic) @router.put('/{id}', response_model=models.UserPublic)
def update_user( def update_user(
@@ -54,9 +59,10 @@ def update_user(
logged_user: models.User = Depends(get_current_user), logged_user: models.User = Depends(get_current_user),
session: Session = Depends(get_session) session: Session = Depends(get_session)
): ):
result = service.update_one(session, id, user) try:
if result is None: result = service.update_one(session, id, user)
raise HTTPException(status_code=404, detail=messages.notfound) except exceptions.UserNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.Messages.not_found('user'))
return result return result
@router.delete('/{id}', response_model=models.UserPublic) @router.delete('/{id}', response_model=models.UserPublic)
@@ -65,7 +71,8 @@ def delete_user(
user: models.User = Depends(get_current_user), user: models.User = Depends(get_current_user),
session: Session = Depends(get_session) session: Session = Depends(get_session)
): ):
result = service.delete_one(session, id) try:
if result is None: result = service.delete_one(session, id)
raise HTTPException(status_code=404, detail=messages.notfound) except exceptions.UserNotFoundError as error:
raise HTTPException(status_code=404, detail=messages.Messages.not_found('user'))
return result return result

View File

@@ -1,12 +1,15 @@
import pytest import pytest
import datetime
from sqlmodel import Session from sqlmodel import Session
import src.models as models import src.models as models
import src.forms.service as forms_service import src.forms.service as forms_service
import src.shipments.service as shipments_service
import src.productors.service as productors_service import src.productors.service as productors_service
import src.products.service as products_service import src.products.service as products_service
import src.users.service as users_service import src.users.service as users_service
import tests.factories.forms as forms_factory import tests.factories.forms as forms_factory
import tests.factories.shipments as shipments_factory
import tests.factories.productors as productors_factory import tests.factories.productors as productors_factory
import tests.factories.products as products_factory import tests.factories.products as products_factory
import tests.factories.users as users_factory import tests.factories.users as users_factory
@@ -77,6 +80,36 @@ def user(session: Session) -> models.UserPublic:
) )
return user return user
@pytest.fixture
def users(session: Session) -> list[models.UserPublic]:
users = [
users_service.create_one(
session,
users_factory.user_create_factory(
name='test user 1 (admin)',
email='test1@test.com',
role_names=['Légumineuses', 'Légumes', 'Oeufs', 'Porc-Agneau', 'Vin', 'Fruits']
)
),
users_service.create_one(
session,
users_factory.user_create_factory(
name='test user 2',
email='test2@test.com',
role_names=['Légumineuses']
)
),
users_service.create_one(
session,
users_factory.user_create_factory(
name='test user 3',
email='test3@test.com',
role_names=['Porc-Agneau']
)
)
]
return users
@pytest.fixture @pytest.fixture
def referer(session: Session) -> models.UserPublic: def referer(session: Session) -> models.UserPublic:
referer = users_service.create_one( referer = users_service.create_one(
@@ -89,6 +122,30 @@ def referer(session: Session) -> models.UserPublic:
) )
return referer return referer
@pytest.fixture
def shipments(session: Session, forms: list[models.FormPublic], products: list[models.ProductPublic]):
shipments = [
shipments_service.create_one(
session,
shipments_factory.shipment_create_factory(
name='test shipment 1',
date=datetime.date(2025, 10, 10),
form_id=forms[0].id,
product_ids=[p.id for p in products]
)
),
shipments_service.create_one(
session,
shipments_factory.shipment_create_factory(
name='test shipment 2',
date=datetime.date(2025, 11, 10),
form_id=forms[0].id,
product_ids=[p.id for p in products]
)
),
]
return shipments
@pytest.fixture @pytest.fixture
def forms( def forms(
session: Session, session: Session,

View File

@@ -122,6 +122,9 @@ class TestContracts:
app.dependency_overrides.clear() app.dependency_overrides.clear()
def test_create_one(self, client, mocker, mock_session, mock_user):
pass
def test_delete_one(self, client, mocker, mock_session, mock_user): def test_delete_one(self, client, mocker, mock_session, mock_user):
contract_result = contract_factory.contract_public_factory(name='test contract delete') contract_result = contract_factory.contract_public_factory(name='test contract delete')

View File

@@ -5,6 +5,7 @@ from src.main import app
from src.auth.auth import get_current_user from src.auth.auth import get_current_user
import tests.factories.forms as form_factory import tests.factories.forms as form_factory
from fastapi.exceptions import HTTPException from fastapi.exceptions import HTTPException
import src.messages as messages
class TestForms: class TestForms:
def test_get_all(self, client, mocker, mock_session, mock_user): def test_get_all(self, client, mocker, mock_session, mock_user):
@@ -124,6 +125,44 @@ class TestForms:
form_create form_create
) )
def test_create_one_referer_notfound(self, client, mocker, mock_session, mock_user):
form_body = form_factory.form_body_factory(name='test form create', referer_id=12312)
form_create = form_factory.form_create_factory(name='test form create', referer_id=12312)
mock = mocker.patch.object(
service,
'create_one',
side_effect=forms_exceptions.UserNotFoundError(messages.Messages.not_found('referer'))
)
response = client.post('/api/forms', json=form_body)
response_data = response.json()
assert response.status_code == 404
mock.assert_called_once_with(
mock_session,
form_create
)
def test_create_one_productor_notfound(self, client, mocker, mock_session, mock_user):
form_body = form_factory.form_body_factory(name='test form create', productor_id=1231)
form_create = form_factory.form_create_factory(name='test form create', productor_id=1231)
mock = mocker.patch.object(
service,
'create_one',
side_effect=forms_exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
)
response = client.post('/api/forms', json=form_body)
response_data = response.json()
assert response.status_code == 404
mock.assert_called_once_with(
mock_session,
form_create
)
def test_create_one_unauthorized(self, client, mocker, mock_session, mock_user): def test_create_one_unauthorized(self, client, mocker, mock_session, mock_user):
def unauthorized(): def unauthorized():
raise HTTPException(status_code=401) raise HTTPException(status_code=401)
@@ -164,12 +203,51 @@ class TestForms:
def test_update_one_notfound(self, client, mocker, mock_session, mock_user): def test_update_one_notfound(self, client, mocker, mock_session, mock_user):
form_body = form_factory.form_body_factory(name='test form update') form_body = form_factory.form_body_factory(name='test form update')
form_update = form_factory.form_update_factory(name='test form update') form_update = form_factory.form_update_factory(name='test form update')
form_result = None
mock = mocker.patch.object( mock = mocker.patch.object(
service, service,
'update_one', 'update_one',
side_effect=forms_exceptions.FormNotFoundError('Form 1 not found') side_effect=forms_exceptions.FormNotFoundError(messages.Messages.not_found('form'))
)
response = client.put('/api/forms/2', json=form_body)
response_data = response.json()
assert response.status_code == 404
mock.assert_called_once_with(
mock_session,
2,
form_update
)
def test_update_one_referer_notfound(self, client, mocker, mock_session, mock_user):
form_body = form_factory.form_body_factory(name='test form update')
form_update = form_factory.form_update_factory(name='test form update')
mock = mocker.patch.object(
service,
'update_one',
side_effect=forms_exceptions.UserNotFoundError(messages.Messages.not_found('referer'))
)
response = client.put('/api/forms/2', json=form_body)
response_data = response.json()
assert response.status_code == 404
mock.assert_called_once_with(
mock_session,
2,
form_update
)
def test_update_one_productor_notfound(self, client, mocker, mock_session, mock_user):
form_body = form_factory.form_body_factory(name='test form update')
form_update = form_factory.form_update_factory(name='test form update')
mock = mocker.patch.object(
service,
'update_one',
side_effect=forms_exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
) )
response = client.put('/api/forms/2', json=form_body) response = client.put('/api/forms/2', json=form_body)
@@ -222,7 +300,7 @@ class TestForms:
mock = mocker.patch.object( mock = mocker.patch.object(
service, service,
'delete_one', 'delete_one',
side_effect=forms_exceptions.FormNotFoundError('Form 2 not found') side_effect=forms_exceptions.FormNotFoundError(messages.Messages.not_found('form'))
) )
response = client.delete('/api/forms/2') response = client.delete('/api/forms/2')

View File

@@ -1,11 +1,14 @@
import src.productors.service as service from fastapi.exceptions import HTTPException
import src.models as models
from src.main import app from src.main import app
import src.models as models
import src.messages as messages
from src.auth.auth import get_current_user from src.auth.auth import get_current_user
import tests.factories.productors as productor_factory
import src.productors.service as service
import src.productors.exceptions as exceptions import src.productors.exceptions as exceptions
from fastapi.exceptions import HTTPException import tests.factories.productors as productor_factory
class TestProductors: class TestProductors:
def test_get_all(self, client, mocker, mock_session, mock_user): def test_get_all(self, client, mocker, mock_session, mock_user):
@@ -181,7 +184,7 @@ class TestProductors:
mock = mocker.patch.object( mock = mocker.patch.object(
service, service,
'update_one', 'update_one',
side_effect=exceptions.ProductorNotFoundError('Productor 1 not found') side_effect=exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
) )
response = client.put('/api/productors/2', json=productor_body) response = client.put('/api/productors/2', json=productor_body)
@@ -234,7 +237,7 @@ class TestProductors:
mock = mocker.patch.object( mock = mocker.patch.object(
service, service,
'delete_one', 'delete_one',
side_effect=exceptions.ProductorNotFoundError('Productor 1 not found') side_effect=exceptions.ProductorNotFoundError(messages.Messages.not_found('productor'))
) )
response = client.delete('/api/productors/2') response = client.delete('/api/productors/2')

View File

@@ -1,6 +1,8 @@
import src.shipments.service as service import src.shipments.service as service
import src.models as models import src.models as models
from src.main import app from src.main import app
import src.messages as messages
import src.shipments.exceptions as exceptions
from src.auth.auth import get_current_user from src.auth.auth import get_current_user
import tests.factories.shipments as shipment_factory import tests.factories.shipments as shipment_factory
@@ -177,12 +179,11 @@ class TestShipments:
def test_update_one_notfound(self, client, mocker, mock_session, mock_user): def test_update_one_notfound(self, client, mocker, mock_session, mock_user):
shipment_body = shipment_factory.shipment_body_factory(name='test shipment update') shipment_body = shipment_factory.shipment_body_factory(name='test shipment update')
shipment_update = shipment_factory.shipment_update_factory(name='test shipment update') shipment_update = shipment_factory.shipment_update_factory(name='test shipment update')
shipment_result = None
mock = mocker.patch.object( mock = mocker.patch.object(
service, service,
'update_one', 'update_one',
return_value=shipment_result side_effect=exceptions.ShipmentNotFoundError(messages.Messages.not_found('shipment'))
) )
response = client.put('/api/shipments/2', json=shipment_body) response = client.put('/api/shipments/2', json=shipment_body)
@@ -235,7 +236,7 @@ class TestShipments:
mock = mocker.patch.object( mock = mocker.patch.object(
service, service,
'delete_one', 'delete_one',
return_value=shipment_result side_effect=exceptions.ShipmentNotFoundError(messages.Messages.not_found('shipment'))
) )
response = client.delete('/api/shipments/2') response = client.delete('/api/shipments/2')

View File

@@ -3,6 +3,7 @@ import src.models as models
from src.main import app from src.main import app
from src.auth.auth import get_current_user from src.auth.auth import get_current_user
import tests.factories.users as user_factory import tests.factories.users as user_factory
import src.users.exceptions as exceptions
from fastapi.exceptions import HTTPException from fastapi.exceptions import HTTPException
@@ -178,7 +179,7 @@ class TestUsers:
mock = mocker.patch.object( mock = mocker.patch.object(
service, service,
'update_one', 'update_one',
return_value=user_result side_effect=exceptions.UserNotFoundError('User 2 not found')
) )
response = client.put('/api/users/2', json=user_body) response = client.put('/api/users/2', json=user_body)
@@ -231,7 +232,7 @@ class TestUsers:
mock = mocker.patch.object( mock = mocker.patch.object(
service, service,
'delete_one', 'delete_one',
return_value=user_result side_effect=exceptions.UserNotFoundError('User 2 not found')
) )
response = client.delete('/api/users/2') response = client.delete('/api/users/2')

View File

@@ -0,0 +1,143 @@
import pytest
import datetime
from sqlmodel import Session
import src.models as models
import src.shipments.service as shipments_service
import src.shipments.exceptions as shipments_exceptions
import tests.factories.shipments as shipments_factory
class TestShipmentsService:
def test_get_all_shipments(
self,
session: Session,
shipments: list[models.ShipmentPublic],
user: models.UserPublic,
):
result = shipments_service.get_all(session, user, [], [], [])
assert len(result) == 2
assert result == shipments
def test_get_all_shipments_filter_names(
self,
session: Session,
shipments: list[models.ShipmentPublic],
user: models.UserPublic,
):
result = shipments_service.get_all(session, user, ['test shipment 1'], [], [])
assert len(result) == 1
assert result == [shipments[0]]
def test_get_all_shipments_filter_dates(
self,
session: Session,
shipments: list[models.ShipmentPublic],
user: models.UserPublic,
):
result = shipments_service.get_all(session, user, [], ['2025-10-10'], [])
assert len(result) == 1
def test_get_all_shipments_filter_forms(
self,
session: Session,
shipments: list[models.ShipmentPublic],
forms: list[models.FormPublic],
user: models.UserPublic,
):
result = shipments_service.get_all(session, user, [], [], [forms[0].name])
assert len(result) == 2
def test_get_all_shipments_all_filters(
self,
session: Session,
shipments: list[models.ShipmentPublic],
forms: list[models.FormPublic],
user: models.UserPublic,
):
result = shipments_service.get_all(session, user, ['test shipment 1'], ['2025-10-10'], [forms[0].name])
assert len(result) == 1
def test_get_one_shipment(self, session: Session, shipments: list[models.ShipmentPublic]):
result = shipments_service.get_one(session, shipments[0].id)
assert result == shipments[0]
def test_get_one_shipment_notfound(self, session: Session):
result = shipments_service.get_one(session, 122)
assert result == None
def test_create_shipment(
self,
session: Session,
):
shipment_create = shipments_factory.shipment_create_factory(
name='new test shipment',
date='2025-10-10',
)
result = shipments_service.create_one(session, shipment_create)
assert result.id is not None
assert result.name == "new test shipment"
def test_create_shipment_invalidinput(
self,
session: Session,
):
shipment_create = None
with pytest.raises(shipments_exceptions.ShipmentCreateError):
result = shipments_service.create_one(session, shipment_create)
def test_update_shipment(
self,
session: Session,
shipments: list[models.ShipmentPublic]
):
shipment_update = shipments_factory.shipment_update_factory(
name='updated shipment 1',
date='2025-12-10',
)
shipment_id = shipments[0].id
result = shipments_service.update_one(session, shipment_id, shipment_update)
assert result.id == shipment_id
assert result.name == 'updated shipment 1'
assert result.date == datetime.date(2025, 12, 10)
def test_update_shipment_notfound(
self,
session: Session,
):
shipment_update = shipments_factory.shipment_update_factory(
name='updated shipment 1',
date=datetime.date(2025, 10, 10),
)
shipment_id = 123
with pytest.raises(shipments_exceptions.ShipmentNotFoundError):
result = shipments_service.update_one(session, shipment_id, shipment_update)
def test_delete_shipment(
self,
session: Session,
shipments: list[models.ShipmentPublic]
):
shipment_id = shipments[0].id
result = shipments_service.delete_one(session, shipment_id)
check = shipments_service.get_one(session, shipment_id)
assert check == None
def test_delete_shipment_notfound(
self,
session: Session,
shipments: list[models.ShipmentPublic]
):
shipment_id = 123
with pytest.raises(shipments_exceptions.ShipmentNotFoundError):
result = shipments_service.delete_one(session, shipment_id)

View File

@@ -0,0 +1,115 @@
import pytest
from sqlmodel import Session
import src.models as models
import src.users.service as users_service
import src.users.exceptions as users_exceptions
import tests.factories.users as users_factory
class TestUsersService:
def test_get_all_users(self, session: Session, users: list[models.UserPublic]):
result = users_service.get_all(session, [], [])
assert len(result) == 3
assert result == users
def test_get_all_users_filter_names(self, session: Session, users: list[models.UserPublic]):
result = users_service.get_all(session, ['test user 1 (admin)'], [])
assert len(result) == 1
assert result == [users[0]]
def test_get_all_users_filter_emails(self, session: Session, users: list[models.UserPublic]):
result = users_service.get_all(session, [], ['test1@test.com'])
assert len(result) == 1
def test_get_all_users_all_filters(self, session: Session, users: list[models.UserPublic]):
result = users_service.get_all(session, ['test user 1 (admin)'], ['test1@test.com'])
assert len(result) == 1
def test_get_one_user(self, session: Session, users: list[models.UserPublic]):
result = users_service.get_one(session, users[0].id)
assert result == users[0]
def test_get_one_user_notfound(self, session: Session):
result = users_service.get_one(session, 122)
assert result == None
def test_create_user(
self,
session: Session,
):
user_create = users_factory.user_create_factory(
name="new test user",
email='test@test.fr',
role_names=['test role']
)
result = users_service.create_one(session, user_create)
assert result.id is not None
assert result.name == "new test user"
assert result.email == "test@test.fr"
assert len(result.roles) == 1
def test_create_user_invalidinput(
self,
session: Session,
):
user_create = None
with pytest.raises(users_exceptions.UserCreateError):
result = users_service.create_one(session, user_create)
def test_update_user(
self,
session: Session,
users: list[models.UserPublic]
):
user_update = users_factory.user_update_factory(
name="updated test user",
email='test@testttt.fr',
role_names=['test role']
)
user_id = users[0].id
result = users_service.update_one(session, user_id, user_update)
assert result.id == user_id
assert result.name == 'updated test user'
assert result.email == 'test@testttt.fr'
def test_update_user_notfound(
self,
session: Session,
):
user_update = users_factory.user_update_factory(
name="updated test user",
email='test@testttt.fr',
role_names=['test role']
)
user_id = 123
with pytest.raises(users_exceptions.UserNotFoundError):
result = users_service.update_one(session, user_id, user_update)
def test_delete_user(
self,
session: Session,
users: list[models.UserPublic]
):
user_id = users[0].id
result = users_service.delete_one(session, user_id)
check = users_service.get_one(session, user_id)
assert check == None
def test_delete_user_notfound(
self,
session: Session,
users: list[models.UserPublic]
):
user_id = 123
with pytest.raises(users_exceptions.UserNotFoundError):
result = users_service.delete_one(session, user_id)