"""Flat-enrichment pipeline. For each new flat we: 1. Ask the apply service to fetch the listing via Playwright (bypasses bot guards) 2. Feed the HTML to Haiku via `llm.extract_flat_details` → structured dict 3. Download each image URL directly into /data/flats//NN. 4. Persist result on the flat row (enrichment_json + image_count + status) Kicked as a detached asyncio task from /internal/flats so scraping stays fast. A small queue cap + per-call lock would be next steps if we ever need them. """ from __future__ import annotations import asyncio import hashlib import logging import mimetypes import os from pathlib import Path from typing import Optional from urllib.parse import urlparse import requests import db import llm from settings import DATA_DIR, INTERNAL_API_KEY logger = logging.getLogger("web.enrichment") APPLY_FETCH_URL = os.environ.get("APPLY_URL", "http://apply:8000") + "/internal/fetch-listing" IMAGES_DIR = DATA_DIR / "flats" IMAGES_DIR.mkdir(parents=True, exist_ok=True) MAX_IMAGES = 12 MAX_IMAGE_BYTES = 3_000_000 # 3 MB per image IMAGE_TIMEOUT = 15 def flat_slug(flat_id: str) -> str: """Filesystem-safe short identifier for a flat (IDs are URLs).""" return hashlib.sha1(flat_id.encode("utf-8")).hexdigest()[:16] def flat_image_dir(flat_id: str) -> Path: d = IMAGES_DIR / flat_slug(flat_id) d.mkdir(parents=True, exist_ok=True) return d def _fetch_listing(url: str) -> Optional[dict]: try: r = requests.post( APPLY_FETCH_URL, headers={"X-Internal-Api-Key": INTERNAL_API_KEY}, json={"url": url}, timeout=90, ) except requests.RequestException as e: logger.warning("fetch-listing request failed for %s: %s", url, e) return None if r.status_code >= 400: logger.warning("fetch-listing %s: %s", r.status_code, r.text[:300]) return None return r.json() def _ext_from_response(resp: requests.Response, url: str) -> str: ct = resp.headers.get("content-type", "").split(";")[0].strip().lower() if ct: ext = mimetypes.guess_extension(ct) or "" if ext: return ext.replace(".jpe", ".jpg") path = urlparse(url).path _, ext = os.path.splitext(path) return ext.lower() or ".jpg" def _download_images(flat_id: str, urls: list[str], referer: str) -> int: d = flat_image_dir(flat_id) # Clear any previous attempts so re-enrichment doesn't pile up dupes. for old in d.iterdir(): try: old.unlink() except OSError: pass saved = 0 for raw_url in urls[:MAX_IMAGES]: try: r = requests.get( raw_url, headers={"Referer": referer, "User-Agent": "Mozilla/5.0 (lazyflat enricher)"}, timeout=IMAGE_TIMEOUT, stream=True, ) if r.status_code >= 400: continue ct = r.headers.get("content-type", "").split(";")[0].strip().lower() if not ct.startswith("image/"): continue ext = _ext_from_response(r, raw_url) path = d / f"{saved + 1:02d}{ext}" total = 0 with open(path, "wb") as f: for chunk in r.iter_content(chunk_size=65_536): if not chunk: continue total += len(chunk) if total > MAX_IMAGE_BYTES: break f.write(chunk) if total == 0: path.unlink(missing_ok=True) continue saved += 1 except requests.RequestException as e: logger.info("image download failed %s: %s", raw_url, e) continue return saved def enrich_flat_sync(flat_id: str) -> None: """Run the full enrichment pipeline for one flat. Blocking.""" flat = db.get_flat(flat_id) if not flat: return url = flat["link"] logger.info("enrich start flat=%s url=%s", flat_id, url) listing = _fetch_listing(url) if not listing: db.set_flat_enrichment(flat_id, "failed") return details = llm.extract_flat_details(listing.get("html") or "", listing.get("final_url") or url) if details is None: db.set_flat_enrichment(flat_id, "failed") return image_urls = listing.get("image_urls") or [] image_count = _download_images(flat_id, image_urls, referer=url) db.set_flat_enrichment(flat_id, "ok", enrichment=details, image_count=image_count) logger.info("enrich done flat=%s images=%d", flat_id, image_count) def kick(flat_id: str) -> None: """Fire-and-forget enrichment in a background thread.""" asyncio.create_task(asyncio.to_thread(enrich_flat_sync, flat_id)) async def _backfill_runner() -> None: rows = db.flats_needing_enrichment(limit=200) logger.info("enrich backfill: %d flats queued", len(rows)) for row in rows: try: await asyncio.to_thread(enrich_flat_sync, row["id"]) except Exception: logger.exception("backfill step failed flat=%s", row["id"]) def kick_backfill() -> int: """Queue enrichment for every flat still pending/failed. Returns how many flats are queued; the actual work happens in a detached task so the admin UI doesn't block for minutes.""" pending = db.flats_needing_enrichment(limit=200) asyncio.create_task(_backfill_runner()) return len(pending)