255 lines
8.2 KiB
Python
255 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
||
import asyncio, aiohttp, aiohttp.client_exceptions as aiox
|
||
import os, time, random, hashlib, json, re, pathlib
|
||
from urllib.parse import urljoin, urldefrag, urlparse
|
||
from bs4 import BeautifulSoup
|
||
from dateutil.parser import parse as dtparse
|
||
import yaml, tldextract, ssl
|
||
|
||
try:
|
||
import uvloop
|
||
uvloop.install()
|
||
except Exception:
|
||
pass
|
||
|
||
# ---- Config laden ----
|
||
BASE = os.environ.get("RAG_CRAWLER_BASE", os.getcwd())
|
||
CONF_PATH = os.path.join(BASE, "crawler", "sources.yml")
|
||
with open(CONF_PATH, "r") as f:
|
||
CFG = yaml.safe_load(f)
|
||
|
||
POLICY = CFG.get("policy", {})
|
||
STORAGE = CFG.get("storage", {})
|
||
MEMORY = CFG.get("memory", {})
|
||
SEEDS = CFG.get("seeds", [])
|
||
|
||
ROOT = pathlib.Path(STORAGE.get("root", "/srv/ai/corpus")).resolve()
|
||
TEXT_DIR = ROOT / STORAGE.get("text_subdir", "text")
|
||
PDF_DIR = ROOT / STORAGE.get("pdf_subdir", "pdf")
|
||
TEXT_DIR.mkdir(parents=True, exist_ok=True)
|
||
PDF_DIR.mkdir(parents=True, exist_ok=True)
|
||
STATE_PATH = ROOT / ".crawler_state.json"
|
||
|
||
STATE = {"visited": {}} # url -> {etag, last_modified, ts}
|
||
if STATE_PATH.exists():
|
||
try:
|
||
STATE = json.loads(STATE_PATH.read_text())
|
||
except Exception:
|
||
pass
|
||
|
||
def save_state():
|
||
try:
|
||
STATE_PATH.write_text(json.dumps(STATE))
|
||
except Exception:
|
||
pass
|
||
|
||
# ---- Robots & Quoten ----
|
||
ROBOTS_CACHE = {}
|
||
DOMAIN_NEXT_ALLOWED = {}
|
||
|
||
def domain_key(url):
|
||
ext = tldextract.extract(url)
|
||
return f"{ext.domain}.{ext.suffix}"
|
||
|
||
async def fetch_robots(session, base_url):
|
||
dom = domain_key(base_url)
|
||
if dom in ROBOTS_CACHE:
|
||
return ROBOTS_CACHE[dom]
|
||
robots_url = urljoin(f"{urlparse(base_url).scheme}://{urlparse(base_url).netloc}", "/robots.txt")
|
||
from robotexclusionrulesparser import RobotExclusionRulesParser as Robots
|
||
rp = Robots()
|
||
try:
|
||
async with session.get(robots_url, timeout=10) as r:
|
||
if r.status == 200:
|
||
rp.parse(await r.text())
|
||
else:
|
||
rp.parse("")
|
||
except Exception:
|
||
rp.parse("")
|
||
ROBOTS_CACHE[dom] = rp
|
||
return rp
|
||
|
||
def polite_delay_for(url):
|
||
dmin = int(POLICY.get("delay_min_seconds", 5))
|
||
dmax = int(POLICY.get("delay_max_seconds", 60))
|
||
d = domain_key(url)
|
||
t = DOMAIN_NEXT_ALLOWED.get(d, 0)
|
||
now = time.time()
|
||
if now < t:
|
||
return max(0, t - now)
|
||
# Setze nächste erlaubte Zeit (random Delay) – eigentlicher Sleep erfolgt in fetch()
|
||
DOMAIN_NEXT_ALLOWED[d] = now + random.uniform(dmin, dmax)
|
||
return 0
|
||
|
||
def norm_url(base, link):
|
||
href = urljoin(base, link)
|
||
href, _ = urldefrag(href)
|
||
return href
|
||
|
||
def fnmatch(text, pat):
|
||
pat = pat.replace("**", ".*").replace("*", "[^/]*")
|
||
return re.fullmatch(pat, text) is not None
|
||
|
||
def allowed_by_patterns(url, inc, exc):
|
||
ok_inc = True if not inc else any(fnmatch(url, pat) for pat in inc)
|
||
ok_exc = any(fnmatch(url, pat) for pat in exc) if exc else False
|
||
return ok_inc and not ok_exc
|
||
|
||
def should_revisit(url, revisit_str):
|
||
info = STATE["visited"].get(url, {})
|
||
if not info:
|
||
return True
|
||
try:
|
||
days = int(revisit_str.rstrip("d"))
|
||
except Exception:
|
||
days = 30
|
||
last_ts = info.get("ts", 0)
|
||
return (time.time() - last_ts) > days * 86400
|
||
|
||
async def fetch(session, url, etag=None, lastmod=None):
|
||
headers = {"User-Agent": POLICY.get("user_agent", "polite-crawler/1.0")}
|
||
if etag:
|
||
headers["If-None-Match"] = etag
|
||
if lastmod:
|
||
headers["If-Modified-Since"] = lastmod
|
||
ssl_ctx = ssl.create_default_context()
|
||
try:
|
||
delay = polite_delay_for(url)
|
||
if delay > 0:
|
||
await asyncio.sleep(delay)
|
||
async with session.get(url, headers=headers, ssl=ssl_ctx, timeout=30) as r:
|
||
if r.status == 304:
|
||
return None, {"status": 304, "headers": {}}
|
||
body = await r.read()
|
||
return body, {"status": r.status, "headers": dict(r.headers)}
|
||
except Exception as e:
|
||
return None, {"status": "error", "error": str(e)}
|
||
|
||
def save_binary(path: pathlib.Path, content: bytes):
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_bytes(content)
|
||
|
||
def save_text(path: pathlib.Path, text: str):
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(text)
|
||
|
||
def is_pdf(headers):
|
||
ct = headers.get("Content-Type", "").lower()
|
||
return "application/pdf" in ct or ct.endswith("/pdf")
|
||
|
||
def extract_text_html(body: bytes) -> str:
|
||
soup = BeautifulSoup(body, "lxml")
|
||
for tag in soup(["script","style","noscript","nav","footer","header","aside"]):
|
||
tag.decompose()
|
||
text = soup.get_text("\n")
|
||
return "\n".join(line.strip() for line in text.splitlines() if line.strip())
|
||
|
||
def path_for(url, typ="text"):
|
||
h = hashlib.sha256(url.encode()).hexdigest()[:16]
|
||
if typ == "text":
|
||
return TEXT_DIR / f"{h}.txt"
|
||
return PDF_DIR / f"{h}.pdf"
|
||
|
||
async def crawl_seed(session, seed, budget=0):
|
||
base = seed["url"]
|
||
include = seed.get("include", [])
|
||
exclude = seed.get("exclude", [])
|
||
revisit = seed.get("revisit", "30d")
|
||
|
||
# robots
|
||
if POLICY.get("obey_robots_txt", True):
|
||
rp = await fetch_robots(session, base)
|
||
if not rp.is_allowed("*", base):
|
||
return
|
||
|
||
queue = [base]
|
||
seen = set()
|
||
processed = 0
|
||
|
||
while queue:
|
||
url = queue.pop(0)
|
||
if url in seen:
|
||
continue
|
||
seen.add(url)
|
||
|
||
if POLICY.get("obey_robots_txt", True):
|
||
rp = await fetch_robots(session, url)
|
||
if not rp.is_allowed("*", url):
|
||
continue
|
||
|
||
if not allowed_by_patterns(url, include, exclude):
|
||
continue
|
||
|
||
info = STATE["visited"].get(url, {})
|
||
etag = info.get("etag")
|
||
lastmod = info.get("last_modified")
|
||
if not should_revisit(url, revisit):
|
||
continue
|
||
|
||
body, meta = await fetch(session, url, etag, lastmod)
|
||
status = meta.get("status")
|
||
headers = meta.get("headers", {})
|
||
|
||
if status == 304:
|
||
STATE["visited"][url] = {"etag": etag, "last_modified": lastmod, "ts": time.time()}
|
||
save_state()
|
||
continue
|
||
if status != 200 or body is None:
|
||
continue
|
||
|
||
if is_pdf(headers):
|
||
out_pdf = path_for(url, "pdf")
|
||
save_binary(out_pdf, body)
|
||
# Grobe Textextraktion (best-effort)
|
||
try:
|
||
from pdfminer.high_level import extract_text as pdf_extract
|
||
txt = pdf_extract(str(out_pdf))
|
||
save_text(path_for(url, "text"), txt)
|
||
except Exception:
|
||
pass
|
||
else:
|
||
txt = extract_text_html(body)
|
||
save_text(path_for(url, "text"), txt)
|
||
# Links sammeln (nur gleiche Domain leicht erweitern)
|
||
soup = BeautifulSoup(body, "lxml")
|
||
for a in soup.find_all("a", href=True):
|
||
href = urljoin(url, a["href"])
|
||
href, _ = urldefrag(href)
|
||
if href.startswith("http"):
|
||
# Begrenze Tiefe implizit über revisit/budget
|
||
queue.append(href)
|
||
|
||
STATE["visited"][url] = {
|
||
"etag": headers.get("ETag"),
|
||
"last_modified": headers.get("Last-Modified"),
|
||
"ts": time.time(),
|
||
}
|
||
save_state()
|
||
|
||
processed += 1
|
||
if budget and processed >= budget:
|
||
break
|
||
|
||
async def main(mode="update", budget=0):
|
||
con_total = int(POLICY.get("concurrency_total", 4))
|
||
timeout = aiohttp.ClientTimeout(total=120)
|
||
connector = aiohttp.TCPConnector(limit=con_total, ssl=False)
|
||
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
|
||
tasks = []
|
||
if mode == "drip":
|
||
budget = budget or 1
|
||
else:
|
||
budget = 0 # unbegrenzt im update-Modus
|
||
for seed in SEEDS:
|
||
tasks.append(crawl_seed(session, seed, budget=budget))
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--mode", choices=["update","drip"], default="update",
|
||
help="update=vollständig, drip=sehr langsam mit Budget je Seed")
|
||
parser.add_argument("--budget", type=int, default=1, help="URLs pro Seed (nur drip)")
|
||
args = parser.parse_args()
|
||
asyncio.run(main(args.mode, args.budget))
|