""" Authentication and session handling. - Users live in the DB (users table). - On boot we seed an admin from env if no user exists yet. - Sessions are signed cookies (itsdangerous); CSRF is a separate signed token. - Login is rate-limited per IP. """ import hmac import logging import sqlite3 import threading import time from typing import Optional from argon2 import PasswordHasher from argon2.exceptions import InvalidHash, VerifyMismatchError from fastapi import HTTPException, Request, Response, status from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer import db from settings import ( AUTH_PASSWORD_HASH, AUTH_USERNAME, COOKIE_SECURE, LOGIN_RATE_LIMIT, LOGIN_RATE_WINDOW_SECONDS, SESSION_COOKIE_NAME, SESSION_MAX_AGE_SECONDS, SESSION_SECRET, ) logger = logging.getLogger("web.auth") _hasher = PasswordHasher() _serializer = URLSafeTimedSerializer(SESSION_SECRET, salt="session") _csrf_serializer = URLSafeTimedSerializer(SESSION_SECRET, salt="csrf") # ---------- Password hashing -------------------------------------------------- def hash_password(password: str) -> str: return _hasher.hash(password) def verify_hash(password_hash: str, password: str) -> bool: try: _hasher.verify(password_hash, password) return True except (VerifyMismatchError, InvalidHash): return False # ---------- Admin bootstrap --------------------------------------------------- def bootstrap_admin() -> None: """If no users exist, create one from env vars as admin.""" existing = db.list_users() if existing: logger.info("users already present (%d), skipping admin bootstrap", len(existing)) return try: uid = db.create_user(AUTH_USERNAME, AUTH_PASSWORD_HASH, is_admin=True) logger.info("bootstrapped admin user '%s' (id=%s) from env", AUTH_USERNAME, uid) except sqlite3.IntegrityError as e: logger.warning("bootstrap admin failed: %s", e) # ---------- Login ------------------------------------------------------------- def verify_login(username: str, password: str) -> Optional[sqlite3.Row]: """Return user row on success, None otherwise. Timing-safe path runs hash check either way.""" user = db.get_user_by_username(username or "") if user is None: # Run the hasher anyway so timing doesn't leak existence try: _hasher.verify(AUTH_PASSWORD_HASH, password or "") except Exception: pass return None if not verify_hash(user["password_hash"], password or ""): return None return user # ---------- Session cookies --------------------------------------------------- def issue_session_cookie(response: Response, user_id: int) -> None: token = _serializer.dumps({"uid": user_id, "iat": int(time.time())}) response.set_cookie( key=SESSION_COOKIE_NAME, value=token, max_age=SESSION_MAX_AGE_SECONDS, httponly=True, secure=COOKIE_SECURE, samesite="strict", path="/", ) def clear_session_cookie(response: Response) -> None: response.delete_cookie( SESSION_COOKIE_NAME, path="/", secure=COOKIE_SECURE, httponly=True, samesite="strict", ) def current_user(request: Request) -> Optional[sqlite3.Row]: token = request.cookies.get(SESSION_COOKIE_NAME) if not token: return None try: data = _serializer.loads(token, max_age=SESSION_MAX_AGE_SECONDS) except (BadSignature, SignatureExpired): return None uid = data.get("uid") if not uid: return None u = db.get_user(uid) if u is None or u["disabled"]: return None return u def require_user(request: Request) -> sqlite3.Row: u = current_user(request) if u is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="login required") return u def require_admin(request: Request) -> sqlite3.Row: u = require_user(request) if not u["is_admin"]: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="admin only") return u # ---------- CSRF -------------------------------------------------------------- def issue_csrf_token(user_id: int) -> str: return _csrf_serializer.dumps({"uid": user_id}) def verify_csrf(user_id: int, submitted: str) -> bool: if not submitted: return False try: data = _csrf_serializer.loads(submitted, max_age=SESSION_MAX_AGE_SECONDS) except (BadSignature, SignatureExpired): return False return int(data.get("uid", -1)) == int(user_id) def require_csrf(user_id: int, token: str) -> None: if not verify_csrf(user_id, token): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="bad csrf") # ---------- Login rate limiting ---------------------------------------------- _rate_lock = threading.Lock() _rate_log: dict[str, list[float]] = {} def rate_limit_login(ip: str) -> bool: now = time.time() cutoff = now - LOGIN_RATE_WINDOW_SECONDS with _rate_lock: attempts = [t for t in _rate_log.get(ip, []) if t > cutoff] if len(attempts) >= LOGIN_RATE_LIMIT: _rate_log[ip] = attempts return False attempts.append(now) _rate_log[ip] = attempts if len(_rate_log) > 1024: for k in list(_rate_log.keys()): if not _rate_log[k] or _rate_log[k][-1] < cutoff: _rate_log.pop(k, None) return True