Compare commits

2 Commits

Author SHA1 Message Date
Julien Aldon
3cfa60507e [WIP] add styles 2026-03-03 17:58:33 +01:00
8c6b25ded8 WIP contract recap 2026-02-19 16:19:40 +01:00
10 changed files with 228 additions and 170 deletions

View File

@@ -1,21 +1,20 @@
import secrets
from typing import Annotated
from urllib.parse import urlencode
import jwt
import requests
import src.messages as messages
import src.users.service as service
from fastapi import (APIRouter, Cookie, Depends, HTTPException, Request,
Security)
from fastapi import APIRouter, Security, HTTPException, Depends, Request, Cookie
from fastapi.responses import RedirectResponse, Response
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jwt import PyJWKClient
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlmodel import Session, select
import jwt
from jwt import PyJWKClient
from src.settings import AUTH_URL, TOKEN_URL, JWKS_URL, ISSUER, LOGOUT_URL, settings
import src.users.service as service
from src.database import get_session
from src.models import User, UserCreate, UserPublic
from src.settings import (AUTH_URL, ISSUER, JWKS_URL, LOGOUT_URL, TOKEN_URL,
settings)
from src.models import UserCreate, User, UserPublic
import secrets
import requests
from urllib.parse import urlencode
import src.messages as messages
router = APIRouter(prefix='/auth')
@@ -99,7 +98,7 @@ def callback(code: str, session: Session = Depends(get_session)):
'client_secret': settings.keycloak_client_secret,
'refresh_token': token_data['refresh_token'],
}
requests.post(LOGOUT_URL, data=data)
res = requests.post(LOGOUT_URL, data=data)
resp = RedirectResponse(f'{settings.origins}?userNotAllowed=true')
return resp
roles = resource_access.get(settings.keycloak_client_id)
@@ -109,7 +108,7 @@ def callback(code: str, session: Session = Depends(get_session)):
'client_secret': settings.keycloak_client_secret,
'refresh_token': token_data['refresh_token'],
}
requests.post(LOGOUT_URL, data=data)
res = requests.post(LOGOUT_URL, data=data)
resp = RedirectResponse(f'{settings.origins}?userNotAllowed=true')
return resp
@@ -161,15 +160,12 @@ def verify_token(token: str):
)
return decoded
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail=messages.Messages.tokenexipired
)
raise HTTPException(status_code=401,
detail=messages.Messages.tokenexipired)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=401,
detail=messages.Messages.invalidtoken
)
detail=messages.Messages.invalidtoken)
def get_current_user(
@@ -177,30 +173,21 @@ def get_current_user(
session: Session = Depends(get_session)):
access_token = request.cookies.get('access_token')
if not access_token:
raise HTTPException(
status_code=401,
detail=messages.Messages.notauthenticated
)
raise HTTPException(status_code=401,
detail=messages.Messages.notauthenticated)
payload = verify_token(access_token)
if not payload:
raise HTTPException(
status_code=401,
detail='aze'
)
raise HTTPException(status_code=401, detail='aze')
email = payload.get('email')
if not email:
raise HTTPException(
status_code=401,
detail=messages.Messages.notauthenticated
)
raise HTTPException(status_code=401,
detail=messages.Messages.notauthenticated)
user = session.exec(select(User).where(User.email == email)).first()
if not user:
raise HTTPException(
status_code=401,
detail=messages.Messages.not_found('user')
)
raise HTTPException(status_code=401,
detail=messages.Messages.not_found('user'))
return user
@@ -262,6 +249,6 @@ def me(user: UserPublic = Depends(get_current_user)):
'name': user.name,
'email': user.email,
'id': user.id,
'roles': user.roles
'roles': [role.name for role in user.roles]
}
}

View File

@@ -4,7 +4,9 @@ import io
import pathlib
import jinja2
from odfdo import Cell, Document, Row, Table
import odfdo
# from odfdo import Cell, Document, Row, Style, Table
from odfdo.element import Element
from src import models
from weasyprint import HTML
@@ -62,20 +64,181 @@ def generate_html_contract(
).write_pdf()
def flatten(xss):
return [x for xs in xss for x in xs]
def create_column_style_width(size: str) -> odfdo.Style:
"""Create a table columm style for a given width.
Paramenters:
size(str): size of the style (format <number><unit>) unit can be in, cm... see odfdo documentation.
Returns:
odfdo.Style with the correct column-width attribute.
"""
return odfdo.Element.from_tag(
'<style:style style:name="product-table.A" style:family="table-column">'
f'<style:table-column-properties style:column-width="{size}"/>'
'</style:style>'
)
def create_row_style_height(size: str) -> odfdo.Style:
"""Create a table height style for a given height.
Paramenters:
size(str): size of the style (format <number><unit>) unit can be in, cm... see odfdo documentation.
Returns:
odfdo.Style with the correct column-height attribute.
"""
return odfdo.Element.from_tag(
'<style:style style:name="product-table.A" style:family="table-row">'
f'<style:table-row-properties style:row-height="{size}"/>'
'</style:style>'
)
def create_center_cell_style(name: str = "centered-cell") -> odfdo.Style:
return odfdo.Element.from_tag(
f'<style:style style:name="{name}" style:family="table-cell">'
'<style:table-cell-properties style:vertical-align="middle" fo:wrap-option="wrap"/>'
'<style:paragraph-properties fo:text-align="center"/>'
'</style:style>'
)
def create_cell_style_with_font(name: str = "font", font_size="14pt", bold: bool = False) -> odfdo.Style:
return odfdo.Element.from_tag(
f'<style:style style:name="{name}" style:family="table-cell" '
f'xmlns:fo="urn:oasis:names:tc:opendocument:xmlns:xsl-fo-compatible:1.0">'
'<style:table-cell-properties style:vertical-align="middle" fo:wrap-option="wrap"/>'
f'<style:paragraph-properties fo:text-align="center" fo:font-size="{font_size}" '
f'{"fo:font-weight=\"bold\"" if bold else ""}/>'
'</style:style>'
)
def apply_center_cell_style(document: odfdo.Document, row: odfdo.Row):
style = document.insert_style(
create_center_cell_style()
)
for cell in row.get_cells():
cell.style = style
def apply_column_height_style(document: odfdo.Document, row: odfdo.Row, height: str):
style = document.insert_style(
style=create_row_style_height(height), name=height, automatic=True
)
row.style = style
def apply_font_style(document: odfdo.Document, table: odfdo.Table, size: str = "14pt"):
style_header = document.insert_style(
style=create_cell_style_with_font(
'header_font', font_size=size, bold=True
)
)
style_body = document.insert_style(
style=create_cell_style_with_font(
'body_font', font_size=size, bold=False
)
)
for position in range(table.height):
row = table.get_row(position)
for cell in row.get_cells():
cell.style = style_header if position == 0 or position == 1 else style_body
for paragraph in cell.get_paragraphs():
paragraph.style = cell.style
def apply_column_width_style(document: odfdo.Document, table: odfdo.Table, widths: list[str]):
"""Apply column width style to a table.
Parameters:
document(odfdo.Document): Document where the table is located.
table(odfdo.Table): Table to apply columns widths.
widths(list[str]): list of width in format <number><unit> unit ca be in, cm... see odfdo documentation.
"""
styles = []
for w in widths:
styles.append(document.insert_style(
style=create_column_style_width(w), name=w, automatic=True))
for position in range(table.width):
col = table.get_column(position)
col.style = styles[position]
table.set_column(position, col)
def generate_recap(
contracts: list[models.Contract],
form: models.Form,
):
recurrents = [pr.name for pr in form.productor.products if pr.type ==
models.ProductType.RECCURENT]
recurrents.sort()
occasionnals = [pr.name for pr in form.productor.products if pr.type ==
models.ProductType.OCCASIONAL]
occasionnals.sort()
shipments = form.shipments
occasionnals_header = [
occ for shipment in shipments for occ in occasionnals]
shipment_header = flatten(
[[f'{shipment.name} - {shipment.date.strftime('%Y-%m-%d')}'] + ["" * len(occasionnals)] for shipment in shipments])
product_unit_map = {
"1": "g",
"2": "kg",
"3": "p"
}
header = (
["Nom", "Email"] +
["Tarif panier", "Total Paniers", "Total à payer"] +
["Cheque 1", "Cheque 2", "Cheque 3"] +
[f"Total {len(shipments)} livraisons + produits occasionnels"] +
recurrents +
occasionnals_header +
["Remarques", "Nom"]
)
data = [
["nom", "email"],
[""] * (9 + len(recurrents)) + shipment_header,
header,
*[
[
f'{contract.firstname} {contract.lastname}',
f'{contract.email}',
*[f'{pr.quantity} {product_unit_map[pr.product.unit]}' for pr in sorted(
contract.products, key=lambda x: x.product.name) if pr.product.type == models.ProductType.RECCURENT],
*[f'{pr.quantity} {product_unit_map[pr.product.unit]}' for pr in sorted(
contract.products, key=lambda x: x.product.name) if pr.product.type == models.ProductType.OCCASIONAL],
"",
f'{contract.firstname} {contract.lastname}',
] for contract in contracts
]
doc = Document("spreadsheet")
sheet = Table(name="Recap")
]
doc = odfdo.Document("spreadsheet")
sheet = doc.body.get_sheet(0)
sheet.name = 'Recap'
sheet.set_values(data)
apply_column_width_style(doc, doc.body.get_table(0), ["4cm"] * len(header))
apply_column_height_style(
doc,
doc.body.get_table(0).get_rows((1, 1))[0],
"1.20cm"
)
apply_center_cell_style(doc, doc.body.get_table(0).get_rows((1, 1))[0])
apply_font_style(doc, doc.body.get_table(0))
index = 9 + len(recurrents)
for _ in enumerate(shipments):
startcol = index
endcol = index+len(occasionnals) - 1
sheet.set_span((startcol, 0, endcol, 0), merge=True)
index += len(occasionnals)
doc.body.append(sheet)
buffer = io.BytesIO()
doc.save(buffer)
doc.save('test.ods')
return buffer.getvalue()

View File

@@ -32,10 +32,7 @@ async def get_forms_filtered(
@router.get('/{_id}', response_model=models.FormPublic)
async def get_form(
_id: int,
session: Session = Depends(get_session)
):
async def get_form(_id: int, session: Session = Depends(get_session)):
result = service.get_one(session, _id)
if result is None:
raise HTTPException(
@@ -51,11 +48,6 @@ async def create_form(
user: models.User = Depends(get_current_user),
session: Session = Depends(get_session)
):
if not service.is_allowed(session, user, form=form):
raise HTTPException(
status_code=403,
detail=messages.Messages.not_allowed('forms', 'update')
)
try:
form = service.create_one(session, form)
except exceptions.ProductorNotFoundError as error:
@@ -69,16 +61,10 @@ async def create_form(
@router.put('/{_id}', response_model=models.FormPublic)
async def update_form(
_id: int,
form: models.FormUpdate,
_id: int, form: models.FormUpdate,
user: models.User = Depends(get_current_user),
session: Session = Depends(get_session)
):
if not service.is_allowed(session, user, _id=_id):
raise HTTPException(
status_code=403,
detail=messages.Messages.not_allowed('forms', 'update')
)
try:
result = service.update_one(session, _id, form)
except exceptions.FormNotFoundError as error:
@@ -96,11 +82,6 @@ async def delete_form(
user: models.User = Depends(get_current_user),
session: Session = Depends(get_session)
):
if not service.is_allowed(session, user, _id=_id):
raise HTTPException(
status_code=403,
detail=messages.Messages.not_allowed('forms', 'delete')
)
try:
result = service.delete_one(session, _id)
except exceptions.FormNotFoundError as error:

View File

@@ -108,25 +108,12 @@ def delete_one(session: Session, _id: int) -> models.FormPublic:
return result
def is_allowed(
session: Session,
user: models.User,
_id: int = None,
form: models.FormCreate = None
) -> bool:
if not _id:
statement = (
select(models.Productor)
.where(models.Productor.id == form.productor_id)
)
productor = session.exec(statement).first()
return productor.type in [r.name for r in user.roles]
def is_allowed(session: Session, user: models.User, _id: int) -> bool:
statement = (
select(models.Form)
.join(
models.Productor,
models.Form.productor_id == models.Productor.id
)
models.Form.productor_id == models.Productor.id)
.where(models.Form.id == _id)
.where(
models.Productor.type.in_(

View File

@@ -92,19 +92,3 @@ def delete_one(session: Session, id: int) -> models.ProductorPublic:
session.delete(productor)
session.commit()
return result
def is_allowed(
session: Session,
user: models.User,
_id: int,
productor: models.ProductorCreate
) -> bool:
if not _id:
return productor.type in [r.name for r in user.roles]
statement = (
select(models.Productor)
.where(models.Productor.id == _id)
.where(models.Productor.type.in_([r.name for r in user.roles]))
.distinct()
)
return len(session.exec(statement).all()) > 0

View File

@@ -85,32 +85,3 @@ def delete_one(session: Session, id: int) -> models.ProductPublic:
session.delete(product)
session.commit()
return result
def is_allowed(
session: Session,
user: models.User,
_id: int,
product: models.ProductCreate
) -> bool:
if not _id:
statement = (
select(models.Product)
.join(
models.Productor,
models.Product.productor_id == models.Productor.id
)
.where(models.Product.id == product.productor_id)
)
productor = session.exec(statement).first()
return productor.type in [r.name for r in user.roles]
statement = (
select(models.Product)
.join(
models.Productor,
models.Product.productor_id == models.Productor.id
)
.where(models.Product.id == _id)
.where(models.Productor.type.in_([r.name for r in user.roles]))
.distinct()
)
return len(session.exec(statement).all()) > 0

View File

@@ -56,9 +56,7 @@ def get_or_create_user(session: Session, user_create: models.UserCreate):
def get_roles(session: Session):
statement = (
select(models.ContractType)
)
statement = select(models.ContractType)
return session.exec(statement.order_by(models.ContractType.name)).all()
@@ -66,9 +64,7 @@ def create_one(session: Session, user: models.UserCreate) -> models.UserPublic:
if user is None:
raise exceptions.UserCreateError(
messages.Messages.invalid_input(
'user', 'input cannot be None'
)
)
'user', 'input cannot be None'))
new_user = models.User(
name=user.name,
email=user.email
@@ -85,19 +81,17 @@ def create_one(session: Session, user: models.UserCreate) -> models.UserPublic:
def update_one(
session: Session,
_id: int,
id: int,
user: models.UserCreate) -> models.UserPublic:
if user is None:
raise exceptions.UserCreateError(
messages.Messages.invalid_input(
'user', 'input cannot be None'
)
)
statement = select(models.User).where(models.User.id == _id)
messages.s.invalid_input(
'user', 'input cannot be None'))
statement = select(models.User).where(models.User.id == id)
result = session.exec(statement)
new_user = result.first()
if not new_user:
raise exceptions.UserNotFoundError(f'User {_id} not found')
raise exceptions.UserNotFoundError(f'User {id} not found')
new_user.email = user.email
new_user.name = user.name
@@ -109,12 +103,12 @@ def update_one(
return new_user
def delete_one(session: Session, _id: int) -> models.UserPublic:
statement = select(models.User).where(models.User.id == _id)
def delete_one(session: Session, id: int) -> models.UserPublic:
statement = select(models.User).where(models.User.id == id)
result = session.exec(statement)
user = result.first()
if not user:
raise exceptions.UserNotFoundError(f'User {_id} not found')
raise exceptions.UserNotFoundError(f'User {id} not found')
result = models.UserPublic.model_validate(user)
session.delete(user)
session.commit()

View File

@@ -32,18 +32,16 @@ def get_roles(
return service.get_roles(session)
@router.get('/{_id}', response_model=models.UserPublic)
def get_user(
_id: int,
@router.get('/{id}', response_model=models.UserPublic)
def get_users(
id: int,
user: models.User = Depends(get_current_user),
session: Session = Depends(get_session)
):
result = service.get_one(session, _id)
result = service.get_one(session, id)
if result is None:
raise HTTPException(
status_code=404,
detail=messages.Messages.not_found('user')
)
raise HTTPException(status_code=404,
detail=messages.Messages.not_found('user'))
return result
@@ -56,27 +54,22 @@ def create_user(
try:
user = service.create_one(session, user)
except exceptions.UserCreateError as error:
raise HTTPException(
status_code=400,
detail=str(error)
) from error
raise HTTPException(status_code=400, detail=str(error))
return user
@router.put('/{_id}', response_model=models.UserPublic)
@router.put('/{id}', response_model=models.UserPublic)
def update_user(
_id: int,
id: int,
user: models.UserUpdate,
logged_user: models.User = Depends(get_current_user),
session: Session = Depends(get_session)
):
try:
result = service.update_one(session, _id, user)
result = service.update_one(session, id, user)
except exceptions.UserNotFoundError as error:
raise HTTPException(
status_code=404,
detail=messages.Messages.not_found('user')
) from error
raise HTTPException(status_code=404,
detail=messages.Messages.not_found('user'))
return result
@@ -89,8 +82,6 @@ def delete_user(
try:
result = service.delete_one(session, id)
except exceptions.UserNotFoundError as error:
raise HTTPException(
status_code=404,
detail=messages.Messages.not_found('user')
) from error
raise HTTPException(status_code=404,
detail=messages.Messages.not_found('user'))
return result

View File

@@ -19,7 +19,7 @@ import {
type ProductorInputs,
} from "@/services/resources/productors";
import { useMemo } from "react";
import { useAuth } from "@/services/auth/AuthProvider";
import { useGetRoles } from "@/services/api";
export type ProductorModalProps = ModalBaseProps & {
currentProductor?: Productor;
@@ -32,7 +32,7 @@ export function ProductorModal({
currentProductor,
handleSubmit,
}: ProductorModalProps) {
const { loggedUser } = useAuth();
const { data: allRoles } = useGetRoles();
const form = useForm<ProductorInputs>({
initialValues: {
@@ -58,8 +58,8 @@ export function ProductorModal({
});
const roleSelect = useMemo(() => {
return loggedUser?.user?.roles?.map((role) => ({ value: String(role.name), label: role.name }));
}, [loggedUser?.user?.roles]);
return allRoles?.map((role) => ({ value: String(role.name), label: role.name }));
}, [allRoles]);
return (
<Modal opened={opened} onClose={onClose} title={t("create productor", { capfirst: true })}>

View File

@@ -89,7 +89,7 @@ export default function Contracts() {
label={t("download recap", { capfirst: true })}
>
<ActionIcon
disabled={true}
disabled={false}
onClick={(e) => {
e.stopPropagation();
navigate(