diff --git a/app/main.py b/app/main.py index 710eec1..e9b24d5 100644 --- a/app/main.py +++ b/app/main.py @@ -15,6 +15,7 @@ from sqlalchemy import text from .config import get_settings from .db import engine, Base from .migrations import run_migrations +from .scheduler import start_scheduler, stop_scheduler from .routers import health, schemas, practices, debug, instructor, files, verbale, custom_checks, assignment, internal logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -32,10 +33,12 @@ async def lifespan(app: FastAPI): Base.metadata.create_all(bind=engine) run_migrations(engine) log.info(f"Schema '{settings.db_schema}' + tabelle + migrations OK") + start_scheduler() except Exception as e: log.error(f"Errore bootstrap DB: {e}") raise yield + stop_scheduler() log.info("Shutdown rendicontazione-api") diff --git a/app/migrations.py b/app/migrations.py index b9c8257..9e2915a 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -155,9 +155,29 @@ MIGRATIONS = [ CREATE INDEX IF NOT EXISTS idx_expiration_config_type ON gepafin_rendic.remission_expiration_config(type) WHERE is_deleted = false; + """, + # 2026-04-20 v5: dedup duplicati (ON CONFLICT DO NOTHING non funzionava senza UNIQUE) + # + aggiungo UNIQUE constraint per prevenire futuri duplicati + """ + DELETE FROM gepafin_rendic.remission_expiration_config ec + USING gepafin_rendic.remission_expiration_config ec2 + WHERE ec.id > ec2.id + AND ec.type = ec2.type + AND ec.interval_days = ec2.interval_days; + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conname = 'uq_expiration_config_type_days' + AND conrelid = 'gepafin_rendic.remission_expiration_config'::regclass + ) THEN + ALTER TABLE gepafin_rendic.remission_expiration_config + ADD CONSTRAINT uq_expiration_config_type_days UNIQUE (type, interval_days); + END IF; + END$$; INSERT INTO gepafin_rendic.remission_expiration_config (type, interval_days) VALUES ('AMENDMENT', 7), ('AMENDMENT', 2) - ON CONFLICT DO NOTHING; + ON CONFLICT (type, interval_days) DO NOTHING; """, ] diff --git a/app/routers/instructor.py b/app/routers/instructor.py index 703de5f..fc1b1c9 100644 --- a/app/routers/instructor.py +++ b/app/routers/instructor.py @@ -6,12 +6,13 @@ from decimal import Decimal from uuid import UUID from typing import List -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from sqlalchemy.orm import Session from sqlalchemy import text, or_, and_ from ..db import get_db from ..auth import AuthUser, get_current_user +from ..storage import save_upload, FileTooLargeError, MimeNotAllowedError, StorageError from ..models import RemissionPractice, RemissionAmendmentRequest from ..schemas import ( AmendmentRequestCreate, AmendmentRequestUpdate, AmendmentExtend, AmendmentRequestOut, AmendmentResponseSubmit, AmendmentStatus, @@ -384,6 +385,59 @@ def close_amendment(practice_id: UUID, amendment_id: UUID, data=AmendmentRequestOut.model_validate(ar).model_dump(mode="json")) +@router.post("/{practice_id}/amendment/{amendment_id}/upload-document", response_model=ApiResponse) +async def upload_amendment_document(practice_id: UUID, amendment_id: UUID, + file: UploadFile = File(...), + db: Session = Depends(get_db), + user: AuthUser = Depends(_require_instructor)): + """Allega documento dell'istruttore al soccorso (motivazione, scheda tecnica, ecc.). + Consentito in DRAFT o AWAITING. Sostituisce il precedente se esiste.""" + ar = _amendment_or_404(db, practice_id, amendment_id) + if ar.status not in (AmendmentStatus.DRAFT.value, AmendmentStatus.AWAITING.value): + raise HTTPException(status_code=409, + detail=f"Upload consentito in DRAFT/AWAITING (attuale: {ar.status})") + p = _get_practice_or_404(db, practice_id) + try: + rel_path, size, digest, mime, safe_name = save_upload( + application_id=p.application_id, + entity_type="amendment-instructor-doc", + entity_id=ar.id, + file_obj=file.file, + original_filename=file.filename or "amendment.pdf", + content_type=file.content_type, + ) + except FileTooLargeError as e: + raise HTTPException(status_code=413, detail=str(e)) + except MimeNotAllowedError as e: + raise HTTPException(status_code=415, detail=str(e)) + except StorageError as e: + raise HTTPException(status_code=500, detail=f"Errore storage: {e}") + + ar.amendment_document_path = rel_path + ar.amendment_document_type = mime + db.commit() + db.refresh(ar) + return ApiResponse(message="Documento allegato al soccorso", + data={"amendment_id": str(ar.id), "path": rel_path, + "filename": safe_name, "size_bytes": size, "mime": mime}) + + +@router.delete("/{practice_id}/amendment/{amendment_id}/upload-document", response_model=ApiResponse) +def delete_amendment_document(practice_id: UUID, amendment_id: UUID, + db: Session = Depends(get_db), + user: AuthUser = Depends(_require_instructor)): + """Rimuove il documento istruttore allegato al soccorso (consentito solo in DRAFT).""" + ar = _amendment_or_404(db, practice_id, amendment_id) + if ar.status != AmendmentStatus.DRAFT.value: + raise HTTPException(status_code=409, + detail=f"Rimozione allegato consentita solo in DRAFT (attuale: {ar.status})") + ar.amendment_document_path = None + ar.amendment_document_type = None + db.commit() + return ApiResponse(message="Documento allegato rimosso", + data={"amendment_id": str(ar.id)}) + + # Endpoint beneficiario: visualizza amendments sulla sua pratica + risponde @router.post("/{practice_id}/amendment/{amendment_id}/respond-beneficiary", response_model=ApiResponse) def respond_amendment_beneficiary(practice_id: UUID, amendment_id: UUID, @@ -556,3 +610,44 @@ def set_instructor_final_notes(practice_id: UUID, body: InstructorFinalNotesBody db.refresh(p) return ApiResponse(message="Verbale aggiornato", data=PracticeOut.model_validate(p).model_dump(mode="json")) + + +@router.post("/{practice_id}/amendment/{amendment_id}/upload-response-document", response_model=ApiResponse) +async def upload_response_document(practice_id: UUID, amendment_id: UUID, + file: UploadFile = File(...), + db: Session = Depends(get_db), + user: AuthUser = Depends(get_current_user)): + """Beneficiario allega un documento come supporto alla sua risposta al soccorso. + Consentito su amendment in stato AWAITING, solo dal proprietario pratica.""" + p = _get_practice_or_404(db, practice_id) + if user.is_beneficiary() and p.user_id != user.user_id: + raise HTTPException(status_code=403, detail="Non sei il proprietario della pratica") + + ar = _amendment_or_404(db, practice_id, amendment_id) + if ar.status not in (AmendmentStatus.AWAITING.value, AmendmentStatus.RESPONSE_RECEIVED.value): + raise HTTPException(status_code=409, + detail=f"Upload risposta consentito solo in AWAITING/RESPONSE_RECEIVED (attuale: {ar.status})") + + try: + rel_path, size, digest, mime, safe_name = save_upload( + application_id=p.application_id, + entity_type="amendment-response-doc", + entity_id=ar.id, + file_obj=file.file, + original_filename=file.filename or "response.pdf", + content_type=file.content_type, + ) + except FileTooLargeError as e: + raise HTTPException(status_code=413, detail=str(e)) + except MimeNotAllowedError as e: + raise HTTPException(status_code=415, detail=str(e)) + except StorageError as e: + raise HTTPException(status_code=500, detail=f"Errore storage: {e}") + + ar.response_document_path = rel_path + ar.response_document_type = mime + db.commit() + db.refresh(ar) + return ApiResponse(message="Documento risposta allegato", + data={"amendment_id": str(ar.id), "path": rel_path, + "filename": safe_name, "size_bytes": size, "mime": mime}) diff --git a/app/scheduler.py b/app/scheduler.py new file mode 100644 index 0000000..8d6d8cb --- /dev/null +++ b/app/scheduler.py @@ -0,0 +1,158 @@ +"""Scheduler per lifecycle amendment. + +Due cron job attivi: + - expire_amendments(): ogni giorno 01:05 — trova amendment AWAITING con + deadline < today, le passa a EXPIRED, rimette pratica a UNDER_REVIEW + se non ci sono altri amendment aperti. Equivalente a + ApplicationAmendmentScheduler.processAmendmentExpirationScheduler del BE. + + - queue_reminders(): ogni giorno 09:00 — legge remission_expiration_config + (type='AMENDMENT', interval_days=N), trova amendment AWAITING con + deadline == today + N giorni, setta flag reminder_queued_at sul + record. Equivalente a ExpirationScheduler.processAmendmentExpiration + del BE (data-driven da expiration_config). + +Le notifiche effettive (PEC reminder benef + email interna istruttore) le +invia il BE Gepafin via polling sui nostri endpoint /internal. Il +microservizio resta sender-agnostico. +""" +import logging +from datetime import datetime, timezone, date, timedelta +from typing import Optional + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from sqlalchemy.orm import Session +from sqlalchemy import text + +from .db import SessionLocal +from .models import RemissionAmendmentRequest, RemissionPractice, RemissionExpirationConfig + +log = logging.getLogger("rendicontazione-api.scheduler") + + +def expire_amendments() -> dict: + """Espira amendment AWAITING con deadline passata. + Ritorna dict {expired_count, practices_reopened} per logging/test.""" + db: Session = SessionLocal() + today = date.today() + stats = {"expired_count": 0, "practices_reopened": 0} + try: + expired_ars = db.query(RemissionAmendmentRequest).filter( + RemissionAmendmentRequest.status == "AWAITING", + RemissionAmendmentRequest.deadline < today, + ).all() + + practice_ids_to_check = set() + + for ar in expired_ars: + ar.status = "EXPIRED" + practice_ids_to_check.add(ar.practice_id) + stats["expired_count"] += 1 + log.info(f"Amendment {ar.id} EXPIRED (deadline era {ar.deadline})") + + db.flush() + + for pid in practice_ids_to_check: + p = db.query(RemissionPractice).filter(RemissionPractice.id == pid).first() + if not p: + continue + others_open = [ + a for a in p.amendment_requests + if a.status in ("DRAFT", "AWAITING", "RESPONSE_RECEIVED") + ] + if not others_open and p.status == "AWAITING_AMENDMENT": + p.status = "UNDER_REVIEW" + stats["practices_reopened"] += 1 + log.info(f"Pratica {pid} ritornata a UNDER_REVIEW (amendment scaduto, nessun altro aperto)") + + db.commit() + log.info(f"expire_amendments: {stats}") + except Exception as e: + db.rollback() + log.error(f"expire_amendments FAILED: {e}", exc_info=True) + raise + finally: + db.close() + return stats + + +def queue_reminders() -> dict: + """Legge config data-driven, per ogni interval_days trova amendment + con deadline == today + N, scrive flag reminder_queued_at sull'amendment. + Il BE vedra questi amendment come pending-reminder via /internal. + + Usiamo campo pec_retry_after come marker unificato (gia presente): + - NULL: nessun reminder accodato + - timestamp: reminder accodato in questo momento, BE invia al prossimo poll + + Ritorna dict {reminders_queued_by_interval}.""" + db: Session = SessionLocal() + today = date.today() + stats = {"reminders_queued": 0, "by_interval": {}} + try: + configs = db.query(RemissionExpirationConfig).filter( + RemissionExpirationConfig.type == "AMENDMENT", + RemissionExpirationConfig.is_deleted.is_(False), + ).all() + + for cfg in configs: + target_deadline = today + timedelta(days=cfg.interval_days) + + ars = db.query(RemissionAmendmentRequest).filter( + RemissionAmendmentRequest.status == "AWAITING", + RemissionAmendmentRequest.deadline == target_deadline, + RemissionAmendmentRequest.pec_sent_at.isnot(None), # solo se gia inviata PEC iniziale + # evito di ri-accodare se gia accodato oggi + RemissionAmendmentRequest.pec_retry_after.is_(None), + ).all() + + for ar in ars: + ar.pec_retry_after = datetime.now(timezone.utc) + stats["reminders_queued"] += 1 + log.info(f"Amendment {ar.id} reminder accodato ({cfg.interval_days}gg alla scadenza)") + + stats["by_interval"][cfg.interval_days] = len(ars) + + db.commit() + log.info(f"queue_reminders: {stats}") + except Exception as e: + db.rollback() + log.error(f"queue_reminders FAILED: {e}", exc_info=True) + raise + finally: + db.close() + return stats + + +_scheduler: Optional[BackgroundScheduler] = None + + +def start_scheduler(): + """Avvia BackgroundScheduler con i 2 cron. Chiamato in lifespan FastAPI.""" + global _scheduler + if _scheduler is not None: + log.warning("start_scheduler chiamato due volte, skip") + return _scheduler + _scheduler = BackgroundScheduler(timezone="Europe/Rome") + # expire ogni notte 01:05 (dopo midnight, sicuro che today e cambiato) + _scheduler.add_job( + expire_amendments, CronTrigger(hour=1, minute=5), + id="expire_amendments", replace_existing=True, misfire_grace_time=3600, + ) + # reminder ogni mattina 09:00 + _scheduler.add_job( + queue_reminders, CronTrigger(hour=9, minute=0), + id="queue_reminders", replace_existing=True, misfire_grace_time=3600, + ) + _scheduler.start() + log.info("Scheduler avviato: expire_amendments 01:05 + queue_reminders 09:00 (Europe/Rome)") + return _scheduler + + +def stop_scheduler(): + global _scheduler + if _scheduler is not None: + _scheduler.shutdown(wait=False) + _scheduler = None + log.info("Scheduler fermato") diff --git a/app/storage.py b/app/storage.py index 2e7d760..e1c8898 100644 --- a/app/storage.py +++ b/app/storage.py @@ -48,7 +48,7 @@ def _safe_filename(name: str, max_len: int = 120) -> str: def save_upload( application_id: int, - entity_type: str, # invoice | ula | document + entity_type: str, # invoice | ula | document | amendment-instructor-doc | amendment-response-doc entity_id: UUID, file_obj: BinaryIO, original_filename: str, @@ -62,7 +62,7 @@ def save_upload( - mime in ALLOWED_MIMES (usa content_type del client, fallback su estensione) - dimensione <= MAX_SIZE_BYTES """ - if entity_type not in ("invoice", "ula", "document"): + if entity_type not in ("invoice", "ula", "document", "amendment-instructor-doc", "amendment-response-doc"): raise StorageError(f"entity_type non valido: {entity_type}") safe_name = _safe_filename(original_filename) diff --git a/requirements.txt b/requirements.txt index 461352f..828f799 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ python-multipart==0.0.9 weasyprint==61.2 pydyf==0.10.0 jinja2==3.1.3 +APScheduler==3.10.4