lazyflat/web/routes/admin.py
EiSiMo f1e26b38d0 refactor: split web/app.py into routers
app.py was ~1300 lines with every route, helper, and middleware mixed
together. Split into:

- app.py (~100 lines): FastAPI bootstrap, lifespan, /health, security
  headers, Jinja filter registration, include_router calls
- common.py: shared helpers (templates, apply_client, base_context,
  _is_htmx, client_ip, require_internal, time helpers, filter helpers,
  apply-gate helpers, _kick_apply / _finish_apply_background,
  _bg_tasks, _spawn, _mask_secret, _has_running_application, BERLIN_TZ)
- routes/auth.py: /login (GET+POST), /logout
- routes/wohnungen.py: /, /partials/wohnungen, /partials/wohnung/{id},
  /flat-images/{slug}/{idx}, /actions/apply|reject|unreject|auto-apply|
  submit-forms|reset-circuit|filters|enrich-all|enrich-flat; owns
  _wohnungen_context + _wohnungen_partial_or_redirect
- routes/bewerbungen.py: /bewerbungen, /bewerbungen/{id}/report.zip
- routes/einstellungen.py: /einstellungen, /einstellungen/{section},
  /actions/profile|notifications|account/password|partner/*; owns
  VALID_SECTIONS
- routes/admin.py: /logs redirect, /admin, /admin/{section},
  /logs/export.csv, /actions/users/*|secrets; owns ADMIN_SECTIONS,
  _parse_date_range, _collect_events
- routes/internal.py: /internal/flats|heartbeat|error|secrets

Route-diff before/after is empty — all 41 routes + /static mount
preserved. No behavior changes, pure mechanical split.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 19:27:12 +02:00

224 lines
7.8 KiB
Python

"""Admin-only routes: protocol log viewer, user management, secrets,
CSV export, and the /logs legacy redirect."""
import io
import sqlite3
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, Form, HTTPException, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
import db
from auth import current_user, hash_password, require_admin, require_csrf
from common import (
BERLIN_TZ,
_de_dt,
_mask_secret,
base_context,
client_ip,
templates,
)
router = APIRouter()
ADMIN_SECTIONS = ("protokoll", "benutzer", "geheimnisse")
def _parse_date_range(from_str: str | None, to_str: str | None) -> tuple[str | None, str | None]:
"""Parse 'YYYY-MM-DD' local-Berlin date inputs into UTC ISO bounds.
Bounds are inclusive start-of-day and start-of-next-day."""
def _to_utc_iso(s: str, end_of_day: bool) -> str | None:
try:
d = datetime.strptime(s, "%Y-%m-%d").replace(tzinfo=BERLIN_TZ)
except ValueError:
return None
if end_of_day:
d = d + timedelta(days=1)
return d.astimezone(timezone.utc).isoformat(timespec="seconds")
start = _to_utc_iso(from_str, False) if from_str else None
end = _to_utc_iso(to_str, True) if to_str else None
return start, end
def _collect_events(start_iso: str | None, end_iso: str | None,
limit: int = 500) -> list[dict]:
users = {row["id"]: row["username"] for row in db.list_users()}
events: list[dict] = []
for a in db.audit_in_range(start_iso, end_iso, limit=limit):
events.append({
"kind": "audit", "ts": a["timestamp"], "source": "web",
"actor": a["actor"], "action": a["action"],
"details": a["details"] or "",
"user": users.get(a["user_id"], ""),
"ip": a["ip"] or "",
})
for e in db.errors_in_range(start_iso, end_iso, limit=limit):
events.append({
"kind": "error", "ts": e["timestamp"], "source": e["source"],
"actor": e["source"], "action": e["kind"],
"details": e["summary"] or "",
"user": users.get(e["user_id"], "") if e["user_id"] else "",
"ip": "",
})
events.sort(key=lambda x: x["ts"], reverse=True)
return events
@router.get("/logs")
def tab_logs_legacy():
# Old top-level Protokoll tab was merged into /admin/protokoll.
return RedirectResponse("/admin/protokoll", status_code=301)
@router.get("/admin", response_class=HTMLResponse)
def tab_admin_root(request: Request):
return RedirectResponse("/admin/protokoll", status_code=303)
@router.get("/admin/{section}", response_class=HTMLResponse)
def tab_admin(request: Request, section: str):
u = current_user(request)
if not u:
return RedirectResponse("/login", status_code=303)
if not u["is_admin"]:
raise HTTPException(403, "admin only")
if section not in ADMIN_SECTIONS:
raise HTTPException(404)
ctx = base_context(request, u, "admin")
ctx["section"] = section
if section == "protokoll":
q = request.query_params
from_str = q.get("from") or ""
to_str = q.get("to") or ""
start_iso, end_iso = _parse_date_range(from_str or None, to_str or None)
ctx.update({
"events": _collect_events(start_iso, end_iso, limit=500),
"from_str": from_str, "to_str": to_str,
})
elif section == "benutzer":
ctx["users"] = db.list_users()
elif section == "geheimnisse":
secrets = db.all_secrets()
ctx["secrets_masked"] = {k: _mask_secret(secrets.get(k, "")) for k in db.SECRET_KEYS}
ctx["secret_flash"] = request.query_params.get("ok")
return templates.TemplateResponse("admin.html", ctx)
@router.get("/logs/export.csv")
def tab_logs_export(request: Request):
u = current_user(request)
if not u:
raise HTTPException(401)
if not u["is_admin"]:
raise HTTPException(403)
import csv as _csv
q = request.query_params
start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None)
events = _collect_events(start_iso, end_iso, limit=5000)
buf = io.StringIO()
w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL)
w.writerow(["timestamp_utc", "timestamp_berlin", "kind", "source", "actor", "action", "user", "details", "ip"])
for e in events:
w.writerow([
e["ts"],
_de_dt(e["ts"]),
e["kind"],
e["source"],
e["actor"],
e["action"],
e["user"],
e["details"],
e["ip"],
])
body = buf.getvalue().encode("utf-8")
filename = "wohnungsdidi-protokoll"
if q.get("from"): filename += f"-{q['from']}"
if q.get("to"): filename += f"-bis-{q['to']}"
filename += ".csv"
return Response(
content=body, media_type="text/csv; charset=utf-8",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.post("/actions/users/create")
async def action_users_create(
request: Request,
username: str = Form(...),
password: str = Form(...),
is_admin: str = Form(""),
csrf: str = Form(...),
admin=Depends(require_admin),
):
require_csrf(admin["id"], csrf)
username = (username or "").strip()
if not username or len(password) < 10:
raise HTTPException(400, "username required, password >= 10 chars")
try:
uid = db.create_user(username, hash_password(password),
is_admin=(is_admin.lower() in ("on", "true", "yes", "1")))
except sqlite3.IntegrityError:
return RedirectResponse("/admin/benutzer?err=exists", status_code=303)
db.log_audit(admin["username"], "user.created", f"new_user={username} id={uid}",
user_id=admin["id"], ip=client_ip(request))
return RedirectResponse("/admin/benutzer?ok=1", status_code=303)
@router.post("/actions/users/disable")
async def action_users_disable(
request: Request,
target_id: int = Form(...),
value: str = Form(...),
csrf: str = Form(...),
admin=Depends(require_admin),
):
require_csrf(admin["id"], csrf)
if target_id == admin["id"]:
raise HTTPException(400, "refusing to disable self")
db.set_user_disabled(target_id, value == "on")
db.log_audit(admin["username"], "user.toggle_disable",
f"target={target_id} disabled={value=='on'}",
user_id=admin["id"], ip=client_ip(request))
return RedirectResponse("/admin/benutzer", status_code=303)
@router.post("/actions/users/delete")
async def action_users_delete(
request: Request,
target_id: int = Form(...),
csrf: str = Form(...),
admin=Depends(require_admin),
):
require_csrf(admin["id"], csrf)
if target_id == admin["id"]:
raise HTTPException(400, "refusing to delete self")
target = db.get_user(target_id)
if not target:
return RedirectResponse("/admin/benutzer", status_code=303)
db.delete_user(target_id)
db.log_audit(admin["username"], "user.deleted",
f"target={target_id} username={target['username']}",
user_id=admin["id"], ip=client_ip(request))
return RedirectResponse("/admin/benutzer?deleted=1", status_code=303)
@router.post("/actions/secrets")
async def action_secrets(request: Request, admin=Depends(require_admin)):
form = await request.form()
require_csrf(admin["id"], form.get("csrf", ""))
changed = []
for key in db.SECRET_KEYS:
raw = (form.get(key) or "").strip()
if not raw:
continue
db.set_secret(key, raw)
changed.append(key)
db.log_audit(admin["username"], "secrets.updated",
",".join(changed) or "no-op",
user_id=admin["id"], ip=client_ip(request))
return RedirectResponse("/admin/geheimnisse?ok=1", status_code=303)