"""Auth — dual mode: cookie (legacy) + localStorage token via URL param. Flow: /auth/login → Google → /auth/callback → /admin?token=JWT Il JS salva in localStorage, le API leggono da Authorization header O cookie. """ import logging from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Request, HTTPException, Header from fastapi.responses import RedirectResponse, HTMLResponse from jose import jwt import httpx from typing import Optional from app.config import get_settings logger = logging.getLogger(__name__) router = APIRouter(prefix="/auth", tags=["auth"]) GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo" COOKIE_NAME = "booking_token" def create_jwt(email: str, name: str, picture: str = "") -> str: settings = get_settings() return jwt.encode({ "sub": email, "name": name, "picture": picture, "exp": datetime.now(timezone.utc) + timedelta(hours=settings.jwt_expire_hours), "iat": datetime.now(timezone.utc), }, settings.jwt_secret, algorithm=settings.jwt_algorithm) def verify_jwt(token: str) -> dict | None: settings = get_settings() try: return jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm]) except Exception: return None def get_current_user(request: Request) -> dict | None: # 1. Check Authorization header (localStorage flow) auth = request.headers.get("authorization", "") if auth.startswith("Bearer "): payload = verify_jwt(auth[7:]) if payload: return payload # 2. Fallback: check cookie token = request.cookies.get(COOKIE_NAME) if token: return verify_jwt(token) return None @router.get("/login") def login(): settings = get_settings() params = { "client_id": settings.google_client_id, "redirect_uri": settings.google_redirect_uri, "response_type": "code", "scope": "openid email profile", "access_type": "offline", "prompt": "select_account", } url = GOOGLE_AUTH_URL + "?" + "&".join(f"{k}={v}" for k, v in params.items()) return RedirectResponse(url) @router.get("/dev") def dev_login(key: str = ""): """Dev bypass.""" settings = get_settings() if key != settings.api_key: raise HTTPException(403, "Chiave non valida") token = create_jwt("mancosu@kitzanos.com", "Carlo Mancosu") logger.info("Dev login: mancosu@kitzanos.com") return RedirectResponse(url=f"/admin?token={token}", status_code=302) @router.get("/callback") async def callback(code: str): """Callback da Google → redirect a /admin?token=JWT""" settings = get_settings() async with httpx.AsyncClient() as client: token_resp = await client.post(GOOGLE_TOKEN_URL, data={ "code": code, "client_id": settings.google_client_id, "client_secret": settings.google_client_secret, "redirect_uri": settings.google_redirect_uri, "grant_type": "authorization_code", }) if token_resp.status_code != 200: logger.error(f"Google token error: {token_resp.text}") raise HTTPException(401, "Autenticazione Google fallita") access_token = token_resp.json().get("access_token") async with httpx.AsyncClient() as client: user_resp = await client.get(GOOGLE_USERINFO_URL, headers={"Authorization": f"Bearer {access_token}"}) if user_resp.status_code != 200: raise HTTPException(401, "Impossibile ottenere info utente") user_info = user_resp.json() email = user_info.get("email", "") name = user_info.get("name", "") picture = user_info.get("picture", "") domain = email.split("@")[-1].lower() if "@" in email else "" if domain not in settings.allowed_domains_list: logger.warning(f"Accesso negato: {email}") return RedirectResponse(url="/admin?error=domain", status_code=302) token = create_jwt(email, name, picture) logger.info(f"Login riuscito: {email}") return RedirectResponse(url=f"/admin?token={token}", status_code=302) @router.get("/me") def me(request: Request): user = get_current_user(request) if not user: raise HTTPException(401, "Non autenticato") return {"email": user["sub"], "name": user.get("name", ""), "picture": user.get("picture", "")} @router.get("/logout") def logout(): resp = RedirectResponse(url="/admin?logout=1", status_code=302) resp.delete_cookie(COOKIE_NAME, path="/") return resp