Files
amap/backend/src/forms/service.py

139 lines
4.7 KiB
Python

import src.forms.exceptions as exceptions
import src.messages as messages
from sqlalchemy import func
from sqlmodel import Session, select
from src import models
def get_all(
session: Session,
seasons: list[str],
productors: list[str],
current_season: bool,
user: models.User = None
) -> list[models.FormPublic]:
statement = select(models.Form)
if user:
statement = statement .join(
models.Productor,
models.Form.productor_id == models.Productor.id) .where(
models.Productor.type.in_(
[
r.name for r in user.roles])) .distinct()
if len(seasons) > 0:
statement = statement.where(models.Form.season.in_(seasons))
if len(productors) > 0:
statement = statement.join(
models.Productor).where(
models.Productor.name.in_(productors))
if not user:
statement = statement.where(models.Form.visible)
if current_season:
subquery = (
select(
models.Productor.type,
func.max(models.Form.start).label("max_start")
)
.join(models.Form)
.group_by(models.Productor.type)
.subquery()
)
statement = select(models.Form)\
.join(models.Productor)\
.join(subquery,
(models.Productor.type == subquery.c.type) &
(models.Form.start == subquery.c.max_start)
)
if not user:
statement = statement.where(models.Form.visible)
return session.exec(statement.order_by(models.Form.name)).all()
return session.exec(statement.order_by(models.Form.name)).all()
def get_one(session: Session, form_id: int) -> models.FormPublic:
return session.get(models.Form, form_id)
def create_one(session: Session, form: models.FormCreate) -> models.FormPublic:
if not form:
raise exceptions.FormCreateError(
messages.Messages.invalid_input(
'form', 'input cannot be None'))
if not session.get(models.Productor, form.productor_id):
raise exceptions.ProductorNotFoundError(
messages.Messages.not_found('productor'))
if not session.get(models.User, form.referer_id):
raise exceptions.UserNotFoundError(messages.Messages.not_found('user'))
form_create = form.model_dump(exclude_unset=True)
new_form = models.Form(**form_create)
session.add(new_form)
session.commit()
session.refresh(new_form)
return new_form
def update_one(
session: Session,
_id: int,
form: models.FormUpdate) -> models.FormPublic:
statement = select(models.Form).where(models.Form.id == _id)
result = session.exec(statement)
new_form = result.first()
if not new_form:
raise exceptions.FormNotFoundError(messages.Messages.not_found('form'))
if form.productor_id and not session.get(
models.Productor, form.productor_id):
raise exceptions.ProductorNotFoundError(
messages.Messages.not_found('productor'))
if form.referer_id and not session.get(models.User, form.referer_id):
raise exceptions.UserNotFoundError(messages.Messages.not_found('user'))
form_updates = form.model_dump(exclude_unset=True)
for key, value in form_updates.items():
setattr(new_form, key, value)
session.add(new_form)
session.commit()
session.refresh(new_form)
return new_form
def delete_one(session: Session, _id: int) -> models.FormPublic:
statement = select(models.Form).where(models.Form.id == _id)
result = session.exec(statement)
form = result.first()
if not form:
raise exceptions.FormNotFoundError(messages.Messages.not_found('form'))
result = models.FormPublic.model_validate(form)
session.delete(form)
session.commit()
return result
def is_allowed(
session: Session,
user: models.User,
_id: int = None,
form: models.FormCreate = None
) -> bool:
if not _id:
statement = (
select(models.Productor)
.where(models.Productor.id == form.productor_id)
)
productor = session.exec(statement).first()
return productor.type in [r.name for r in user.roles]
statement = (
select(models.Form)
.join(
models.Productor,
models.Form.productor_id == models.Productor.id
)
.where(models.Form.id == _id)
.where(
models.Productor.type.in_(
[r.name for r in user.roles]
)
)
.distinct()
)
return len(session.exec(statement).all()) > 0