Files
BFLOWS 34c4a47a1c feat(amendment): ROUND 2 — scheduler + upload documenti
Seconda parte della replica soccorso istruttorio speculare al BE Gepafin.
Completata: scheduler cron (expire + reminder), upload documenti istruttore
e benef, fix duplicati config.

==SCHEDULER (app/scheduler.py NUOVO)==
APScheduler BackgroundScheduler integrato nel lifespan FastAPI.
Due cron attivi (timezone Europe/Rome):

  expire_amendments() - cron 01:05 ogni notte
    Speculare a ApplicationAmendmentScheduler.processAmendmentExpirationScheduler.
    Trova amendment AWAITING con deadline < today, passa a EXPIRED.
    Rimette pratica a UNDER_REVIEW se non ha altri amendment aperti.
    Ritorna dict stats per logging/test.

  queue_reminders() - cron 09:00 ogni mattina
    Speculare a ExpirationScheduler.processAmendmentExpiration (data-driven).
    Legge remission_expiration_config (type='AMENDMENT', interval_days=N),
    per ogni riga trova amendment con deadline esattamente today+N e setta
    pec_retry_after (marker che il BE vede via /internal pending-reminder).
    Multipli row = multipli reminder (seed: 7gg + 2gg).

Il microservizio aggiorna solo stato DB. L invio effettivo di email
reminder lo fa il BE Gepafin tramite polling, tenant-aware.

==UPLOAD DOCUMENTI==
3 nuovi endpoint nel router istruttoria:

  POST   /instructor/{pid}/amendment/{aid}/upload-document
    - Istruttore allega PDF al soccorso (motivazione, scheda tecnica).
    - Consentito in DRAFT o AWAITING. Sostituisce precedente se esiste.
    - Popola amendment_document_path + amendment_document_type.

  DELETE /instructor/{pid}/amendment/{aid}/upload-document
    - Rimuove allegato (solo in DRAFT).

  POST   /instructor/{pid}/amendment/{aid}/upload-response-document
    - Benef allega PDF come supporto alla risposta.
    - Consentito in AWAITING/RESPONSE_RECEIVED, solo proprietario.
    - Popola response_document_path + response_document_type.

Riusa save_upload() esistente con entity_type dedicati.

==FIX storage.py==
Whitelist entity_type estesa con 'amendment-instructor-doc' +
'amendment-response-doc' (prima accettava solo invoice/ula/document,
bloccando l'upload con StorageError).

==FIX migration dedup==
Scoperto in test: migration 8 faceva INSERT ON CONFLICT DO NOTHING su
remission_expiration_config ma senza UNIQUE constraint. Ogni restart
inseriva duplicati (16 righe in DB invece di 2). Fix in migration 9:
DELETE duplicati + ADD UNIQUE (type, interval_days) + re-seed pulito.

==REQUIREMENTS==
APScheduler==3.10.4

==TEST E2E==
/tmp/test_amendment_r2_fixed.py passa su tutto:
  [A] upload amendment_document istruttore + response_document benef + respond
  [B] amendment scaduto artificiale -> expire_amendments lo marca EXPIRED,
      pratica torna UNDER_REVIEW
  [C] amendment a +2gg e +7gg -> queue_reminders accoda 2 reminder,
      /internal pending-reminder li espone entrambi
2026-04-20 22:35:01 +02:00

159 lines
5.9 KiB
Python

"""Scheduler per lifecycle amendment.
Due cron job attivi:
- expire_amendments(): ogni giorno 01:05 — trova amendment AWAITING con
deadline < today, le passa a EXPIRED, rimette pratica a UNDER_REVIEW
se non ci sono altri amendment aperti. Equivalente a
ApplicationAmendmentScheduler.processAmendmentExpirationScheduler del BE.
- queue_reminders(): ogni giorno 09:00 — legge remission_expiration_config
(type='AMENDMENT', interval_days=N), trova amendment AWAITING con
deadline == today + N giorni, setta flag reminder_queued_at sul
record. Equivalente a ExpirationScheduler.processAmendmentExpiration
del BE (data-driven da expiration_config).
Le notifiche effettive (PEC reminder benef + email interna istruttore) le
invia il BE Gepafin via polling sui nostri endpoint /internal. Il
microservizio resta sender-agnostico.
"""
import logging
from datetime import datetime, timezone, date, timedelta
from typing import Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy.orm import Session
from sqlalchemy import text
from .db import SessionLocal
from .models import RemissionAmendmentRequest, RemissionPractice, RemissionExpirationConfig
log = logging.getLogger("rendicontazione-api.scheduler")
def expire_amendments() -> dict:
"""Espira amendment AWAITING con deadline passata.
Ritorna dict {expired_count, practices_reopened} per logging/test."""
db: Session = SessionLocal()
today = date.today()
stats = {"expired_count": 0, "practices_reopened": 0}
try:
expired_ars = db.query(RemissionAmendmentRequest).filter(
RemissionAmendmentRequest.status == "AWAITING",
RemissionAmendmentRequest.deadline < today,
).all()
practice_ids_to_check = set()
for ar in expired_ars:
ar.status = "EXPIRED"
practice_ids_to_check.add(ar.practice_id)
stats["expired_count"] += 1
log.info(f"Amendment {ar.id} EXPIRED (deadline era {ar.deadline})")
db.flush()
for pid in practice_ids_to_check:
p = db.query(RemissionPractice).filter(RemissionPractice.id == pid).first()
if not p:
continue
others_open = [
a for a in p.amendment_requests
if a.status in ("DRAFT", "AWAITING", "RESPONSE_RECEIVED")
]
if not others_open and p.status == "AWAITING_AMENDMENT":
p.status = "UNDER_REVIEW"
stats["practices_reopened"] += 1
log.info(f"Pratica {pid} ritornata a UNDER_REVIEW (amendment scaduto, nessun altro aperto)")
db.commit()
log.info(f"expire_amendments: {stats}")
except Exception as e:
db.rollback()
log.error(f"expire_amendments FAILED: {e}", exc_info=True)
raise
finally:
db.close()
return stats
def queue_reminders() -> dict:
"""Legge config data-driven, per ogni interval_days trova amendment
con deadline == today + N, scrive flag reminder_queued_at sull'amendment.
Il BE vedra questi amendment come pending-reminder via /internal.
Usiamo campo pec_retry_after come marker unificato (gia presente):
- NULL: nessun reminder accodato
- timestamp: reminder accodato in questo momento, BE invia al prossimo poll
Ritorna dict {reminders_queued_by_interval}."""
db: Session = SessionLocal()
today = date.today()
stats = {"reminders_queued": 0, "by_interval": {}}
try:
configs = db.query(RemissionExpirationConfig).filter(
RemissionExpirationConfig.type == "AMENDMENT",
RemissionExpirationConfig.is_deleted.is_(False),
).all()
for cfg in configs:
target_deadline = today + timedelta(days=cfg.interval_days)
ars = db.query(RemissionAmendmentRequest).filter(
RemissionAmendmentRequest.status == "AWAITING",
RemissionAmendmentRequest.deadline == target_deadline,
RemissionAmendmentRequest.pec_sent_at.isnot(None), # solo se gia inviata PEC iniziale
# evito di ri-accodare se gia accodato oggi
RemissionAmendmentRequest.pec_retry_after.is_(None),
).all()
for ar in ars:
ar.pec_retry_after = datetime.now(timezone.utc)
stats["reminders_queued"] += 1
log.info(f"Amendment {ar.id} reminder accodato ({cfg.interval_days}gg alla scadenza)")
stats["by_interval"][cfg.interval_days] = len(ars)
db.commit()
log.info(f"queue_reminders: {stats}")
except Exception as e:
db.rollback()
log.error(f"queue_reminders FAILED: {e}", exc_info=True)
raise
finally:
db.close()
return stats
_scheduler: Optional[BackgroundScheduler] = None
def start_scheduler():
"""Avvia BackgroundScheduler con i 2 cron. Chiamato in lifespan FastAPI."""
global _scheduler
if _scheduler is not None:
log.warning("start_scheduler chiamato due volte, skip")
return _scheduler
_scheduler = BackgroundScheduler(timezone="Europe/Rome")
# expire ogni notte 01:05 (dopo midnight, sicuro che today e cambiato)
_scheduler.add_job(
expire_amendments, CronTrigger(hour=1, minute=5),
id="expire_amendments", replace_existing=True, misfire_grace_time=3600,
)
# reminder ogni mattina 09:00
_scheduler.add_job(
queue_reminders, CronTrigger(hour=9, minute=0),
id="queue_reminders", replace_existing=True, misfire_grace_time=3600,
)
_scheduler.start()
log.info("Scheduler avviato: expire_amendments 01:05 + queue_reminders 09:00 (Europe/Rome)")
return _scheduler
def stop_scheduler():
global _scheduler
if _scheduler is not None:
_scheduler.shutdown(wait=False)
_scheduler = None
log.info("Scheduler fermato")