Files
KI-Cluster-Roadmap/01_Modulerweiterungen/Entworfener_Code/app/src/logging_internal.py
2025-11-13 11:11:21 +01:00

476 lines
16 KiB
Python

from __future__ import annotations
"""
SQLite-basiertes internes Logging-Modul
=======================================
Zweck
- Persistentes, leichtgewichtiges Logging in eine interne SQLite-Datenbank.
- Geeignet für interne Daten wie Hash-Werte, Status-Events, Metadaten.
Fähigkeiten
- Schema-Management (Tabellen und Indizes falls nicht vorhanden).
- Optionales Säubern der Datenbank beim Start (clean_database).
- Aufbewahrung/Retention nach Tagen (retention_days).
- Begrenzung der Gesamtanzahl (max_entries).
- Bereitstellung eines logging.Handler, der LogRecords direkt in SQLite schreibt.
- Abfrage-API mit Filtern und Paging.
Konfiguration (Beispiel, siehe Planung/Architektur.md)
logging_internal:
enabled: true
db_path: "data/internal_logs.sqlite"
clean_database: false
retention_days: 30
max_entries: 100000
vacuum_on_start: true
batch_write: 100
"""
import json
import logging
import sqlite3
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
# ---------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------
def _utc_now_iso() -> str:
"""Aktuelle UTC-Zeit als ISO 8601 mit Millisekunden und 'Z'-Suffix."""
return datetime.now(timezone.utc).astimezone(timezone.utc).replace(tzinfo=timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
def _ensure_dir_for(path: Path) -> None:
"""Erzeugt das Zielverzeichnis für eine Datei, falls erforderlich."""
path.parent.mkdir(parents=True, exist_ok=True)
def _to_iso(ts: Union[str, datetime, None]) -> Optional[str]:
if ts is None:
return None
if isinstance(ts, str):
return ts
if isinstance(ts, datetime):
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
ts = ts.astimezone(timezone.utc)
return ts.isoformat(timespec="milliseconds").replace("+00:00", "Z")
return None
# ---------------------------------------------------------
# Kern: SQLiteLogger
# ---------------------------------------------------------
@dataclass
class RetentionPolicy:
retention_days: int = 30
max_entries: int = 100_000
class SQLiteLogger:
"""
Verwaltet die SQLite-Datenbank für interne Logs.
Thread-sicher durch Lock, eine Connection pro Prozess (check_same_thread=False).
"""
def __init__(
self,
db_path: Union[str, Path],
vacuum_on_start: bool = True,
clean_database: bool = False,
retention: Optional[RetentionPolicy] = None,
) -> None:
self.db_path = Path(db_path)
self.vacuum_on_start = bool(vacuum_on_start)
self.clean_database = bool(clean_database)
self.retention = retention or RetentionPolicy()
self._lock = threading.RLock()
self._conn: Optional[sqlite3.Connection] = None
self._initialize_db()
# ---------- Public API ----------
def write(self, entry: Dict[str, Any]) -> None:
"""
Schreibt einen einzelnen Log-Eintrag.
Erwartete Keys:
- ts: ISO 8601, sonst wird automatisch gesetzt
- level: TEXT
- logger: TEXT
- message: TEXT
- meta: dict | JSON-serialisierbar | None
"""
data = self._normalize_entry(entry)
with self._lock, self._connection() as con:
con.execute(
"""
INSERT INTO logs (ts, level, logger, message, meta)
VALUES (?, ?, ?, ?, ?)
""",
(data["ts"], data.get("level"), data.get("logger"), data.get("message"), data.get("meta")),
)
con.commit()
def write_many(self, entries: Iterable[Dict[str, Any]]) -> None:
"""Batch-Insert für mehrere Einträge."""
rows: List[Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]]] = []
for e in entries:
d = self._normalize_entry(e)
rows.append((d["ts"], d.get("level"), d.get("logger"), d.get("message"), d.get("meta")))
if not rows:
return
with self._lock, self._connection() as con:
con.executemany(
"INSERT INTO logs (ts, level, logger, message, meta) VALUES (?, ?, ?, ?, ?)",
rows,
)
con.commit()
def query(
self,
*,
logger: Optional[str] = None,
level: Optional[str] = None,
from_ts: Optional[Union[str, datetime]] = None,
to_ts: Optional[Union[str, datetime]] = None,
text: Optional[str] = None,
limit: int = 100,
offset: int = 0,
order_desc: bool = True,
) -> List[Dict[str, Any]]:
"""
Liefert Log-Einträge gefiltert und paginiert zurück.
- logger: exakter Match
- level: exakter Match
- from_ts / to_ts: Grenzen (inklusive), ISO 8601 oder datetime
- text: Fulltext-ähnliche Suche per LIKE auf message
"""
clauses: List[str] = []
params: List[Any] = []
if logger:
clauses.append("logger = ?")
params.append(logger)
if level:
clauses.append("level = ?")
params.append(level)
if from_ts is not None:
v = _to_iso(from_ts) or _utc_now_iso()
clauses.append("ts >= ?")
params.append(v)
if to_ts is not None:
v = _to_iso(to_ts) or _utc_now_iso()
clauses.append("ts <= ?")
params.append(v)
if text:
clauses.append("message LIKE ?")
params.append(f"%{text}%")
where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
order = "DESC" if order_desc else "ASC"
sql = f"""
SELECT id, ts, level, logger, message, meta
FROM logs
{where}
ORDER BY ts {order}, id {order}
LIMIT ? OFFSET ?
"""
params.extend([int(max(limit, 0)), int(max(offset, 0))])
with self._lock, self._connection() as con:
cur = con.execute(sql, params)
rows = cur.fetchall() or []
out: List[Dict[str, Any]] = []
for r in rows:
meta_val: Optional[str] = r[5]
meta_obj: Optional[Any] = None
if meta_val:
try:
meta_obj = json.loads(meta_val)
except Exception:
meta_obj = meta_val # fallback: Rohwert
out.append(
{
"id": r[0],
"ts": r[1],
"level": r[2],
"logger": r[3],
"message": r[4],
"meta": meta_obj,
}
)
return out
def cleanup(self) -> None:
"""
Führt Aufräumregeln aus:
- Lösche Einträge älter als retention_days (wenn > 0).
- Reduziere auf max_entries (wenn > 0), lösche älteste zuerst.
"""
with self._lock, self._connection() as con:
# 1) Zeitbasierte Aufbewahrung
if self.retention.retention_days and self.retention.retention_days > 0:
cutoff = datetime.now(timezone.utc) - timedelta(days=int(self.retention.retention_days))
cutoff_iso = _to_iso(cutoff) or _utc_now_iso()
con.execute("DELETE FROM logs WHERE ts < ?", (cutoff_iso,))
con.commit()
# 2) Anzahl begrenzen
if self.retention.max_entries and self.retention.max_entries > 0:
cur = con.execute("SELECT COUNT(*) FROM logs")
total = int(cur.fetchone()[0])
overflow = total - int(self.retention.max_entries)
if overflow > 0:
# Lösche die ältesten N Einträge
con.execute(
"""
DELETE FROM logs
WHERE id IN (
SELECT id FROM logs
ORDER BY ts ASC, id ASC
LIMIT ?
)
""",
(overflow,),
)
con.commit()
def get_handler(self, level: Union[int, str] = logging.INFO) -> logging.Handler:
"""
Erzeugt einen logging.Handler, der LogRecords in die SQLite-DB schreibt.
Der zurückgegebene Handler ist threadsicher und kann dem Root-Logger zugewiesen werden.
"""
h = SQLiteLogHandler(self)
if isinstance(level, str):
level = getattr(logging, level.upper(), logging.INFO)
h.setLevel(int(level))
return h
# ---------- Internals ----------
def _initialize_db(self) -> None:
if self.clean_database and self.db_path.exists():
try:
self.db_path.unlink()
except FileNotFoundError:
pass
_ensure_dir_for(self.db_path)
with self._lock, self._connection() as con:
# Pragmas für Stabilität/Performance
con.execute("PRAGMA journal_mode=WAL;")
con.execute("PRAGMA synchronous=NORMAL;")
con.execute("PRAGMA foreign_keys=ON;")
con.execute("PRAGMA temp_store=MEMORY;")
# Schema
con.execute(
"""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
level TEXT,
logger TEXT,
message TEXT,
meta TEXT
)
"""
)
con.execute("CREATE INDEX IF NOT EXISTS idx_logs_ts ON logs (ts);")
con.execute("CREATE INDEX IF NOT EXISTS idx_logs_logger ON logs (logger);")
con.commit()
if self.vacuum_on_start:
# VACUUM darf nicht innerhalb einer aktiven Transaktion laufen
prev_iso = con.isolation_level
try:
con.isolation_level = None
con.execute("VACUUM;")
finally:
con.isolation_level = prev_iso
# Nach Schema-Erstellung sofort Cleanup-Regeln anwenden
self.cleanup()
def _normalize_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
ts = _to_iso(entry.get("ts")) or _utc_now_iso()
meta_val = entry.get("meta")
if meta_val is None:
meta_json = None
else:
try:
meta_json = json.dumps(meta_val, ensure_ascii=False, separators=(",", ":"))
except Exception:
meta_json = json.dumps({"_repr": repr(meta_val)}, ensure_ascii=False)
return {
"ts": ts,
"level": str(entry.get("level")) if entry.get("level") is not None else None,
"logger": str(entry.get("logger")) if entry.get("logger") is not None else None,
"message": str(entry.get("message")) if entry.get("message") is not None else None,
"meta": meta_json,
}
def _connection(self) -> sqlite3.Connection:
if self._conn is None:
# check_same_thread=False: erlaubt Nutzung über mehrere Threads, wir schützen per Lock
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
return self._conn
# ---------------------------------------------------------
# logging.Handler-Integration
# ---------------------------------------------------------
class SQLiteLogHandler(logging.Handler):
"""
Logging-Handler, der LogRecords in die SQLite-Datenbank schreibt.
"""
def __init__(self, writer: SQLiteLogger) -> None:
super().__init__()
self._writer = writer
def emit(self, record: logging.LogRecord) -> None:
try:
# Standardfelder
payload: Dict[str, Any] = {
"ts": _utc_now_iso(),
"level": record.levelname,
"logger": record.name,
"message": self.format(record) if self.formatter else record.getMessage(),
}
# Meta: ausgewählte Felder + Extras
meta: Dict[str, Any] = {
"pathname": record.pathname,
"lineno": record.lineno,
"funcName": record.funcName,
"process": record.process,
"threadName": record.threadName,
}
# Extras erkennen: all jene keys, die nicht Standard sind
standard = {
"name",
"msg",
"args",
"levelname",
"levelno",
"pathname",
"filename",
"module",
"exc_info",
"exc_text",
"stack_info",
"lineno",
"funcName",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
"message",
}
for k, v in record.__dict__.items():
if k not in standard:
# Versuch: JSON-serialisierbar machen
try:
json.dumps(v)
meta[k] = v
except Exception:
meta[k] = repr(v)
if record.exc_info:
# Ausnahmeinformationen hinzufügen (als Text)
meta["exc_info"] = logging.Formatter().formatException(record.exc_info)
payload["meta"] = meta
self._writer.write(payload)
except Exception:
# Handler darf niemals den Prozess crashen
self.handleError(record)
# ---------------------------------------------------------
# Modulweite Singletons/Helper-Funktionen
# ---------------------------------------------------------
_logger_instance: Optional[SQLiteLogger] = None
_INSTANCE_LOCK = threading.Lock()
def init(
*,
db_path: Union[str, Path],
vacuum_on_start: bool = True,
clean_database: bool = False,
retention_days: int = 30,
max_entries: int = 100_000,
) -> SQLiteLogger:
"""
Initialisiert den globalen SQLiteLogger (Singleton pro Prozess).
Wird erneut aufgerufen, wird die bestehende Instanz zurückgegeben.
"""
global _logger_instance
with _INSTANCE_LOCK:
if _logger_instance is not None:
return _logger_instance
p = Path(db_path)
# relative Pfade sind relativ zum Arbeitsverzeichnis des Prozesses;
# in Integrationen sollte nach Bedarf auf app/-Pfad aufgelöst werden.
retention = RetentionPolicy(retention_days=int(retention_days), max_entries=int(max_entries))
_logger_instance = SQLiteLogger(
db_path=p,
vacuum_on_start=vacuum_on_start,
clean_database=clean_database,
retention=retention,
)
return _logger_instance
def instance() -> SQLiteLogger:
"""Gibt die initialisierte Instanz zurück oder wirft einen Fehler."""
if _logger_instance is None:
raise RuntimeError("SQLiteLogger ist nicht initialisiert. Bitte init(...) zuerst aufrufen.")
return _logger_instance
def get_engineered_handler(level: Union[int, str] = logging.INFO) -> logging.Handler:
"""
Liefert einen konfigurierten Handler auf Basis der globalen Instanz.
Beispiel:
from logging import getLogger
from logging_internal import init, get_engineered_handler
init(db_path='data/internal_logs.sqlite', retention_days=30, max_entries=100_000)
root = logging.getLogger()
root.addHandler(get_engineered_handler(logging.INFO))
"""
return instance().get_handler(level)
__all__ = [
"SQLiteLogger",
"RetentionPolicy",
"init",
"instance",
"get_engineered_handler",
]