Two issues surfaced on HOWOGE and similar sites: 1. Tiny icons/1x1 tracking pixels leaked through (e.g. image #5, 1.8 KB). Added MIN_IMAGE_BYTES = 15_000 and MIN_IMAGE_DIMENSION = 400 px on the short side; files below either threshold are dropped before saving. Pillow already gives us the dims as part of the phash pass, so the check is free. 2. Listings whose image URLs are opaque CDN hashes (.../fileadmin/_processed_/2/3/xcsm_<hash>.webp.pagespeed.ic.<hash>.webp) caused the LLM URL picker to reject every candidate, yielding 0 images for legit flats. Fixes: (a) prompt now explicitly instructs Haiku to keep same-host /fileadmin/_processed_/ style URLs even when the filename is illegible, (b) if the model still returns an empty set we fall back to the unfiltered Playwright candidates, trusting the pre-filter instead of erasing the gallery. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
238 lines
7.5 KiB
Python
238 lines
7.5 KiB
Python
"""Image enrichment pipeline.
|
|
|
|
For each new flat we:
|
|
1. Ask the apply service to fetch the listing via Playwright (bypasses bot guards)
|
|
2. Optionally let Haiku narrow the <img> URL list down to actual flat photos
|
|
3. Download the images into /data/flats/<slug>/NN.<ext>, deduplicating by
|
|
exact bytes (SHA256) and visual similarity (perceptual hash)
|
|
|
|
Structured fields (rooms / size / rent / WBS) come from the inberlinwohnen.de
|
|
scraper and are not re-derived by the LLM. Failures land in
|
|
enrichment_status='failed' with the reason in enrichment_json._error.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
|
|
import db
|
|
import llm
|
|
from settings import DATA_DIR, INTERNAL_API_KEY
|
|
|
|
logger = logging.getLogger("web.enrichment")
|
|
|
|
APPLY_FETCH_URL = os.environ.get("APPLY_URL", "http://apply:8000") + "/internal/fetch-listing"
|
|
IMAGES_DIR = DATA_DIR / "flats"
|
|
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
MAX_IMAGES = 12
|
|
MAX_IMAGE_BYTES = 5_000_000
|
|
MIN_IMAGE_BYTES = 15_000 # Below this, it's an icon / tracking pixel
|
|
MIN_IMAGE_DIMENSION = 400 # Shortest side in pixels — filters thumbs
|
|
IMAGE_TIMEOUT = 15
|
|
PHASH_DUPLICATE_THRESHOLD = 5 # Hamming distance ≤ N → same picture
|
|
|
|
|
|
class EnrichmentError(Exception):
|
|
def __init__(self, step: str, reason: str):
|
|
self.step = step
|
|
self.reason = reason
|
|
super().__init__(f"{step}: {reason}")
|
|
|
|
|
|
def flat_slug(flat_id: str) -> str:
|
|
return hashlib.sha1(flat_id.encode("utf-8")).hexdigest()[:16]
|
|
|
|
|
|
def flat_image_dir(flat_id: str) -> Path:
|
|
d = IMAGES_DIR / flat_slug(flat_id)
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def _fetch_listing(url: str) -> dict:
|
|
if not INTERNAL_API_KEY:
|
|
raise EnrichmentError("fetch", "INTERNAL_API_KEY not configured in web env")
|
|
try:
|
|
r = requests.post(
|
|
APPLY_FETCH_URL,
|
|
headers={"X-Internal-Api-Key": INTERNAL_API_KEY},
|
|
json={"url": url},
|
|
timeout=90,
|
|
)
|
|
except requests.RequestException as e:
|
|
raise EnrichmentError("fetch", f"apply unreachable: {e}")
|
|
if r.status_code >= 400:
|
|
snippet = (r.text or "")[:200].replace("\n", " ")
|
|
raise EnrichmentError("fetch", f"apply returned HTTP {r.status_code}: {snippet}")
|
|
try:
|
|
return r.json()
|
|
except ValueError:
|
|
raise EnrichmentError("fetch", "apply returned non-JSON response")
|
|
|
|
|
|
def _ext_for(content_type: str, url: str) -> str:
|
|
ct = content_type.split(";")[0].strip().lower()
|
|
if ct:
|
|
ext = mimetypes.guess_extension(ct) or ""
|
|
if ext:
|
|
return ext.replace(".jpe", ".jpg")
|
|
_, ext = os.path.splitext(urlparse(url).path)
|
|
return ext.lower() or ".jpg"
|
|
|
|
|
|
def _image_info(data: bytes):
|
|
"""Return (phash, (width, height)) for the bytes, or (None, None) on failure."""
|
|
try:
|
|
from PIL import Image
|
|
import imagehash
|
|
except ImportError:
|
|
return None, None
|
|
try:
|
|
with Image.open(io.BytesIO(data)) as img:
|
|
img.load()
|
|
return imagehash.phash(img), img.size
|
|
except Exception:
|
|
return None, None
|
|
|
|
|
|
def _download_images(flat_id: str, urls: list[str], referer: str) -> int:
|
|
"""Download up to MAX_IMAGES distinct flat photos. Dedup tiers:
|
|
1. Skip URLs that fail / aren't images
|
|
2. Skip exact byte-equal duplicates (different URL pointing to same file)
|
|
3. Skip visually-equivalent images (same picture re-encoded at a
|
|
different size — common with srcset/CDN variants)
|
|
"""
|
|
d = flat_image_dir(flat_id)
|
|
for old in d.iterdir():
|
|
try: old.unlink()
|
|
except OSError: pass
|
|
|
|
seen_sha: set[str] = set()
|
|
seen_phash: list = [] # list of imagehash objects
|
|
|
|
saved = 0
|
|
for raw_url in urls:
|
|
if saved >= MAX_IMAGES:
|
|
break
|
|
try:
|
|
r = requests.get(
|
|
raw_url,
|
|
headers={"Referer": referer,
|
|
"User-Agent": "Mozilla/5.0 (lazyflat enricher)"},
|
|
timeout=IMAGE_TIMEOUT,
|
|
stream=True,
|
|
)
|
|
if r.status_code >= 400:
|
|
continue
|
|
ct = r.headers.get("content-type", "").split(";")[0].strip().lower()
|
|
if not ct.startswith("image/"):
|
|
continue
|
|
data = bytearray()
|
|
for chunk in r.iter_content(chunk_size=65_536):
|
|
if not chunk:
|
|
continue
|
|
data.extend(chunk)
|
|
if len(data) > MAX_IMAGE_BYTES:
|
|
break
|
|
if not data:
|
|
continue
|
|
except requests.RequestException as e:
|
|
logger.info("image download failed %s: %s", raw_url, e)
|
|
continue
|
|
|
|
data_bytes = bytes(data)
|
|
|
|
# Size filter: files under ~15 KB are icons/tracking pixels.
|
|
if len(data_bytes) < MIN_IMAGE_BYTES:
|
|
continue
|
|
|
|
sha = hashlib.sha256(data_bytes).hexdigest()
|
|
if sha in seen_sha:
|
|
continue
|
|
seen_sha.add(sha)
|
|
|
|
ph, dims = _image_info(data_bytes)
|
|
# Pixel filter: anything smaller than 400 px on the short side is a
|
|
# thumbnail/avatar, not a real flat photo.
|
|
if dims is not None and min(dims) < MIN_IMAGE_DIMENSION:
|
|
continue
|
|
if ph is not None:
|
|
if any((ph - prev) <= PHASH_DUPLICATE_THRESHOLD for prev in seen_phash):
|
|
continue
|
|
seen_phash.append(ph)
|
|
|
|
ext = _ext_for(ct, raw_url)
|
|
path = d / f"{saved + 1:02d}{ext}"
|
|
path.write_bytes(data_bytes)
|
|
saved += 1
|
|
|
|
return saved
|
|
|
|
|
|
def enrich_flat_sync(flat_id: str) -> None:
|
|
flat = db.get_flat(flat_id)
|
|
if not flat:
|
|
return
|
|
url = flat["link"]
|
|
logger.info("enrich start flat=%s", flat_id)
|
|
|
|
try:
|
|
listing = _fetch_listing(url)
|
|
candidates = listing.get("image_urls") or []
|
|
if candidates:
|
|
candidates = llm.select_flat_image_urls(candidates, listing.get("final_url") or url)
|
|
image_count = _download_images(flat_id, candidates, referer=url)
|
|
except EnrichmentError as e:
|
|
_record_failure(flat_id, e.step, e.reason)
|
|
return
|
|
except Exception as e:
|
|
logger.exception("enrich crashed flat=%s", flat_id)
|
|
_record_failure(flat_id, "crash", f"{type(e).__name__}: {e}")
|
|
return
|
|
|
|
db.set_flat_enrichment(flat_id, "ok", enrichment=None, image_count=image_count)
|
|
logger.info("enrich done flat=%s images=%d", flat_id, image_count)
|
|
|
|
|
|
def _record_failure(flat_id: str, step: str, reason: str) -> None:
|
|
logger.warning("enrich failed flat=%s step=%s: %s", flat_id, step, reason)
|
|
db.set_flat_enrichment(
|
|
flat_id, "failed",
|
|
enrichment={"_error": reason, "_step": step},
|
|
)
|
|
try:
|
|
db.log_error(
|
|
source="enrichment", kind=f"enrich_{step}",
|
|
summary=f"flat={flat_id}: {reason}",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def kick(flat_id: str) -> None:
|
|
asyncio.create_task(asyncio.to_thread(enrich_flat_sync, flat_id))
|
|
|
|
|
|
async def _backfill_runner() -> None:
|
|
rows = db.flats_needing_enrichment(limit=200)
|
|
logger.info("enrich backfill: %d flats queued", len(rows))
|
|
for row in rows:
|
|
try:
|
|
await asyncio.to_thread(enrich_flat_sync, row["id"])
|
|
except Exception:
|
|
logger.exception("backfill step failed flat=%s", row["id"])
|
|
|
|
|
|
def kick_backfill() -> int:
|
|
pending = db.flats_needing_enrichment(limit=200)
|
|
asyncio.create_task(_backfill_runner())
|
|
return len(pending)
|