461 lines
15 KiB
Python
461 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""laendleimmo.at scraper — finds listings and auto-submits the contact form."""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from playwright.sync_api import (
|
|
Browser,
|
|
Page,
|
|
sync_playwright,
|
|
TimeoutError as PWTimeout,
|
|
)
|
|
|
|
BASE_URL = "https://www.laendleimmo.at"
|
|
CONFIG_FILE = Path(__file__).parent / "contact_config.json"
|
|
CONTACTED_FILE = Path(__file__).parent / "contacted.json"
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Config / state helpers
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def load_config() -> dict:
|
|
if not CONFIG_FILE.exists():
|
|
sys.exit(
|
|
f"[!] Config not found at {CONFIG_FILE}\n"
|
|
" Run: python configure.py"
|
|
)
|
|
with open(CONFIG_FILE, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def load_contacted() -> set:
|
|
if not CONTACTED_FILE.exists():
|
|
return set()
|
|
with open(CONTACTED_FILE, encoding="utf-8") as f:
|
|
return set(json.load(f))
|
|
|
|
|
|
def save_contacted(contacted: set) -> None:
|
|
with open(CONTACTED_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(sorted(contacted), f, indent=2)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# URL construction
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def build_search_urls(args) -> list[str]:
|
|
prop_types: list[str] = []
|
|
if args.flat:
|
|
prop_types.append("wohnung")
|
|
if args.house:
|
|
prop_types.append("haus")
|
|
if not prop_types:
|
|
prop_types = ["wohnung", "haus"]
|
|
|
|
marketing_types: list[Optional[str]] = []
|
|
if args.rent:
|
|
marketing_types.append("Mietobjekt")
|
|
if args.buy:
|
|
marketing_types.append("Kaufobjekt")
|
|
if not marketing_types:
|
|
marketing_types = [None] # no filter → all transaction types
|
|
|
|
urls: list[str] = []
|
|
for prop_type in prop_types:
|
|
for marketing_type in marketing_types:
|
|
params: list[tuple[str, str]] = []
|
|
if args.minrooms is not None:
|
|
params.append(("f[noOfRooms@f]", str(float(args.minrooms))))
|
|
if args.maxprice is not None:
|
|
params.append(("f[price@t]", str(args.maxprice)))
|
|
if marketing_type:
|
|
params.append(("f[marketingType]", marketing_type))
|
|
|
|
base = f"{BASE_URL}/{prop_type}/vorarlberg"
|
|
if params:
|
|
qs = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
|
|
urls.append(f"{base}?{qs}")
|
|
else:
|
|
urls.append(base)
|
|
return urls
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Scraping
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def scrape_listings(page: Page, url: str) -> tuple[list[dict], Optional[str]]:
|
|
"""Return (listings_on_page, next_page_url_or_None)."""
|
|
try:
|
|
page.goto(url, wait_until="networkidle", timeout=30_000)
|
|
except PWTimeout:
|
|
page.goto(url, wait_until="domcontentloaded", timeout=30_000)
|
|
|
|
listings: list[dict] = []
|
|
seen: set[str] = set()
|
|
|
|
for a in page.query_selector_all("a[href*='/immobilien/']"):
|
|
href: Optional[str] = a.get_attribute("href")
|
|
if not href or href in seen:
|
|
continue
|
|
seen.add(href)
|
|
full_url = BASE_URL + href if href.startswith("/") else href
|
|
snippet = (a.inner_text() or "").strip().replace("\n", " ")[:120]
|
|
listings.append({"url": full_url, "snippet": snippet})
|
|
|
|
# Next-page link — try common patterns
|
|
next_url: Optional[str] = None
|
|
for sel in [
|
|
"a[aria-label='Nächste Seite']",
|
|
"a[aria-label='Next']",
|
|
"a[rel='next']",
|
|
".pagination a:last-child",
|
|
"a.next",
|
|
]:
|
|
el = page.query_selector(sel)
|
|
if el:
|
|
href = el.get_attribute("href")
|
|
if href and href != "#":
|
|
next_url = BASE_URL + href if href.startswith("/") else href
|
|
break
|
|
|
|
return listings, next_url
|
|
|
|
|
|
def collect_all_listings(
|
|
page: Page,
|
|
search_urls: list[str],
|
|
max_listings: int,
|
|
already_contacted: set,
|
|
) -> list[dict]:
|
|
all_new: list[dict] = []
|
|
for search_url in search_urls:
|
|
url: Optional[str] = search_url
|
|
page_num = 1
|
|
while url and len(all_new) < max_listings:
|
|
print(f" [scrape] page {page_num}: {url}")
|
|
listings, next_url = scrape_listings(page, url)
|
|
new = [l for l in listings if l["url"] not in already_contacted]
|
|
all_new.extend(new)
|
|
print(f" {len(listings)} found, {len(new)} new (total new: {len(all_new)})")
|
|
if len(all_new) >= max_listings:
|
|
break
|
|
url = next_url
|
|
page_num += 1
|
|
time.sleep(1.5)
|
|
return all_new[:max_listings]
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Contact form submission
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def _try_fill(page: Page, selectors: list[str], value: str) -> bool:
|
|
for sel in selectors:
|
|
try:
|
|
el = page.query_selector(sel)
|
|
if el and el.is_visible():
|
|
el.fill(value)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _try_check(page: Page, selectors: list[str]) -> None:
|
|
for sel in selectors:
|
|
try:
|
|
for cb in page.query_selector_all(sel):
|
|
if cb.is_visible() and not cb.is_checked():
|
|
cb.check()
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def login(page: Page, config: dict) -> bool:
|
|
"""Attempt site login if credentials are configured. Returns True on success."""
|
|
email = config.get("login_email", "")
|
|
password = config.get("login_password", "")
|
|
if not email or not password:
|
|
return False
|
|
|
|
page.goto(f"{BASE_URL}/user/login", wait_until="networkidle", timeout=30_000)
|
|
for sel in ["input[type='email']", "input[name='email']", "input[name='username']"]:
|
|
if _try_fill(page, [sel], email):
|
|
break
|
|
for sel in ["input[type='password']", "input[name='password']"]:
|
|
if _try_fill(page, [sel], password):
|
|
break
|
|
for sel in [
|
|
"button[type='submit']",
|
|
"input[type='submit']",
|
|
"button:has-text('Anmelden')",
|
|
"button:has-text('Login')",
|
|
]:
|
|
btn = page.query_selector(sel)
|
|
if btn and btn.is_visible():
|
|
btn.click()
|
|
try:
|
|
page.wait_for_load_state("networkidle", timeout=10_000)
|
|
except PWTimeout:
|
|
pass
|
|
return True
|
|
return False
|
|
|
|
|
|
def submit_contact_form(page: Page, listing_url: str, config: dict) -> bool:
|
|
"""Navigate to listing and submit 'Anbieter kontaktieren' form."""
|
|
try:
|
|
page.goto(listing_url, wait_until="networkidle", timeout=30_000)
|
|
except PWTimeout:
|
|
try:
|
|
page.goto(listing_url, wait_until="domcontentloaded", timeout=30_000)
|
|
except PWTimeout:
|
|
print(" [!] Page load timed out")
|
|
return False
|
|
|
|
# Scroll to bottom to trigger lazy-loaded form
|
|
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
time.sleep(1.5)
|
|
|
|
message = config.get("message", "")
|
|
|
|
# Personal data fields — may or may not be present (absent when logged in)
|
|
_try_fill(
|
|
page,
|
|
[
|
|
"input[name='name']",
|
|
"input[name='fullName']",
|
|
"input[name='contactName']",
|
|
"input[placeholder*='Name']",
|
|
"input[id*='name']:not([type='hidden'])",
|
|
],
|
|
config.get("name", ""),
|
|
)
|
|
_try_fill(
|
|
page,
|
|
[
|
|
"input[type='email']",
|
|
"input[name='email']",
|
|
"input[name='contactEmail']",
|
|
"input[placeholder*='Mail']",
|
|
],
|
|
config.get("email", ""),
|
|
)
|
|
_try_fill(
|
|
page,
|
|
[
|
|
"input[name='phone']",
|
|
"input[name='telefon']",
|
|
"input[name='tel']",
|
|
"input[type='tel']",
|
|
"input[placeholder*='Telefon']",
|
|
"input[placeholder*='Phone']",
|
|
],
|
|
config.get("phone", ""),
|
|
)
|
|
|
|
# Message textarea
|
|
filled_msg = False
|
|
for sel in [
|
|
"textarea[name='message']",
|
|
"textarea[name='nachricht']",
|
|
"textarea[name='body']",
|
|
"textarea[id*='message']",
|
|
"textarea[id*='nachricht']",
|
|
]:
|
|
if _try_fill(page, [sel], message):
|
|
filled_msg = True
|
|
break
|
|
if not filled_msg:
|
|
textareas = page.query_selector_all("textarea")
|
|
for ta in textareas:
|
|
if ta.is_visible():
|
|
ta.fill(message)
|
|
filled_msg = True
|
|
break
|
|
|
|
if not filled_msg:
|
|
print(" [!] No message field found — form structure may have changed")
|
|
return False
|
|
|
|
# Required consent checkbox
|
|
_try_check(
|
|
page,
|
|
[
|
|
"input[type='checkbox'][name*='zustimm']",
|
|
"input[type='checkbox'][name*='consent']",
|
|
"input[type='checkbox'][name*='datenschutz']",
|
|
"input[type='checkbox'][name*='agree']",
|
|
"input[type='checkbox'][name*='accept']",
|
|
],
|
|
)
|
|
# All remaining visible required checkboxes
|
|
for cb in page.query_selector_all("input[type='checkbox'][required]"):
|
|
if cb.is_visible() and not cb.is_checked():
|
|
cb.check()
|
|
|
|
# Submit
|
|
for sel in [
|
|
"button:has-text('Anfrage senden')",
|
|
"button:has-text('Anfrage absenden')",
|
|
"button:has-text('Senden')",
|
|
"button[type='submit']",
|
|
"input[type='submit']",
|
|
]:
|
|
btn = page.query_selector(sel)
|
|
if btn and btn.is_visible():
|
|
btn.click()
|
|
try:
|
|
page.wait_for_load_state("networkidle", timeout=15_000)
|
|
except PWTimeout:
|
|
pass
|
|
# Check for success text
|
|
body = page.content().lower()
|
|
if any(
|
|
kw in body
|
|
for kw in ["erfolgreich", "gesendet", "danke", "thank you", "wurde verschickt"]
|
|
):
|
|
return True
|
|
# Check for CAPTCHA
|
|
if any(kw in body for kw in ["captcha", "robot", "recaptcha"]):
|
|
print(" [!] CAPTCHA detected — manual action required")
|
|
return False
|
|
# Assume success if no obvious error page
|
|
return "error" not in body and "fehler" not in body
|
|
|
|
print(" [!] Submit button not found")
|
|
return False
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Main
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Scrape laendleimmo.at and auto-contact matching listings.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s --flat --rent --minrooms 2 --maxprice 1200
|
|
%(prog)s --house --buy --minrooms 4 --maxprice 500000 --dry-run
|
|
%(prog)s --flat --rent --maxprice 900 --no-headless
|
|
""",
|
|
)
|
|
|
|
parser.add_argument("--minrooms", type=int, metavar="N", help="Minimum number of rooms")
|
|
parser.add_argument("--maxprice", type=int, metavar="EUR", help="Maximum price in €")
|
|
parser.add_argument("--house", action="store_true", help="Search for houses (Haus)")
|
|
parser.add_argument("--flat", action="store_true", help="Search for flats/apartments (Wohnung)")
|
|
parser.add_argument("--rent", action="store_true", help="Rentals only (Mietobjekte)")
|
|
parser.add_argument("--buy", action="store_true", help="Purchases only (Kaufobjekte)")
|
|
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true",
|
|
help="List matches without sending any contact forms",
|
|
)
|
|
parser.add_argument(
|
|
"--max-listings", type=int, default=50, metavar="N",
|
|
help="Max new listings to process (default: 50)",
|
|
)
|
|
parser.add_argument(
|
|
"--delay", type=float, default=3.0, metavar="SEC",
|
|
help="Seconds between contact form submissions (default: 3)",
|
|
)
|
|
parser.add_argument(
|
|
"--headless", action="store_true", default=True,
|
|
help="Run browser headlessly (default)",
|
|
)
|
|
parser.add_argument(
|
|
"--no-headless", dest="headless", action="store_false",
|
|
help="Show browser window (useful for debugging / CAPTCHA solving)",
|
|
)
|
|
parser.add_argument(
|
|
"--reset", action="store_true",
|
|
help="Clear the contacted.json history and start fresh",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.reset:
|
|
if CONTACTED_FILE.exists():
|
|
CONTACTED_FILE.unlink()
|
|
print("[i] contacted.json cleared.")
|
|
|
|
config = load_config()
|
|
contacted = load_contacted()
|
|
search_urls = build_search_urls(args)
|
|
|
|
print("Search URLs:")
|
|
for u in search_urls:
|
|
print(f" {u}")
|
|
print(f"Already contacted: {len(contacted)} listings")
|
|
print()
|
|
|
|
with sync_playwright() as pw:
|
|
browser: Browser = pw.chromium.launch(headless=args.headless)
|
|
ctx = browser.new_context(
|
|
user_agent=(
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
)
|
|
page = ctx.new_page()
|
|
|
|
# Optional login
|
|
if config.get("login_email"):
|
|
print("[i] Logging in...")
|
|
ok = login(page, config)
|
|
print(f" {'[OK] Logged in' if ok else '[!] Login failed — proceeding without login'}")
|
|
print()
|
|
|
|
# Collect listings
|
|
print("[i] Scraping listings...")
|
|
listings = collect_all_listings(page, search_urls, args.max_listings, contacted)
|
|
print(f"\n[i] {len(listings)} new listings to process.\n")
|
|
|
|
if not listings:
|
|
print("Nothing new to contact.")
|
|
browser.close()
|
|
return
|
|
|
|
if args.dry_run:
|
|
print("Dry-run mode — no forms will be submitted.\n")
|
|
for i, listing in enumerate(listings, 1):
|
|
print(f" {i:3}. {listing['url']}")
|
|
if listing["snippet"]:
|
|
print(f" {listing['snippet']}")
|
|
else:
|
|
success_count = 0
|
|
for i, listing in enumerate(listings, 1):
|
|
url = listing["url"]
|
|
print(f"[{i}/{len(listings)}] {url}")
|
|
ok = submit_contact_form(page, url, config)
|
|
if ok:
|
|
contacted.add(url)
|
|
save_contacted(contacted)
|
|
success_count += 1
|
|
print(" [OK] Contact form submitted")
|
|
else:
|
|
print(" [FAIL] Could not submit form")
|
|
if i < len(listings):
|
|
time.sleep(args.delay)
|
|
|
|
print(f"\nDone. {success_count}/{len(listings)} forms submitted.")
|
|
|
|
browser.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|