- notifications: round sqm_price to whole € in Telegram match messages
(was emitting raw float like "12.345614 €/m²").
- wohnungen: remove the admin-only "Bilder nachladen (N)" button. It
flickered into view whenever a freshly-scraped flat was still in
pending state, which was effectively random from the user's point of
view, and the manual backfill it triggered isn't needed anymore — new
flats are auto-enriched at scrape time. Also drops the dead helpers
it was the sole caller of: enrichment.kick_backfill,
enrichment._backfill_runner, db.flats_needing_enrichment,
db.enrichment_counts.
- lightbox: the modal didn't appear because Tailwind's Play CDN injects
its own .hidden { display: none } rule at runtime, which kept fighting
our class toggle. Switch the show/hide to inline style.display so no
external stylesheet can mask it. Single-class .lightbox now only owns
the layout — the initial-hidden state is on the element via
style="display:none".
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
234 lines
7.4 KiB
Python
234 lines
7.4 KiB
Python
"""Image enrichment pipeline.
|
|
|
|
For each new flat we:
|
|
1. Ask the apply service to fetch the listing via Playwright (bypasses bot guards)
|
|
2. Optionally let Haiku narrow the <img> URL list down to actual flat photos
|
|
3. Download the images into /data/flats/<slug>/NN.<ext>, deduplicating by
|
|
exact bytes (SHA256) and visual similarity (perceptual hash)
|
|
|
|
Structured fields (rooms / size / rent / WBS) come from the inberlinwohnen.de
|
|
scraper and are not re-derived by the LLM. Failures land in
|
|
enrichment_status='failed' with the reason in enrichment_json._error.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
|
|
import db
|
|
import llm
|
|
from settings import DATA_DIR, INTERNAL_API_KEY
|
|
|
|
logger = logging.getLogger("web.enrichment")
|
|
|
|
APPLY_FETCH_URL = os.environ.get("APPLY_URL", "http://apply:8000") + "/internal/fetch-listing"
|
|
IMAGES_DIR = DATA_DIR / "flats"
|
|
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
MAX_IMAGES = 12
|
|
MAX_IMAGE_BYTES = 5_000_000
|
|
MIN_IMAGE_BYTES = 15_000 # Below this, it's an icon / tracking pixel
|
|
MIN_IMAGE_DIMENSION = 400 # Shortest side in pixels — filters thumbs
|
|
IMAGE_TIMEOUT = 15
|
|
PHASH_DUPLICATE_THRESHOLD = 5 # Hamming distance ≤ N → same picture
|
|
|
|
|
|
class EnrichmentError(Exception):
|
|
def __init__(self, step: str, reason: str):
|
|
self.step = step
|
|
self.reason = reason
|
|
super().__init__(f"{step}: {reason}")
|
|
|
|
|
|
def flat_slug(flat_id: str) -> str:
|
|
return hashlib.sha1(flat_id.encode("utf-8")).hexdigest()[:16]
|
|
|
|
|
|
def flat_image_dir(flat_id: str) -> Path:
|
|
d = IMAGES_DIR / flat_slug(flat_id)
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def _fetch_listing(url: str) -> dict:
|
|
if not INTERNAL_API_KEY:
|
|
raise EnrichmentError("fetch", "INTERNAL_API_KEY not configured in web env")
|
|
try:
|
|
r = requests.post(
|
|
APPLY_FETCH_URL,
|
|
headers={"X-Internal-Api-Key": INTERNAL_API_KEY},
|
|
json={"url": url},
|
|
timeout=90,
|
|
)
|
|
except requests.RequestException as e:
|
|
raise EnrichmentError("fetch", f"apply unreachable: {e}")
|
|
if r.status_code >= 400:
|
|
snippet = (r.text or "")[:200].replace("\n", " ")
|
|
raise EnrichmentError("fetch", f"apply returned HTTP {r.status_code}: {snippet}")
|
|
try:
|
|
return r.json()
|
|
except ValueError:
|
|
raise EnrichmentError("fetch", "apply returned non-JSON response")
|
|
|
|
|
|
def _ext_for(content_type: str, url: str) -> str:
|
|
ct = content_type.split(";")[0].strip().lower()
|
|
if ct:
|
|
ext = mimetypes.guess_extension(ct) or ""
|
|
if ext:
|
|
return ext.replace(".jpe", ".jpg")
|
|
_, ext = os.path.splitext(urlparse(url).path)
|
|
return ext.lower() or ".jpg"
|
|
|
|
|
|
def _image_info(data: bytes):
|
|
"""Return (phash, (width, height)) for the bytes, or (None, None) on failure."""
|
|
try:
|
|
from PIL import Image
|
|
import imagehash
|
|
except ImportError:
|
|
return None, None
|
|
try:
|
|
with Image.open(io.BytesIO(data)) as img:
|
|
img.load()
|
|
return imagehash.phash(img), img.size
|
|
except Exception:
|
|
return None, None
|
|
|
|
|
|
def _download_images(flat_id: str, urls: list[str], referer: str) -> int:
|
|
"""Download up to MAX_IMAGES distinct flat photos. Dedup tiers:
|
|
1. Skip URLs that fail / aren't images
|
|
2. Skip exact byte-equal duplicates (different URL pointing to same file)
|
|
3. Skip visually-equivalent images (same picture re-encoded at a
|
|
different size — common with srcset/CDN variants)
|
|
"""
|
|
d = flat_image_dir(flat_id)
|
|
for old in d.iterdir():
|
|
try: old.unlink()
|
|
except OSError: pass
|
|
|
|
seen_sha: set[str] = set()
|
|
seen_phash: list = [] # list of imagehash objects
|
|
|
|
saved = 0
|
|
for raw_url in urls:
|
|
if saved >= MAX_IMAGES:
|
|
break
|
|
try:
|
|
r = requests.get(
|
|
raw_url,
|
|
headers={"Referer": referer,
|
|
"User-Agent": "Mozilla/5.0 (lazyflat enricher)"},
|
|
timeout=IMAGE_TIMEOUT,
|
|
stream=True,
|
|
)
|
|
if r.status_code >= 400:
|
|
continue
|
|
ct = r.headers.get("content-type", "").split(";")[0].strip().lower()
|
|
if not ct.startswith("image/"):
|
|
continue
|
|
data = bytearray()
|
|
for chunk in r.iter_content(chunk_size=65_536):
|
|
if not chunk:
|
|
continue
|
|
data.extend(chunk)
|
|
if len(data) > MAX_IMAGE_BYTES:
|
|
break
|
|
if not data:
|
|
continue
|
|
except requests.RequestException as e:
|
|
logger.info("image download failed %s: %s", raw_url, e)
|
|
continue
|
|
|
|
data_bytes = bytes(data)
|
|
|
|
# Size filter: files under ~15 KB are icons/tracking pixels.
|
|
if len(data_bytes) < MIN_IMAGE_BYTES:
|
|
continue
|
|
|
|
sha = hashlib.sha256(data_bytes).hexdigest()
|
|
if sha in seen_sha:
|
|
continue
|
|
seen_sha.add(sha)
|
|
|
|
ph, dims = _image_info(data_bytes)
|
|
# Pixel filter: anything smaller than 400 px on the short side is a
|
|
# thumbnail/avatar, not a real flat photo.
|
|
if dims is not None and min(dims) < MIN_IMAGE_DIMENSION:
|
|
continue
|
|
if ph is not None:
|
|
if any((ph - prev) <= PHASH_DUPLICATE_THRESHOLD for prev in seen_phash):
|
|
continue
|
|
seen_phash.append(ph)
|
|
|
|
ext = _ext_for(ct, raw_url)
|
|
path = d / f"{saved + 1:02d}{ext}"
|
|
path.write_bytes(data_bytes)
|
|
saved += 1
|
|
|
|
return saved
|
|
|
|
|
|
def enrich_flat_sync(flat_id: str) -> None:
|
|
flat = db.get_flat(flat_id)
|
|
if not flat:
|
|
return
|
|
url = flat["link"]
|
|
logger.info("enrich start flat=%s", flat_id)
|
|
|
|
try:
|
|
listing = _fetch_listing(url)
|
|
candidates = listing.get("image_urls") or []
|
|
if candidates:
|
|
candidates = llm.select_flat_image_urls(candidates, listing.get("final_url") or url)
|
|
image_count = _download_images(flat_id, candidates, referer=url)
|
|
except EnrichmentError as e:
|
|
_record_failure(flat_id, e.step, e.reason)
|
|
return
|
|
except Exception as e:
|
|
logger.exception("enrich crashed flat=%s", flat_id)
|
|
_record_failure(flat_id, "crash", f"{type(e).__name__}: {e}")
|
|
return
|
|
|
|
db.set_flat_enrichment(flat_id, "ok", enrichment=None, image_count=image_count)
|
|
logger.info("enrich done flat=%s images=%d", flat_id, image_count)
|
|
|
|
|
|
def _record_failure(flat_id: str, step: str, reason: str) -> None:
|
|
logger.warning("enrich failed flat=%s step=%s: %s", flat_id, step, reason)
|
|
db.set_flat_enrichment(
|
|
flat_id, "failed",
|
|
enrichment={"_error": reason, "_step": step},
|
|
)
|
|
try:
|
|
db.log_error(
|
|
source="enrichment", kind=f"enrich_{step}",
|
|
summary=f"flat={flat_id}: {reason}",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# Holding strong references to every spawned task so asyncio doesn't GC them
|
|
# mid-flight. create_task only weakly references tasks from the event loop.
|
|
_bg_tasks: set[asyncio.Task] = set()
|
|
|
|
|
|
def _spawn(coro) -> asyncio.Task:
|
|
t = asyncio.create_task(coro)
|
|
_bg_tasks.add(t)
|
|
t.add_done_callback(_bg_tasks.discard)
|
|
return t
|
|
|
|
|
|
def kick(flat_id: str) -> None:
|
|
_spawn(asyncio.to_thread(enrich_flat_sync, flat_id))
|