Compare commits
2 commits
77098c82df
...
cb617dd38a
| Author | SHA1 | Date | |
|---|---|---|---|
| cb617dd38a | |||
| eb73b5e415 |
8 changed files with 113 additions and 45 deletions
|
|
@ -1,6 +1,11 @@
|
||||||
import hashlib
|
import hashlib
|
||||||
import pickle
|
import json
|
||||||
|
|
||||||
def hash_any_object(var):
|
|
||||||
byte_stream = pickle.dumps(var)
|
def hash_any_object(var) -> str:
|
||||||
return hashlib.sha256(byte_stream).hexdigest()
|
"""Deterministic sha256 over a JSON-encodable value. Replaces the older
|
||||||
|
pickle-based hash: pickle is nondeterministic across Python versions /
|
||||||
|
dict-ordering tweaks and pulls more attack surface than we need for a
|
||||||
|
plain change-detection fingerprint."""
|
||||||
|
encoded = json.dumps(var, sort_keys=True, ensure_ascii=False, default=str)
|
||||||
|
return hashlib.sha256(encoded.encode("utf-8")).hexdigest()
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
import tomllib
|
import tomllib
|
||||||
|
|
||||||
from settings import *
|
|
||||||
from paths import *
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from settings import LANGUAGE
|
||||||
|
from paths import TRANSLATIONS_FILE
|
||||||
|
|
||||||
logger = logging.getLogger("flat-apply")
|
logger = logging.getLogger("flat-apply")
|
||||||
|
|
||||||
with open(TRANSLATIONS_FILE, "rb") as f:
|
with open(TRANSLATIONS_FILE, "rb") as f:
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
import hmac
|
||||||
import logging
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from urllib.parse import urljoin, urlparse
|
from urllib.parse import urljoin, urlparse
|
||||||
|
|
@ -71,7 +72,7 @@ class ApplyResponse(BaseModel):
|
||||||
def require_api_key(x_internal_api_key: str | None = Header(default=None)) -> None:
|
def require_api_key(x_internal_api_key: str | None = Header(default=None)) -> None:
|
||||||
if not INTERNAL_API_KEY:
|
if not INTERNAL_API_KEY:
|
||||||
raise HTTPException(status_code=503, detail="INTERNAL_API_KEY not configured")
|
raise HTTPException(status_code=503, detail="INTERNAL_API_KEY not configured")
|
||||||
if x_internal_api_key != INTERNAL_API_KEY:
|
if not x_internal_api_key or not hmac.compare_digest(x_internal_api_key, INTERNAL_API_KEY):
|
||||||
raise HTTPException(status_code=401, detail="invalid api key")
|
raise HTTPException(status_code=401, detail="invalid api key")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
28
web/app.py
28
web/app.py
|
|
@ -69,6 +69,17 @@ logger = logging.getLogger("web")
|
||||||
|
|
||||||
apply_client = ApplyClient()
|
apply_client = ApplyClient()
|
||||||
|
|
||||||
|
# Strong refs for fire-and-forget tasks. asyncio.create_task only weakly
|
||||||
|
# references tasks from the event loop — the GC could drop them mid-flight.
|
||||||
|
_bg_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def _spawn(coro) -> asyncio.Task:
|
||||||
|
t = asyncio.create_task(coro)
|
||||||
|
_bg_tasks.add(t)
|
||||||
|
t.add_done_callback(_bg_tasks.discard)
|
||||||
|
return t
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# App + Jinja
|
# App + Jinja
|
||||||
|
|
@ -326,7 +337,7 @@ def _kick_apply(user_id: int, flat_id: str, url: str, triggered_by: str) -> None
|
||||||
profile_snapshot=profile,
|
profile_snapshot=profile,
|
||||||
)
|
)
|
||||||
|
|
||||||
asyncio.create_task(asyncio.to_thread(
|
_spawn(asyncio.to_thread(
|
||||||
_finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms,
|
_finish_apply_background, app_id, user_id, flat_id, url, profile, submit_forms,
|
||||||
))
|
))
|
||||||
|
|
||||||
|
|
@ -784,12 +795,11 @@ def _parse_date_range(from_str: str | None, to_str: str | None) -> tuple[str | N
|
||||||
return start, end
|
return start, end
|
||||||
|
|
||||||
|
|
||||||
def _collect_events(start_iso: str | None, end_iso: str | None) -> list[dict]:
|
def _collect_events(start_iso: str | None, end_iso: str | None,
|
||||||
|
limit: int = 500) -> list[dict]:
|
||||||
users = {row["id"]: row["username"] for row in db.list_users()}
|
users = {row["id"]: row["username"] for row in db.list_users()}
|
||||||
events: list[dict] = []
|
events: list[dict] = []
|
||||||
for a in db.recent_audit(None, limit=5000):
|
for a in db.audit_in_range(start_iso, end_iso, limit=limit):
|
||||||
if start_iso and a["timestamp"] < start_iso: continue
|
|
||||||
if end_iso and a["timestamp"] >= end_iso: continue
|
|
||||||
events.append({
|
events.append({
|
||||||
"kind": "audit", "ts": a["timestamp"], "source": "web",
|
"kind": "audit", "ts": a["timestamp"], "source": "web",
|
||||||
"actor": a["actor"], "action": a["action"],
|
"actor": a["actor"], "action": a["action"],
|
||||||
|
|
@ -797,9 +807,7 @@ def _collect_events(start_iso: str | None, end_iso: str | None) -> list[dict]:
|
||||||
"user": users.get(a["user_id"], ""),
|
"user": users.get(a["user_id"], ""),
|
||||||
"ip": a["ip"] or "",
|
"ip": a["ip"] or "",
|
||||||
})
|
})
|
||||||
for e in db.recent_errors(None, limit=5000):
|
for e in db.errors_in_range(start_iso, end_iso, limit=limit):
|
||||||
if start_iso and e["timestamp"] < start_iso: continue
|
|
||||||
if end_iso and e["timestamp"] >= end_iso: continue
|
|
||||||
events.append({
|
events.append({
|
||||||
"kind": "error", "ts": e["timestamp"], "source": e["source"],
|
"kind": "error", "ts": e["timestamp"], "source": e["source"],
|
||||||
"actor": e["source"], "action": e["kind"],
|
"actor": e["source"], "action": e["kind"],
|
||||||
|
|
@ -852,7 +860,7 @@ def tab_admin(request: Request, section: str):
|
||||||
to_str = q.get("to") or ""
|
to_str = q.get("to") or ""
|
||||||
start_iso, end_iso = _parse_date_range(from_str or None, to_str or None)
|
start_iso, end_iso = _parse_date_range(from_str or None, to_str or None)
|
||||||
ctx.update({
|
ctx.update({
|
||||||
"events": _collect_events(start_iso, end_iso)[:500],
|
"events": _collect_events(start_iso, end_iso, limit=500),
|
||||||
"from_str": from_str, "to_str": to_str,
|
"from_str": from_str, "to_str": to_str,
|
||||||
})
|
})
|
||||||
elif section == "benutzer":
|
elif section == "benutzer":
|
||||||
|
|
@ -875,7 +883,7 @@ def tab_logs_export(request: Request):
|
||||||
import csv as _csv
|
import csv as _csv
|
||||||
q = request.query_params
|
q = request.query_params
|
||||||
start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None)
|
start_iso, end_iso = _parse_date_range(q.get("from") or None, q.get("to") or None)
|
||||||
events = _collect_events(start_iso, end_iso)
|
events = _collect_events(start_iso, end_iso, limit=5000)
|
||||||
|
|
||||||
buf = io.StringIO()
|
buf = io.StringIO()
|
||||||
w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL)
|
w = _csv.writer(buf, delimiter=",", quoting=_csv.QUOTE_MINIMAL)
|
||||||
|
|
|
||||||
|
|
@ -7,25 +7,14 @@ logger = logging.getLogger("web.apply_client")
|
||||||
|
|
||||||
|
|
||||||
def _row_to_profile(profile_row) -> dict:
|
def _row_to_profile(profile_row) -> dict:
|
||||||
"""Convert a user_profiles row to the apply service Profile dict."""
|
"""Convert a user_profiles row into the payload dict for /apply.
|
||||||
|
|
||||||
|
Apply-side ProfileModel (Pydantic) validates + coerces types; we just
|
||||||
|
hand over whatever the row has. `updated_at` and any other extra keys
|
||||||
|
are ignored by the model."""
|
||||||
if profile_row is None:
|
if profile_row is None:
|
||||||
return {}
|
return {}
|
||||||
keys = [
|
return dict(profile_row)
|
||||||
"salutation", "firstname", "lastname", "email", "telephone",
|
|
||||||
"street", "house_number", "postcode", "city",
|
|
||||||
"is_possessing_wbs", "wbs_type", "wbs_valid_till",
|
|
||||||
"wbs_rooms", "wbs_adults", "wbs_children", "is_prio_wbs",
|
|
||||||
"immomio_email", "immomio_password",
|
|
||||||
]
|
|
||||||
d = {}
|
|
||||||
for k in keys:
|
|
||||||
try:
|
|
||||||
d[k] = profile_row[k]
|
|
||||||
except (KeyError, IndexError):
|
|
||||||
pass
|
|
||||||
for k in ("is_possessing_wbs", "is_prio_wbs"):
|
|
||||||
d[k] = bool(d.get(k) or 0)
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
class ApplyClient:
|
class ApplyClient:
|
||||||
|
|
|
||||||
65
web/db.py
65
web/db.py
|
|
@ -9,6 +9,7 @@ import json
|
||||||
import logging
|
import logging
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import threading
|
import threading
|
||||||
|
from contextlib import contextmanager
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from typing import Any, Iterable, Optional
|
from typing import Any, Iterable, Optional
|
||||||
|
|
||||||
|
|
@ -42,6 +43,23 @@ def _get_conn() -> sqlite3.Connection:
|
||||||
return c
|
return c
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _tx():
|
||||||
|
"""Atomic BEGIN IMMEDIATE … COMMIT/ROLLBACK. Required for multi-statement
|
||||||
|
writes because our connections run in autocommit mode (isolation_level=None).
|
||||||
|
Combine with _lock to serialize writers and avoid BUSY.
|
||||||
|
"""
|
||||||
|
c = _get_conn()
|
||||||
|
c.execute("BEGIN IMMEDIATE")
|
||||||
|
try:
|
||||||
|
yield c
|
||||||
|
except Exception:
|
||||||
|
c.execute("ROLLBACK")
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
c.execute("COMMIT")
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Schema
|
# Schema
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
@ -242,6 +260,11 @@ MIGRATIONS: list[str] = [
|
||||||
CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id);
|
CREATE INDEX IF NOT EXISTS idx_partnerships_from ON partnerships(from_user_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id);
|
CREATE INDEX IF NOT EXISTS idx_partnerships_to ON partnerships(to_user_id);
|
||||||
""",
|
""",
|
||||||
|
# 0009: composite index for latest_applications_by_flat + last_application_for_flat
|
||||||
|
"""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_applications_user_flat_started
|
||||||
|
ON applications(user_id, flat_id, started_at DESC);
|
||||||
|
""",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -762,6 +785,36 @@ def recent_audit(user_id: Optional[int], limit: int = 100) -> list[sqlite3.Row]:
|
||||||
).fetchall())
|
).fetchall())
|
||||||
|
|
||||||
|
|
||||||
|
def _range_filter_rows(table: str, ts_col: str, start_iso: Optional[str],
|
||||||
|
end_iso: Optional[str], limit: int) -> list[sqlite3.Row]:
|
||||||
|
"""Date-range filtered fetch from an append-only table. Pushes the
|
||||||
|
timestamp filter into SQL so we don't drag 5000 rows into Python just
|
||||||
|
to discard most of them."""
|
||||||
|
clauses, params = [], []
|
||||||
|
if start_iso:
|
||||||
|
clauses.append(f"{ts_col} >= ?")
|
||||||
|
params.append(start_iso)
|
||||||
|
if end_iso:
|
||||||
|
clauses.append(f"{ts_col} < ?")
|
||||||
|
params.append(end_iso)
|
||||||
|
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
|
||||||
|
params.append(limit)
|
||||||
|
return list(_get_conn().execute(
|
||||||
|
f"SELECT * FROM {table} {where} ORDER BY {ts_col} DESC LIMIT ?",
|
||||||
|
params,
|
||||||
|
).fetchall())
|
||||||
|
|
||||||
|
|
||||||
|
def audit_in_range(start_iso: Optional[str], end_iso: Optional[str],
|
||||||
|
limit: int = 500) -> list[sqlite3.Row]:
|
||||||
|
return _range_filter_rows("audit_log", "timestamp", start_iso, end_iso, limit)
|
||||||
|
|
||||||
|
|
||||||
|
def errors_in_range(start_iso: Optional[str], end_iso: Optional[str],
|
||||||
|
limit: int = 500) -> list[sqlite3.Row]:
|
||||||
|
return _range_filter_rows("errors", "timestamp", start_iso, end_iso, limit)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Retention cleanup
|
# Retention cleanup
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
@ -845,13 +898,13 @@ def partnership_accept(request_id: int, user_id: int) -> bool:
|
||||||
if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id):
|
if get_accepted_partnership(row["from_user_id"]) or get_accepted_partnership(user_id):
|
||||||
return False
|
return False
|
||||||
partner_id = row["from_user_id"]
|
partner_id = row["from_user_id"]
|
||||||
with _lock:
|
with _lock, _tx() as c:
|
||||||
_get_conn().execute(
|
c.execute(
|
||||||
"UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?",
|
"UPDATE partnerships SET status = 'accepted', accepted_at = ? WHERE id = ?",
|
||||||
(now_iso(), request_id),
|
(now_iso(), request_id),
|
||||||
)
|
)
|
||||||
# clean up any stale pending requests touching either user
|
# clean up any stale pending requests touching either user
|
||||||
_get_conn().execute(
|
c.execute(
|
||||||
"""DELETE FROM partnerships
|
"""DELETE FROM partnerships
|
||||||
WHERE status = 'pending'
|
WHERE status = 'pending'
|
||||||
AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""",
|
AND (from_user_id IN (?, ?) OR to_user_id IN (?, ?))""",
|
||||||
|
|
@ -899,12 +952,12 @@ def partner_flat_actions(partner_id: int) -> dict:
|
||||||
def cleanup_retention() -> dict:
|
def cleanup_retention() -> dict:
|
||||||
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds")
|
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat(timespec="seconds")
|
||||||
stats = {}
|
stats = {}
|
||||||
with _lock:
|
with _lock, _tx() as c:
|
||||||
for table in ("errors", "audit_log"):
|
for table in ("errors", "audit_log"):
|
||||||
cur = _get_conn().execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,))
|
cur = c.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,))
|
||||||
stats[table] = cur.rowcount
|
stats[table] = cur.rowcount
|
||||||
# Drop forensics from old applications (but keep the row itself for history)
|
# Drop forensics from old applications (but keep the row itself for history)
|
||||||
cur = _get_conn().execute(
|
cur = c.execute(
|
||||||
"UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL",
|
"UPDATE applications SET forensics_json = NULL WHERE started_at < ? AND forensics_json IS NOT NULL",
|
||||||
(cutoff,),
|
(cutoff,),
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -218,8 +218,20 @@ def _record_failure(flat_id: str, step: str, reason: str) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Holding strong references to every spawned task so asyncio doesn't GC them
|
||||||
|
# mid-flight. create_task only weakly references tasks from the event loop.
|
||||||
|
_bg_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def _spawn(coro) -> asyncio.Task:
|
||||||
|
t = asyncio.create_task(coro)
|
||||||
|
_bg_tasks.add(t)
|
||||||
|
t.add_done_callback(_bg_tasks.discard)
|
||||||
|
return t
|
||||||
|
|
||||||
|
|
||||||
def kick(flat_id: str) -> None:
|
def kick(flat_id: str) -> None:
|
||||||
asyncio.create_task(asyncio.to_thread(enrich_flat_sync, flat_id))
|
_spawn(asyncio.to_thread(enrich_flat_sync, flat_id))
|
||||||
|
|
||||||
|
|
||||||
async def _backfill_runner() -> None:
|
async def _backfill_runner() -> None:
|
||||||
|
|
@ -234,5 +246,5 @@ async def _backfill_runner() -> None:
|
||||||
|
|
||||||
def kick_backfill() -> int:
|
def kick_backfill() -> int:
|
||||||
pending = db.flats_needing_enrichment(limit=200)
|
pending = db.flats_needing_enrichment(limit=200)
|
||||||
asyncio.create_task(_backfill_runner())
|
_spawn(_backfill_runner())
|
||||||
return len(pending)
|
return len(pending)
|
||||||
|
|
|
||||||
|
|
@ -84,8 +84,8 @@ def on_apply_ok(user_id: int, flat: dict, message: str) -> None:
|
||||||
|
|
||||||
def on_apply_fail(user_id: int, flat: dict, message: str) -> None:
|
def on_apply_fail(user_id: int, flat: dict, message: str) -> None:
|
||||||
addr = flat.get("address") or flat.get("link")
|
addr = flat.get("address") or flat.get("link")
|
||||||
body = f"Bewerbung fehlgeschlagen: {addr}\n{message}\n{PUBLIC_URL}/fehler"
|
body = f"Bewerbung fehlgeschlagen: {addr}\n{message}\n{PUBLIC_URL}/bewerbungen"
|
||||||
md = (f"*Bewerbung fehlgeschlagen*\n{addr}\n{message}\n"
|
md = (f"*Bewerbung fehlgeschlagen*\n{addr}\n{message}\n"
|
||||||
f"[Fehler ansehen]({PUBLIC_URL}/fehler)")
|
f"[Bewerbungen ansehen]({PUBLIC_URL}/bewerbungen)")
|
||||||
notify_user(user_id, "apply_fail", subject="[wohnungsdidi] Bewerbung fehlgeschlagen",
|
notify_user(user_id, "apply_fail", subject="[wohnungsdidi] Bewerbung fehlgeschlagen",
|
||||||
body_plain=body, body_markdown=md)
|
body_plain=body, body_markdown=md)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue