""" JWT validation compatibile con GEPAFIN-BE. Il BE Spring emette token HS512 con payload: sub: "email:userId:hubId" userId: int auth: "ROLE_SUPER_ADMIN" | "ROLE_BENEFICIARY" | "ROLE_CONFIDI" | ... exp: unix timestamp loginAttemptId: int """ from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import jwt, JWTError from .config import get_settings settings = get_settings() bearer_scheme = HTTPBearer(auto_error=False) class AuthUser: def __init__(self, user_id: int, email: str, role: str, hub_id: int): self.user_id = user_id self.email = email self.role = role self.hub_id = hub_id def is_superadmin(self) -> bool: return self.role == "ROLE_SUPER_ADMIN" def is_beneficiary(self) -> bool: return self.role == "ROLE_BENEFICIARY" def is_confidi(self) -> bool: return self.role == "ROLE_CONFIDI" def is_owner_role(self) -> bool: """Ruoli che possono essere proprietari di una pratica (user_id match): BENEFICIARY (azienda diretta) o CONFIDI (delegato per conto azienda). Pattern allineato al BE Gepafin (DashboardDao, CompanyDocumentDao).""" return self.role in ("ROLE_BENEFICIARY", "ROLE_CONFIDI") def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), ) -> AuthUser: if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization Bearer mancante", ) try: payload = jwt.decode( credentials.credentials, settings.jwt_secret, algorithms=[settings.jwt_algorithm], ) except JWTError as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"JWT non valido: {e}", ) sub = payload.get("sub", "") parts = sub.split(":") email = parts[0] if len(parts) > 0 else "" hub_id_str = parts[2] if len(parts) > 2 else "0" return AuthUser( user_id=int(payload.get("userId", 0)), email=email, role=payload.get("auth", ""), hub_id=int(hub_id_str) if hub_id_str.isdigit() else 0, ) def require_superadmin(user: AuthUser = Depends(get_current_user)) -> AuthUser: if not user.is_superadmin(): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Richiesto ruolo ROLE_SUPER_ADMIN", ) return user