Per review §2: - web/db.py: new _tx() context manager wraps multi-statement writers in BEGIN IMMEDIATE … COMMIT/ROLLBACK (our connections run in autocommit mode, so plain `with _lock:` doesn't give atomicity). partnership_accept (UPDATE + DELETE) and cleanup_retention (3 deletes/updates) now use it. - Fire-and-forget tasks: add module-level _bg_tasks sets in web/app.py and web/enrichment.py. A _spawn() helper holds a strong ref until the task finishes so the GC can't drop it mid-flight (CPython's event loop only weakly references pending tasks). - apply/main.py: require_api_key uses hmac.compare_digest, matching web's check. Also imports now use explicit names instead of `from settings *`. - apply/language.py: replace `from settings import *` + `from paths import *` with explicit imports — this is the pattern that caused the LANGUAGE NameError earlier. - alert/utils.py: pickle-based hash_any_object → deterministic JSON+sha256. Cheaper, portable across Python versions, no pickle attack surface. - web/notifications.py: /fehler links repointed to /bewerbungen (the former page doesn't exist). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
91 lines
3.3 KiB
Python
91 lines
3.3 KiB
Python
"""
|
|
User-level notification dispatcher.
|
|
|
|
Channels:
|
|
- 'ui' → no-op (dashboard shows the events anyway)
|
|
- 'telegram' → per-user bot token + chat id
|
|
"""
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
import db
|
|
from settings import PUBLIC_URL
|
|
|
|
logger = logging.getLogger("web.notifications")
|
|
|
|
EventType = str # 'match' | 'apply_ok' | 'apply_fail'
|
|
|
|
|
|
def _telegram_send(token: str, chat_id: str, text: str) -> bool:
|
|
try:
|
|
r = requests.post(
|
|
f"https://api.telegram.org/bot{token}/sendMessage",
|
|
json={"chat_id": chat_id, "text": text, "parse_mode": "Markdown",
|
|
"disable_web_page_preview": True},
|
|
timeout=10,
|
|
)
|
|
if not r.ok:
|
|
logger.warning("telegram send failed: %s %s", r.status_code, r.text[:200])
|
|
return False
|
|
return True
|
|
except requests.RequestException as e:
|
|
logger.warning("telegram unreachable: %s", e)
|
|
return False
|
|
|
|
|
|
def _should_notify(notif, event: EventType) -> bool:
|
|
if event == "match":
|
|
return bool(notif["notify_on_match"])
|
|
if event == "apply_ok":
|
|
return bool(notif["notify_on_apply_success"])
|
|
if event == "apply_fail":
|
|
return bool(notif["notify_on_apply_fail"])
|
|
return False
|
|
|
|
|
|
def notify_user(user_id: int, event: EventType, *,
|
|
subject: str, body_plain: str, body_markdown: Optional[str] = None) -> None:
|
|
"""Fire a notification for one user on one event. Best-effort, non-raising."""
|
|
try:
|
|
notif = db.get_notifications(user_id)
|
|
if not notif or not _should_notify(notif, event):
|
|
return
|
|
channel = notif["channel"] or "ui"
|
|
if channel == "telegram":
|
|
token = notif["telegram_bot_token"]
|
|
chat = notif["telegram_chat_id"]
|
|
if token and chat:
|
|
_telegram_send(token, chat, body_markdown or body_plain)
|
|
except Exception:
|
|
logger.exception("notify_user failed for user=%s event=%s", user_id, event)
|
|
|
|
|
|
# -- Convenience builders -----------------------------------------------------
|
|
|
|
def on_match(user_id: int, flat: dict) -> None:
|
|
addr = flat.get("address") or flat.get("link")
|
|
rent = flat.get("total_rent")
|
|
rooms = flat.get("rooms")
|
|
link = flat.get("link", "")
|
|
body = f"Neue passende Wohnung: {addr}\nMiete: {rent} €\nZimmer: {rooms}\n{link}"
|
|
md = (f"*Neue passende Wohnung*\n[{addr}]({link})\n"
|
|
f"Miete: {rent} € · Zimmer: {rooms}\n{PUBLIC_URL}")
|
|
notify_user(user_id, "match", subject="[wohnungsdidi] passende Wohnung", body_plain=body, body_markdown=md)
|
|
|
|
|
|
def on_apply_ok(user_id: int, flat: dict, message: str) -> None:
|
|
addr = flat.get("address") or flat.get("link")
|
|
body = f"Bewerbung erfolgreich: {addr}\n{message}"
|
|
md = f"*Bewerbung erfolgreich*\n{addr}\n{message}"
|
|
notify_user(user_id, "apply_ok", subject="[wohnungsdidi] Bewerbung OK", body_plain=body, body_markdown=md)
|
|
|
|
|
|
def on_apply_fail(user_id: int, flat: dict, message: str) -> None:
|
|
addr = flat.get("address") or flat.get("link")
|
|
body = f"Bewerbung fehlgeschlagen: {addr}\n{message}\n{PUBLIC_URL}/bewerbungen"
|
|
md = (f"*Bewerbung fehlgeschlagen*\n{addr}\n{message}\n"
|
|
f"[Bewerbungen ansehen]({PUBLIC_URL}/bewerbungen)")
|
|
notify_user(user_id, "apply_fail", subject="[wohnungsdidi] Bewerbung fehlgeschlagen",
|
|
body_plain=body, body_markdown=md)
|