feat: endpoint pratiche rendicontazione (lato beneficiario)

- 4 nuove tabelle: remission_practice, remission_invoice, remission_ula_employee, remission_document
  con cascade delete e FK
- 13 endpoint /api/remission-practices/*:
  GET /mine (lista pratiche user + applications CONTRACT_SIGNED ready_to_start)
  POST /start (avvia pratica da application_id, richiede schema PUBLISHED)
  GET /{id}, PUT /{id} (regime IVA + note)
  POST/DELETE /{id}/invoices
  POST/DELETE /{id}/ula-employees
  PUT/DELETE /{id}/documents/{doc_code}
  GET /{id}/gate-check (valida gate rules contro pratica, ritorna totali + checks)
  POST /{id}/submit (gate-check obbligatorio, status DRAFT -> SUBMITTED)
- 1 endpoint debug /api/debug/impersonate (sandbox-only, genera JWT per utente
  - necessario perche' /v1/user/login del BE Spring esclude ROLE_BENEFICIARY)
- Gate check calcola: totali per categoria, grand_total, max_remission = min(cap_pct*erogato, cap_abs),
  remission_due = min(grand_total, max_remission), applica iva_ordinario_imponibile_only
This commit is contained in:
BFLOWS
2026-04-18 09:51:06 +02:00
parent 63fd2f66e6
commit e217f15e5a
5 changed files with 728 additions and 5 deletions

View File

@@ -14,7 +14,7 @@ from sqlalchemy import text
from .config import get_settings
from .db import engine, Base
from .routers import health, schemas
from .routers import health, schemas, practices, debug
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
log = logging.getLogger("rendicontazione-api")
@@ -55,6 +55,8 @@ app.add_middleware(
app.include_router(health.router, tags=["health"])
app.include_router(schemas.router)
app.include_router(practices.router)
app.include_router(debug.router)
@app.get("/", tags=["root"])

View File

@@ -4,9 +4,10 @@ Schema: gepafin_rendic (stesso DB del BE Gepafin sandbox).
"""
import uuid
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, UniqueConstraint
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, UniqueConstraint, Numeric, Boolean, Date
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from .db import Base
@@ -32,3 +33,111 @@ class CallRemissionSchema(Base):
updated_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now())
published_at = Column(DateTime(timezone=True), nullable=True)
published_by = Column(Integer, nullable=True)
class RemissionPractice(Base):
"""
Pratica di rendicontazione di un beneficiario per una specifica application in CONTRACT_SIGNED.
Uno schema_snapshot congelato alla creazione: se il superadmin modifica lo schema
del bando dopo, la pratica continua a usare la versione snapshot.
"""
__tablename__ = "remission_practice"
__table_args__ = (
UniqueConstraint("application_id", name="uq_remission_practice_application"),
{"schema": "gepafin_rendic"},
)
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
call_id = Column(Integer, nullable=False)
application_id = Column(Integer, nullable=False, unique=True)
company_id = Column(Integer, nullable=False)
user_id = Column(Integer, nullable=False) # beneficiario che compila
status = Column(String(32), nullable=False, default="DRAFT")
# DRAFT -> SUBMITTED -> UNDER_REVIEW -> APPROVED | REJECTED | AWAITING_AMENDMENT
schema_snapshot = Column(JSONB, nullable=False) # copia schema al momento start
iva_regime = Column(String(32), nullable=True) # ORDINARIO | FORFETTARIO | ESENTE
amount_erogato = Column(Numeric(14, 2), nullable=False) # copiato da application.amount_accepted
notes_beneficiario = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
updated_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now())
submitted_at = Column(DateTime(timezone=True), nullable=True)
# relazioni
invoices = relationship("RemissionInvoice", back_populates="practice", cascade="all, delete-orphan")
ula_employees = relationship("RemissionUlaEmployee", back_populates="practice", cascade="all, delete-orphan")
documents = relationship("RemissionDocument", back_populates="practice", cascade="all, delete-orphan")
class RemissionInvoice(Base):
"""Fattura rendicontata dentro una pratica, assegnata a una categoria."""
__tablename__ = "remission_invoice"
__table_args__ = ({"schema": "gepafin_rendic"},)
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
practice_id = Column(UUID(as_uuid=True),
ForeignKey("gepafin_rendic.remission_practice.id", ondelete="CASCADE"),
nullable=False)
category_code = Column(String(16), nullable=False) # B1 / B2 / B3 / custom
invoice_number = Column(String(128), nullable=False)
invoice_date = Column(Date, nullable=False)
payment_date = Column(Date, nullable=False)
supplier_name = Column(String(255), nullable=False)
supplier_vat = Column(String(32), nullable=False)
description = Column(Text, nullable=False)
taxable = Column(Numeric(14, 2), nullable=False) # imponibile
vat = Column(Numeric(14, 2), nullable=False, default=0)
total = Column(Numeric(14, 2), nullable=False)
pdf_filename = Column(String(512), nullable=True) # per ora solo nome, upload vero dopo
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
practice = relationship("RemissionPractice", back_populates="invoices")
class RemissionUlaEmployee(Base):
"""Dipendente conteggiato nel calcolo ULA."""
__tablename__ = "remission_ula_employee"
__table_args__ = ({"schema": "gepafin_rendic"},)
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
practice_id = Column(UUID(as_uuid=True),
ForeignKey("gepafin_rendic.remission_practice.id", ondelete="CASCADE"),
nullable=False)
codice_fiscale = Column(String(16), nullable=False)
full_name = Column(String(255), nullable=False)
contract_type = Column(String(64), nullable=False) # T_IND / T_DET / APPR / ...
role_description = Column(String(255), nullable=True)
fte_pct = Column(Numeric(5, 4), nullable=False, default=1) # 0..1
period_start_date = Column(Date, nullable=False)
period_end_date = Column(Date, nullable=False)
supporting_doc_type = Column(String(64), nullable=True)
supporting_doc_filename = Column(String(512), nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
practice = relationship("RemissionPractice", back_populates="ula_employees")
class RemissionDocument(Base):
"""Documento associato alla pratica (DURC, visura, ecc.)."""
__tablename__ = "remission_document"
__table_args__ = ({"schema": "gepafin_rendic"},)
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
practice_id = Column(UUID(as_uuid=True),
ForeignKey("gepafin_rendic.remission_practice.id", ondelete="CASCADE"),
nullable=False)
doc_code = Column(String(64), nullable=False) # DURC / VISURA_CAMERALE / ...
filename = Column(String(512), nullable=True)
uploaded_at = Column(DateTime(timezone=True), nullable=True)
expires_at = Column(Date, nullable=True)
notes = Column(Text, nullable=True)
practice = relationship("RemissionPractice", back_populates="documents")

67
app/routers/debug.py Normal file
View File

@@ -0,0 +1,67 @@
"""
Endpoint di debug SOLO per sandbox. Permette a un superadmin di impersonare
un utente (tipicamente un beneficiario) per testare i flussi senza SPID.
Non deve essere mai abilitato in produzione.
"""
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import text
from jose import jwt
from ..db import get_db
from ..config import get_settings
from ..auth import AuthUser, require_superadmin
from ..schemas import ApiResponse
router = APIRouter(prefix="/api/debug", tags=["debug"])
settings = get_settings()
class ImpersonateRequest(BaseModel):
email: str
@router.post("/impersonate", response_model=ApiResponse)
def impersonate(body: ImpersonateRequest,
db: Session = Depends(get_db),
admin: AuthUser = Depends(require_superadmin)):
"""
Genera un token JWT valido per l'email indicata. Solo sandbox dev,
richiede ruolo superadmin per chiamarlo.
"""
row = db.execute(text("""
SELECT u.id, u.email, u.first_name, u.last_name, u.hub_id,
r.role_type
FROM gepafin_schema.gepafin_user u
JOIN gepafin_schema.role r ON r.id = u.role_id
WHERE u.email = :email AND u.is_deleted = false
"""), {"email": body.email}).mappings().first()
if not row:
raise HTTPException(status_code=404, detail=f"Utente {body.email} non trovato")
# Genero token con lo stesso formato del BE Spring
exp = datetime.now(timezone.utc) + timedelta(hours=8)
payload = {
"sub": f"{row['email']}:{row['id']}:{row['hub_id']}",
"userId": row["id"],
"auth": row["role_type"],
"exp": int(exp.timestamp()),
}
token = jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
return ApiResponse(
message=f"Token generato per {body.email}",
data={
"token": token,
"user": {
"id": row["id"],
"email": row["email"],
"firstName": row["first_name"],
"lastName": row["last_name"],
"role": {"roleType": row["role_type"]}
}
}
)

412
app/routers/practices.py Normal file
View File

@@ -0,0 +1,412 @@
"""
Endpoint pratiche di rendicontazione (lato beneficiario).
"""
import copy
from datetime import datetime, timezone
from decimal import Decimal
from typing import List
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..db import get_db
from ..auth import AuthUser, get_current_user
from ..models import (
CallRemissionSchema, RemissionPractice,
RemissionInvoice, RemissionUlaEmployee, RemissionDocument
)
from ..schemas import (
PracticeStartRequest, PracticeUpdate, PracticeOut, PracticeListItem,
InvoiceCreate, InvoiceOut,
UlaEmployeeCreate, UlaEmployeeOut,
DocumentUpsert, DocumentOut,
GateCheckResult,
ApiResponse
)
router = APIRouter(prefix="/api/remission-practices", tags=["remission-practices"])
# ---------- helpers ----------
def _get_practice_or_404(db: Session, practice_id: UUID, user: AuthUser) -> RemissionPractice:
"""Recupera la pratica validando ownership beneficiario (o admin)."""
practice = db.query(RemissionPractice).filter(RemissionPractice.id == practice_id).first()
if not practice:
raise HTTPException(status_code=404, detail=f"Pratica {practice_id} non trovata")
# Solo il beneficiario owner o un superadmin può accedere
if user.is_beneficiary() and practice.user_id != user.user_id:
raise HTTPException(status_code=403, detail="Accesso negato a questa pratica")
return practice
def _ensure_editable(practice: RemissionPractice):
if practice.status != "DRAFT":
raise HTTPException(
status_code=409,
detail=f"Pratica in stato {practice.status}: non modificabile"
)
def _compute_gate_check(practice: RemissionPractice) -> GateCheckResult:
"""Valuta le gate_rules dello schema snapshot contro il contenuto della pratica."""
rules = practice.schema_snapshot.get("gate_rules", {}) or {}
sections = practice.schema_snapshot.get("sections", []) or []
# totali per categoria
per_category = {}
grand_total = Decimal("0")
iva_ordinario_only_taxable = rules.get("iva_ordinario_imponibile_only", True)
use_taxable_only = (practice.iva_regime == "ORDINARIO" and iva_ordinario_only_taxable)
for inv in practice.invoices:
amt = inv.taxable if use_taxable_only else inv.total
per_category[inv.category_code] = per_category.get(inv.category_code, Decimal("0")) + amt
grand_total += amt
# cap remissione
amt_erogato = practice.amount_erogato
cap_pct = Decimal(str(rules.get("cap_pct_erogato", 0.5)))
cap_abs = Decimal(str(rules.get("cap_absolute", 12500)))
max_remission = min(cap_pct * amt_erogato, cap_abs)
checks = []
# Check 1: regime IVA scelto
if not practice.iva_regime:
checks.append({"id": "iva_selected", "label": "Regime IVA dichiarato",
"passed": False, "detail": "Seleziona il regime IVA"})
else:
checks.append({"id": "iva_selected", "label": "Regime IVA dichiarato",
"passed": True, "detail": practice.iva_regime})
# Check 2: importo totale > 0
checks.append({
"id": "has_invoices",
"label": "Almeno una fattura caricata",
"passed": len(practice.invoices) > 0,
"detail": f"{len(practice.invoices)} fatture per un totale di {grand_total} EUR"
})
# Check 3: una fattura per ogni categoria (se richiesto)
if rules.get("require_at_least_one_invoice_per_nonzero_category", False):
expenses = next((s for s in sections if s.get("type") == "category_grid"), {})
cats = expenses.get("categories", []) or []
missing = [c["code"] for c in cats if c.get("code") and c["code"] not in per_category]
checks.append({
"id": "invoice_per_category",
"label": "Fattura per ogni categoria",
"passed": len(missing) == 0,
"detail": f"Categorie senza fatture: {', '.join(missing)}" if missing else "Tutte le categorie coperte"
})
# Check 4: ULA (se richiesto)
ula_section = next((s for s in sections if s.get("type") == "ula_block"), {})
if ula_section.get("enabled") and rules.get("require_ula_above_threshold", False):
threshold = Decimal(str(ula_section.get("threshold", 1.0)))
total_fte = sum((e.fte_pct for e in practice.ula_employees), Decimal("0"))
checks.append({
"id": "ula_threshold",
"label": f"Incremento ULA >= {threshold}",
"passed": total_fte >= threshold,
"detail": f"Totale FTE: {total_fte} (soglia: {threshold})"
})
# Check 5: documenti richiesti (se richiesto)
if rules.get("require_all_documents_resolved", False):
docs_section = next((s for s in sections if s.get("type") == "document_checklist"), {})
required = docs_section.get("required_types", []) or []
uploaded_codes = {d.doc_code for d in practice.documents if d.filename}
required_codes = [r["code"] if isinstance(r, dict) else r for r in required]
missing_docs = [c for c in required_codes if c not in uploaded_codes]
checks.append({
"id": "docs_resolved",
"label": "Tutti i documenti richiesti caricati",
"passed": len(missing_docs) == 0,
"detail": f"Mancanti: {', '.join(missing_docs)}" if missing_docs else "Tutti presenti"
})
# Check 6: importo range (cap erogato)
amt_range = rules.get("amount_range", {})
min_e = Decimal(str(amt_range.get("min", 0)))
max_e = Decimal(str(amt_range.get("max", 999999999)))
checks.append({
"id": "erogato_in_range",
"label": f"Importo erogato entro range ({min_e}-{max_e})",
"passed": min_e <= amt_erogato <= max_e,
"detail": f"Erogato: {amt_erogato} EUR"
})
remission_due = min(grand_total, max_remission)
return GateCheckResult(
passed=all(c["passed"] for c in checks),
checks=checks,
totals={
"per_category": {k: float(v) for k, v in per_category.items()},
"grand_total": float(grand_total),
"max_remission": float(max_remission),
"remission_due": float(remission_due),
"amount_erogato": float(amt_erogato)
}
)
def _enrich_list_item(db: Session, p: RemissionPractice) -> PracticeListItem:
# Nome call e company dal DB Gepafin
q = db.execute(text("""
SELECT c.name as call_name, comp.company_name as company_name
FROM gepafin_schema.call c
JOIN gepafin_schema.company comp ON comp.id = :cid
WHERE c.id = :call_id
"""), {"call_id": p.call_id, "cid": p.company_id}).first()
item = PracticeListItem.model_validate(p)
if q:
item.call_name = q[0]
item.company_name = q[1]
item.invoice_count = len(p.invoices)
item.ula_count = len(p.ula_employees)
item.document_count = len([d for d in p.documents if d.filename])
return item
# ---------- endpoints ----------
@router.get("/mine", response_model=ApiResponse)
def list_my_practices(db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
"""Lista pratiche del beneficiario corrente + applications CONTRACT_SIGNED pronte per start."""
# pratiche esistenti
practices = db.query(RemissionPractice).filter(RemissionPractice.user_id == user.user_id).all()
existing_app_ids = {p.application_id for p in practices}
# applications CONTRACT_SIGNED del beneficiario che non hanno ancora una pratica
rows = db.execute(text("""
SELECT a.id as application_id, a.call_id, a.company_id, a.amount_accepted,
a.status, c.name as call_name, comp.company_name as company_name
FROM gepafin_schema.application a
JOIN gepafin_schema.call c ON c.id = a.call_id
LEFT JOIN gepafin_schema.company comp ON comp.id = a.company_id
WHERE a.user_id = :uid AND a.status = 'CONTRACT_SIGNED' AND a.is_deleted = false
"""), {"uid": user.user_id}).mappings().all()
pending = []
for r in rows:
if r["application_id"] not in existing_app_ids:
pending.append({
"application_id": r["application_id"],
"call_id": r["call_id"],
"company_id": r["company_id"],
"amount_erogato": float(r["amount_accepted"] or 0),
"call_name": r["call_name"],
"company_name": r["company_name"],
"status": "NOT_STARTED"
})
return ApiResponse(data={
"practices": [_enrich_list_item(db, p).model_dump(mode="json") for p in practices],
"ready_to_start": pending
})
@router.post("/start", response_model=ApiResponse)
def start_practice(body: PracticeStartRequest, db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user)):
"""Avvia una pratica di rendicontazione per una application CONTRACT_SIGNED."""
# Verifica application
app_row = db.execute(text("""
SELECT id, call_id, company_id, user_id, status, amount_accepted
FROM gepafin_schema.application
WHERE id = :aid AND is_deleted = false
"""), {"aid": body.application_id}).mappings().first()
if not app_row:
raise HTTPException(status_code=404, detail=f"Application {body.application_id} non trovata")
if app_row["status"] != "CONTRACT_SIGNED":
raise HTTPException(status_code=409,
detail=f"Application in stato {app_row['status']}, richiesto CONTRACT_SIGNED")
if user.is_beneficiary() and app_row["user_id"] != user.user_id:
raise HTTPException(status_code=403, detail="Application non di tua proprietà")
# Schema del bando: richiede PUBLISHED (o DRAFT se superadmin per test)
schema = db.query(CallRemissionSchema).filter(CallRemissionSchema.call_id == app_row["call_id"]).first()
if not schema:
raise HTTPException(status_code=409,
detail="Nessuno schema di rendicontazione configurato per questo bando. "
"Contatta l'ente gestore.")
if schema.status != "PUBLISHED" and user.is_beneficiary():
raise HTTPException(status_code=409,
detail="Lo schema di rendicontazione non è ancora stato pubblicato.")
# Pratica esistente?
exists = db.query(RemissionPractice).filter(RemissionPractice.application_id == body.application_id).first()
if exists:
raise HTTPException(status_code=409, detail="Pratica già esistente")
practice = RemissionPractice(
call_id=app_row["call_id"],
application_id=body.application_id,
company_id=app_row["company_id"],
user_id=app_row["user_id"],
status="DRAFT",
schema_snapshot=copy.deepcopy(schema.schema_json),
amount_erogato=app_row["amount_accepted"] or Decimal("0"),
)
db.add(practice)
db.commit()
db.refresh(practice)
return ApiResponse(message="Pratica avviata",
data=PracticeOut.model_validate(practice).model_dump(mode="json"))
@router.get("/{practice_id}", response_model=ApiResponse)
def get_practice(practice_id: UUID, db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
return ApiResponse(data=PracticeOut.model_validate(p).model_dump(mode="json"))
@router.put("/{practice_id}", response_model=ApiResponse)
def update_practice(practice_id: UUID, body: PracticeUpdate,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
if body.iva_regime is not None:
p.iva_regime = body.iva_regime
if body.notes_beneficiario is not None:
p.notes_beneficiario = body.notes_beneficiario
db.commit()
db.refresh(p)
return ApiResponse(message="Pratica aggiornata",
data=PracticeOut.model_validate(p).model_dump(mode="json"))
# ---------- Invoices ----------
@router.post("/{practice_id}/invoices", response_model=ApiResponse)
def add_invoice(practice_id: UUID, body: InvoiceCreate,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
inv = RemissionInvoice(practice_id=p.id, **body.model_dump())
db.add(inv)
db.commit()
db.refresh(inv)
return ApiResponse(message="Fattura aggiunta",
data=InvoiceOut.model_validate(inv).model_dump(mode="json"))
@router.delete("/{practice_id}/invoices/{invoice_id}", response_model=ApiResponse)
def delete_invoice(practice_id: UUID, invoice_id: UUID,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
inv = db.query(RemissionInvoice).filter(
RemissionInvoice.id == invoice_id, RemissionInvoice.practice_id == practice_id
).first()
if not inv:
raise HTTPException(status_code=404, detail="Fattura non trovata")
db.delete(inv)
db.commit()
return ApiResponse(message="Fattura rimossa")
# ---------- ULA Employees ----------
@router.post("/{practice_id}/ula-employees", response_model=ApiResponse)
def add_ula_employee(practice_id: UUID, body: UlaEmployeeCreate,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
e = RemissionUlaEmployee(practice_id=p.id, **body.model_dump())
db.add(e)
db.commit()
db.refresh(e)
return ApiResponse(message="Dipendente aggiunto",
data=UlaEmployeeOut.model_validate(e).model_dump(mode="json"))
@router.delete("/{practice_id}/ula-employees/{employee_id}", response_model=ApiResponse)
def delete_ula_employee(practice_id: UUID, employee_id: UUID,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
e = db.query(RemissionUlaEmployee).filter(
RemissionUlaEmployee.id == employee_id, RemissionUlaEmployee.practice_id == practice_id
).first()
if not e:
raise HTTPException(status_code=404, detail="Dipendente non trovato")
db.delete(e)
db.commit()
return ApiResponse(message="Dipendente rimosso")
# ---------- Documents ----------
@router.put("/{practice_id}/documents/{doc_code}", response_model=ApiResponse)
def upsert_document(practice_id: UUID, doc_code: str, body: DocumentUpsert,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
d = db.query(RemissionDocument).filter(
RemissionDocument.practice_id == practice_id,
RemissionDocument.doc_code == doc_code
).first()
if not d:
d = RemissionDocument(practice_id=p.id, doc_code=doc_code)
db.add(d)
d.filename = body.filename
d.uploaded_at = body.uploaded_at or (datetime.now(timezone.utc) if body.filename else None)
d.expires_at = body.expires_at
d.notes = body.notes
db.commit()
db.refresh(d)
return ApiResponse(message="Documento aggiornato",
data=DocumentOut.model_validate(d).model_dump(mode="json"))
@router.delete("/{practice_id}/documents/{doc_code}", response_model=ApiResponse)
def clear_document(practice_id: UUID, doc_code: str,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
d = db.query(RemissionDocument).filter(
RemissionDocument.practice_id == practice_id,
RemissionDocument.doc_code == doc_code
).first()
if d:
db.delete(d)
db.commit()
return ApiResponse(message="Documento rimosso")
# ---------- Gate check + submit ----------
@router.get("/{practice_id}/gate-check", response_model=ApiResponse)
def gate_check(practice_id: UUID, db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
result = _compute_gate_check(p)
return ApiResponse(data=result.model_dump(mode="json"))
@router.post("/{practice_id}/submit", response_model=ApiResponse)
def submit_practice(practice_id: UUID, db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
check = _compute_gate_check(p)
if not check.passed:
raise HTTPException(status_code=422, detail={
"message": "Gate rules non soddisfatte",
"checks": check.checks
})
p.status = "SUBMITTED"
p.submitted_at = datetime.now(timezone.utc)
db.commit()
db.refresh(p)
return ApiResponse(message="Pratica inviata con successo",
data=PracticeOut.model_validate(p).model_dump(mode="json"))

View File

@@ -1,14 +1,17 @@
"""
Pydantic schemas per API.
"""
from typing import Optional, Any
from datetime import datetime
from typing import Optional, Any, List
from datetime import datetime, date
from decimal import Decimal
from uuid import UUID
from pydantic import BaseModel, Field
# ====================== Schema di rendicontazione (bando) ======================
class RemissionSchemaBase(BaseModel):
schema_json: dict = Field(..., description="JSON dello schema di rendicontazione")
schema_json: dict
class RemissionSchemaCreate(RemissionSchemaBase):
@@ -34,6 +37,136 @@ class RemissionSchemaOut(BaseModel):
model_config = {"from_attributes": True}
# ====================== Pratica di rendicontazione (beneficiario) ======================
class PracticeStartRequest(BaseModel):
"""Input minimo per avviare una pratica: solo application_id. Il resto viene dal DB."""
application_id: int
class PracticeUpdate(BaseModel):
iva_regime: Optional[str] = None
notes_beneficiario: Optional[str] = None
# Fattura
class InvoiceCreate(BaseModel):
category_code: str
invoice_number: str
invoice_date: date
payment_date: date
supplier_name: str
supplier_vat: str
description: str
taxable: Decimal
vat: Decimal = Decimal("0")
total: Decimal
pdf_filename: Optional[str] = None
class InvoiceOut(InvoiceCreate):
id: UUID
practice_id: UUID
created_at: datetime
model_config = {"from_attributes": True}
# ULA Employee
class UlaEmployeeCreate(BaseModel):
codice_fiscale: str
full_name: str
contract_type: str
role_description: Optional[str] = None
fte_pct: Decimal = Decimal("1")
period_start_date: date
period_end_date: date
supporting_doc_type: Optional[str] = None
supporting_doc_filename: Optional[str] = None
class UlaEmployeeOut(UlaEmployeeCreate):
id: UUID
practice_id: UUID
created_at: datetime
model_config = {"from_attributes": True}
# Document
class DocumentUpsert(BaseModel):
doc_code: str
filename: Optional[str] = None
uploaded_at: Optional[datetime] = None
expires_at: Optional[date] = None
notes: Optional[str] = None
class DocumentOut(BaseModel):
id: UUID
practice_id: UUID
doc_code: str
filename: Optional[str] = None
uploaded_at: Optional[datetime] = None
expires_at: Optional[date] = None
notes: Optional[str] = None
model_config = {"from_attributes": True}
# Pratica dettagliata
class PracticeOut(BaseModel):
id: UUID
call_id: int
application_id: int
company_id: int
user_id: int
status: str
schema_snapshot: dict
iva_regime: Optional[str] = None
amount_erogato: Decimal
notes_beneficiario: Optional[str] = None
created_at: datetime
updated_at: datetime
submitted_at: Optional[datetime] = None
invoices: List[InvoiceOut] = []
ula_employees: List[UlaEmployeeOut] = []
documents: List[DocumentOut] = []
model_config = {"from_attributes": True}
class PracticeListItem(BaseModel):
"""Riga leggera per liste."""
id: UUID
call_id: int
application_id: int
company_id: int
status: str
amount_erogato: Decimal
created_at: datetime
submitted_at: Optional[datetime] = None
# campi denormalizzati aggiunti a runtime
call_name: Optional[str] = None
company_name: Optional[str] = None
invoice_count: int = 0
ula_count: int = 0
document_count: int = 0
model_config = {"from_attributes": True}
# Gate check
class GateCheckResult(BaseModel):
passed: bool
checks: List[dict] # [{id, label, passed, detail}]
totals: dict # {per_category: {B1: 1234.56, ...}, grand_total, max_remission_due, ...}
# ====================== Wrapper ======================
class ApiResponse(BaseModel):
status: str = "SUCCESS"
message: Optional[str] = None