feat(v2): endpoint multi-tranche + custom_checks + assignment manager

A4 /mine + /start + /copy-ula-options (practices.py):
- GET /mine raggruppa per application_id, ogni app ha:
  tranches[], max_tranches, can_start_new, start_blocked_reason,
  already_approved_sum, max_remission_global, max_remission_next_tranche
- POST /start valida: count<max, last terminale, residuo>0 -> 400 con detail parlante
- Bulk copy ULA da tranche N-1 se copy_ula_from_previous=true (reset verification_*)
- Legge suggested_instructor da gepafin_schema.assigned_applications (solo tranche 1)
- upgrade_schema_to_v2 al snapshot per allineare a v2 schemi vecchi
- GET /{id}/copy-ula-options: preview ULA tranche N-1 per pre-fill FE

A5 custom_checks.py (nuovo router):
- GET /{id}/custom-checks merge definition+values con defaults
- PUT /.../declare (beneficiary form-data + optional file upload 15MB, PDF/JPG/PNG)
  storage dedicato /var/uploads/custom_checks/{practice_id}/{code}/<sha12>-file
- DELETE /.../document (beneficiary) reset metadata + cleanup FS
- PUT /.../verify (istruttore) VALIDO/NON_VALIDO/PENDING + notes
- GET /.../document?inline=0|1 stream con Content-Disposition
- Matrix autorizzazioni: declare solo benef su DRAFT/AWAITING, verify solo istruttore

A6 assignment.py (nuovo router, manager view):
- GET /instructor-manager/assignments: pratiche attive con suggested+assigned+is_unassigned
- GET /instructor-manager/instructors: elenco PRE_INSTRUCTOR+MANAGER per dropdown riassegna
- POST /instructor/{id}/reassign: cambio assigned_instructor_id + audit log in
  instructor_checklist.reassignment_log [{at,by_user_id,from,to,reason}]
- Solo ROLE_INSTRUCTOR_MANAGER + ROLE_SUPER_ADMIN

Test curl E2E tutti passati: tranche 2 creata con copy 2 ULA, check dichiarati+verificati,
download PDF polizza OK, reassign con audit log scritto.
This commit is contained in:
BFLOWS
2026-04-18 17:35:56 +02:00
parent 25215f388b
commit 86681678c4
2 changed files with 502 additions and 0 deletions

183
app/routers/assignment.py Normal file
View File

@@ -0,0 +1,183 @@
"""
Endpoint v2 per gestione assegnazione istruttori (capo istruttore / manager).
Solo ROLE_INSTRUCTOR_MANAGER + ROLE_SUPER_ADMIN.
"""
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..db import get_db
from ..auth import AuthUser, get_current_user
from ..models import RemissionPractice
from ..schemas import ApiResponse, PracticeReassignBody
router = APIRouter(
prefix="/api/remission-practices",
tags=["assignment-manager"],
)
def _require_manager(user: AuthUser = Depends(get_current_user)) -> AuthUser:
if user.role not in ("ROLE_INSTRUCTOR_MANAGER", "ROLE_SUPER_ADMIN"):
raise HTTPException(
status_code=403,
detail="Richiesto ruolo manager o superadmin"
)
return user
@router.get("/instructor-manager/assignments", response_model=ApiResponse)
def assignments_overview(
db: Session = Depends(get_db),
manager: AuthUser = Depends(_require_manager),
):
"""Vista capo istruttore: pratiche con suggested + assigned + flag 'da assegnare'."""
practices = db.query(RemissionPractice).filter(
RemissionPractice.status.in_(["SUBMITTED", "UNDER_REVIEW", "AWAITING_AMENDMENT"])
).order_by(
RemissionPractice.application_id,
RemissionPractice.sequence_number
).all()
# Enrichment: nome istruttori + company
items = []
user_cache: dict = {}
def _user_name(uid: Optional[int]) -> Optional[str]:
if uid is None:
return None
if uid in user_cache:
return user_cache[uid]
row = db.execute(text("""
SELECT first_name || ' ' || last_name as name, email
FROM gepafin_schema.gepafin_user WHERE id = :uid
"""), {"uid": uid}).mappings().first()
name = (row["name"] if row else None) or (row["email"] if row else None)
user_cache[uid] = name
return name
for p in practices:
company_row = db.execute(text("""
SELECT company_name, vat_number FROM gepafin_schema.company WHERE id = :cid
"""), {"cid": p.company_id}).mappings().first()
call_row = db.execute(text("""
SELECT name FROM gepafin_schema.call WHERE id = :cid
"""), {"cid": p.call_id}).mappings().first()
items.append({
"id": str(p.id),
"application_id": p.application_id,
"sequence_number": p.sequence_number,
"period_label": p.period_label,
"call_id": p.call_id,
"call_name": call_row["name"] if call_row else None,
"company_id": p.company_id,
"company_name": company_row["company_name"] if company_row else None,
"status": p.status,
"submitted_at": p.submitted_at.isoformat() if p.submitted_at else None,
"amount_erogato": float(p.amount_erogato or 0),
"suggested_instructor_id": p.suggested_instructor_id,
"suggested_instructor_name": _user_name(p.suggested_instructor_id),
"assigned_instructor_id": p.assigned_instructor_id,
"assigned_instructor_name": _user_name(p.assigned_instructor_id),
"is_unassigned": p.assigned_instructor_id is None,
})
return ApiResponse(data={"assignments": items})
@router.get("/instructor-manager/instructors", response_model=ApiResponse)
def list_available_instructors(
db: Session = Depends(get_db),
manager: AuthUser = Depends(_require_manager),
):
"""Elenco istruttori disponibili per riassegnazione (pre_instructor + manager ACTIVE)."""
rows = db.execute(text("""
SELECT u.id, u.email, u.first_name, u.last_name, r.role_type
FROM gepafin_schema.gepafin_user u
JOIN gepafin_schema.role r ON r.id = u.role_id
WHERE u.is_deleted = false
AND r.role_type IN ('ROLE_PRE_INSTRUCTOR', 'ROLE_INSTRUCTOR_MANAGER')
ORDER BY u.last_name, u.first_name
""")).mappings().all()
return ApiResponse(data={"instructors": [
{
"user_id": r["id"],
"email": r["email"],
"first_name": r["first_name"],
"last_name": r["last_name"],
"role_type": r["role_type"],
"display_name": f"{r['first_name'] or ''} {r['last_name'] or ''}".strip() or r["email"],
} for r in rows
]})
@router.post("/instructor/{practice_id}/reassign", response_model=ApiResponse)
def reassign_instructor(
practice_id: UUID,
body: PracticeReassignBody,
db: Session = Depends(get_db),
manager: AuthUser = Depends(_require_manager),
):
"""Manager assegna/riassegna la pratica a un istruttore diverso (o unassign se new_instructor_id=None).
Scrive audit entry in instructor_checklist.reassignment_log.
"""
p = db.query(RemissionPractice).filter(RemissionPractice.id == practice_id).first()
if not p:
raise HTTPException(status_code=404, detail="Pratica non trovata")
old_instructor_id = p.assigned_instructor_id
# Verifica nuovo istruttore se specificato
if body.new_instructor_id is not None:
row = db.execute(text("""
SELECT u.id, r.role_type
FROM gepafin_schema.gepafin_user u
JOIN gepafin_schema.role r ON r.id = u.role_id
WHERE u.id = :uid AND u.is_deleted = false
"""), {"uid": body.new_instructor_id}).mappings().first()
if not row:
raise HTTPException(status_code=404,
detail=f"Istruttore {body.new_instructor_id} non trovato")
if row["role_type"] not in ("ROLE_PRE_INSTRUCTOR", "ROLE_INSTRUCTOR_MANAGER"):
raise HTTPException(status_code=422,
detail="Utente non ha ruolo istruttore")
# Audit log
checklist = dict(p.instructor_checklist or {})
log = list(checklist.get("reassignment_log") or [])
log.append({
"at": datetime.now(timezone.utc).isoformat(),
"by_user_id": manager.user_id,
"by_email": manager.email,
"from_instructor_id": old_instructor_id,
"to_instructor_id": body.new_instructor_id,
"reason": body.reassignment_reason,
})
checklist["reassignment_log"] = log
p.assigned_instructor_id = body.new_instructor_id
p.instructor_checklist = checklist
# Se passo da SUBMITTED + assegnato -> UNDER_REVIEW
# Altrimenti lascio status invariato (manager puo riassegnare anche durante review)
if p.status == "SUBMITTED" and body.new_instructor_id is not None:
p.status = "UNDER_REVIEW"
db.commit()
db.refresh(p)
action = "unassigned" if body.new_instructor_id is None else f"assigned to {body.new_instructor_id}"
return ApiResponse(
message=f"Pratica {action}",
data={
"id": str(p.id),
"status": p.status,
"assigned_instructor_id": p.assigned_instructor_id,
"suggested_instructor_id": p.suggested_instructor_id,
"reassignment_log": log,
}
)

View File

@@ -0,0 +1,319 @@
"""
Endpoint custom_checks v2: dichiarazione beneficiario + documento opzionale + verifica istruttore.
Merge definition (da schema_snapshot.custom_checks[]) + value (RemissionCustomCheckValue).
Path storage custom_checks: /var/uploads/custom_checks/{practice_id}/{code}/<sha12>-file.pdf
(fuori dal pattern invoice/ula/document per isolarli — non confondibili con allegati fattura/LUL).
"""
import io
import os
from datetime import datetime, timezone
from typing import List, Optional, Literal
from uuid import UUID
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from ..db import get_db
from ..auth import AuthUser, get_current_user
from ..models import RemissionPractice, RemissionCustomCheckValue
from ..schemas import ApiResponse, CustomCheckOut, CustomCheckVerifyBody
from ..storage import (
save_upload, delete_file, open_file,
FileTooLargeError, MimeNotAllowedError, StorageError, BASE_PATH,
)
router = APIRouter(prefix="/api/remission-practices", tags=["custom-checks"])
def _is_instructor(user: AuthUser) -> bool:
return user.role in ("ROLE_PRE_INSTRUCTOR", "ROLE_INSTRUCTOR_MANAGER", "ROLE_SUPER_ADMIN")
def _get_practice(db: Session, practice_id: UUID, user: AuthUser) -> RemissionPractice:
p = db.query(RemissionPractice).filter(RemissionPractice.id == practice_id).first()
if not p:
raise HTTPException(status_code=404, detail=f"Pratica {practice_id} non trovata")
# Autorizzazione base: beneficiario owner o istruttore
if user.is_beneficiary() and p.user_id != user.user_id:
raise HTTPException(status_code=403, detail="Accesso negato")
if not user.is_beneficiary() and not _is_instructor(user):
raise HTTPException(status_code=403, detail="Ruolo non autorizzato")
return p
def _can_declare(user: AuthUser, practice: RemissionPractice) -> bool:
"""Solo beneficiario owner e solo su DRAFT | AWAITING_AMENDMENT."""
if not user.is_beneficiary():
return False
if practice.user_id != user.user_id:
return False
return practice.status in ("DRAFT", "AWAITING_AMENDMENT")
def _can_verify(user: AuthUser, practice: RemissionPractice) -> bool:
if not _is_instructor(user):
return False
return practice.status in ("UNDER_REVIEW", "AWAITING_AMENDMENT")
def _schema_check_defs(practice: RemissionPractice) -> List[dict]:
return practice.schema_snapshot.get("custom_checks") or []
def _merge_check(definition: dict, value: Optional[RemissionCustomCheckValue]) -> dict:
out = {
"code": definition.get("code"),
"label": definition.get("label"),
"description": definition.get("description"),
"requires_document": bool(definition.get("requires_document", False)),
"required": bool(definition.get("required", False)),
# valori default
"beneficiary_declared": False,
"declared_at": None,
"filename_original": None,
"storage_path": None,
"size_bytes": None,
"document_uploaded_at": None,
"verification_status": "PENDING",
"verification_notes": None,
"verified_by": None,
"verified_at": None,
}
if value is not None:
out.update({
"beneficiary_declared": value.beneficiary_declared,
"declared_at": value.declared_at,
"storage_path": value.storage_path,
"size_bytes": value.size_bytes,
"document_uploaded_at": value.document_uploaded_at,
"verification_status": value.verification_status,
"verification_notes": value.verification_notes,
"verified_by": value.verified_by,
"verified_at": value.verified_at,
})
# filename originale ricostruito dal path (dopo il sha12-)
if value.storage_path:
basename = Path(value.storage_path).name
# formato: <sha12>-<original>
parts = basename.split("-", 1)
out["filename_original"] = parts[1] if len(parts) == 2 else basename
return out
def _get_or_create_value(db: Session, practice_id: UUID, code: str) -> RemissionCustomCheckValue:
v = db.query(RemissionCustomCheckValue).filter(
RemissionCustomCheckValue.practice_id == practice_id,
RemissionCustomCheckValue.check_code == code,
).first()
if not v:
v = RemissionCustomCheckValue(practice_id=practice_id, check_code=code)
db.add(v)
db.flush()
return v
# ---------- endpoints ----------
@router.get("/{practice_id}/custom-checks", response_model=ApiResponse)
def list_custom_checks(practice_id: UUID, db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user)):
"""Ritorna i custom_checks della pratica: schema definition + valori correnti."""
p = _get_practice(db, practice_id, user)
defs = _schema_check_defs(p)
values_by_code = {v.check_code: v for v in p.custom_checks}
out = [_merge_check(d, values_by_code.get(d.get("code"))) for d in defs]
return ApiResponse(data={"custom_checks": out})
@router.put("/{practice_id}/custom-checks/{code}/declare", response_model=ApiResponse)
async def declare_custom_check(
practice_id: UUID,
code: str,
beneficiary_declared: bool = Form(...),
file: Optional[UploadFile] = File(None),
db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user),
):
"""Beneficiario dichiara il check (bool) e opzionalmente allega un documento.
Se requires_document=true nello schema, l'upload e raccomandato ma non imposto
lato server (la required-ness e un gate su /submit)."""
p = _get_practice(db, practice_id, user)
if not _can_declare(user, p):
raise HTTPException(
status_code=403,
detail="Solo beneficiario owner su pratica DRAFT o AWAITING_AMENDMENT puo dichiarare"
)
defs = {d["code"]: d for d in _schema_check_defs(p)}
if code not in defs:
raise HTTPException(status_code=404, detail=f"Custom check '{code}' non definito nello schema")
v = _get_or_create_value(db, p.id, code)
v.beneficiary_declared = bool(beneficiary_declared)
v.declared_at = datetime.now(timezone.utc) if beneficiary_declared else None
# Se arriva un file sostituisce l'eventuale esistente
if file is not None and file.filename:
try:
# path custom_checks/<practice_id>/<code>/<sha12>-<name> — sfrutto storage_adapter
# con entity_type "document" fittizio e un app_id = practice_id (sfrutto la dir)
# In alternativa faccio path custom scrivendolo direttamente qui.
# Scelgo via diretta per evitare collisione con document reale.
from hashlib import sha256
size = 0
hasher = sha256()
content = b""
while True:
chunk = await file.read(65536)
if not chunk:
break
content += chunk
size += len(chunk)
if size > 15 * 1024 * 1024:
raise HTTPException(status_code=413, detail="File troppo grande (max 15 MB)")
hasher.update(chunk)
mime = (file.content_type or "").lower().split(";")[0].strip()
if mime not in ("application/pdf", "image/jpeg", "image/png"):
raise HTTPException(
status_code=415,
detail=f"MIME non consentito: {mime}. Accettati: pdf, jpeg, png"
)
digest = hasher.hexdigest()
# sanitize filename
safe = "".join(c if (c.isalnum() or c in "-_.() ") else "_" for c in file.filename).strip().replace(" ", "_")
if len(safe) > 120:
root, ext = os.path.splitext(safe)
safe = root[:120 - len(ext)] + ext
target_dir = BASE_PATH / "custom_checks" / str(p.id) / code
target_dir.mkdir(parents=True, exist_ok=True)
final_path = target_dir / f"{digest[:12]}-{safe}"
final_path.write_bytes(content)
# Rimuovi eventuale file precedente (path diverso)
if v.storage_path and Path(BASE_PATH / v.storage_path) != final_path:
try:
delete_file(v.storage_path)
except Exception:
pass
v.storage_path = str(final_path.relative_to(BASE_PATH))
v.mime = mime
v.size_bytes = size
v.sha256 = digest
v.document_uploaded_at = datetime.now(timezone.utc)
v.uploaded_by = user.user_id
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Errore upload: {e}")
# Reset eventuale validazione precedente (beneficiario ha cambiato qualcosa)
v.verification_status = "PENDING"
v.verification_notes = None
v.verified_by = None
v.verified_at = None
db.commit()
db.refresh(v)
defs_by_code = {d["code"]: d for d in _schema_check_defs(p)}
return ApiResponse(message="Check aggiornato", data=_merge_check(defs_by_code[code], v))
@router.delete("/{practice_id}/custom-checks/{code}/document", response_model=ApiResponse)
def delete_custom_check_document(
practice_id: UUID, code: str,
db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user),
):
"""Beneficiario rimuove il documento allegato (dichiarazione resta)."""
p = _get_practice(db, practice_id, user)
if not _can_declare(user, p):
raise HTTPException(status_code=403, detail="Non autorizzato")
v = db.query(RemissionCustomCheckValue).filter(
RemissionCustomCheckValue.practice_id == practice_id,
RemissionCustomCheckValue.check_code == code,
).first()
if not v or not v.storage_path:
return ApiResponse(message="Nessun documento da rimuovere")
try:
delete_file(v.storage_path)
except Exception:
pass
v.storage_path = None
v.mime = None
v.size_bytes = None
v.sha256 = None
v.document_uploaded_at = None
v.verification_status = "PENDING" # reset verify
v.verification_notes = None
db.commit()
return ApiResponse(message="Documento rimosso")
@router.put("/{practice_id}/custom-checks/{code}/verify", response_model=ApiResponse)
def verify_custom_check(
practice_id: UUID, code: str,
body: CustomCheckVerifyBody,
db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user),
):
"""Istruttore valida il check (VALIDO | NON_VALIDO | PENDING)."""
p = _get_practice(db, practice_id, user)
if not _can_verify(user, p):
raise HTTPException(status_code=403, detail="Solo istruttore su pratica in lavorazione")
if body.verification_status not in ("PENDING", "VALIDO", "NON_VALIDO"):
raise HTTPException(status_code=422, detail="verification_status non valido")
defs = {d["code"]: d for d in _schema_check_defs(p)}
if code not in defs:
raise HTTPException(status_code=404, detail=f"Check '{code}' non nello schema")
v = _get_or_create_value(db, p.id, code)
v.verification_status = body.verification_status
v.verification_notes = body.verification_notes
v.verified_by = user.user_id
v.verified_at = datetime.now(timezone.utc) if body.verification_status != "PENDING" else None
db.commit()
db.refresh(v)
return ApiResponse(message="Check verificato", data=_merge_check(defs[code], v))
@router.get("/{practice_id}/custom-checks/{code}/document")
def download_custom_check_document(
practice_id: UUID, code: str,
inline: int = 0,
db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user),
):
"""Download del documento allegato (stream con Content-Disposition)."""
from fastapi.responses import FileResponse
p = _get_practice(db, practice_id, user)
v = db.query(RemissionCustomCheckValue).filter(
RemissionCustomCheckValue.practice_id == practice_id,
RemissionCustomCheckValue.check_code == code,
).first()
if not v or not v.storage_path:
raise HTTPException(status_code=404, detail="Nessun documento allegato")
try:
abs_path = open_file(v.storage_path)
except FileNotFoundError:
raise HTTPException(status_code=410, detail="File non piu disponibile su storage")
basename = Path(v.storage_path).name
parts = basename.split("-", 1)
filename = parts[1] if len(parts) == 2 else basename
disp = "inline" if inline else "attachment"
return FileResponse(
path=str(abs_path),
media_type=v.mime or "application/octet-stream",
headers={"Content-Disposition": f'{disp}; filename="{filename}"'},
)