Files
gepafin-rendicontazione-api/app/routers/practices.py
BFLOWS f9f543b008 feat(istruttoria): verifica riga-per-riga con dual declared/verified
Replica il workflow del foglio Excel originale (REMISSIONE_DEL_DEBITO_5888.xlsm).
Istruttore ora verifica ogni fattura, ogni dipendente ULA, ogni documento singolarmente
invece di accettare/respingere la pratica intera.

Modello dati - nuove colonne su 3 tabelle:
- remission_invoice: taxable_verified, vat_verified, total_verified,
  verification_status (PENDING/AMMESSA/PARZIALE/RESPINTA), verification_notes,
  date_checks (JSONB con invoice_in_period/payment_in_period), verified_by, verified_at
- remission_ula_employee: fte_pct_verified, verification_status, verification_notes,
  verified_by, verified_at
- remission_document: verification_status (PENDING/VALIDO/NON_VALIDO/SCADUTO),
  verification_notes, verified_by, verified_at
- remission_practice: instructor_final_notes, instructor_checklist (JSONB 3 gate SI/NO),
  verbale_date

Nuovi endpoint:
- PUT /instructor/{id}/invoices/{inv_id}/verify (status + rettifica importi + note)
- PUT /instructor/{id}/ula-employees/{emp_id}/verify (rettifica FTE + note)
- PUT /instructor/{id}/documents/{doc_code}/verify (VALIDO/NON_VALIDO/SCADUTO + note)
- PUT /instructor/{id}/final-notes (note sintetiche + checklist)

Ricalcolo gate_check dual track:
- grand_total_declared: sempre (importo richiesto dal beneficiario)
- grand_total_verified: somma solo fatture AMMESSA/PARZIALE (se PARZIALE usa verified)
- remission_due: usa verified se any_verified=True, altrimenti declared (backward compat)
- residuo_da_restituire: amount_erogato - remission_due
- flag any_verified e all_verified per gating decisione finale

_auto_check_dates: fattura in periodo? pagamento in periodo?
Legge period_start e period_end da schema.gate_rules (superadmin editor).

Template: aggiunto period_start/period_end_date come campi 'editable_by superadmin'
nella sezione general static_fields.

Schema editor FE (BandoRendicontazioneSchemaEdit): aggiunto Calendar period_start
accanto a period_end in section gate rules. period_start_rule dropdown per logica
(erogato_date|fixed) resta; period_start data fissa usata dal check.
2026-04-18 11:03:15 +02:00

458 lines
19 KiB
Python

"""
Endpoint pratiche di rendicontazione (lato beneficiario).
"""
import copy
from datetime import datetime, timezone
from decimal import Decimal
from typing import List
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..db import get_db
from ..auth import AuthUser, get_current_user
from ..models import (
CallRemissionSchema, RemissionPractice,
RemissionInvoice, RemissionUlaEmployee, RemissionDocument
)
from ..schemas import (
PracticeStartRequest, PracticeUpdate, PracticeOut, PracticeListItem,
InvoiceCreate, InvoiceOut,
UlaEmployeeCreate, UlaEmployeeOut,
DocumentUpsert, DocumentOut,
GateCheckResult,
ApiResponse
)
router = APIRouter(prefix="/api/remission-practices", tags=["remission-practices"])
# ---------- helpers ----------
def _get_practice_or_404(db: Session, practice_id: UUID, user: AuthUser) -> RemissionPractice:
"""Recupera la pratica validando ownership beneficiario (o admin)."""
practice = db.query(RemissionPractice).filter(RemissionPractice.id == practice_id).first()
if not practice:
raise HTTPException(status_code=404, detail=f"Pratica {practice_id} non trovata")
# Solo il beneficiario owner o un superadmin può accedere
if user.is_beneficiary() and practice.user_id != user.user_id:
raise HTTPException(status_code=403, detail="Accesso negato a questa pratica")
return practice
def _ensure_editable(practice: RemissionPractice):
if practice.status != "DRAFT":
raise HTTPException(
status_code=409,
detail=f"Pratica in stato {practice.status}: non modificabile"
)
def _compute_gate_check(practice: RemissionPractice) -> GateCheckResult:
"""Valuta le gate_rules dello schema snapshot contro il contenuto della pratica.
Calcola:
- per_category_declared: totali dichiarati dal beneficiario (sempre)
- per_category_verified: totali verificati dall istruttore (solo AMMESSA/PARZIALE)
- grand_total: totale dichiarato (rif. beneficiario)
- grand_total_verified: totale verificato (rif. remissione finale)
"""
rules = practice.schema_snapshot.get("gate_rules", {}) or {}
sections = practice.schema_snapshot.get("sections", []) or []
iva_ordinario_only_taxable = rules.get("iva_ordinario_imponibile_only", True)
use_taxable_only = (practice.iva_regime == "ORDINARIO" and iva_ordinario_only_taxable)
per_category_declared = {}
per_category_verified = {}
grand_total = Decimal("0")
grand_total_verified = Decimal("0")
any_verified = False
all_verified = len(practice.invoices) > 0
for inv in practice.invoices:
# Dichiarato (sempre)
amt_decl = inv.taxable if use_taxable_only else inv.total
per_category_declared[inv.category_code] = per_category_declared.get(inv.category_code, Decimal("0")) + amt_decl
grand_total += amt_decl
# Verificato (solo se stato AMMESSA o PARZIALE)
if inv.verification_status in ("AMMESSA", "PARZIALE"):
any_verified = True
# se PARZIALE usa i valori verified; se AMMESSA ma verified sono null, usa dichiarato
if inv.verification_status == "PARZIALE":
tax_v = inv.taxable_verified if inv.taxable_verified is not None else inv.taxable
tot_v = inv.total_verified if inv.total_verified is not None else inv.total
else: # AMMESSA
tax_v = inv.taxable_verified if inv.taxable_verified is not None else inv.taxable
tot_v = inv.total_verified if inv.total_verified is not None else inv.total
amt_ver = tax_v if use_taxable_only else tot_v
per_category_verified[inv.category_code] = per_category_verified.get(inv.category_code, Decimal("0")) + amt_ver
grand_total_verified += amt_ver
elif inv.verification_status == "PENDING":
all_verified = False
# RESPINTA: non contribuisce ai verified
# cap remissione
amt_erogato = practice.amount_erogato
cap_pct = Decimal(str(rules.get("cap_pct_erogato", 0.5)))
cap_abs = Decimal(str(rules.get("cap_absolute", 12500)))
max_remission = min(cap_pct * amt_erogato, cap_abs)
# Se almeno 1 verifica fatta -> uso grand_total_verified per remission_due
# altrimenti uso grand_total (dichiarato) per preview pre-istruttoria
effective_total = grand_total_verified if any_verified else grand_total
remission_due = min(effective_total, max_remission)
# Per compatibilità: per_category e grand_total restano "dichiarato"
per_category = per_category_declared
checks = []
# Check 1: regime IVA scelto
if not practice.iva_regime:
checks.append({"id": "iva_selected", "label": "Regime IVA dichiarato",
"passed": False, "detail": "Seleziona il regime IVA"})
else:
checks.append({"id": "iva_selected", "label": "Regime IVA dichiarato",
"passed": True, "detail": practice.iva_regime})
# Check 2: importo totale > 0
checks.append({
"id": "has_invoices",
"label": "Almeno una fattura caricata",
"passed": len(practice.invoices) > 0,
"detail": f"{len(practice.invoices)} fatture per un totale di {grand_total} EUR"
})
# Check 3: una fattura per ogni categoria (se richiesto)
if rules.get("require_at_least_one_invoice_per_nonzero_category", False):
expenses = next((s for s in sections if s.get("type") == "category_grid"), {})
cats = expenses.get("categories", []) or []
missing = [c["code"] for c in cats if c.get("code") and c["code"] not in per_category]
checks.append({
"id": "invoice_per_category",
"label": "Fattura per ogni categoria",
"passed": len(missing) == 0,
"detail": f"Categorie senza fatture: {', '.join(missing)}" if missing else "Tutte le categorie coperte"
})
# Check 4: ULA (se richiesto)
ula_section = next((s for s in sections if s.get("type") == "ula_block"), {})
if ula_section.get("enabled") and rules.get("require_ula_above_threshold", False):
threshold = Decimal(str(ula_section.get("threshold", 1.0)))
total_fte = sum((e.fte_pct for e in practice.ula_employees), Decimal("0"))
checks.append({
"id": "ula_threshold",
"label": f"Incremento ULA >= {threshold}",
"passed": total_fte >= threshold,
"detail": f"Totale FTE: {total_fte} (soglia: {threshold})"
})
# Check 5: documenti richiesti (se richiesto)
if rules.get("require_all_documents_resolved", False):
docs_section = next((s for s in sections if s.get("type") == "document_checklist"), {})
required = docs_section.get("required_types", []) or []
uploaded_codes = {d.doc_code for d in practice.documents if d.filename}
required_codes = [r["code"] if isinstance(r, dict) else r for r in required]
missing_docs = [c for c in required_codes if c not in uploaded_codes]
checks.append({
"id": "docs_resolved",
"label": "Tutti i documenti richiesti caricati",
"passed": len(missing_docs) == 0,
"detail": f"Mancanti: {', '.join(missing_docs)}" if missing_docs else "Tutti presenti"
})
# Check 6: importo range (cap erogato)
amt_range = rules.get("amount_range", {})
min_e = Decimal(str(amt_range.get("min", 0)))
max_e = Decimal(str(amt_range.get("max", 999999999)))
checks.append({
"id": "erogato_in_range",
"label": f"Importo erogato entro range ({min_e}-{max_e})",
"passed": min_e <= amt_erogato <= max_e,
"detail": f"Erogato: {amt_erogato} EUR"
})
return GateCheckResult(
passed=all(c["passed"] for c in checks),
checks=checks,
totals={
"per_category": {k: float(v) for k, v in per_category.items()},
"per_category_declared": {k: float(v) for k, v in per_category_declared.items()},
"per_category_verified": {k: float(v) for k, v in per_category_verified.items()},
"grand_total": float(grand_total),
"grand_total_declared": float(grand_total),
"grand_total_verified": float(grand_total_verified),
"max_remission": float(max_remission),
"remission_due": float(remission_due),
"amount_erogato": float(amt_erogato),
"any_verified": any_verified,
"all_verified": all_verified,
"residuo_da_restituire": float(max(amt_erogato - Decimal(str(remission_due)), Decimal("0")))
}
)
def _enrich_list_item(db: Session, p: RemissionPractice) -> PracticeListItem:
# Nome call e company dal DB Gepafin
q = db.execute(text("""
SELECT c.name as call_name, comp.company_name as company_name
FROM gepafin_schema.call c
JOIN gepafin_schema.company comp ON comp.id = :cid
WHERE c.id = :call_id
"""), {"call_id": p.call_id, "cid": p.company_id}).first()
item = PracticeListItem.model_validate(p)
if q:
item.call_name = q[0]
item.company_name = q[1]
item.invoice_count = len(p.invoices)
item.ula_count = len(p.ula_employees)
item.document_count = len([d for d in p.documents if d.filename])
return item
# ---------- endpoints ----------
@router.get("/mine", response_model=ApiResponse)
def list_my_practices(db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
"""Lista pratiche del beneficiario corrente + applications CONTRACT_SIGNED pronte per start."""
# pratiche esistenti
practices = db.query(RemissionPractice).filter(RemissionPractice.user_id == user.user_id).all()
existing_app_ids = {p.application_id for p in practices}
# applications CONTRACT_SIGNED del beneficiario che non hanno ancora una pratica
rows = db.execute(text("""
SELECT a.id as application_id, a.call_id, a.company_id, a.amount_accepted,
a.status, c.name as call_name, comp.company_name as company_name
FROM gepafin_schema.application a
JOIN gepafin_schema.call c ON c.id = a.call_id
LEFT JOIN gepafin_schema.company comp ON comp.id = a.company_id
WHERE a.user_id = :uid AND a.status = 'CONTRACT_SIGNED' AND a.is_deleted = false
"""), {"uid": user.user_id}).mappings().all()
pending = []
for r in rows:
if r["application_id"] not in existing_app_ids:
pending.append({
"application_id": r["application_id"],
"call_id": r["call_id"],
"company_id": r["company_id"],
"amount_erogato": float(r["amount_accepted"] or 0),
"call_name": r["call_name"],
"company_name": r["company_name"],
"status": "NOT_STARTED"
})
return ApiResponse(data={
"practices": [_enrich_list_item(db, p).model_dump(mode="json") for p in practices],
"ready_to_start": pending
})
@router.post("/start", response_model=ApiResponse)
def start_practice(body: PracticeStartRequest, db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user)):
"""Avvia una pratica di rendicontazione per una application CONTRACT_SIGNED."""
# Verifica application
app_row = db.execute(text("""
SELECT id, call_id, company_id, user_id, status, amount_accepted
FROM gepafin_schema.application
WHERE id = :aid AND is_deleted = false
"""), {"aid": body.application_id}).mappings().first()
if not app_row:
raise HTTPException(status_code=404, detail=f"Application {body.application_id} non trovata")
if app_row["status"] != "CONTRACT_SIGNED":
raise HTTPException(status_code=409,
detail=f"Application in stato {app_row['status']}, richiesto CONTRACT_SIGNED")
if user.is_beneficiary() and app_row["user_id"] != user.user_id:
raise HTTPException(status_code=403, detail="Application non di tua proprietà")
# Schema del bando: richiede PUBLISHED (o DRAFT se superadmin per test)
schema = db.query(CallRemissionSchema).filter(CallRemissionSchema.call_id == app_row["call_id"]).first()
if not schema:
raise HTTPException(status_code=409,
detail="Nessuno schema di rendicontazione configurato per questo bando. "
"Contatta l'ente gestore.")
if schema.status != "PUBLISHED" and user.is_beneficiary():
raise HTTPException(status_code=409,
detail="Lo schema di rendicontazione non è ancora stato pubblicato.")
# Pratica esistente?
exists = db.query(RemissionPractice).filter(RemissionPractice.application_id == body.application_id).first()
if exists:
raise HTTPException(status_code=409, detail="Pratica già esistente")
practice = RemissionPractice(
call_id=app_row["call_id"],
application_id=body.application_id,
company_id=app_row["company_id"],
user_id=app_row["user_id"],
status="DRAFT",
schema_snapshot=copy.deepcopy(schema.schema_json),
amount_erogato=app_row["amount_accepted"] or Decimal("0"),
)
db.add(practice)
db.commit()
db.refresh(practice)
return ApiResponse(message="Pratica avviata",
data=PracticeOut.model_validate(practice).model_dump(mode="json"))
@router.get("/{practice_id}", response_model=ApiResponse)
def get_practice(practice_id: UUID, db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
from ..schemas import AmendmentRequestOut
payload = PracticeOut.model_validate(p).model_dump(mode="json")
payload["amendments"] = [AmendmentRequestOut.model_validate(a).model_dump(mode="json") for a in p.amendment_requests]
return ApiResponse(data=payload)
@router.put("/{practice_id}", response_model=ApiResponse)
def update_practice(practice_id: UUID, body: PracticeUpdate,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
if body.iva_regime is not None:
p.iva_regime = body.iva_regime
if body.notes_beneficiario is not None:
p.notes_beneficiario = body.notes_beneficiario
db.commit()
db.refresh(p)
return ApiResponse(message="Pratica aggiornata",
data=PracticeOut.model_validate(p).model_dump(mode="json"))
# ---------- Invoices ----------
@router.post("/{practice_id}/invoices", response_model=ApiResponse)
def add_invoice(practice_id: UUID, body: InvoiceCreate,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
inv = RemissionInvoice(practice_id=p.id, **body.model_dump())
db.add(inv)
db.commit()
db.refresh(inv)
return ApiResponse(message="Fattura aggiunta",
data=InvoiceOut.model_validate(inv).model_dump(mode="json"))
@router.delete("/{practice_id}/invoices/{invoice_id}", response_model=ApiResponse)
def delete_invoice(practice_id: UUID, invoice_id: UUID,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
inv = db.query(RemissionInvoice).filter(
RemissionInvoice.id == invoice_id, RemissionInvoice.practice_id == practice_id
).first()
if not inv:
raise HTTPException(status_code=404, detail="Fattura non trovata")
db.delete(inv)
db.commit()
return ApiResponse(message="Fattura rimossa")
# ---------- ULA Employees ----------
@router.post("/{practice_id}/ula-employees", response_model=ApiResponse)
def add_ula_employee(practice_id: UUID, body: UlaEmployeeCreate,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
e = RemissionUlaEmployee(practice_id=p.id, **body.model_dump())
db.add(e)
db.commit()
db.refresh(e)
return ApiResponse(message="Dipendente aggiunto",
data=UlaEmployeeOut.model_validate(e).model_dump(mode="json"))
@router.delete("/{practice_id}/ula-employees/{employee_id}", response_model=ApiResponse)
def delete_ula_employee(practice_id: UUID, employee_id: UUID,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
e = db.query(RemissionUlaEmployee).filter(
RemissionUlaEmployee.id == employee_id, RemissionUlaEmployee.practice_id == practice_id
).first()
if not e:
raise HTTPException(status_code=404, detail="Dipendente non trovato")
db.delete(e)
db.commit()
return ApiResponse(message="Dipendente rimosso")
# ---------- Documents ----------
@router.put("/{practice_id}/documents/{doc_code}", response_model=ApiResponse)
def upsert_document(practice_id: UUID, doc_code: str, body: DocumentUpsert,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
d = db.query(RemissionDocument).filter(
RemissionDocument.practice_id == practice_id,
RemissionDocument.doc_code == doc_code
).first()
if not d:
d = RemissionDocument(practice_id=p.id, doc_code=doc_code)
db.add(d)
d.filename = body.filename
d.uploaded_at = body.uploaded_at or (datetime.now(timezone.utc) if body.filename else None)
d.expires_at = body.expires_at
d.notes = body.notes
db.commit()
db.refresh(d)
return ApiResponse(message="Documento aggiornato",
data=DocumentOut.model_validate(d).model_dump(mode="json"))
@router.delete("/{practice_id}/documents/{doc_code}", response_model=ApiResponse)
def clear_document(practice_id: UUID, doc_code: str,
db: Session = Depends(get_db), user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
d = db.query(RemissionDocument).filter(
RemissionDocument.practice_id == practice_id,
RemissionDocument.doc_code == doc_code
).first()
if d:
db.delete(d)
db.commit()
return ApiResponse(message="Documento rimosso")
# ---------- Gate check + submit ----------
@router.get("/{practice_id}/gate-check", response_model=ApiResponse)
def gate_check(practice_id: UUID, db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
result = _compute_gate_check(p)
return ApiResponse(data=result.model_dump(mode="json"))
@router.post("/{practice_id}/submit", response_model=ApiResponse)
def submit_practice(practice_id: UUID, db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user)):
p = _get_practice_or_404(db, practice_id, user)
_ensure_editable(p)
check = _compute_gate_check(p)
if not check.passed:
raise HTTPException(status_code=422, detail={
"message": "Gate rules non soddisfatte",
"checks": check.checks
})
p.status = "SUBMITTED"
p.submitted_at = datetime.now(timezone.utc)
db.commit()
db.refresh(p)
return ApiResponse(message="Pratica inviata con successo",
data=PracticeOut.model_validate(p).model_dump(mode="json"))