A4 /mine + /start + /copy-ula-options (practices.py):
- GET /mine raggruppa per application_id, ogni app ha:
tranches[], max_tranches, can_start_new, start_blocked_reason,
already_approved_sum, max_remission_global, max_remission_next_tranche
- POST /start valida: count<max, last terminale, residuo>0 -> 400 con detail parlante
- Bulk copy ULA da tranche N-1 se copy_ula_from_previous=true (reset verification_*)
- Legge suggested_instructor da gepafin_schema.assigned_applications (solo tranche 1)
- upgrade_schema_to_v2 al snapshot per allineare a v2 schemi vecchi
- GET /{id}/copy-ula-options: preview ULA tranche N-1 per pre-fill FE
A5 custom_checks.py (nuovo router):
- GET /{id}/custom-checks merge definition+values con defaults
- PUT /.../declare (beneficiary form-data + optional file upload 15MB, PDF/JPG/PNG)
storage dedicato /var/uploads/custom_checks/{practice_id}/{code}/<sha12>-file
- DELETE /.../document (beneficiary) reset metadata + cleanup FS
- PUT /.../verify (istruttore) VALIDO/NON_VALIDO/PENDING + notes
- GET /.../document?inline=0|1 stream con Content-Disposition
- Matrix autorizzazioni: declare solo benef su DRAFT/AWAITING, verify solo istruttore
A6 assignment.py (nuovo router, manager view):
- GET /instructor-manager/assignments: pratiche attive con suggested+assigned+is_unassigned
- GET /instructor-manager/instructors: elenco PRE_INSTRUCTOR+MANAGER per dropdown riassegna
- POST /instructor/{id}/reassign: cambio assigned_instructor_id + audit log in
instructor_checklist.reassignment_log [{at,by_user_id,from,to,reason}]
- Solo ROLE_INSTRUCTOR_MANAGER + ROLE_SUPER_ADMIN
Test curl E2E tutti passati: tranche 2 creata con copy 2 ULA, check dichiarati+verificati,
download PDF polizza OK, reassign con audit log scritto.
320 lines
13 KiB
Python
320 lines
13 KiB
Python
"""
|
|
Endpoint custom_checks v2: dichiarazione beneficiario + documento opzionale + verifica istruttore.
|
|
|
|
Merge definition (da schema_snapshot.custom_checks[]) + value (RemissionCustomCheckValue).
|
|
Path storage custom_checks: /var/uploads/custom_checks/{practice_id}/{code}/<sha12>-file.pdf
|
|
(fuori dal pattern invoice/ula/document per isolarli — non confondibili con allegati fattura/LUL).
|
|
"""
|
|
import io
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional, Literal
|
|
from uuid import UUID
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
|
|
from sqlalchemy.orm import Session
|
|
|
|
from ..db import get_db
|
|
from ..auth import AuthUser, get_current_user
|
|
from ..models import RemissionPractice, RemissionCustomCheckValue
|
|
from ..schemas import ApiResponse, CustomCheckOut, CustomCheckVerifyBody
|
|
from ..storage import (
|
|
save_upload, delete_file, open_file,
|
|
FileTooLargeError, MimeNotAllowedError, StorageError, BASE_PATH,
|
|
)
|
|
|
|
router = APIRouter(prefix="/api/remission-practices", tags=["custom-checks"])
|
|
|
|
|
|
def _is_instructor(user: AuthUser) -> bool:
|
|
return user.role in ("ROLE_PRE_INSTRUCTOR", "ROLE_INSTRUCTOR_MANAGER", "ROLE_SUPER_ADMIN")
|
|
|
|
|
|
def _get_practice(db: Session, practice_id: UUID, user: AuthUser) -> RemissionPractice:
|
|
p = db.query(RemissionPractice).filter(RemissionPractice.id == practice_id).first()
|
|
if not p:
|
|
raise HTTPException(status_code=404, detail=f"Pratica {practice_id} non trovata")
|
|
# Autorizzazione base: beneficiario owner o istruttore
|
|
if user.is_beneficiary() and p.user_id != user.user_id:
|
|
raise HTTPException(status_code=403, detail="Accesso negato")
|
|
if not user.is_beneficiary() and not _is_instructor(user):
|
|
raise HTTPException(status_code=403, detail="Ruolo non autorizzato")
|
|
return p
|
|
|
|
|
|
def _can_declare(user: AuthUser, practice: RemissionPractice) -> bool:
|
|
"""Solo beneficiario owner e solo su DRAFT | AWAITING_AMENDMENT."""
|
|
if not user.is_beneficiary():
|
|
return False
|
|
if practice.user_id != user.user_id:
|
|
return False
|
|
return practice.status in ("DRAFT", "AWAITING_AMENDMENT")
|
|
|
|
|
|
def _can_verify(user: AuthUser, practice: RemissionPractice) -> bool:
|
|
if not _is_instructor(user):
|
|
return False
|
|
return practice.status in ("UNDER_REVIEW", "AWAITING_AMENDMENT")
|
|
|
|
|
|
def _schema_check_defs(practice: RemissionPractice) -> List[dict]:
|
|
return practice.schema_snapshot.get("custom_checks") or []
|
|
|
|
|
|
def _merge_check(definition: dict, value: Optional[RemissionCustomCheckValue]) -> dict:
|
|
out = {
|
|
"code": definition.get("code"),
|
|
"label": definition.get("label"),
|
|
"description": definition.get("description"),
|
|
"requires_document": bool(definition.get("requires_document", False)),
|
|
"required": bool(definition.get("required", False)),
|
|
# valori default
|
|
"beneficiary_declared": False,
|
|
"declared_at": None,
|
|
"filename_original": None,
|
|
"storage_path": None,
|
|
"size_bytes": None,
|
|
"document_uploaded_at": None,
|
|
"verification_status": "PENDING",
|
|
"verification_notes": None,
|
|
"verified_by": None,
|
|
"verified_at": None,
|
|
}
|
|
if value is not None:
|
|
out.update({
|
|
"beneficiary_declared": value.beneficiary_declared,
|
|
"declared_at": value.declared_at,
|
|
"storage_path": value.storage_path,
|
|
"size_bytes": value.size_bytes,
|
|
"document_uploaded_at": value.document_uploaded_at,
|
|
"verification_status": value.verification_status,
|
|
"verification_notes": value.verification_notes,
|
|
"verified_by": value.verified_by,
|
|
"verified_at": value.verified_at,
|
|
})
|
|
# filename originale ricostruito dal path (dopo il sha12-)
|
|
if value.storage_path:
|
|
basename = Path(value.storage_path).name
|
|
# formato: <sha12>-<original>
|
|
parts = basename.split("-", 1)
|
|
out["filename_original"] = parts[1] if len(parts) == 2 else basename
|
|
return out
|
|
|
|
|
|
def _get_or_create_value(db: Session, practice_id: UUID, code: str) -> RemissionCustomCheckValue:
|
|
v = db.query(RemissionCustomCheckValue).filter(
|
|
RemissionCustomCheckValue.practice_id == practice_id,
|
|
RemissionCustomCheckValue.check_code == code,
|
|
).first()
|
|
if not v:
|
|
v = RemissionCustomCheckValue(practice_id=practice_id, check_code=code)
|
|
db.add(v)
|
|
db.flush()
|
|
return v
|
|
|
|
|
|
# ---------- endpoints ----------
|
|
|
|
@router.get("/{practice_id}/custom-checks", response_model=ApiResponse)
|
|
def list_custom_checks(practice_id: UUID, db: Session = Depends(get_db),
|
|
user: AuthUser = Depends(get_current_user)):
|
|
"""Ritorna i custom_checks della pratica: schema definition + valori correnti."""
|
|
p = _get_practice(db, practice_id, user)
|
|
defs = _schema_check_defs(p)
|
|
values_by_code = {v.check_code: v for v in p.custom_checks}
|
|
out = [_merge_check(d, values_by_code.get(d.get("code"))) for d in defs]
|
|
return ApiResponse(data={"custom_checks": out})
|
|
|
|
|
|
@router.put("/{practice_id}/custom-checks/{code}/declare", response_model=ApiResponse)
|
|
async def declare_custom_check(
|
|
practice_id: UUID,
|
|
code: str,
|
|
beneficiary_declared: bool = Form(...),
|
|
file: Optional[UploadFile] = File(None),
|
|
db: Session = Depends(get_db),
|
|
user: AuthUser = Depends(get_current_user),
|
|
):
|
|
"""Beneficiario dichiara il check (bool) e opzionalmente allega un documento.
|
|
Se requires_document=true nello schema, l'upload e raccomandato ma non imposto
|
|
lato server (la required-ness e un gate su /submit)."""
|
|
p = _get_practice(db, practice_id, user)
|
|
if not _can_declare(user, p):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Solo beneficiario owner su pratica DRAFT o AWAITING_AMENDMENT puo dichiarare"
|
|
)
|
|
|
|
defs = {d["code"]: d for d in _schema_check_defs(p)}
|
|
if code not in defs:
|
|
raise HTTPException(status_code=404, detail=f"Custom check '{code}' non definito nello schema")
|
|
|
|
v = _get_or_create_value(db, p.id, code)
|
|
v.beneficiary_declared = bool(beneficiary_declared)
|
|
v.declared_at = datetime.now(timezone.utc) if beneficiary_declared else None
|
|
|
|
# Se arriva un file sostituisce l'eventuale esistente
|
|
if file is not None and file.filename:
|
|
try:
|
|
# path custom_checks/<practice_id>/<code>/<sha12>-<name> — sfrutto storage_adapter
|
|
# con entity_type "document" fittizio e un app_id = practice_id (sfrutto la dir)
|
|
# In alternativa faccio path custom scrivendolo direttamente qui.
|
|
# Scelgo via diretta per evitare collisione con document reale.
|
|
from hashlib import sha256
|
|
size = 0
|
|
hasher = sha256()
|
|
content = b""
|
|
while True:
|
|
chunk = await file.read(65536)
|
|
if not chunk:
|
|
break
|
|
content += chunk
|
|
size += len(chunk)
|
|
if size > 15 * 1024 * 1024:
|
|
raise HTTPException(status_code=413, detail="File troppo grande (max 15 MB)")
|
|
hasher.update(chunk)
|
|
|
|
mime = (file.content_type or "").lower().split(";")[0].strip()
|
|
if mime not in ("application/pdf", "image/jpeg", "image/png"):
|
|
raise HTTPException(
|
|
status_code=415,
|
|
detail=f"MIME non consentito: {mime}. Accettati: pdf, jpeg, png"
|
|
)
|
|
|
|
digest = hasher.hexdigest()
|
|
# sanitize filename
|
|
safe = "".join(c if (c.isalnum() or c in "-_.() ") else "_" for c in file.filename).strip().replace(" ", "_")
|
|
if len(safe) > 120:
|
|
root, ext = os.path.splitext(safe)
|
|
safe = root[:120 - len(ext)] + ext
|
|
|
|
target_dir = BASE_PATH / "custom_checks" / str(p.id) / code
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
final_path = target_dir / f"{digest[:12]}-{safe}"
|
|
final_path.write_bytes(content)
|
|
|
|
# Rimuovi eventuale file precedente (path diverso)
|
|
if v.storage_path and Path(BASE_PATH / v.storage_path) != final_path:
|
|
try:
|
|
delete_file(v.storage_path)
|
|
except Exception:
|
|
pass
|
|
|
|
v.storage_path = str(final_path.relative_to(BASE_PATH))
|
|
v.mime = mime
|
|
v.size_bytes = size
|
|
v.sha256 = digest
|
|
v.document_uploaded_at = datetime.now(timezone.utc)
|
|
v.uploaded_by = user.user_id
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Errore upload: {e}")
|
|
|
|
# Reset eventuale validazione precedente (beneficiario ha cambiato qualcosa)
|
|
v.verification_status = "PENDING"
|
|
v.verification_notes = None
|
|
v.verified_by = None
|
|
v.verified_at = None
|
|
|
|
db.commit()
|
|
db.refresh(v)
|
|
|
|
defs_by_code = {d["code"]: d for d in _schema_check_defs(p)}
|
|
return ApiResponse(message="Check aggiornato", data=_merge_check(defs_by_code[code], v))
|
|
|
|
|
|
@router.delete("/{practice_id}/custom-checks/{code}/document", response_model=ApiResponse)
|
|
def delete_custom_check_document(
|
|
practice_id: UUID, code: str,
|
|
db: Session = Depends(get_db),
|
|
user: AuthUser = Depends(get_current_user),
|
|
):
|
|
"""Beneficiario rimuove il documento allegato (dichiarazione resta)."""
|
|
p = _get_practice(db, practice_id, user)
|
|
if not _can_declare(user, p):
|
|
raise HTTPException(status_code=403, detail="Non autorizzato")
|
|
v = db.query(RemissionCustomCheckValue).filter(
|
|
RemissionCustomCheckValue.practice_id == practice_id,
|
|
RemissionCustomCheckValue.check_code == code,
|
|
).first()
|
|
if not v or not v.storage_path:
|
|
return ApiResponse(message="Nessun documento da rimuovere")
|
|
|
|
try:
|
|
delete_file(v.storage_path)
|
|
except Exception:
|
|
pass
|
|
v.storage_path = None
|
|
v.mime = None
|
|
v.size_bytes = None
|
|
v.sha256 = None
|
|
v.document_uploaded_at = None
|
|
v.verification_status = "PENDING" # reset verify
|
|
v.verification_notes = None
|
|
db.commit()
|
|
return ApiResponse(message="Documento rimosso")
|
|
|
|
|
|
@router.put("/{practice_id}/custom-checks/{code}/verify", response_model=ApiResponse)
|
|
def verify_custom_check(
|
|
practice_id: UUID, code: str,
|
|
body: CustomCheckVerifyBody,
|
|
db: Session = Depends(get_db),
|
|
user: AuthUser = Depends(get_current_user),
|
|
):
|
|
"""Istruttore valida il check (VALIDO | NON_VALIDO | PENDING)."""
|
|
p = _get_practice(db, practice_id, user)
|
|
if not _can_verify(user, p):
|
|
raise HTTPException(status_code=403, detail="Solo istruttore su pratica in lavorazione")
|
|
if body.verification_status not in ("PENDING", "VALIDO", "NON_VALIDO"):
|
|
raise HTTPException(status_code=422, detail="verification_status non valido")
|
|
|
|
defs = {d["code"]: d for d in _schema_check_defs(p)}
|
|
if code not in defs:
|
|
raise HTTPException(status_code=404, detail=f"Check '{code}' non nello schema")
|
|
|
|
v = _get_or_create_value(db, p.id, code)
|
|
v.verification_status = body.verification_status
|
|
v.verification_notes = body.verification_notes
|
|
v.verified_by = user.user_id
|
|
v.verified_at = datetime.now(timezone.utc) if body.verification_status != "PENDING" else None
|
|
db.commit()
|
|
db.refresh(v)
|
|
return ApiResponse(message="Check verificato", data=_merge_check(defs[code], v))
|
|
|
|
|
|
@router.get("/{practice_id}/custom-checks/{code}/document")
|
|
def download_custom_check_document(
|
|
practice_id: UUID, code: str,
|
|
inline: int = 0,
|
|
db: Session = Depends(get_db),
|
|
user: AuthUser = Depends(get_current_user),
|
|
):
|
|
"""Download del documento allegato (stream con Content-Disposition)."""
|
|
from fastapi.responses import FileResponse
|
|
p = _get_practice(db, practice_id, user)
|
|
v = db.query(RemissionCustomCheckValue).filter(
|
|
RemissionCustomCheckValue.practice_id == practice_id,
|
|
RemissionCustomCheckValue.check_code == code,
|
|
).first()
|
|
if not v or not v.storage_path:
|
|
raise HTTPException(status_code=404, detail="Nessun documento allegato")
|
|
|
|
try:
|
|
abs_path = open_file(v.storage_path)
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=410, detail="File non piu disponibile su storage")
|
|
|
|
basename = Path(v.storage_path).name
|
|
parts = basename.split("-", 1)
|
|
filename = parts[1] if len(parts) == 2 else basename
|
|
|
|
disp = "inline" if inline else "attachment"
|
|
return FileResponse(
|
|
path=str(abs_path),
|
|
media_type=v.mime or "application/octet-stream",
|
|
headers={"Content-Disposition": f'{disp}; filename="{filename}"'},
|
|
)
|