"""Image enrichment pipeline.
For each new flat we:
1. Ask the apply service to fetch the listing via Playwright (bypasses bot guards)
2. Optionally let Haiku narrow the
URL list down to actual flat photos
3. Download the images into /data/flats//NN., deduplicating by
exact bytes (SHA256) and visual similarity (perceptual hash)
Structured fields (rooms / size / rent / WBS) come from the inberlinwohnen.de
scraper and are not re-derived by the LLM. Failures land in
enrichment_status='failed' with the reason in enrichment_json._error.
"""
from __future__ import annotations
import asyncio
import hashlib
import io
import logging
import mimetypes
import os
from pathlib import Path
from urllib.parse import urlparse
import requests
import db
import llm
from settings import DATA_DIR, INTERNAL_API_KEY
logger = logging.getLogger("web.enrichment")
APPLY_FETCH_URL = os.environ.get("APPLY_URL", "http://apply:8000") + "/internal/fetch-listing"
IMAGES_DIR = DATA_DIR / "flats"
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
MAX_IMAGES = 12
MAX_IMAGE_BYTES = 5_000_000
MIN_IMAGE_BYTES = 15_000 # Below this, it's an icon / tracking pixel
MIN_IMAGE_DIMENSION = 400 # Shortest side in pixels — filters thumbs
IMAGE_TIMEOUT = 15
PHASH_DUPLICATE_THRESHOLD = 5 # Hamming distance ≤ N → same picture
class EnrichmentError(Exception):
def __init__(self, step: str, reason: str):
self.step = step
self.reason = reason
super().__init__(f"{step}: {reason}")
def flat_slug(flat_id: str) -> str:
return hashlib.sha1(flat_id.encode("utf-8")).hexdigest()[:16]
def flat_image_dir(flat_id: str) -> Path:
d = IMAGES_DIR / flat_slug(flat_id)
d.mkdir(parents=True, exist_ok=True)
return d
def _fetch_listing(url: str) -> dict:
if not INTERNAL_API_KEY:
raise EnrichmentError("fetch", "INTERNAL_API_KEY not configured in web env")
try:
r = requests.post(
APPLY_FETCH_URL,
headers={"X-Internal-Api-Key": INTERNAL_API_KEY},
json={"url": url},
timeout=90,
)
except requests.RequestException as e:
raise EnrichmentError("fetch", f"apply unreachable: {e}")
if r.status_code >= 400:
snippet = (r.text or "")[:200].replace("\n", " ")
raise EnrichmentError("fetch", f"apply returned HTTP {r.status_code}: {snippet}")
try:
return r.json()
except ValueError:
raise EnrichmentError("fetch", "apply returned non-JSON response")
def _ext_for(content_type: str, url: str) -> str:
ct = content_type.split(";")[0].strip().lower()
if ct:
ext = mimetypes.guess_extension(ct) or ""
if ext:
return ext.replace(".jpe", ".jpg")
_, ext = os.path.splitext(urlparse(url).path)
return ext.lower() or ".jpg"
def _image_info(data: bytes):
"""Return (phash, (width, height)) for the bytes, or (None, None) on failure."""
try:
from PIL import Image
import imagehash
except ImportError:
return None, None
try:
with Image.open(io.BytesIO(data)) as img:
img.load()
return imagehash.phash(img), img.size
except Exception:
return None, None
def _download_images(flat_id: str, urls: list[str], referer: str) -> int:
"""Download up to MAX_IMAGES distinct flat photos. Dedup tiers:
1. Skip URLs that fail / aren't images
2. Skip exact byte-equal duplicates (different URL pointing to same file)
3. Skip visually-equivalent images (same picture re-encoded at a
different size — common with srcset/CDN variants)
"""
d = flat_image_dir(flat_id)
for old in d.iterdir():
try: old.unlink()
except OSError: pass
seen_sha: set[str] = set()
seen_phash: list = [] # list of imagehash objects
saved = 0
for raw_url in urls:
if saved >= MAX_IMAGES:
break
try:
r = requests.get(
raw_url,
headers={"Referer": referer,
"User-Agent": "Mozilla/5.0 (lazyflat enricher)"},
timeout=IMAGE_TIMEOUT,
stream=True,
)
if r.status_code >= 400:
continue
ct = r.headers.get("content-type", "").split(";")[0].strip().lower()
if not ct.startswith("image/"):
continue
data = bytearray()
for chunk in r.iter_content(chunk_size=65_536):
if not chunk:
continue
data.extend(chunk)
if len(data) > MAX_IMAGE_BYTES:
break
if not data:
continue
except requests.RequestException as e:
logger.info("image download failed %s: %s", raw_url, e)
continue
data_bytes = bytes(data)
# Size filter: files under ~15 KB are icons/tracking pixels.
if len(data_bytes) < MIN_IMAGE_BYTES:
continue
sha = hashlib.sha256(data_bytes).hexdigest()
if sha in seen_sha:
continue
seen_sha.add(sha)
ph, dims = _image_info(data_bytes)
# Pixel filter: anything smaller than 400 px on the short side is a
# thumbnail/avatar, not a real flat photo.
if dims is not None and min(dims) < MIN_IMAGE_DIMENSION:
continue
if ph is not None:
if any((ph - prev) <= PHASH_DUPLICATE_THRESHOLD for prev in seen_phash):
continue
seen_phash.append(ph)
ext = _ext_for(ct, raw_url)
path = d / f"{saved + 1:02d}{ext}"
path.write_bytes(data_bytes)
saved += 1
return saved
def enrich_flat_sync(flat_id: str) -> None:
flat = db.get_flat(flat_id)
if not flat:
return
url = flat["link"]
logger.info("enrich start flat=%s", flat_id)
try:
listing = _fetch_listing(url)
candidates = listing.get("image_urls") or []
if candidates:
candidates = llm.select_flat_image_urls(candidates, listing.get("final_url") or url)
image_count = _download_images(flat_id, candidates, referer=url)
except EnrichmentError as e:
_record_failure(flat_id, e.step, e.reason)
return
except Exception as e:
logger.exception("enrich crashed flat=%s", flat_id)
_record_failure(flat_id, "crash", f"{type(e).__name__}: {e}")
return
db.set_flat_enrichment(flat_id, "ok", enrichment=None, image_count=image_count)
logger.info("enrich done flat=%s images=%d", flat_id, image_count)
def _record_failure(flat_id: str, step: str, reason: str) -> None:
logger.warning("enrich failed flat=%s step=%s: %s", flat_id, step, reason)
db.set_flat_enrichment(
flat_id, "failed",
enrichment={"_error": reason, "_step": step},
)
try:
db.log_error(
source="enrichment", kind=f"enrich_{step}",
summary=f"flat={flat_id}: {reason}",
)
except Exception:
pass
# Holding strong references to every spawned task so asyncio doesn't GC them
# mid-flight. create_task only weakly references tasks from the event loop.
_bg_tasks: set[asyncio.Task] = set()
def _spawn(coro) -> asyncio.Task:
t = asyncio.create_task(coro)
_bg_tasks.add(t)
t.add_done_callback(_bg_tasks.discard)
return t
def kick(flat_id: str) -> None:
_spawn(asyncio.to_thread(enrich_flat_sync, flat_id))