B6 routers/verbale.py:
- _build_context arricchito con custom_checks_merged (schema+values da RemissionCustomCheckValue)
- previous_tranches: elenco tranche APPROVED precedenti con cumulative progressivo
- max_tranches_snapshot letto dallo schema_snapshot.gate_rules.max_tranches
- filename include _t{sequence_number}.pdf
B6 templates_jinja/verbale_istruttoria.html:
- Header: 'Tranche N/M' + period_label dopo numero pratica
- Meta-grid: riga 'Tranche / fase' quando max_tranches > 1
- Nuova sezione 'Controlli aggiuntivi' (dopo verifica documenti):
tabella label, obbligatorio, dichiarato SI/NO, doc allegato SI/NO, validazione, note
- Sezione 'Storico tranches precedenti' (solo se sequence > 1):
tabella con cumulativo progressivo
- Box totali riscritto con **5 VOCI UFFICIALI CECILIA**:
(1) Importo massimo ammissibile (cap globale) + gia approvato tranche precedenti
(2) Richiesto pre-controllo = pre_check_admissible
(3) Ammesso post-controllo = remission_due
(4) Importo finanziamento erogato + tranches count/max
(5) Residuo da restituire = erogato - approvato_prec - ammesso
- Box 'REMISSIONE APPROVATA PER QUESTA TRANCHE' evidenziato quando APPROVED
Test E2E: verbale T1 APPROVED 29.3KB con tutte sezioni presenti.
Verbale T2 simulata con storico T1 e cap tranche 2 correttamente calcolato.
294 lines
10 KiB
Python
294 lines
10 KiB
Python
"""
|
|
Endpoint generazione verbale di istruttoria in HTML/PDF via Jinja2 + weasyprint.
|
|
Solo ruoli istruttore/superadmin possono scaricare.
|
|
"""
|
|
from datetime import datetime, date
|
|
from uuid import UUID
|
|
from pathlib import Path
|
|
from collections import OrderedDict
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from fastapi.responses import Response, HTMLResponse
|
|
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
|
|
from ..db import get_db
|
|
from ..auth import AuthUser, get_current_user
|
|
from ..models import RemissionPractice, RemissionCustomCheckValue
|
|
from .practices import _compute_gate_check
|
|
|
|
router = APIRouter(prefix="/api/remission-practices/instructor", tags=["verbale"])
|
|
|
|
|
|
TEMPLATES_DIR = Path(__file__).resolve().parent.parent / "templates_jinja"
|
|
|
|
|
|
# ---------- Jinja env & filters ----------
|
|
def _euro(v):
|
|
if v is None:
|
|
return "—"
|
|
try:
|
|
n = float(v)
|
|
except (TypeError, ValueError):
|
|
return "—"
|
|
s = f"{n:,.2f}"
|
|
# IT locale: 1,234,567.89 → 1.234.567,89
|
|
s = s.replace(",", "X").replace(".", ",").replace("X", ".")
|
|
return f"€ {s}"
|
|
|
|
|
|
def _datefmt(v):
|
|
if v is None:
|
|
return "—"
|
|
if isinstance(v, str):
|
|
try:
|
|
v = datetime.fromisoformat(v).date()
|
|
except ValueError:
|
|
return v
|
|
if hasattr(v, "strftime"):
|
|
return v.strftime("%d/%m/%Y")
|
|
return str(v)
|
|
|
|
|
|
def _datetimefmt(v):
|
|
if v is None:
|
|
return "—"
|
|
if isinstance(v, str):
|
|
try:
|
|
v = datetime.fromisoformat(v)
|
|
except ValueError:
|
|
return v
|
|
if hasattr(v, "strftime"):
|
|
return v.strftime("%d/%m/%Y %H:%M")
|
|
return str(v)
|
|
|
|
|
|
_AMEND_STATUS = {
|
|
"AWAITING": "In attesa risposta",
|
|
"RESPONSE_RECEIVED": "Risposta ricevuta",
|
|
"CLOSED": "Chiuso",
|
|
"EXPIRED": "Scaduto",
|
|
"REJECTED": "Rifiutato",
|
|
}
|
|
|
|
|
|
def _amendstatus(s):
|
|
return _AMEND_STATUS.get(s, s or "—")
|
|
|
|
|
|
_env = Environment(
|
|
loader=FileSystemLoader(str(TEMPLATES_DIR)),
|
|
autoescape=select_autoescape(["html", "xml"]),
|
|
trim_blocks=True, lstrip_blocks=True,
|
|
)
|
|
_env.filters["euro"] = _euro
|
|
_env.filters["datefmt"] = _datefmt
|
|
_env.filters["datetimefmt"] = _datetimefmt
|
|
_env.filters["amendstatus"] = _amendstatus
|
|
|
|
|
|
_CONTRACT_LABELS = {
|
|
"T_IND": "Tempo indeterminato",
|
|
"T_DET": "Tempo determinato",
|
|
"APPR": "Apprendistato",
|
|
"STAGE": "Tirocinio / Stage",
|
|
"COLL": "Collaborazione coordinata",
|
|
"ALTRO": "Altro",
|
|
}
|
|
|
|
|
|
def _is_instructor(user: AuthUser) -> bool:
|
|
return user.role in ("ROLE_PRE_INSTRUCTOR", "ROLE_INSTRUCTOR_MANAGER", "ROLE_SUPER_ADMIN")
|
|
|
|
|
|
def _build_context(db: Session, practice: RemissionPractice, user: AuthUser) -> dict:
|
|
"""Prepara tutto il contesto per il template."""
|
|
# Gate check + totali
|
|
gate_obj = _compute_gate_check(db, practice); gate = gate_obj.model_dump() if hasattr(gate_obj, "model_dump") else dict(gate_obj)
|
|
totals = gate.get("totals") or {}
|
|
|
|
# Schema sections
|
|
sections = practice.schema_snapshot.get("sections") or []
|
|
cat_section = next((s for s in sections if s.get("type") == "category_grid"), {}) or {}
|
|
categories = cat_section.get("categories") or []
|
|
categories_map = {c.get("code"): c.get("label") for c in categories}
|
|
cat_order = {c.get("code"): i for i, c in enumerate(categories)}
|
|
|
|
ula_section = next((s for s in sections if s.get("type") == "ula_block"), {}) or {}
|
|
ula_enabled = bool(ula_section.get("enabled"))
|
|
ula_threshold = float(ula_section.get("threshold") or 1)
|
|
|
|
docs_section = next((s for s in sections if s.get("type") == "document_checklist"), {}) or {}
|
|
docs_required_raw = docs_section.get("required_types") or []
|
|
docs_required = [
|
|
(r if isinstance(r, dict) else {"code": r, "label": r})
|
|
for r in docs_required_raw
|
|
]
|
|
docs_by_code = {d.doc_code: {
|
|
"filename": d.filename, "verification_status": d.verification_status,
|
|
"verification_notes": d.verification_notes,
|
|
} for d in practice.documents}
|
|
|
|
# Raggruppo fatture per categoria in ordine schema
|
|
sorted_invoices = sorted(
|
|
practice.invoices,
|
|
key=lambda i: (cat_order.get(i.category_code, 999), i.invoice_number or "")
|
|
)
|
|
invoices_by_cat = OrderedDict()
|
|
for c in categories:
|
|
invoices_by_cat[c.get("code")] = []
|
|
for inv in sorted_invoices:
|
|
invoices_by_cat.setdefault(inv.category_code, []).append(inv)
|
|
|
|
per_cat_declared = totals.get("per_category_declared") or {}
|
|
per_cat_verified = totals.get("per_category_verified") or {}
|
|
|
|
# ULA aggregati
|
|
ula_fte_decl = sum(float(e.fte_pct or 0) for e in practice.ula_employees)
|
|
ula_fte_verif = sum(
|
|
float((e.fte_pct_verified if e.fte_pct_verified is not None else e.fte_pct) or 0)
|
|
for e in practice.ula_employees
|
|
if e.verification_status in ("AMMESSA", "PARZIALE")
|
|
)
|
|
ula_ok = ula_fte_verif >= ula_threshold
|
|
|
|
# Anagrafica beneficiario (da gepafin_schema.company)
|
|
company_row = db.execute(text("""
|
|
SELECT company_name, vat_number
|
|
FROM gepafin_schema.company
|
|
WHERE id = :cid
|
|
"""), {"cid": practice.company_id}).mappings().first()
|
|
company = dict(company_row) if company_row else {}
|
|
|
|
# Istruttore
|
|
instructor_name = None
|
|
if practice.reviewed_by:
|
|
row = db.execute(text("""
|
|
SELECT first_name || ' ' || last_name AS name
|
|
FROM gepafin_schema.gepafin_user WHERE id = :uid
|
|
"""), {"uid": practice.reviewed_by}).scalar()
|
|
instructor_name = row
|
|
elif user.user_id:
|
|
row = db.execute(text("""
|
|
SELECT first_name || ' ' || last_name AS name
|
|
FROM gepafin_schema.gepafin_user WHERE id = :uid
|
|
"""), {"uid": user.user_id}).scalar()
|
|
instructor_name = row
|
|
|
|
# v2: custom_checks merged (schema_snapshot.custom_checks[] + RemissionCustomCheckValue)
|
|
check_defs = practice.schema_snapshot.get("custom_checks") or []
|
|
values_by_code = {v.check_code: v for v in practice.custom_checks}
|
|
custom_checks_merged = []
|
|
for d in check_defs:
|
|
code = d.get("code")
|
|
val = values_by_code.get(code)
|
|
custom_checks_merged.append({
|
|
"code": code,
|
|
"label": d.get("label"),
|
|
"description": d.get("description"),
|
|
"requires_document": bool(d.get("requires_document")),
|
|
"required": bool(d.get("required")),
|
|
"beneficiary_declared": bool(val.beneficiary_declared) if val else False,
|
|
"declared_at": val.declared_at if val else None,
|
|
"has_document": bool(val and val.storage_path),
|
|
"verification_status": (val.verification_status if val else "PENDING"),
|
|
"verification_notes": (val.verification_notes if val else None),
|
|
})
|
|
|
|
# v2: storico tranche precedenti APPROVED (se sequence > 1)
|
|
previous_tranches = []
|
|
cumulative_approved = 0.0
|
|
if practice.sequence_number > 1:
|
|
prevs = db.query(RemissionPractice).filter(
|
|
RemissionPractice.application_id == practice.application_id,
|
|
RemissionPractice.sequence_number < practice.sequence_number,
|
|
RemissionPractice.status == "APPROVED",
|
|
).order_by(RemissionPractice.sequence_number).all()
|
|
for pv in prevs:
|
|
amt = float(pv.approved_remission or 0)
|
|
cumulative_approved += amt
|
|
previous_tranches.append({
|
|
"sequence_number": pv.sequence_number,
|
|
"period_label": pv.period_label,
|
|
"reviewed_at": pv.reviewed_at,
|
|
"approved_remission": amt,
|
|
"cumulative": cumulative_approved,
|
|
})
|
|
|
|
# v2 max_tranches dallo schema_snapshot (o dal bando corrente, fallback 1)
|
|
snap_rules = practice.schema_snapshot.get("gate_rules") or {}
|
|
max_tranches_snapshot = int(snap_rules.get("max_tranches") or totals.get("tranches_max") or 1)
|
|
|
|
return {
|
|
"practice": practice,
|
|
"totals": totals,
|
|
"categories_map": categories_map,
|
|
"invoices_by_cat": invoices_by_cat,
|
|
"per_cat_declared": per_cat_declared,
|
|
"per_cat_verified": per_cat_verified,
|
|
"ula_section_enabled": ula_enabled,
|
|
"ula_threshold": ula_threshold,
|
|
"ula_fte_decl": ula_fte_decl,
|
|
"ula_fte_verif": ula_fte_verif,
|
|
"ula_ok": ula_ok,
|
|
"docs_required": docs_required,
|
|
"docs_by_code": docs_by_code,
|
|
"amendments": practice.amendment_requests or [],
|
|
"contract_labels": _CONTRACT_LABELS,
|
|
"company": company,
|
|
"instructor_name": instructor_name,
|
|
"generated_at": datetime.now().strftime("%d/%m/%Y"),
|
|
# v2
|
|
"custom_checks_merged": custom_checks_merged,
|
|
"previous_tranches": previous_tranches,
|
|
"max_tranches_snapshot": max_tranches_snapshot,
|
|
}
|
|
|
|
|
|
def _render_html(db: Session, practice_id: UUID, user: AuthUser) -> tuple[RemissionPractice, str]:
|
|
practice = db.query(RemissionPractice).filter(RemissionPractice.id == practice_id).first()
|
|
if not practice:
|
|
raise HTTPException(status_code=404, detail=f"Pratica {practice_id} non trovata")
|
|
if not _is_instructor(user):
|
|
raise HTTPException(status_code=403, detail="Ruolo istruttore richiesto")
|
|
ctx = _build_context(db, practice, user)
|
|
tpl = _env.get_template("verbale_istruttoria.html")
|
|
html = tpl.render(**ctx)
|
|
return practice, html
|
|
|
|
|
|
@router.get("/{practice_id}/verbale.html", response_class=HTMLResponse)
|
|
def verbale_html(
|
|
practice_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
user: AuthUser = Depends(get_current_user),
|
|
):
|
|
"""Rendering HTML del verbale — utile per debug e preview rapida."""
|
|
_, html = _render_html(db, practice_id, user)
|
|
return HTMLResponse(content=html)
|
|
|
|
|
|
@router.get("/{practice_id}/verbale.pdf")
|
|
def verbale_pdf(
|
|
practice_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
user: AuthUser = Depends(get_current_user),
|
|
):
|
|
"""Generazione PDF via weasyprint."""
|
|
# Import locale per non rallentare il cold start se weasyprint manca
|
|
from weasyprint import HTML as WeasyHTML
|
|
|
|
practice, html = _render_html(db, practice_id, user)
|
|
pdf_bytes = WeasyHTML(string=html).write_pdf()
|
|
filename = f"verbale_istruttoria_pratica_{practice.application_id}_t{practice.sequence_number}.pdf"
|
|
return Response(
|
|
content=pdf_bytes,
|
|
media_type="application/pdf",
|
|
headers={
|
|
"Content-Disposition": f'attachment; filename="{filename}"',
|
|
"Cache-Control": "no-store",
|
|
},
|
|
)
|