feat(amendment): ROUND 2 — scheduler + upload documenti

Seconda parte della replica soccorso istruttorio speculare al BE Gepafin.
Completata: scheduler cron (expire + reminder), upload documenti istruttore
e benef, fix duplicati config.

==SCHEDULER (app/scheduler.py NUOVO)==
APScheduler BackgroundScheduler integrato nel lifespan FastAPI.
Due cron attivi (timezone Europe/Rome):

  expire_amendments() - cron 01:05 ogni notte
    Speculare a ApplicationAmendmentScheduler.processAmendmentExpirationScheduler.
    Trova amendment AWAITING con deadline < today, passa a EXPIRED.
    Rimette pratica a UNDER_REVIEW se non ha altri amendment aperti.
    Ritorna dict stats per logging/test.

  queue_reminders() - cron 09:00 ogni mattina
    Speculare a ExpirationScheduler.processAmendmentExpiration (data-driven).
    Legge remission_expiration_config (type='AMENDMENT', interval_days=N),
    per ogni riga trova amendment con deadline esattamente today+N e setta
    pec_retry_after (marker che il BE vede via /internal pending-reminder).
    Multipli row = multipli reminder (seed: 7gg + 2gg).

Il microservizio aggiorna solo stato DB. L invio effettivo di email
reminder lo fa il BE Gepafin tramite polling, tenant-aware.

==UPLOAD DOCUMENTI==
3 nuovi endpoint nel router istruttoria:

  POST   /instructor/{pid}/amendment/{aid}/upload-document
    - Istruttore allega PDF al soccorso (motivazione, scheda tecnica).
    - Consentito in DRAFT o AWAITING. Sostituisce precedente se esiste.
    - Popola amendment_document_path + amendment_document_type.

  DELETE /instructor/{pid}/amendment/{aid}/upload-document
    - Rimuove allegato (solo in DRAFT).

  POST   /instructor/{pid}/amendment/{aid}/upload-response-document
    - Benef allega PDF come supporto alla risposta.
    - Consentito in AWAITING/RESPONSE_RECEIVED, solo proprietario.
    - Popola response_document_path + response_document_type.

Riusa save_upload() esistente con entity_type dedicati.

==FIX storage.py==
Whitelist entity_type estesa con 'amendment-instructor-doc' +
'amendment-response-doc' (prima accettava solo invoice/ula/document,
bloccando l'upload con StorageError).

==FIX migration dedup==
Scoperto in test: migration 8 faceva INSERT ON CONFLICT DO NOTHING su
remission_expiration_config ma senza UNIQUE constraint. Ogni restart
inseriva duplicati (16 righe in DB invece di 2). Fix in migration 9:
DELETE duplicati + ADD UNIQUE (type, interval_days) + re-seed pulito.

==REQUIREMENTS==
APScheduler==3.10.4

==TEST E2E==
/tmp/test_amendment_r2_fixed.py passa su tutto:
  [A] upload amendment_document istruttore + response_document benef + respond
  [B] amendment scaduto artificiale -> expire_amendments lo marca EXPIRED,
      pratica torna UNDER_REVIEW
  [C] amendment a +2gg e +7gg -> queue_reminders accoda 2 reminder,
      /internal pending-reminder li espone entrambi
This commit is contained in:
BFLOWS
2026-04-20 22:35:01 +02:00
parent da13ca7478
commit 34c4a47a1c
6 changed files with 281 additions and 4 deletions

View File

@@ -15,6 +15,7 @@ from sqlalchemy import text
from .config import get_settings
from .db import engine, Base
from .migrations import run_migrations
from .scheduler import start_scheduler, stop_scheduler
from .routers import health, schemas, practices, debug, instructor, files, verbale, custom_checks, assignment, internal
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
@@ -32,10 +33,12 @@ async def lifespan(app: FastAPI):
Base.metadata.create_all(bind=engine)
run_migrations(engine)
log.info(f"Schema '{settings.db_schema}' + tabelle + migrations OK")
start_scheduler()
except Exception as e:
log.error(f"Errore bootstrap DB: {e}")
raise
yield
stop_scheduler()
log.info("Shutdown rendicontazione-api")

View File

@@ -155,9 +155,29 @@ MIGRATIONS = [
CREATE INDEX IF NOT EXISTS idx_expiration_config_type
ON gepafin_rendic.remission_expiration_config(type)
WHERE is_deleted = false;
""",
# 2026-04-20 v5: dedup duplicati (ON CONFLICT DO NOTHING non funzionava senza UNIQUE)
# + aggiungo UNIQUE constraint per prevenire futuri duplicati
"""
DELETE FROM gepafin_rendic.remission_expiration_config ec
USING gepafin_rendic.remission_expiration_config ec2
WHERE ec.id > ec2.id
AND ec.type = ec2.type
AND ec.interval_days = ec2.interval_days;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'uq_expiration_config_type_days'
AND conrelid = 'gepafin_rendic.remission_expiration_config'::regclass
) THEN
ALTER TABLE gepafin_rendic.remission_expiration_config
ADD CONSTRAINT uq_expiration_config_type_days UNIQUE (type, interval_days);
END IF;
END$$;
INSERT INTO gepafin_rendic.remission_expiration_config (type, interval_days)
VALUES ('AMENDMENT', 7), ('AMENDMENT', 2)
ON CONFLICT DO NOTHING;
ON CONFLICT (type, interval_days) DO NOTHING;
""",
]

View File

@@ -6,12 +6,13 @@ from decimal import Decimal
from uuid import UUID
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.orm import Session
from sqlalchemy import text, or_, and_
from ..db import get_db
from ..auth import AuthUser, get_current_user
from ..storage import save_upload, FileTooLargeError, MimeNotAllowedError, StorageError
from ..models import RemissionPractice, RemissionAmendmentRequest
from ..schemas import (
AmendmentRequestCreate, AmendmentRequestUpdate, AmendmentExtend, AmendmentRequestOut, AmendmentResponseSubmit, AmendmentStatus,
@@ -384,6 +385,59 @@ def close_amendment(practice_id: UUID, amendment_id: UUID,
data=AmendmentRequestOut.model_validate(ar).model_dump(mode="json"))
@router.post("/{practice_id}/amendment/{amendment_id}/upload-document", response_model=ApiResponse)
async def upload_amendment_document(practice_id: UUID, amendment_id: UUID,
file: UploadFile = File(...),
db: Session = Depends(get_db),
user: AuthUser = Depends(_require_instructor)):
"""Allega documento dell'istruttore al soccorso (motivazione, scheda tecnica, ecc.).
Consentito in DRAFT o AWAITING. Sostituisce il precedente se esiste."""
ar = _amendment_or_404(db, practice_id, amendment_id)
if ar.status not in (AmendmentStatus.DRAFT.value, AmendmentStatus.AWAITING.value):
raise HTTPException(status_code=409,
detail=f"Upload consentito in DRAFT/AWAITING (attuale: {ar.status})")
p = _get_practice_or_404(db, practice_id)
try:
rel_path, size, digest, mime, safe_name = save_upload(
application_id=p.application_id,
entity_type="amendment-instructor-doc",
entity_id=ar.id,
file_obj=file.file,
original_filename=file.filename or "amendment.pdf",
content_type=file.content_type,
)
except FileTooLargeError as e:
raise HTTPException(status_code=413, detail=str(e))
except MimeNotAllowedError as e:
raise HTTPException(status_code=415, detail=str(e))
except StorageError as e:
raise HTTPException(status_code=500, detail=f"Errore storage: {e}")
ar.amendment_document_path = rel_path
ar.amendment_document_type = mime
db.commit()
db.refresh(ar)
return ApiResponse(message="Documento allegato al soccorso",
data={"amendment_id": str(ar.id), "path": rel_path,
"filename": safe_name, "size_bytes": size, "mime": mime})
@router.delete("/{practice_id}/amendment/{amendment_id}/upload-document", response_model=ApiResponse)
def delete_amendment_document(practice_id: UUID, amendment_id: UUID,
db: Session = Depends(get_db),
user: AuthUser = Depends(_require_instructor)):
"""Rimuove il documento istruttore allegato al soccorso (consentito solo in DRAFT)."""
ar = _amendment_or_404(db, practice_id, amendment_id)
if ar.status != AmendmentStatus.DRAFT.value:
raise HTTPException(status_code=409,
detail=f"Rimozione allegato consentita solo in DRAFT (attuale: {ar.status})")
ar.amendment_document_path = None
ar.amendment_document_type = None
db.commit()
return ApiResponse(message="Documento allegato rimosso",
data={"amendment_id": str(ar.id)})
# Endpoint beneficiario: visualizza amendments sulla sua pratica + risponde
@router.post("/{practice_id}/amendment/{amendment_id}/respond-beneficiary", response_model=ApiResponse)
def respond_amendment_beneficiary(practice_id: UUID, amendment_id: UUID,
@@ -556,3 +610,44 @@ def set_instructor_final_notes(practice_id: UUID, body: InstructorFinalNotesBody
db.refresh(p)
return ApiResponse(message="Verbale aggiornato",
data=PracticeOut.model_validate(p).model_dump(mode="json"))
@router.post("/{practice_id}/amendment/{amendment_id}/upload-response-document", response_model=ApiResponse)
async def upload_response_document(practice_id: UUID, amendment_id: UUID,
file: UploadFile = File(...),
db: Session = Depends(get_db),
user: AuthUser = Depends(get_current_user)):
"""Beneficiario allega un documento come supporto alla sua risposta al soccorso.
Consentito su amendment in stato AWAITING, solo dal proprietario pratica."""
p = _get_practice_or_404(db, practice_id)
if user.is_beneficiary() and p.user_id != user.user_id:
raise HTTPException(status_code=403, detail="Non sei il proprietario della pratica")
ar = _amendment_or_404(db, practice_id, amendment_id)
if ar.status not in (AmendmentStatus.AWAITING.value, AmendmentStatus.RESPONSE_RECEIVED.value):
raise HTTPException(status_code=409,
detail=f"Upload risposta consentito solo in AWAITING/RESPONSE_RECEIVED (attuale: {ar.status})")
try:
rel_path, size, digest, mime, safe_name = save_upload(
application_id=p.application_id,
entity_type="amendment-response-doc",
entity_id=ar.id,
file_obj=file.file,
original_filename=file.filename or "response.pdf",
content_type=file.content_type,
)
except FileTooLargeError as e:
raise HTTPException(status_code=413, detail=str(e))
except MimeNotAllowedError as e:
raise HTTPException(status_code=415, detail=str(e))
except StorageError as e:
raise HTTPException(status_code=500, detail=f"Errore storage: {e}")
ar.response_document_path = rel_path
ar.response_document_type = mime
db.commit()
db.refresh(ar)
return ApiResponse(message="Documento risposta allegato",
data={"amendment_id": str(ar.id), "path": rel_path,
"filename": safe_name, "size_bytes": size, "mime": mime})

158
app/scheduler.py Normal file
View File

@@ -0,0 +1,158 @@
"""Scheduler per lifecycle amendment.
Due cron job attivi:
- expire_amendments(): ogni giorno 01:05 — trova amendment AWAITING con
deadline < today, le passa a EXPIRED, rimette pratica a UNDER_REVIEW
se non ci sono altri amendment aperti. Equivalente a
ApplicationAmendmentScheduler.processAmendmentExpirationScheduler del BE.
- queue_reminders(): ogni giorno 09:00 — legge remission_expiration_config
(type='AMENDMENT', interval_days=N), trova amendment AWAITING con
deadline == today + N giorni, setta flag reminder_queued_at sul
record. Equivalente a ExpirationScheduler.processAmendmentExpiration
del BE (data-driven da expiration_config).
Le notifiche effettive (PEC reminder benef + email interna istruttore) le
invia il BE Gepafin via polling sui nostri endpoint /internal. Il
microservizio resta sender-agnostico.
"""
import logging
from datetime import datetime, timezone, date, timedelta
from typing import Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy.orm import Session
from sqlalchemy import text
from .db import SessionLocal
from .models import RemissionAmendmentRequest, RemissionPractice, RemissionExpirationConfig
log = logging.getLogger("rendicontazione-api.scheduler")
def expire_amendments() -> dict:
"""Espira amendment AWAITING con deadline passata.
Ritorna dict {expired_count, practices_reopened} per logging/test."""
db: Session = SessionLocal()
today = date.today()
stats = {"expired_count": 0, "practices_reopened": 0}
try:
expired_ars = db.query(RemissionAmendmentRequest).filter(
RemissionAmendmentRequest.status == "AWAITING",
RemissionAmendmentRequest.deadline < today,
).all()
practice_ids_to_check = set()
for ar in expired_ars:
ar.status = "EXPIRED"
practice_ids_to_check.add(ar.practice_id)
stats["expired_count"] += 1
log.info(f"Amendment {ar.id} EXPIRED (deadline era {ar.deadline})")
db.flush()
for pid in practice_ids_to_check:
p = db.query(RemissionPractice).filter(RemissionPractice.id == pid).first()
if not p:
continue
others_open = [
a for a in p.amendment_requests
if a.status in ("DRAFT", "AWAITING", "RESPONSE_RECEIVED")
]
if not others_open and p.status == "AWAITING_AMENDMENT":
p.status = "UNDER_REVIEW"
stats["practices_reopened"] += 1
log.info(f"Pratica {pid} ritornata a UNDER_REVIEW (amendment scaduto, nessun altro aperto)")
db.commit()
log.info(f"expire_amendments: {stats}")
except Exception as e:
db.rollback()
log.error(f"expire_amendments FAILED: {e}", exc_info=True)
raise
finally:
db.close()
return stats
def queue_reminders() -> dict:
"""Legge config data-driven, per ogni interval_days trova amendment
con deadline == today + N, scrive flag reminder_queued_at sull'amendment.
Il BE vedra questi amendment come pending-reminder via /internal.
Usiamo campo pec_retry_after come marker unificato (gia presente):
- NULL: nessun reminder accodato
- timestamp: reminder accodato in questo momento, BE invia al prossimo poll
Ritorna dict {reminders_queued_by_interval}."""
db: Session = SessionLocal()
today = date.today()
stats = {"reminders_queued": 0, "by_interval": {}}
try:
configs = db.query(RemissionExpirationConfig).filter(
RemissionExpirationConfig.type == "AMENDMENT",
RemissionExpirationConfig.is_deleted.is_(False),
).all()
for cfg in configs:
target_deadline = today + timedelta(days=cfg.interval_days)
ars = db.query(RemissionAmendmentRequest).filter(
RemissionAmendmentRequest.status == "AWAITING",
RemissionAmendmentRequest.deadline == target_deadline,
RemissionAmendmentRequest.pec_sent_at.isnot(None), # solo se gia inviata PEC iniziale
# evito di ri-accodare se gia accodato oggi
RemissionAmendmentRequest.pec_retry_after.is_(None),
).all()
for ar in ars:
ar.pec_retry_after = datetime.now(timezone.utc)
stats["reminders_queued"] += 1
log.info(f"Amendment {ar.id} reminder accodato ({cfg.interval_days}gg alla scadenza)")
stats["by_interval"][cfg.interval_days] = len(ars)
db.commit()
log.info(f"queue_reminders: {stats}")
except Exception as e:
db.rollback()
log.error(f"queue_reminders FAILED: {e}", exc_info=True)
raise
finally:
db.close()
return stats
_scheduler: Optional[BackgroundScheduler] = None
def start_scheduler():
"""Avvia BackgroundScheduler con i 2 cron. Chiamato in lifespan FastAPI."""
global _scheduler
if _scheduler is not None:
log.warning("start_scheduler chiamato due volte, skip")
return _scheduler
_scheduler = BackgroundScheduler(timezone="Europe/Rome")
# expire ogni notte 01:05 (dopo midnight, sicuro che today e cambiato)
_scheduler.add_job(
expire_amendments, CronTrigger(hour=1, minute=5),
id="expire_amendments", replace_existing=True, misfire_grace_time=3600,
)
# reminder ogni mattina 09:00
_scheduler.add_job(
queue_reminders, CronTrigger(hour=9, minute=0),
id="queue_reminders", replace_existing=True, misfire_grace_time=3600,
)
_scheduler.start()
log.info("Scheduler avviato: expire_amendments 01:05 + queue_reminders 09:00 (Europe/Rome)")
return _scheduler
def stop_scheduler():
global _scheduler
if _scheduler is not None:
_scheduler.shutdown(wait=False)
_scheduler = None
log.info("Scheduler fermato")

View File

@@ -48,7 +48,7 @@ def _safe_filename(name: str, max_len: int = 120) -> str:
def save_upload(
application_id: int,
entity_type: str, # invoice | ula | document
entity_type: str, # invoice | ula | document | amendment-instructor-doc | amendment-response-doc
entity_id: UUID,
file_obj: BinaryIO,
original_filename: str,
@@ -62,7 +62,7 @@ def save_upload(
- mime in ALLOWED_MIMES (usa content_type del client, fallback su estensione)
- dimensione <= MAX_SIZE_BYTES
"""
if entity_type not in ("invoice", "ula", "document"):
if entity_type not in ("invoice", "ula", "document", "amendment-instructor-doc", "amendment-response-doc"):
raise StorageError(f"entity_type non valido: {entity_type}")
safe_name = _safe_filename(original_filename)

View File

@@ -9,3 +9,4 @@ python-multipart==0.0.9
weasyprint==61.2
pydyf==0.10.0
jinja2==3.1.3
APScheduler==3.10.4