#!/usr/bin/env python3 import asyncio, aiohttp, aiohttp.client_exceptions as aiox import os, time, random, hashlib, json, re, pathlib from urllib.parse import urljoin, urldefrag, urlparse from bs4 import BeautifulSoup from dateutil.parser import parse as dtparse import yaml, tldextract, ssl try: import uvloop uvloop.install() except Exception: pass # ---- Config laden ---- BASE = os.environ.get("RAG_CRAWLER_BASE", os.getcwd()) CONF_PATH = os.path.join(BASE, "crawler", "sources.yml") with open(CONF_PATH, "r") as f: CFG = yaml.safe_load(f) POLICY = CFG.get("policy", {}) STORAGE = CFG.get("storage", {}) MEMORY = CFG.get("memory", {}) SEEDS = CFG.get("seeds", []) ROOT = pathlib.Path(STORAGE.get("root", "/srv/ai/corpus")).resolve() TEXT_DIR = ROOT / STORAGE.get("text_subdir", "text") PDF_DIR = ROOT / STORAGE.get("pdf_subdir", "pdf") TEXT_DIR.mkdir(parents=True, exist_ok=True) PDF_DIR.mkdir(parents=True, exist_ok=True) STATE_PATH = ROOT / ".crawler_state.json" STATE = {"visited": {}} # url -> {etag, last_modified, ts} if STATE_PATH.exists(): try: STATE = json.loads(STATE_PATH.read_text()) except Exception: pass def save_state(): try: STATE_PATH.write_text(json.dumps(STATE)) except Exception: pass # ---- Robots & Quoten ---- ROBOTS_CACHE = {} DOMAIN_NEXT_ALLOWED = {} def domain_key(url): ext = tldextract.extract(url) return f"{ext.domain}.{ext.suffix}" async def fetch_robots(session, base_url): dom = domain_key(base_url) if dom in ROBOTS_CACHE: return ROBOTS_CACHE[dom] robots_url = urljoin(f"{urlparse(base_url).scheme}://{urlparse(base_url).netloc}", "/robots.txt") from robotexclusionrulesparser import RobotExclusionRulesParser as Robots rp = Robots() try: async with session.get(robots_url, timeout=10) as r: if r.status == 200: rp.parse(await r.text()) else: rp.parse("") except Exception: rp.parse("") ROBOTS_CACHE[dom] = rp return rp def polite_delay_for(url): dmin = int(POLICY.get("delay_min_seconds", 5)) dmax = int(POLICY.get("delay_max_seconds", 60)) d = domain_key(url) t = DOMAIN_NEXT_ALLOWED.get(d, 0) now = time.time() if now < t: return max(0, t - now) # Setze nächste erlaubte Zeit (random Delay) – eigentlicher Sleep erfolgt in fetch() DOMAIN_NEXT_ALLOWED[d] = now + random.uniform(dmin, dmax) return 0 def norm_url(base, link): href = urljoin(base, link) href, _ = urldefrag(href) return href def fnmatch(text, pat): pat = pat.replace("**", ".*").replace("*", "[^/]*") return re.fullmatch(pat, text) is not None def allowed_by_patterns(url, inc, exc): ok_inc = True if not inc else any(fnmatch(url, pat) for pat in inc) ok_exc = any(fnmatch(url, pat) for pat in exc) if exc else False return ok_inc and not ok_exc def should_revisit(url, revisit_str): info = STATE["visited"].get(url, {}) if not info: return True try: days = int(revisit_str.rstrip("d")) except Exception: days = 30 last_ts = info.get("ts", 0) return (time.time() - last_ts) > days * 86400 async def fetch(session, url, etag=None, lastmod=None): headers = {"User-Agent": POLICY.get("user_agent", "polite-crawler/1.0")} if etag: headers["If-None-Match"] = etag if lastmod: headers["If-Modified-Since"] = lastmod ssl_ctx = ssl.create_default_context() try: delay = polite_delay_for(url) if delay > 0: await asyncio.sleep(delay) async with session.get(url, headers=headers, ssl=ssl_ctx, timeout=30) as r: if r.status == 304: return None, {"status": 304, "headers": {}} body = await r.read() return body, {"status": r.status, "headers": dict(r.headers)} except Exception as e: return None, {"status": "error", "error": str(e)} def save_binary(path: pathlib.Path, content: bytes): path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(content) def save_text(path: pathlib.Path, text: str): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text) def is_pdf(headers): ct = headers.get("Content-Type", "").lower() return "application/pdf" in ct or ct.endswith("/pdf") def extract_text_html(body: bytes) -> str: soup = BeautifulSoup(body, "lxml") for tag in soup(["script","style","noscript","nav","footer","header","aside"]): tag.decompose() text = soup.get_text("\n") return "\n".join(line.strip() for line in text.splitlines() if line.strip()) def path_for(url, typ="text"): h = hashlib.sha256(url.encode()).hexdigest()[:16] if typ == "text": return TEXT_DIR / f"{h}.txt" return PDF_DIR / f"{h}.pdf" async def crawl_seed(session, seed, budget=0): base = seed["url"] include = seed.get("include", []) exclude = seed.get("exclude", []) revisit = seed.get("revisit", "30d") # robots if POLICY.get("obey_robots_txt", True): rp = await fetch_robots(session, base) if not rp.is_allowed("*", base): return queue = [base] seen = set() processed = 0 while queue: url = queue.pop(0) if url in seen: continue seen.add(url) if POLICY.get("obey_robots_txt", True): rp = await fetch_robots(session, url) if not rp.is_allowed("*", url): continue if not allowed_by_patterns(url, include, exclude): continue info = STATE["visited"].get(url, {}) etag = info.get("etag") lastmod = info.get("last_modified") if not should_revisit(url, revisit): continue body, meta = await fetch(session, url, etag, lastmod) status = meta.get("status") headers = meta.get("headers", {}) if status == 304: STATE["visited"][url] = {"etag": etag, "last_modified": lastmod, "ts": time.time()} save_state() continue if status != 200 or body is None: continue if is_pdf(headers): out_pdf = path_for(url, "pdf") save_binary(out_pdf, body) # Grobe Textextraktion (best-effort) try: from pdfminer.high_level import extract_text as pdf_extract txt = pdf_extract(str(out_pdf)) save_text(path_for(url, "text"), txt) except Exception: pass else: txt = extract_text_html(body) save_text(path_for(url, "text"), txt) # Links sammeln (nur gleiche Domain leicht erweitern) soup = BeautifulSoup(body, "lxml") for a in soup.find_all("a", href=True): href = urljoin(url, a["href"]) href, _ = urldefrag(href) if href.startswith("http"): # Begrenze Tiefe implizit über revisit/budget queue.append(href) STATE["visited"][url] = { "etag": headers.get("ETag"), "last_modified": headers.get("Last-Modified"), "ts": time.time(), } save_state() processed += 1 if budget and processed >= budget: break async def main(mode="update", budget=0): con_total = int(POLICY.get("concurrency_total", 4)) timeout = aiohttp.ClientTimeout(total=120) connector = aiohttp.TCPConnector(limit=con_total, ssl=False) async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: tasks = [] if mode == "drip": budget = budget or 1 else: budget = 0 # unbegrenzt im update-Modus for seed in SEEDS: tasks.append(crawl_seed(session, seed, budget=budget)) await asyncio.gather(*tasks, return_exceptions=True) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--mode", choices=["update","drip"], default="update", help="update=vollständig, drip=sehr langsam mit Budget je Seed") parser.add_argument("--budget", type=int, default=1, help="URLs pro Seed (nur drip)") args = parser.parse_args() asyncio.run(main(args.mode, args.budget))