Risoluzione 403 segnalato da Rinaldo Bonazzo su upload fattura con utente ROLE_CONFIDI (confidi4@test.test). Pattern allineato al BE Gepafin che in DashboardDao, CompanyDocumentDao e FaqDao raggruppa BENEFICIARY+CONFIDI con stessi diritti operativi sulla pratica. ==RAZIONALE== Sui bandi con call.confidi=true il confidi sottomette la application per conto dell'azienda e diventa user_id della application. Lato microservizio rendicontazione la pratica viene ereditata con stesso user_id, quindi il confidi e proprietario della pratica e deve poter fare upload/download/delete come il beneficiario. ==MODIFICHE== app/auth.py: - Aggiunto AuthUser.is_confidi() — controlla ROLE_CONFIDI - Aggiunto AuthUser.is_owner_role() — True per BENEFICIARY o CONFIDI - Aggiornato docstring header con ROLE_CONFIDI - Manteno is_beneficiary() per backward compat (non rimosso, non chiamato) Sostituzione is_beneficiary() -> is_owner_role() in 11 punti dove la semantica era 'proprietario pratica': - app/routers/files.py: 3 (_can_upload, _can_download, _can_delete) - app/routers/instructor.py: 2 (respond-beneficiary, ack-amendment) - app/routers/practices.py: 3 (visibilita, create, schema gating) - app/routers/custom_checks.py: 3 (declared, gate) ==COMPORTAMENTO== Per ROLE_CONFIDI vale ora la stessa regola di BENEFICIARY: - upload/download/delete: solo se practice.user_id == user.user_id AND practice.status IN ('DRAFT','AWAITING_AMENDMENT') - respond-beneficiary: solo se proprietario pratica - visualizzazione: solo proprie pratiche - creazione: solo se schema PUBLISHED Confidi su pratica di altri o su pratica non editabile -> 403 come prima. ==TEST E2E (4 step verdi)== /tmp/test_confidi_upload.py: 1. CONFIDI proprietario DRAFT upload Invoice_zapier2024.pdf -> 200 (era 403) 2. CONFIDI NON proprietario -> 403 (scoping) 3. CONFIDI proprietario ma SUBMITTED -> 403 (stato) 4. BENEFICIARY proprietario DRAFT (regressione) -> 200
82 lines
2.5 KiB
Python
82 lines
2.5 KiB
Python
"""
|
|
JWT validation compatibile con GEPAFIN-BE.
|
|
Il BE Spring emette token HS512 con payload:
|
|
sub: "email:userId:hubId"
|
|
userId: int
|
|
auth: "ROLE_SUPER_ADMIN" | "ROLE_BENEFICIARY" | "ROLE_CONFIDI" | ...
|
|
exp: unix timestamp
|
|
loginAttemptId: int
|
|
"""
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from jose import jwt, JWTError
|
|
from .config import get_settings
|
|
|
|
settings = get_settings()
|
|
bearer_scheme = HTTPBearer(auto_error=False)
|
|
|
|
|
|
class AuthUser:
|
|
def __init__(self, user_id: int, email: str, role: str, hub_id: int):
|
|
self.user_id = user_id
|
|
self.email = email
|
|
self.role = role
|
|
self.hub_id = hub_id
|
|
|
|
def is_superadmin(self) -> bool:
|
|
return self.role == "ROLE_SUPER_ADMIN"
|
|
|
|
def is_beneficiary(self) -> bool:
|
|
return self.role == "ROLE_BENEFICIARY"
|
|
|
|
def is_confidi(self) -> bool:
|
|
return self.role == "ROLE_CONFIDI"
|
|
|
|
def is_owner_role(self) -> bool:
|
|
"""Ruoli che possono essere proprietari di una pratica (user_id match):
|
|
BENEFICIARY (azienda diretta) o CONFIDI (delegato per conto azienda).
|
|
Pattern allineato al BE Gepafin (DashboardDao, CompanyDocumentDao)."""
|
|
return self.role in ("ROLE_BENEFICIARY", "ROLE_CONFIDI")
|
|
|
|
|
|
def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
|
|
) -> AuthUser:
|
|
if credentials is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authorization Bearer mancante",
|
|
)
|
|
try:
|
|
payload = jwt.decode(
|
|
credentials.credentials,
|
|
settings.jwt_secret,
|
|
algorithms=[settings.jwt_algorithm],
|
|
)
|
|
except JWTError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"JWT non valido: {e}",
|
|
)
|
|
|
|
sub = payload.get("sub", "")
|
|
parts = sub.split(":")
|
|
email = parts[0] if len(parts) > 0 else ""
|
|
hub_id_str = parts[2] if len(parts) > 2 else "0"
|
|
|
|
return AuthUser(
|
|
user_id=int(payload.get("userId", 0)),
|
|
email=email,
|
|
role=payload.get("auth", ""),
|
|
hub_id=int(hub_id_str) if hub_id_str.isdigit() else 0,
|
|
)
|
|
|
|
|
|
def require_superadmin(user: AuthUser = Depends(get_current_user)) -> AuthUser:
|
|
if not user.is_superadmin():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Richiesto ruolo ROLE_SUPER_ADMIN",
|
|
)
|
|
return user
|