add logout logic and wip recap

This commit is contained in:
Julien Aldon
2026-02-18 18:08:30 +01:00
parent aca24ca560
commit acbaadff67
29 changed files with 363 additions and 100 deletions

View File

@@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from src.database import get_session
from sqlmodel import Session
from src.contracts.generate_contract import generate_html_contract
from src.contracts.generate_contract import generate_html_contract, generate_recap
from src.auth.auth import get_current_user
import src.models as models
import src.messages as messages
@@ -79,7 +79,8 @@ async def create_contract(
occasionals = create_occasional_dict(occasional_contract_products)
recurrents = list(map(lambda x: {"product": x.product, "quantity": x.quantity}, filter(lambda contract_product: contract_product.product.type == models.ProductType.RECCURENT, new_contract.products)))
recurrent_price = compute_recurrent_prices(recurrents, len(new_contract.form.shipments))
total_price = '{:10.2f}'.format(recurrent_price + compute_occasional_prices(occasionals))
price = recurrent_price + compute_occasional_prices(occasionals)
total_price = '{:10.2f}'.format(price)
cheques = list(map(lambda x: {"name": x.name, "value": x.value}, new_contract.cheques))
# TODO: send contract to referer
@@ -94,7 +95,7 @@ async def create_contract(
)
pdf_file = io.BytesIO(pdf_bytes)
contract_id = f'{new_contract.firstname}_{new_contract.lastname}_{new_contract.form.productor.type}_{new_contract.form.season}'
service.add_contract_file(session, new_contract.id, pdf_bytes)
service.add_contract_file(session, new_contract.id, pdf_bytes, price)
except Exception as e:
print(e)
raise HTTPException(status_code=400, detail=messages.pdferror)
@@ -112,7 +113,7 @@ def get_contracts(
session: Session = Depends(get_session),
user: models.User = Depends(get_current_user)
):
return service.get_all(session, forms)
return service.get_all(session, user, forms)
@router.get('/{id}/file')
def get_contract_file(
@@ -120,6 +121,8 @@ def get_contract_file(
session: Session = Depends(get_session),
user: models.User = Depends(get_current_user)
):
if not service.is_allowed(session, user, id):
raise HTTPException(status_code=403, detail=messages.notallowed)
contract = service.get_one(session, id)
if contract is None:
raise HTTPException(status_code=404, detail=messages.notfound)
@@ -138,8 +141,10 @@ def get_contract_files(
session: Session = Depends(get_session),
user: models.User = Depends(get_current_user)
):
if not form_service.is_allowed(session, user, form_id):
raise HTTPException(status_code=403, detail=messages.notallowed)
form = form_service.get_one(session, form_id=form_id)
contracts = service.get_all(session, [form.name])
contracts = service.get_all(session, user, forms=[form.name])
zipped_contracts = io.BytesIO()
with zipfile.ZipFile(zipped_contracts, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
for contract in contracts:
@@ -155,9 +160,29 @@ def get_contract_files(
}
)
@router.get('/{form_id}/recap')
def get_contract_recap(
form_id: int,
session: Session = Depends(get_session),
user: models.User = Depends(get_current_user)
):
if not form_service.is_allowed(session, user, form_id):
raise HTTPException(status_code=403, detail=messages.notallowed)
form = form_service.get_one(session, form_id=form_id)
contracts = service.get_all(session, user, forms=[form.name])
return StreamingResponse(
io.BytesIO(generate_recap(contracts, form)),
media_type='application/zip',
headers={
'Content-Disposition': f'attachment; filename=filename.ods'
}
)
@router.get('/{id}', response_model=models.ContractPublic)
def get_contract(id: int, session: Session = Depends(get_session), user: models.User = Depends(get_current_user)):
if not service.is_allowed(session, user, id):
raise HTTPException(status_code=403, detail=messages.notallowed)
result = service.get_one(session, id)
if result is None:
raise HTTPException(status_code=404, detail=messages.notfound)
@@ -165,6 +190,8 @@ def get_contract(id: int, session: Session = Depends(get_session), user: models.
@router.delete('/{id}', response_model=models.ContractPublic)
def delete_contract(id: int, session: Session = Depends(get_session), user: models.User = Depends(get_current_user)):
if not service.is_allowed(session, user, id):
raise HTTPException(status_code=403, detail=messages.notallowed)
result = service.delete_one(session, id)
if result is None:
raise HTTPException(status_code=404, detail=messages.notfound)

View File

@@ -3,6 +3,7 @@ import jinja2
import src.models as models
import html
from weasyprint import HTML
import io
def generate_html_contract(
contract: models.Contract,
@@ -57,4 +58,25 @@ def generate_html_contract(
return HTML(
string=output_text,
base_url=template_dir
).write_pdf()
).write_pdf()
from odfdo import Document, Table, Row, Cell
def generate_recap(
contracts: list[models.Contract],
form: models.Form,
):
data = [
["nom", "email"],
]
doc = Document("spreadsheet")
sheet = Table(name="Recap")
sheet.set_values(data)
doc.body.append(sheet)
buffer = io.BytesIO()
doc.save(buffer)
return buffer.getvalue()

View File

@@ -3,14 +3,19 @@ import src.models as models
def get_all(
session: Session,
user: models.User,
forms: list[str] = [],
form_id: int | None = None,
form_id: int | None = None,
) -> list[models.ContractPublic]:
statement = select(models.Contract)
if form_id:
statement = statement.join(models.Form).where(models.Form.id == form_id)
statement = select(models.Contract)\
.join(models.Form, models.Contract.form_id == models.Form.id)\
.join(models.Productor, models.Form.productor_id == models.Productor.id)\
.where(models.Productor.type.in_([r.name for r in user.roles]))\
.distinct()
if len(forms) > 0:
statement = statement.join(models.Form).where(models.Form.name.in_(forms))
statement = statement.where(models.Form.name.in_(forms))
if form_id:
statement = statement.where(models.Form.id == form_id)
return session.exec(statement.order_by(models.Contract.id)).all()
def get_one(session: Session, contract_id: int) -> models.ContractPublic:
@@ -42,11 +47,11 @@ def create_one(session: Session, contract: models.ContractCreate) -> models.Cont
session.refresh(new_contract)
return new_contract
def add_contract_file(session: Session, id: int, file: bytes):
def add_contract_file(session: Session, id: int, file: bytes, price: float):
statement = select(models.Contract).where(models.Contract.id == id)
result = session.exec(statement)
contract = result.first()
contract.total_price = price
contract.file = file
session.add(contract)
session.commit()
@@ -77,3 +82,12 @@ def delete_one(session: Session, id: int) -> models.ContractPublic:
session.delete(contract)
session.commit()
return result
def is_allowed(session: Session, user: models.User, id: int) -> bool:
statement = select(models.Contract)\
.join(models.Form, models.Contract.form_id == models.Form.id)\
.join(models.Productor, models.Form.productor_id == models.Productor.id)\
.where(models.Contract.id == id)\
.where(models.Productor.type.in_([r.name for r in user.roles]))\
.distinct()
return len(session.exec(statement).all()) > 0