"""Flat-enrichment pipeline. For each new flat we: 1. Ask the apply service to fetch the listing via Playwright (bypasses bot guards) 2. Feed the HTML to Haiku via `llm.extract_flat_details` → structured dict 3. Download each image URL directly into /data/flats//NN. 4. Persist result on the flat row (enrichment_json + image_count + status) Kicked as a detached asyncio task from /internal/flats so scraping stays fast. Every failure is caught, stashed in enrichment_json as {"_error": "...", ...} and mirrored into the errors log so /logs/protokoll explains what went wrong. """ from __future__ import annotations import asyncio import hashlib import logging import mimetypes import os from pathlib import Path from urllib.parse import urlparse import requests import db import llm from settings import DATA_DIR, INTERNAL_API_KEY logger = logging.getLogger("web.enrichment") APPLY_FETCH_URL = os.environ.get("APPLY_URL", "http://apply:8000") + "/internal/fetch-listing" IMAGES_DIR = DATA_DIR / "flats" IMAGES_DIR.mkdir(parents=True, exist_ok=True) MAX_IMAGES = 12 MAX_IMAGE_BYTES = 3_000_000 # 3 MB per image IMAGE_TIMEOUT = 15 class EnrichmentError(Exception): """Raised by each pipeline step with a human-readable reason.""" def __init__(self, step: str, reason: str): self.step = step self.reason = reason super().__init__(f"{step}: {reason}") def flat_slug(flat_id: str) -> str: """Filesystem-safe short identifier for a flat (IDs are URLs).""" return hashlib.sha1(flat_id.encode("utf-8")).hexdigest()[:16] def flat_image_dir(flat_id: str) -> Path: d = IMAGES_DIR / flat_slug(flat_id) d.mkdir(parents=True, exist_ok=True) return d def _fetch_listing(url: str) -> dict: if not INTERNAL_API_KEY: raise EnrichmentError("fetch", "INTERNAL_API_KEY not configured in web env") try: r = requests.post( APPLY_FETCH_URL, headers={"X-Internal-Api-Key": INTERNAL_API_KEY}, json={"url": url}, timeout=90, ) except requests.RequestException as e: raise EnrichmentError("fetch", f"apply unreachable: {e}") if r.status_code >= 400: snippet = (r.text or "")[:200].replace("\n", " ") raise EnrichmentError("fetch", f"apply returned HTTP {r.status_code}: {snippet}") try: return r.json() except ValueError: raise EnrichmentError("fetch", "apply returned non-JSON response") def _ext_from_response(resp: requests.Response, url: str) -> str: ct = resp.headers.get("content-type", "").split(";")[0].strip().lower() if ct: ext = mimetypes.guess_extension(ct) or "" if ext: return ext.replace(".jpe", ".jpg") path = urlparse(url).path _, ext = os.path.splitext(path) return ext.lower() or ".jpg" def _download_images(flat_id: str, urls: list[str], referer: str) -> int: d = flat_image_dir(flat_id) for old in d.iterdir(): try: old.unlink() except OSError: pass saved = 0 for raw_url in urls[:MAX_IMAGES]: try: r = requests.get( raw_url, headers={"Referer": referer, "User-Agent": "Mozilla/5.0 (lazyflat enricher)"}, timeout=IMAGE_TIMEOUT, stream=True, ) if r.status_code >= 400: continue ct = r.headers.get("content-type", "").split(";")[0].strip().lower() if not ct.startswith("image/"): continue ext = _ext_from_response(r, raw_url) path = d / f"{saved + 1:02d}{ext}" total = 0 with open(path, "wb") as f: for chunk in r.iter_content(chunk_size=65_536): if not chunk: continue total += len(chunk) if total > MAX_IMAGE_BYTES: break f.write(chunk) if total == 0: path.unlink(missing_ok=True) continue saved += 1 except requests.RequestException as e: logger.info("image download failed %s: %s", raw_url, e) continue return saved def enrich_flat_sync(flat_id: str) -> None: """Run the full enrichment pipeline for one flat. Blocking.""" flat = db.get_flat(flat_id) if not flat: return url = flat["link"] logger.info("enrich start flat=%s url=%s", flat_id, url) try: listing = _fetch_listing(url) html = listing.get("html") or "" final_url = listing.get("final_url") or url if not html.strip(): raise EnrichmentError("fetch", "apply returned empty HTML") details = llm.extract_flat_details(html, final_url) if details is None: raise EnrichmentError("llm", "model returned no tool_use or call failed (see web logs)") image_urls = listing.get("image_urls") or [] image_count = _download_images(flat_id, image_urls, referer=url) except EnrichmentError as e: _record_failure(flat_id, e.step, e.reason) return except Exception as e: logger.exception("enrich crashed flat=%s", flat_id) _record_failure(flat_id, "crash", f"{type(e).__name__}: {e}") return db.set_flat_enrichment(flat_id, "ok", enrichment=details, image_count=image_count) logger.info("enrich done flat=%s images=%d", flat_id, image_count) def _record_failure(flat_id: str, step: str, reason: str) -> None: logger.warning("enrich failed flat=%s step=%s: %s", flat_id, step, reason) db.set_flat_enrichment( flat_id, "failed", enrichment={"_error": reason, "_step": step}, ) try: db.log_error( source="enrichment", kind=f"enrich_{step}", summary=f"flat={flat_id}: {reason}", ) except Exception: pass def kick(flat_id: str) -> None: """Fire-and-forget enrichment in a background thread.""" asyncio.create_task(asyncio.to_thread(enrich_flat_sync, flat_id)) async def _backfill_runner() -> None: rows = db.flats_needing_enrichment(limit=200) logger.info("enrich backfill: %d flats queued", len(rows)) for row in rows: try: await asyncio.to_thread(enrich_flat_sync, row["id"]) except Exception: logger.exception("backfill step failed flat=%s", row["id"]) def kick_backfill() -> int: pending = db.flats_needing_enrichment(limit=200) asyncio.create_task(_backfill_runner()) return len(pending)