* DB: users + user_profiles/filters/notifications/preferences; applications gets user_id + forensics_json + profile_snapshot_json; new errors table with 14d retention; schema versioning via MIGRATIONS list * auth: password hashes in DB (argon2); env vars seed first admin; per-user sessions; CSRF bound to user id * apply: personal info/WBS moved out of env into the request body; providers take an ApplyContext with Profile + submit_forms; full Playwright recorder (step log, console, page errors, network, screenshots, final HTML) * web: five top-level tabs (Wohnungen/Bewerbungen/Logs/Fehler/Einstellungen); settings sub-tabs profil/filter/benachrichtigungen/account/benutzer; per-user matching, auto-apply and notifications (UI/Telegram/SMTP); red auto-apply switch on Wohnungen tab; forensics detail view for bewerbungen and fehler; retention background thread Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
180 lines
5.4 KiB
Python
180 lines
5.4 KiB
Python
"""
|
|
Authentication and session handling.
|
|
|
|
- Users live in the DB (users table).
|
|
- On boot we seed an admin from env if no user exists yet.
|
|
- Sessions are signed cookies (itsdangerous); CSRF is a separate signed token.
|
|
- Login is rate-limited per IP.
|
|
"""
|
|
import hmac
|
|
import logging
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import InvalidHash, VerifyMismatchError
|
|
from fastapi import HTTPException, Request, Response, status
|
|
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
|
|
|
|
import db
|
|
from settings import (
|
|
AUTH_PASSWORD_HASH,
|
|
AUTH_USERNAME,
|
|
COOKIE_SECURE,
|
|
LOGIN_RATE_LIMIT,
|
|
LOGIN_RATE_WINDOW_SECONDS,
|
|
SESSION_COOKIE_NAME,
|
|
SESSION_MAX_AGE_SECONDS,
|
|
SESSION_SECRET,
|
|
)
|
|
|
|
logger = logging.getLogger("web.auth")
|
|
|
|
_hasher = PasswordHasher()
|
|
_serializer = URLSafeTimedSerializer(SESSION_SECRET, salt="session")
|
|
_csrf_serializer = URLSafeTimedSerializer(SESSION_SECRET, salt="csrf")
|
|
|
|
|
|
# ---------- Password hashing --------------------------------------------------
|
|
|
|
def hash_password(password: str) -> str:
|
|
return _hasher.hash(password)
|
|
|
|
|
|
def verify_hash(password_hash: str, password: str) -> bool:
|
|
try:
|
|
_hasher.verify(password_hash, password)
|
|
return True
|
|
except (VerifyMismatchError, InvalidHash):
|
|
return False
|
|
|
|
|
|
# ---------- Admin bootstrap ---------------------------------------------------
|
|
|
|
def bootstrap_admin() -> None:
|
|
"""If no users exist, create one from env vars as admin."""
|
|
existing = db.list_users()
|
|
if existing:
|
|
logger.info("users already present (%d), skipping admin bootstrap", len(existing))
|
|
return
|
|
try:
|
|
uid = db.create_user(AUTH_USERNAME, AUTH_PASSWORD_HASH, is_admin=True)
|
|
logger.info("bootstrapped admin user '%s' (id=%s) from env", AUTH_USERNAME, uid)
|
|
except sqlite3.IntegrityError as e:
|
|
logger.warning("bootstrap admin failed: %s", e)
|
|
|
|
|
|
# ---------- Login -------------------------------------------------------------
|
|
|
|
def verify_login(username: str, password: str) -> Optional[sqlite3.Row]:
|
|
"""Return user row on success, None otherwise. Timing-safe path runs hash check either way."""
|
|
user = db.get_user_by_username(username or "")
|
|
if user is None:
|
|
# Run the hasher anyway so timing doesn't leak existence
|
|
try:
|
|
_hasher.verify(AUTH_PASSWORD_HASH, password or "")
|
|
except Exception:
|
|
pass
|
|
return None
|
|
if not verify_hash(user["password_hash"], password or ""):
|
|
return None
|
|
return user
|
|
|
|
|
|
# ---------- Session cookies ---------------------------------------------------
|
|
|
|
def issue_session_cookie(response: Response, user_id: int) -> None:
|
|
token = _serializer.dumps({"uid": user_id, "iat": int(time.time())})
|
|
response.set_cookie(
|
|
key=SESSION_COOKIE_NAME,
|
|
value=token,
|
|
max_age=SESSION_MAX_AGE_SECONDS,
|
|
httponly=True,
|
|
secure=COOKIE_SECURE,
|
|
samesite="strict",
|
|
path="/",
|
|
)
|
|
|
|
|
|
def clear_session_cookie(response: Response) -> None:
|
|
response.delete_cookie(
|
|
SESSION_COOKIE_NAME, path="/",
|
|
secure=COOKIE_SECURE, httponly=True, samesite="strict",
|
|
)
|
|
|
|
|
|
def current_user(request: Request) -> Optional[sqlite3.Row]:
|
|
token = request.cookies.get(SESSION_COOKIE_NAME)
|
|
if not token:
|
|
return None
|
|
try:
|
|
data = _serializer.loads(token, max_age=SESSION_MAX_AGE_SECONDS)
|
|
except (BadSignature, SignatureExpired):
|
|
return None
|
|
uid = data.get("uid")
|
|
if not uid:
|
|
return None
|
|
u = db.get_user(uid)
|
|
if u is None or u["disabled"]:
|
|
return None
|
|
return u
|
|
|
|
|
|
def require_user(request: Request) -> sqlite3.Row:
|
|
u = current_user(request)
|
|
if u is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="login required")
|
|
return u
|
|
|
|
|
|
def require_admin(request: Request) -> sqlite3.Row:
|
|
u = require_user(request)
|
|
if not u["is_admin"]:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="admin only")
|
|
return u
|
|
|
|
|
|
# ---------- CSRF --------------------------------------------------------------
|
|
|
|
def issue_csrf_token(user_id: int) -> str:
|
|
return _csrf_serializer.dumps({"uid": user_id})
|
|
|
|
|
|
def verify_csrf(user_id: int, submitted: str) -> bool:
|
|
if not submitted:
|
|
return False
|
|
try:
|
|
data = _csrf_serializer.loads(submitted, max_age=SESSION_MAX_AGE_SECONDS)
|
|
except (BadSignature, SignatureExpired):
|
|
return False
|
|
return int(data.get("uid", -1)) == int(user_id)
|
|
|
|
|
|
def require_csrf(user_id: int, token: str) -> None:
|
|
if not verify_csrf(user_id, token):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="bad csrf")
|
|
|
|
|
|
# ---------- Login rate limiting ----------------------------------------------
|
|
|
|
_rate_lock = threading.Lock()
|
|
_rate_log: dict[str, list[float]] = {}
|
|
|
|
|
|
def rate_limit_login(ip: str) -> bool:
|
|
now = time.time()
|
|
cutoff = now - LOGIN_RATE_WINDOW_SECONDS
|
|
with _rate_lock:
|
|
attempts = [t for t in _rate_log.get(ip, []) if t > cutoff]
|
|
if len(attempts) >= LOGIN_RATE_LIMIT:
|
|
_rate_log[ip] = attempts
|
|
return False
|
|
attempts.append(now)
|
|
_rate_log[ip] = attempts
|
|
if len(_rate_log) > 1024:
|
|
for k in list(_rate_log.keys()):
|
|
if not _rate_log[k] or _rate_log[k][-1] < cutoff:
|
|
_rate_log.pop(k, None)
|
|
return True
|