Files
gepafin-rendicontazione-api/app/auth.py

73 lines
2.1 KiB
Python

"""
JWT validation compatibile con GEPAFIN-BE.
Il BE Spring emette token HS512 con payload:
sub: "email:userId:hubId"
userId: int
auth: "ROLE_SUPER_ADMIN" | "ROLE_BENEFICIARY" | ...
exp: unix timestamp
loginAttemptId: int
"""
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from .config import get_settings
settings = get_settings()
bearer_scheme = HTTPBearer(auto_error=False)
class AuthUser:
def __init__(self, user_id: int, email: str, role: str, hub_id: int):
self.user_id = user_id
self.email = email
self.role = role
self.hub_id = hub_id
def is_superadmin(self) -> bool:
return self.role == "ROLE_SUPER_ADMIN"
def is_beneficiary(self) -> bool:
return self.role == "ROLE_BENEFICIARY"
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
) -> AuthUser:
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authorization Bearer mancante",
)
try:
payload = jwt.decode(
credentials.credentials,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
)
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"JWT non valido: {e}",
)
sub = payload.get("sub", "")
parts = sub.split(":")
email = parts[0] if len(parts) > 0 else ""
hub_id_str = parts[2] if len(parts) > 2 else "0"
return AuthUser(
user_id=int(payload.get("userId", 0)),
email=email,
role=payload.get("auth", ""),
hub_id=int(hub_id_str) if hub_id_str.isdigit() else 0,
)
def require_superadmin(user: AuthUser = Depends(get_current_user)) -> AuthUser:
if not user.is_superadmin():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Richiesto ruolo ROLE_SUPER_ADMIN",
)
return user