add contract storage fix various bugs and translations
This commit is contained in:
@@ -1,65 +1,16 @@
|
||||
from fastapi import APIRouter, Depends
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from fastapi.responses import StreamingResponse
|
||||
from src.database import get_session
|
||||
from sqlmodel import Session
|
||||
import src.forms.service as form_service
|
||||
import src.shipments.service as shipment_service
|
||||
import src.products.service as product_service
|
||||
from src.contracts.generate_contract import generate_html_contract
|
||||
import src.models as models
|
||||
from src.messages import PDFerrorOccured
|
||||
import src.contracts.service as service
|
||||
|
||||
import io
|
||||
|
||||
router = APIRouter(prefix='/contracts')
|
||||
|
||||
def find_dict_in_list(lst, key, value):
|
||||
for i, dic in enumerate(lst):
|
||||
if dic[key].id == value:
|
||||
return i
|
||||
return -1
|
||||
|
||||
def extract_products(session: Session, contract: dict):
|
||||
planned = []
|
||||
recurrent = []
|
||||
for key in contract.keys():
|
||||
key_list = key.split("-")
|
||||
if "planned" in key:
|
||||
shipment_id = int(key_list[1])
|
||||
product_id = int(key_list[2])
|
||||
shipment = shipment_service.get_one(session, shipment_id)
|
||||
product = product_service.get_one(session, product_id)
|
||||
|
||||
existing_id = find_dict_in_list(planned, "shipment", shipment_id)
|
||||
if existing_id >= 0:
|
||||
planned[existing_id]["products"].append({
|
||||
"product": product,
|
||||
"quantity": contract[key],
|
||||
})
|
||||
planned[existing_id]['price'] += compute_product_price(product, contract[key])
|
||||
else:
|
||||
planned.append({
|
||||
"shipment": shipment,
|
||||
"price": compute_product_price(product, contract[key]),
|
||||
"products": [{
|
||||
"product": product,
|
||||
"quantity": contract[key],
|
||||
}]
|
||||
})
|
||||
if "recurrent" in key:
|
||||
product_id = int(key_list[1])
|
||||
product = product_service.get_one(session, product_id)
|
||||
recurrent.append({
|
||||
"product": product,
|
||||
"quantity": contract[key]
|
||||
})
|
||||
return planned, recurrent
|
||||
|
||||
def compute_product_price(product: models.Product, quantity: int, nb_shipment: int = 1):
|
||||
product_quantity_unit = 1 if product.unit == models.Unit.KILO else 1000
|
||||
final_quantity = quantity if product.price else quantity / product_quantity_unit
|
||||
final_price = product.price if product.price else product.price_kg
|
||||
return final_price * final_quantity * nb_shipment
|
||||
|
||||
def compute_recurrent_prices(products_quantities: list[dict], nb_shipment: int):
|
||||
result = 0
|
||||
for product_quantity in products_quantities:
|
||||
@@ -68,41 +19,106 @@ def compute_recurrent_prices(products_quantities: list[dict], nb_shipment: int):
|
||||
result += compute_product_price(product, quantity, nb_shipment)
|
||||
return result
|
||||
|
||||
def compute_planned_prices(planned: list[dict]):
|
||||
def compute_planned_prices(planneds: list[dict]):
|
||||
result = 0
|
||||
for plan in planned:
|
||||
result += plan['price']
|
||||
for planned in planneds:
|
||||
result += planned['price']
|
||||
return result
|
||||
|
||||
def compute_product_price(product: models.Product, quantity: int, nb_shipment: int = 1):
|
||||
product_quantity_unit = 1 if product.unit == models.Unit.KILO else 1000
|
||||
final_quantity = quantity if product.price else quantity / product_quantity_unit
|
||||
final_price = product.price if product.price else product.price_kg
|
||||
return final_price * final_quantity * nb_shipment
|
||||
|
||||
def find_dict_in_list(lst, key, value):
|
||||
for i, dic in enumerate(lst):
|
||||
if dic[key].id == value:
|
||||
return i
|
||||
return -1
|
||||
|
||||
def create_planned_dict(contract_products: list[models.ContractProduct]):
|
||||
result = []
|
||||
for contract_product in contract_products:
|
||||
existing_id = find_dict_in_list(
|
||||
result,
|
||||
'shipment',
|
||||
contract_product.shipment.id
|
||||
)
|
||||
if existing_id < 0:
|
||||
result.append({
|
||||
'shipment': contract_product.shipment,
|
||||
'price': compute_product_price(
|
||||
contract_product.product,
|
||||
contract_product.quantity
|
||||
),
|
||||
'products': [{
|
||||
'product': contract_product.product,
|
||||
'quantity': contract_product.quantity
|
||||
}]
|
||||
})
|
||||
else:
|
||||
result[existing_id]['products'].append({
|
||||
'product': contract_product.product,
|
||||
'quantity': contract_product.quantity
|
||||
})
|
||||
result[existing_id]['price'] += compute_product_price(
|
||||
contract_product.product,
|
||||
contract_product.quantity
|
||||
)
|
||||
return result
|
||||
|
||||
@router.post('/')
|
||||
async def create_contract(
|
||||
contract: models.ContractBase,
|
||||
contract: models.ContractCreate,
|
||||
session: Session = Depends(get_session)
|
||||
):
|
||||
form = form_service.get_one(session, contract.form_id)
|
||||
planned, recurrent = extract_products(session, contract.contract)
|
||||
recurrent_price = compute_recurrent_prices(recurrent, len(form.shipments))
|
||||
total_price = '{:10.2f}'.format(recurrent_price + compute_planned_prices(planned))
|
||||
# TODO: Store contract
|
||||
new_contract = service.create_one(session, contract)
|
||||
planned_contract_products = list(filter(lambda contract_product: contract_product.product.type == models.ProductType.PLANNED, new_contract.products))
|
||||
planneds = create_planned_dict(planned_contract_products)
|
||||
recurrents = list(map(lambda x: {"product": x.product, "quantity": x.quantity}, filter(lambda contract_product: contract_product.product.type == models.ProductType.RECCURENT, new_contract.products)))
|
||||
recurrent_price = compute_recurrent_prices(recurrents, len(new_contract.form.shipments))
|
||||
total_price = '{:10.2f}'.format(recurrent_price + compute_planned_prices(planneds))
|
||||
cheques = list(map(lambda x: {"name": x.name, "value": x.value}, new_contract.cheques))
|
||||
# TODO: send contract to referer
|
||||
# TODO: Store contract informations ?
|
||||
try:
|
||||
pdf_bytes = generate_html_contract(
|
||||
form,
|
||||
contract.contract,
|
||||
planned,
|
||||
recurrent,
|
||||
'{:10.2f}'.format(recurrent_price),
|
||||
new_contract,
|
||||
cheques,
|
||||
planneds,
|
||||
recurrents,
|
||||
recurrent_price,
|
||||
total_price
|
||||
)
|
||||
pdf_file = io.BytesIO(pdf_bytes)
|
||||
contract_id = f'{contract.contract['firstname']}_{contract.contract['lastname']}_{form.productor.type}_{form.season}'
|
||||
contract_id = f'{new_contract.firstname}_{new_contract.lastname}_{new_contract.form.productor.type}_{new_contract.form.season}'
|
||||
except:
|
||||
raise HTTPException(status_code=400, detail=PDFerrorOccured)
|
||||
return StreamingResponse(
|
||||
pdf_file,
|
||||
media_type="application/pdf",
|
||||
media_type='application/pdf',
|
||||
headers={
|
||||
"Content-Disposition": f"attachement; filename=contract_{contract_id}.pdf"
|
||||
'Content-Disposition': f'attachement; filename=contract_{contract_id}.pdf'
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
@router.get('/', response_model=list[models.ContractPublic])
|
||||
def get_contracts(
|
||||
forms: list[str] = Query([]),
|
||||
session: Session = Depends(get_session)
|
||||
):
|
||||
return service.get_all(session, forms)
|
||||
|
||||
@router.get('/{id}', response_model=models.ContractPublic)
|
||||
def get_contract(id: int, session: Session = Depends(get_session)):
|
||||
result = service.get_one(session, id)
|
||||
if result is None:
|
||||
raise HTTPException(status_code=404, detail=messages.notfound)
|
||||
return result
|
||||
|
||||
@router.delete('/{id}', response_model=models.ContractPublic)
|
||||
def delete_contract(id: int, session: Session = Depends(get_session)):
|
||||
result = service.delete_one(session, id)
|
||||
if result is None:
|
||||
raise HTTPException(status_code=404, detail=messages.notfound)
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user