Files
amap/backend/src/forms/service.py
2026-02-18 18:08:30 +01:00

77 lines
2.8 KiB
Python

from sqlmodel import Session, select
import src.models as models
from sqlalchemy import func
def get_all(
session: Session,
seasons: list[str],
productors: list[str],
current_season: bool,
) -> list[models.FormPublic]:
statement = select(models.Form)
if len(seasons) > 0:
statement = statement.where(models.Form.season.in_(seasons))
if len(productors) > 0:
statement = statement.join(models.Productor).where(models.Productor.name.in_(productors))
if current_season:
subquery = (
select(
models.Productor.type,
func.max(models.Form.start).label("max_start")
)
.join(models.Form)\
.group_by(models.Productor.type)\
.subquery()
)
statement = select(models.Form)\
.join(models.Productor)\
.join(subquery,
(models.Productor.type == subquery.c.type) &
(models.Form.start == subquery.c.max_start)
)
return session.exec(statement.order_by(models.Form.name)).all()
return session.exec(statement.order_by(models.Form.name)).all()
def get_one(session: Session, form_id: int) -> models.FormPublic:
return session.get(models.Form, form_id)
def create_one(session: Session, form: models.FormCreate) -> models.FormPublic:
form_create = form.model_dump(exclude_unset=True)
new_form = models.Form(**form_create)
session.add(new_form)
session.commit()
session.refresh(new_form)
return new_form
def update_one(session: Session, id: int, form: models.FormUpdate) -> models.FormPublic:
statement = select(models.Form).where(models.Form.id == id)
result = session.exec(statement)
new_form = result.first()
if not new_form:
return None
form_updates = form.model_dump(exclude_unset=True)
for key, value in form_updates.items():
setattr(new_form, key, value)
session.add(new_form)
session.commit()
session.refresh(new_form)
return new_form
def delete_one(session: Session, id: int) -> models.FormPublic:
statement = select(models.Form).where(models.Form.id == id)
result = session.exec(statement)
form = result.first()
if not form:
return None
result = models.FormPublic.model_validate(form)
session.delete(form)
session.commit()
return result
def is_allowed(session: Session, user: models.User, id: int) -> bool:
statement = select(models.Form)\
.join(models.Productor, models.Form.productor_id == models.Productor.id)\
.where(models.Form.id == id)\
.where(models.Productor.type.in_([r.name for r in user.roles]))\
.distinct()
return len(session.exec(statement).all()) > 0