Files
booking-service/app/routers/auth.py

136 lines
4.6 KiB
Python

"""Auth — dual mode: cookie (legacy) + localStorage token via URL param.
Flow: /auth/login → Google → /auth/callback → /admin?token=JWT
Il JS salva in localStorage, le API leggono da Authorization header O cookie.
"""
import logging
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Request, HTTPException, Header
from fastapi.responses import RedirectResponse, HTMLResponse
from jose import jwt
import httpx
from typing import Optional
from app.config import get_settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["auth"])
GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token"
GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo"
COOKIE_NAME = "booking_token"
def create_jwt(email: str, name: str, picture: str = "") -> str:
settings = get_settings()
return jwt.encode({
"sub": email, "name": name, "picture": picture,
"exp": datetime.now(timezone.utc) + timedelta(hours=settings.jwt_expire_hours),
"iat": datetime.now(timezone.utc),
}, settings.jwt_secret, algorithm=settings.jwt_algorithm)
def verify_jwt(token: str) -> dict | None:
settings = get_settings()
try:
return jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm])
except Exception:
return None
def get_current_user(request: Request) -> dict | None:
# 1. Check Authorization header (localStorage flow)
auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
payload = verify_jwt(auth[7:])
if payload:
return payload
# 2. Fallback: check cookie
token = request.cookies.get(COOKIE_NAME)
if token:
return verify_jwt(token)
return None
@router.get("/login")
def login():
settings = get_settings()
params = {
"client_id": settings.google_client_id,
"redirect_uri": settings.google_redirect_uri,
"response_type": "code",
"scope": "openid email profile",
"access_type": "offline",
"prompt": "select_account",
}
url = GOOGLE_AUTH_URL + "?" + "&".join(f"{k}={v}" for k, v in params.items())
return RedirectResponse(url)
@router.get("/dev")
def dev_login(key: str = ""):
"""Dev bypass."""
settings = get_settings()
if key != settings.api_key:
raise HTTPException(403, "Chiave non valida")
token = create_jwt("mancosu@kitzanos.com", "Carlo Mancosu")
logger.info("Dev login: mancosu@kitzanos.com")
return RedirectResponse(url=f"/admin?token={token}", status_code=302)
@router.get("/callback")
async def callback(code: str):
"""Callback da Google → redirect a /admin?token=JWT"""
settings = get_settings()
async with httpx.AsyncClient() as client:
token_resp = await client.post(GOOGLE_TOKEN_URL, data={
"code": code,
"client_id": settings.google_client_id,
"client_secret": settings.google_client_secret,
"redirect_uri": settings.google_redirect_uri,
"grant_type": "authorization_code",
})
if token_resp.status_code != 200:
logger.error(f"Google token error: {token_resp.text}")
raise HTTPException(401, "Autenticazione Google fallita")
access_token = token_resp.json().get("access_token")
async with httpx.AsyncClient() as client:
user_resp = await client.get(GOOGLE_USERINFO_URL,
headers={"Authorization": f"Bearer {access_token}"})
if user_resp.status_code != 200:
raise HTTPException(401, "Impossibile ottenere info utente")
user_info = user_resp.json()
email = user_info.get("email", "")
name = user_info.get("name", "")
picture = user_info.get("picture", "")
domain = email.split("@")[-1].lower() if "@" in email else ""
if domain not in settings.allowed_domains_list:
logger.warning(f"Accesso negato: {email}")
return RedirectResponse(url="/admin?error=domain", status_code=302)
token = create_jwt(email, name, picture)
logger.info(f"Login riuscito: {email}")
return RedirectResponse(url=f"/admin?token={token}", status_code=302)
@router.get("/me")
def me(request: Request):
user = get_current_user(request)
if not user:
raise HTTPException(401, "Non autenticato")
return {"email": user["sub"], "name": user.get("name", ""), "picture": user.get("picture", "")}
@router.get("/logout")
def logout():
resp = RedirectResponse(url="/admin?logout=1", status_code=302)
resp.delete_cookie(COOKIE_NAME, path="/")
return resp