lazyflat/web/auth.py
Moritz c630b500ef multi-user: users, per-user profiles/filters/notifications, tab UI, apply forensics
* DB: users + user_profiles/filters/notifications/preferences; applications gets
  user_id + forensics_json + profile_snapshot_json; new errors table
  with 14d retention; schema versioning via MIGRATIONS list
* auth: password hashes in DB (argon2); env vars seed first admin; per-user
  sessions; CSRF bound to user id
* apply: personal info/WBS moved out of env into the request body; providers
  take an ApplyContext with Profile + submit_forms; full Playwright recorder
  (step log, console, page errors, network, screenshots, final HTML)
* web: five top-level tabs (Wohnungen/Bewerbungen/Logs/Fehler/Einstellungen);
  settings sub-tabs profil/filter/benachrichtigungen/account/benutzer;
  per-user matching, auto-apply and notifications (UI/Telegram/SMTP); red
  auto-apply switch on Wohnungen tab; forensics detail view for bewerbungen
  and fehler; retention background thread

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 10:52:41 +02:00

180 lines
5.4 KiB
Python

"""
Authentication and session handling.
- Users live in the DB (users table).
- On boot we seed an admin from env if no user exists yet.
- Sessions are signed cookies (itsdangerous); CSRF is a separate signed token.
- Login is rate-limited per IP.
"""
import hmac
import logging
import sqlite3
import threading
import time
from typing import Optional
from argon2 import PasswordHasher
from argon2.exceptions import InvalidHash, VerifyMismatchError
from fastapi import HTTPException, Request, Response, status
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
import db
from settings import (
AUTH_PASSWORD_HASH,
AUTH_USERNAME,
COOKIE_SECURE,
LOGIN_RATE_LIMIT,
LOGIN_RATE_WINDOW_SECONDS,
SESSION_COOKIE_NAME,
SESSION_MAX_AGE_SECONDS,
SESSION_SECRET,
)
logger = logging.getLogger("web.auth")
_hasher = PasswordHasher()
_serializer = URLSafeTimedSerializer(SESSION_SECRET, salt="session")
_csrf_serializer = URLSafeTimedSerializer(SESSION_SECRET, salt="csrf")
# ---------- Password hashing --------------------------------------------------
def hash_password(password: str) -> str:
return _hasher.hash(password)
def verify_hash(password_hash: str, password: str) -> bool:
try:
_hasher.verify(password_hash, password)
return True
except (VerifyMismatchError, InvalidHash):
return False
# ---------- Admin bootstrap ---------------------------------------------------
def bootstrap_admin() -> None:
"""If no users exist, create one from env vars as admin."""
existing = db.list_users()
if existing:
logger.info("users already present (%d), skipping admin bootstrap", len(existing))
return
try:
uid = db.create_user(AUTH_USERNAME, AUTH_PASSWORD_HASH, is_admin=True)
logger.info("bootstrapped admin user '%s' (id=%s) from env", AUTH_USERNAME, uid)
except sqlite3.IntegrityError as e:
logger.warning("bootstrap admin failed: %s", e)
# ---------- Login -------------------------------------------------------------
def verify_login(username: str, password: str) -> Optional[sqlite3.Row]:
"""Return user row on success, None otherwise. Timing-safe path runs hash check either way."""
user = db.get_user_by_username(username or "")
if user is None:
# Run the hasher anyway so timing doesn't leak existence
try:
_hasher.verify(AUTH_PASSWORD_HASH, password or "")
except Exception:
pass
return None
if not verify_hash(user["password_hash"], password or ""):
return None
return user
# ---------- Session cookies ---------------------------------------------------
def issue_session_cookie(response: Response, user_id: int) -> None:
token = _serializer.dumps({"uid": user_id, "iat": int(time.time())})
response.set_cookie(
key=SESSION_COOKIE_NAME,
value=token,
max_age=SESSION_MAX_AGE_SECONDS,
httponly=True,
secure=COOKIE_SECURE,
samesite="strict",
path="/",
)
def clear_session_cookie(response: Response) -> None:
response.delete_cookie(
SESSION_COOKIE_NAME, path="/",
secure=COOKIE_SECURE, httponly=True, samesite="strict",
)
def current_user(request: Request) -> Optional[sqlite3.Row]:
token = request.cookies.get(SESSION_COOKIE_NAME)
if not token:
return None
try:
data = _serializer.loads(token, max_age=SESSION_MAX_AGE_SECONDS)
except (BadSignature, SignatureExpired):
return None
uid = data.get("uid")
if not uid:
return None
u = db.get_user(uid)
if u is None or u["disabled"]:
return None
return u
def require_user(request: Request) -> sqlite3.Row:
u = current_user(request)
if u is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="login required")
return u
def require_admin(request: Request) -> sqlite3.Row:
u = require_user(request)
if not u["is_admin"]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="admin only")
return u
# ---------- CSRF --------------------------------------------------------------
def issue_csrf_token(user_id: int) -> str:
return _csrf_serializer.dumps({"uid": user_id})
def verify_csrf(user_id: int, submitted: str) -> bool:
if not submitted:
return False
try:
data = _csrf_serializer.loads(submitted, max_age=SESSION_MAX_AGE_SECONDS)
except (BadSignature, SignatureExpired):
return False
return int(data.get("uid", -1)) == int(user_id)
def require_csrf(user_id: int, token: str) -> None:
if not verify_csrf(user_id, token):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="bad csrf")
# ---------- Login rate limiting ----------------------------------------------
_rate_lock = threading.Lock()
_rate_log: dict[str, list[float]] = {}
def rate_limit_login(ip: str) -> bool:
now = time.time()
cutoff = now - LOGIN_RATE_WINDOW_SECONDS
with _rate_lock:
attempts = [t for t in _rate_log.get(ip, []) if t > cutoff]
if len(attempts) >= LOGIN_RATE_LIMIT:
_rate_log[ip] = attempts
return False
attempts.append(now)
_rate_log[ip] = attempts
if len(_rate_log) > 1024:
for k in list(_rate_log.keys()):
if not _rate_log[k] or _rate_log[k][-1] < cutoff:
_rate_log.pop(k, None)
return True