feat(files): upload/preview/delete allegati su fatture, ULA, documenti
- models: colonne file inline (storage_path, mime, size_bytes, sha256, uploaded_by, uploaded_at) su remission_invoice, remission_ula_employee, remission_document - migrations: ALTER idempotente al lifespan per evolvere schema in sandbox - storage: FS adapter /var/uploads con validazione MIME/size, dedup sha256, sanitize - routers/files: POST upload / GET download (con ?inline=1) / DELETE matrix autorizzazioni: beneficiary su DRAFT|AWAITING_AMENDMENT, istruttore read-only, superadmin full - main: include router files, version bump 0.2.0 Testato E2E con admin JWT: upload 549B PDF -> DB coerente, storage 1/invoice/<uuid>/<sha12>-file.pdf, download con magic bytes PDF corretti, delete chirurgico con cleanup FS e metadata.
This commit is contained in:
159
app/storage.py
Normal file
159
app/storage.py
Normal file
@@ -0,0 +1,159 @@
|
||||
"""
|
||||
Storage adapter per file upload.
|
||||
|
||||
Implementazione attuale: filesystem locale con bind mount /var/uploads.
|
||||
Struttura: /var/uploads/{application_id}/{entity_type}/{entity_id}/{sha256}-{filename}
|
||||
|
||||
Migrazione futura a S3/MinIO: cambiare solo questa classe.
|
||||
"""
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import BinaryIO, Optional, Tuple
|
||||
from uuid import UUID
|
||||
|
||||
BASE_PATH = Path(os.environ.get("RENDIC_UPLOAD_BASE", "/var/uploads"))
|
||||
MAX_SIZE_BYTES = 15 * 1024 * 1024 # 15 MB
|
||||
|
||||
ALLOWED_MIMES = {
|
||||
"application/pdf": ".pdf",
|
||||
"image/jpeg": ".jpg",
|
||||
"image/png": ".png",
|
||||
}
|
||||
|
||||
|
||||
class StorageError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FileTooLargeError(StorageError):
|
||||
pass
|
||||
|
||||
|
||||
class MimeNotAllowedError(StorageError):
|
||||
pass
|
||||
|
||||
|
||||
def _safe_filename(name: str, max_len: int = 120) -> str:
|
||||
"""Rimuove caratteri pericolosi, tronca."""
|
||||
keep = "-_.() "
|
||||
clean = "".join(c if (c.isalnum() or c in keep) else "_" for c in name)
|
||||
clean = clean.strip().replace(" ", "_")
|
||||
if len(clean) > max_len:
|
||||
root, ext = os.path.splitext(clean)
|
||||
clean = root[: max_len - len(ext)] + ext
|
||||
return clean or "file"
|
||||
|
||||
|
||||
def save_upload(
|
||||
application_id: int,
|
||||
entity_type: str, # invoice | ula | document
|
||||
entity_id: UUID,
|
||||
file_obj: BinaryIO,
|
||||
original_filename: str,
|
||||
content_type: Optional[str],
|
||||
) -> Tuple[str, int, str, str, str]:
|
||||
"""
|
||||
Salva il file su FS e ritorna (storage_path, size_bytes, sha256, mime, safe_filename).
|
||||
storage_path è RELATIVO a BASE_PATH (es: "1/invoice/xxx/yyy-fattura.pdf").
|
||||
|
||||
Valida:
|
||||
- mime in ALLOWED_MIMES (usa content_type del client, fallback su estensione)
|
||||
- dimensione <= MAX_SIZE_BYTES
|
||||
"""
|
||||
if entity_type not in ("invoice", "ula", "document"):
|
||||
raise StorageError(f"entity_type non valido: {entity_type}")
|
||||
|
||||
safe_name = _safe_filename(original_filename)
|
||||
ext = os.path.splitext(safe_name)[1].lower()
|
||||
|
||||
# Risolvi mime: prima content_type client, poi da estensione
|
||||
mime = (content_type or "").lower().split(";")[0].strip()
|
||||
if mime not in ALLOWED_MIMES:
|
||||
# Fallback da estensione
|
||||
ext_to_mime = {v: k for k, v in ALLOWED_MIMES.items()}
|
||||
mime = ext_to_mime.get(ext, "")
|
||||
if mime not in ALLOWED_MIMES:
|
||||
raise MimeNotAllowedError(
|
||||
f"MIME non consentito: '{content_type}' / estensione '{ext}'. "
|
||||
f"Accettati: {list(ALLOWED_MIMES.keys())}"
|
||||
)
|
||||
|
||||
# Calcola sha256 e size streaming per non tenere tutto in RAM
|
||||
hasher = hashlib.sha256()
|
||||
size = 0
|
||||
# Salva in tmp poi rename atomico
|
||||
target_dir = BASE_PATH / str(application_id) / entity_type / str(entity_id)
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path = target_dir / f".tmp-{entity_id}"
|
||||
|
||||
try:
|
||||
with open(tmp_path, "wb") as out:
|
||||
while True:
|
||||
chunk = file_obj.read(65536)
|
||||
if not chunk:
|
||||
break
|
||||
size += len(chunk)
|
||||
if size > MAX_SIZE_BYTES:
|
||||
raise FileTooLargeError(
|
||||
f"File {size} byte oltre limite {MAX_SIZE_BYTES}"
|
||||
)
|
||||
hasher.update(chunk)
|
||||
out.write(chunk)
|
||||
|
||||
digest = hasher.hexdigest()
|
||||
final_name = f"{digest[:12]}-{safe_name}"
|
||||
final_path = target_dir / final_name
|
||||
|
||||
# Se già esiste (dedup per sha+nome) rimuovi tmp e usa esistente
|
||||
if final_path.exists():
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
else:
|
||||
os.replace(tmp_path, final_path)
|
||||
|
||||
rel_path = str(final_path.relative_to(BASE_PATH))
|
||||
return rel_path, size, digest, mime, safe_name
|
||||
except Exception:
|
||||
# cleanup tmp in caso di errore
|
||||
try:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
def delete_file(storage_path: str) -> bool:
|
||||
"""Elimina fisicamente il file. Ritorna True se rimosso."""
|
||||
if not storage_path:
|
||||
return False
|
||||
abs_path = BASE_PATH / storage_path
|
||||
# hardening: resta sotto BASE_PATH
|
||||
try:
|
||||
abs_path.resolve().relative_to(BASE_PATH.resolve())
|
||||
except ValueError:
|
||||
raise StorageError(f"Path fuori da BASE_PATH: {storage_path}")
|
||||
if abs_path.exists():
|
||||
abs_path.unlink()
|
||||
# prova a rimuovere directory vuote (entity_id/entity_type)
|
||||
try:
|
||||
abs_path.parent.rmdir()
|
||||
abs_path.parent.parent.rmdir()
|
||||
except OSError:
|
||||
pass
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def open_file(storage_path: str) -> Path:
|
||||
"""Ritorna Path assoluto del file. Solleva FileNotFoundError se mancante."""
|
||||
if not storage_path:
|
||||
raise FileNotFoundError("storage_path vuoto")
|
||||
abs_path = BASE_PATH / storage_path
|
||||
try:
|
||||
abs_path.resolve().relative_to(BASE_PATH.resolve())
|
||||
except ValueError:
|
||||
raise StorageError(f"Path fuori da BASE_PATH: {storage_path}")
|
||||
if not abs_path.is_file():
|
||||
raise FileNotFoundError(f"File non trovato: {storage_path}")
|
||||
return abs_path
|
||||
Reference in New Issue
Block a user