Files
gepafin-rendicontazione-api/app/migrations.py
BFLOWS 34c4a47a1c feat(amendment): ROUND 2 — scheduler + upload documenti
Seconda parte della replica soccorso istruttorio speculare al BE Gepafin.
Completata: scheduler cron (expire + reminder), upload documenti istruttore
e benef, fix duplicati config.

==SCHEDULER (app/scheduler.py NUOVO)==
APScheduler BackgroundScheduler integrato nel lifespan FastAPI.
Due cron attivi (timezone Europe/Rome):

  expire_amendments() - cron 01:05 ogni notte
    Speculare a ApplicationAmendmentScheduler.processAmendmentExpirationScheduler.
    Trova amendment AWAITING con deadline < today, passa a EXPIRED.
    Rimette pratica a UNDER_REVIEW se non ha altri amendment aperti.
    Ritorna dict stats per logging/test.

  queue_reminders() - cron 09:00 ogni mattina
    Speculare a ExpirationScheduler.processAmendmentExpiration (data-driven).
    Legge remission_expiration_config (type='AMENDMENT', interval_days=N),
    per ogni riga trova amendment con deadline esattamente today+N e setta
    pec_retry_after (marker che il BE vede via /internal pending-reminder).
    Multipli row = multipli reminder (seed: 7gg + 2gg).

Il microservizio aggiorna solo stato DB. L invio effettivo di email
reminder lo fa il BE Gepafin tramite polling, tenant-aware.

==UPLOAD DOCUMENTI==
3 nuovi endpoint nel router istruttoria:

  POST   /instructor/{pid}/amendment/{aid}/upload-document
    - Istruttore allega PDF al soccorso (motivazione, scheda tecnica).
    - Consentito in DRAFT o AWAITING. Sostituisce precedente se esiste.
    - Popola amendment_document_path + amendment_document_type.

  DELETE /instructor/{pid}/amendment/{aid}/upload-document
    - Rimuove allegato (solo in DRAFT).

  POST   /instructor/{pid}/amendment/{aid}/upload-response-document
    - Benef allega PDF come supporto alla risposta.
    - Consentito in AWAITING/RESPONSE_RECEIVED, solo proprietario.
    - Popola response_document_path + response_document_type.

Riusa save_upload() esistente con entity_type dedicati.

==FIX storage.py==
Whitelist entity_type estesa con 'amendment-instructor-doc' +
'amendment-response-doc' (prima accettava solo invoice/ula/document,
bloccando l'upload con StorageError).

==FIX migration dedup==
Scoperto in test: migration 8 faceva INSERT ON CONFLICT DO NOTHING su
remission_expiration_config ma senza UNIQUE constraint. Ogni restart
inseriva duplicati (16 righe in DB invece di 2). Fix in migration 9:
DELETE duplicati + ADD UNIQUE (type, interval_days) + re-seed pulito.

==REQUIREMENTS==
APScheduler==3.10.4

==TEST E2E==
/tmp/test_amendment_r2_fixed.py passa su tutto:
  [A] upload amendment_document istruttore + response_document benef + respond
  [B] amendment scaduto artificiale -> expire_amendments lo marca EXPIRED,
      pratica torna UNDER_REVIEW
  [C] amendment a +2gg e +7gg -> queue_reminders accoda 2 reminder,
      /internal pending-reminder li espone entrambi
2026-04-20 22:35:01 +02:00

195 lines
9.1 KiB
Python

"""
Migration idempotente per sandbox.
Base.metadata.create_all() crea solo tabelle mancanti, non colonne aggiuntive.
Qui gestiamo ALTER TABLE ADD COLUMN IF NOT EXISTS per l'evoluzione dello schema.
Ogni migration è una stringa SQL che puo essere eseguita piu volte senza errori.
"""
from sqlalchemy import text
import logging
log = logging.getLogger("rendicontazione-api.migrations")
MIGRATIONS = [
# 2026-04-18: colonne file upload su remission_invoice
"""
ALTER TABLE gepafin_rendic.remission_invoice
ADD COLUMN IF NOT EXISTS storage_path varchar(1024),
ADD COLUMN IF NOT EXISTS mime varchar(128),
ADD COLUMN IF NOT EXISTS size_bytes bigint,
ADD COLUMN IF NOT EXISTS sha256 varchar(64),
ADD COLUMN IF NOT EXISTS uploaded_by integer,
ADD COLUMN IF NOT EXISTS uploaded_at timestamptz;
""",
# 2026-04-18: colonne file upload su remission_ula_employee
"""
ALTER TABLE gepafin_rendic.remission_ula_employee
ADD COLUMN IF NOT EXISTS storage_path varchar(1024),
ADD COLUMN IF NOT EXISTS mime varchar(128),
ADD COLUMN IF NOT EXISTS size_bytes bigint,
ADD COLUMN IF NOT EXISTS sha256 varchar(64),
ADD COLUMN IF NOT EXISTS uploaded_by integer,
ADD COLUMN IF NOT EXISTS uploaded_at timestamptz;
""",
# 2026-04-18: colonne file upload su remission_document
"""
ALTER TABLE gepafin_rendic.remission_document
ADD COLUMN IF NOT EXISTS storage_path varchar(1024),
ADD COLUMN IF NOT EXISTS mime varchar(128),
ADD COLUMN IF NOT EXISTS size_bytes bigint,
ADD COLUMN IF NOT EXISTS sha256 varchar(64),
ADD COLUMN IF NOT EXISTS uploaded_by integer;
""",
# 2026-04-18 v2: multi-tranche su remission_practice
# DROP UNIQUE su application_id (permette piu tranche per stessa domanda)
# aggiunge sequence_number, period_label, suggested_instructor_id
# nuova UNIQUE (application_id, sequence_number)
# partial index su assigned_instructor_id IS NULL per coda "da assegnare"
"""
ALTER TABLE gepafin_rendic.remission_practice
DROP CONSTRAINT IF EXISTS uq_remission_practice_application;
ALTER TABLE gepafin_rendic.remission_practice
ADD COLUMN IF NOT EXISTS sequence_number integer NOT NULL DEFAULT 1,
ADD COLUMN IF NOT EXISTS period_label varchar(100),
ADD COLUMN IF NOT EXISTS suggested_instructor_id integer;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'uq_remission_practice_app_seq'
AND conrelid = 'gepafin_rendic.remission_practice'::regclass
) THEN
ALTER TABLE gepafin_rendic.remission_practice
ADD CONSTRAINT uq_remission_practice_app_seq UNIQUE (application_id, sequence_number);
END IF;
END$$;
CREATE INDEX IF NOT EXISTS idx_remission_practice_unassigned
ON gepafin_rendic.remission_practice(assigned_instructor_id)
WHERE assigned_instructor_id IS NULL;
""",
# 2026-04-20: link documento a company_document del BE Gepafin (riutilizzo dal repository)
# Se source_company_document_id e valorizzato, il documento e selezionato dal repository
# company (gepafin_schema.company_document). Lo status/scadenza del sorgente governa
# semaforo UI e gate submit (documenti EXPIRED bloccano la trasmissione).
"""
ALTER TABLE gepafin_rendic.remission_document
ADD COLUMN IF NOT EXISTS source_company_document_id integer;
CREATE INDEX IF NOT EXISTS idx_remission_document_source
ON gepafin_rendic.remission_document(source_company_document_id)
WHERE source_company_document_id IS NOT NULL;
""",
# 2026-04-18 v2: tabella custom checks
# allineata allo storage adapter esistente (storage_path + mime + size + sha256)
# NON segue le specs RAG p1 che usavano document_filename (v1 obsoleta)
"""
CREATE TABLE IF NOT EXISTS gepafin_rendic.remission_custom_check_value (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
practice_id uuid NOT NULL REFERENCES gepafin_rendic.remission_practice(id) ON DELETE CASCADE,
check_code varchar(64) NOT NULL,
beneficiary_declared boolean NOT NULL DEFAULT false,
declared_at timestamptz,
storage_path varchar(1024),
mime varchar(128),
size_bytes bigint,
sha256 varchar(64),
document_uploaded_at timestamptz,
uploaded_by integer,
verification_status varchar(20) NOT NULL DEFAULT 'PENDING',
verification_notes text,
verified_by integer,
verified_at timestamptz,
created_at timestamptz NOT NULL DEFAULT NOW(),
updated_at timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT uq_custom_check_practice_code UNIQUE (practice_id, check_code)
);
CREATE INDEX IF NOT EXISTS idx_custom_check_practice
ON gepafin_rendic.remission_custom_check_value(practice_id);
""",
# 2026-04-20 v3: soccorso istruttorio speculare al BE Gepafin
# - stato DRAFT (istruttore prepara, non ancora inviato)
# - response_days + extended_days + extension_date (prolunghe)
# - internal_note (visibile solo istruttore, separata da request_text)
# - amendment_document_* (allegato istruttore al soccorso, firmato e no)
# - response_document_* (upload risposta beneficiario)
# - protocol_id + email_log_id + user_action_id (popolati dal BE via mark-pec-sent)
# - pec_sent_at + pec_failed_reason + pec_retry_after (tracking PEC asincrono)
# Lato microservizio NON gestiamo PEC ne protocollo: il BE multi-tenant
# (gepafin_schema.hub id=1 PEC_SERVICE, id=2 MAILGUN_SERVICE) fa polling
# su endpoint /internal/remission-amendments e notifica via mark-pec-sent/failed.
"""
ALTER TABLE gepafin_rendic.remission_amendment_request
ADD COLUMN IF NOT EXISTS response_days integer,
ADD COLUMN IF NOT EXISTS extended_days integer,
ADD COLUMN IF NOT EXISTS extension_date timestamptz,
ADD COLUMN IF NOT EXISTS internal_note text,
ADD COLUMN IF NOT EXISTS amendment_document_path varchar(1024),
ADD COLUMN IF NOT EXISTS amendment_document_type varchar(128),
ADD COLUMN IF NOT EXISTS amendment_initial_document_path varchar(1024),
ADD COLUMN IF NOT EXISTS response_document_path varchar(1024),
ADD COLUMN IF NOT EXISTS response_document_type varchar(128),
ADD COLUMN IF NOT EXISTS protocol_id varchar(128),
ADD COLUMN IF NOT EXISTS email_log_id integer,
ADD COLUMN IF NOT EXISTS user_action_id integer,
ADD COLUMN IF NOT EXISTS pec_sent_at timestamptz,
ADD COLUMN IF NOT EXISTS pec_failed_reason text,
ADD COLUMN IF NOT EXISTS pec_retry_after timestamptz;
CREATE INDEX IF NOT EXISTS idx_amendment_status_pec
ON gepafin_rendic.remission_amendment_request(status)
WHERE status IN ('DRAFT','AWAITING');
CREATE INDEX IF NOT EXISTS idx_amendment_deadline
ON gepafin_rendic.remission_amendment_request(deadline)
WHERE status = 'AWAITING';
""",
# 2026-04-20 v4: tabella config reminder data-driven, speculare al BE
# (expiration_config type='AMENDMENT' interval_days=N). Permette righe multiple
# per triggerare reminder a N gg diversi dalla scadenza (es. 7gg + 2gg).
"""
CREATE TABLE IF NOT EXISTS gepafin_rendic.remission_expiration_config (
id serial PRIMARY KEY,
type varchar(50) NOT NULL,
interval_days integer NOT NULL CHECK (interval_days > 0),
is_deleted boolean NOT NULL DEFAULT false,
created_at timestamptz NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_expiration_config_type
ON gepafin_rendic.remission_expiration_config(type)
WHERE is_deleted = false;
""",
# 2026-04-20 v5: dedup duplicati (ON CONFLICT DO NOTHING non funzionava senza UNIQUE)
# + aggiungo UNIQUE constraint per prevenire futuri duplicati
"""
DELETE FROM gepafin_rendic.remission_expiration_config ec
USING gepafin_rendic.remission_expiration_config ec2
WHERE ec.id > ec2.id
AND ec.type = ec2.type
AND ec.interval_days = ec2.interval_days;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'uq_expiration_config_type_days'
AND conrelid = 'gepafin_rendic.remission_expiration_config'::regclass
) THEN
ALTER TABLE gepafin_rendic.remission_expiration_config
ADD CONSTRAINT uq_expiration_config_type_days UNIQUE (type, interval_days);
END IF;
END$$;
INSERT INTO gepafin_rendic.remission_expiration_config (type, interval_days)
VALUES ('AMENDMENT', 7), ('AMENDMENT', 2)
ON CONFLICT (type, interval_days) DO NOTHING;
""",
]
def run_migrations(engine) -> None:
"""Esegue tutte le migration in transazione. Log su ciascuna."""
with engine.begin() as conn:
for i, sql in enumerate(MIGRATIONS, 1):
try:
conn.execute(text(sql))
log.info(f"migration {i}/{len(MIGRATIONS)} OK")
except Exception as e:
log.error(f"migration {i} FAILED: {e}")
raise