This commit is contained in:
2025-11-13 11:11:21 +01:00
parent 4830cafd5a
commit e02c3b79cc
7 changed files with 1129 additions and 26 deletions

View File

@@ -0,0 +1,372 @@
from __future__ import annotations
"""
Externes Logging-Modul (MySQL / PostgreSQL) auf Basis von SQLAlchemy
====================================================================
Zweck
- Persistentes Logging in eine externe Datenbank (MySQL oder PostgreSQL).
- Optionale Fallback-Strategie auf internes SQLite-Logging bei Fehlern.
Features
- Connection-Pooling via SQLAlchemy Engine.
- Schema-Erzeugung (Tabelle logs) via SQLAlchemy Core.
- Batch-Schreiben von Logeinträgen.
- Health-Check (SELECT 1).
- logging.Handler zum direkten Schreiben aus Python-Logging.
- Optionaler Fallback (Schreiben in internes SQLite-Logging), wenn aktiviert.
Konfiguration (Beispiel; siehe 01_Modulerweiterungen/Planung/Architektur.md)
logging_external:
enabled: false
type: "postgresql" # mysql | postgresql
host: "localhost"
port: 5432
user: "logger"
password: null
database: "logs"
sslmode: "prefer"
pool_size: 5
connect_timeout: 10
write_buffer_size: 100
fallback_to_internal_on_error: true
"""
import json
import logging
import time
import os
import threading
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
try:
from sqlalchemy import (
create_engine,
Table,
Column,
Integer,
String,
Text,
MetaData,
DateTime,
text as sa_text,
insert as sa_insert,
)
from sqlalchemy.engine import Engine, Connection
except Exception as e: # pragma: no cover
raise RuntimeError(
"SQLAlchemy ist erforderlich für logging_external. Bitte Abhängigkeiten installieren."
) from e
def _env_override(key: str, default: Optional[str] = None) -> Optional[str]:
"""Liest optionale Secrets aus Umgebungsvariablen, falls gesetzt."""
v = os.environ.get(key)
return v if v is not None else default
def _safe_quote(v: str) -> str:
# Nur einfache, naive Maskierung für URL-Bestandteile
return v.replace("@", "%40").replace(":", "%3A").replace("/", "%2F")
def _now_utc_iso() -> str:
return (
datetime.now(timezone.utc)
.astimezone(timezone.utc)
.replace(tzinfo=timezone.utc)
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z")
)
@dataclass
class ExternalConfig:
db_type: str = "postgresql" # "mysql" | "postgresql"
host: str = "localhost"
port: int = 5432
user: Optional[str] = None
password: Optional[str] = None
database: str = "logs"
sslmode: Optional[str] = None # nur relevant für PostgreSQL
pool_size: int = 5
connect_timeout: int = 10
fallback_to_internal_on_error: bool = True
@classmethod
def from_dict(cls, cfg: Dict[str, Any]) -> "ExternalConfig":
# Secrets via ENV überschreiben, wenn vorhanden
user = cfg.get("user")
password = cfg.get("password")
user = _env_override("LOG_EXT_USER", user)
password = _env_override("LOG_EXT_PASSWORD", password)
return cls(
db_type=str(cfg.get("type", "postgresql")),
host=str(cfg.get("host", "localhost")),
port=int(cfg.get("port", 5432)),
user=(None if user is None else str(user)),
password=(None if password is None else str(password)),
database=str(cfg.get("database", "logs")),
sslmode=(cfg.get("sslmode") if cfg.get("sslmode") is not None else None),
pool_size=int(cfg.get("pool_size", 5)),
connect_timeout=int(cfg.get("connect_timeout", 10)),
fallback_to_internal_on_error=bool(cfg.get("fallback_to_internal_on_error", True)),
)
def to_url(self) -> str:
"""
Baut den SQLAlchemy-URL-String auf Basis der Konfiguration.
- PostgreSQL: postgresql+psycopg2://user:password@host:port/database?sslmode=...
- MySQL: mysql+pymysql://user:password@host:port/database
"""
user_part = ""
if self.user:
if self.password:
user_part = f"{_safe_quote(self.user)}:{_safe_quote(self.password)}@"
else:
user_part = f"{_safe_quote(self.user)}@"
if self.db_type.lower() in ("postgres", "postgresql", "pg"):
base = f"postgresql+psycopg2://{user_part}{self.host}:{self.port}/{self.database}"
if self.sslmode:
base = f"{base}?sslmode={self.sslmode}"
return base
if self.db_type.lower() in ("mysql", "mariadb"):
return f"mysql+pymysql://{user_part}{self.host}:{self.port}/{self.database}"
raise ValueError(f"Unsupported db_type: {self.db_type}")
class ExternalDBLogger:
"""
Verwaltet Engine und Schema für externes Logging.
"""
def __init__(self, engine: Engine) -> None:
self.engine = engine
self.meta = MetaData()
# Hinweis: ts als String/DateTime. Für maximale Kompatibilität nutzen wir String(30) ISO-Zeitstempel.
self.logs = Table(
"logs",
self.meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("ts", String(30), nullable=False),
Column("level", String(16), nullable=True),
Column("logger", String(128), nullable=True),
Column("message", Text, nullable=True),
Column("meta", Text, nullable=True), # JSON-String
)
# ---------- API ----------
def ensure_schema(self) -> None:
self.meta.create_all(self.engine, checkfirst=True)
def health_check(self, timeout_s: int = 5) -> bool:
try:
with self.engine.connect() as conn:
conn.execute(sa_text("SELECT 1"))
return True
except Exception:
return False
def write_batch(self, entries: Iterable[Dict[str, Any]]) -> None:
rows: List[Dict[str, Any]] = []
for e in entries:
ts = e.get("ts") or _now_utc_iso()
level = e.get("level")
loggername = e.get("logger")
message = e.get("message")
meta = e.get("meta")
try:
meta_json = json.dumps(meta, ensure_ascii=False, separators=(",", ":")) if meta is not None else None
except Exception:
meta_json = json.dumps({"_repr": repr(meta)}, ensure_ascii=False)
rows.append(
{
"ts": str(ts),
"level": (None if level is None else str(level)),
"logger": (None if loggername is None else str(loggername)),
"message": (None if message is None else str(message)),
"meta": meta_json,
}
)
if not rows:
return
with self.engine.begin() as conn:
conn.execute(sa_insert(self.logs), rows)
class ExternalDBHandler(logging.Handler):
"""
Logging-Handler, der LogRecords in die externe DB schreibt.
Mit einfachem Retry/Backoff und optionalem Fallback auf internes SQLite.
"""
def __init__(self, writer: ExternalDBLogger, *, fallback_to_internal: bool = True) -> None:
super().__init__()
self._writer = writer
self._fallback_to_internal = fallback_to_internal
def emit(self, record: logging.LogRecord) -> None:
payload = {
"ts": _now_utc_iso(),
"level": record.levelname,
"logger": record.name,
"message": self.format(record) if self.formatter else record.getMessage(),
"meta": _extract_meta(record),
}
# Kleiner Retry mit Backoff: 3 Versuche 50ms/150ms
delay_ms = [0.05, 0.15, 0.5]
last_err: Optional[BaseException] = None
for d in delay_ms:
try:
self._writer.write_batch([payload])
return
except Exception as e:
last_err = e
time.sleep(d)
# Fallback
if self._fallback_to_internal:
try:
import importlib
mod = importlib.import_module("logging_internal")
mod.instance().write(payload) # nutzt bestehende interne Instanz
logging.getLogger(__name__).warning(
"External logging failed; wrote to internal SQLite fallback."
)
return
except Exception:
pass
# Letzter Ausweg: Fehler im Handler nicht nach außen eskalieren
if last_err:
self.handleError(record)
def _extract_meta(record: logging.LogRecord) -> Dict[str, Any]:
meta: Dict[str, Any] = {
"pathname": record.pathname,
"lineno": record.lineno,
"funcName": record.funcName,
"process": record.process,
"threadName": record.threadName,
}
standard = {
"name",
"msg",
"args",
"levelname",
"levelno",
"pathname",
"filename",
"module",
"exc_info",
"exc_text",
"stack_info",
"lineno",
"funcName",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
"message",
}
for k, v in record.__dict__.items():
if k not in standard:
try:
json.dumps(v)
meta[k] = v
except Exception:
meta[k] = repr(v)
if record.exc_info:
try:
meta["exc_info"] = logging.Formatter().formatException(record.exc_info)
except Exception:
pass
return meta
# ---------------------------------------------------------
# Modul-Singleton und Helper
# ---------------------------------------------------------
_EXTERNAL_INSTANCE: Optional[ExternalDBLogger] = None
_INSTANCE_LOCK = threading.Lock()
def init(
*,
connection_url: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> ExternalDBLogger:
"""
Initialisiert den globalen ExternalDBLogger. Entweder via connection_url oder via config.
Mehrfache Aufrufe liefern dieselbe Instanz zurück.
"""
global _EXTERNAL_INSTANCE
with _INSTANCE_LOCK:
if _EXTERNAL_INSTANCE is not None:
return _EXTERNAL_INSTANCE
if connection_url is None:
if not isinstance(config, dict):
raise ValueError("Entweder connection_url angeben oder config (Dict) bereitstellen.")
cfg = ExternalConfig.from_dict(config)
connection_url = cfg.to_url()
pool_size = cfg.pool_size
connect_timeout = cfg.connect_timeout
else:
# Fallback-Parameter wenn URL direkt übergeben wird
pool_size = int((config or {}).get("pool_size", 5))
connect_timeout = int((config or {}).get("connect_timeout", 10))
cfg = ExternalConfig.from_dict(config or {})
engine = create_engine(
connection_url,
pool_pre_ping=True,
pool_size=pool_size,
connect_args={}, # sslmode ist bei PG bereits in der URL verarbeitet
echo=False,
)
logger = ExternalDBLogger(engine)
logger.ensure_schema()
_EXTERNAL_INSTANCE = logger
# Speichere Fallback-Flag am Logger-Objekt (attributiv), damit Handler darauf zugreifen kann, wenn benötigt
setattr(_EXTERNAL_INSTANCE, "_fallback_to_internal_on_error", cfg.fallback_to_internal_on_error)
return _EXTERNAL_INSTANCE
def instance() -> ExternalDBLogger:
if _EXTERNAL_INSTANCE is None:
raise RuntimeError("ExternalDBLogger ist nicht initialisiert. Bitte init(...) zuerst aufrufen.")
return _EXTERNAL_INSTANCE
def get_handler(level: Union[int, str] = logging.INFO) -> logging.Handler:
if _EXTERNAL_INSTANCE is None:
raise RuntimeError("ExternalDBLogger ist nicht initialisiert. Bitte init(...) zuerst aufrufen.")
h = ExternalDBHandler(
_EXTERNAL_INSTANCE,
fallback_to_internal=bool(getattr(_EXTERNAL_INSTANCE, "_fallback_to_internal_on_error", True)),
)
if isinstance(level, str):
level = getattr(logging, level.upper(), logging.INFO)
h.setLevel(int(level))
return h
__all__ = [
"ExternalConfig",
"ExternalDBLogger",
"init",
"instance",
"get_handler",
]

View File

@@ -0,0 +1,476 @@
from __future__ import annotations
"""
SQLite-basiertes internes Logging-Modul
=======================================
Zweck
- Persistentes, leichtgewichtiges Logging in eine interne SQLite-Datenbank.
- Geeignet für interne Daten wie Hash-Werte, Status-Events, Metadaten.
Fähigkeiten
- Schema-Management (Tabellen und Indizes falls nicht vorhanden).
- Optionales Säubern der Datenbank beim Start (clean_database).
- Aufbewahrung/Retention nach Tagen (retention_days).
- Begrenzung der Gesamtanzahl (max_entries).
- Bereitstellung eines logging.Handler, der LogRecords direkt in SQLite schreibt.
- Abfrage-API mit Filtern und Paging.
Konfiguration (Beispiel, siehe Planung/Architektur.md)
logging_internal:
enabled: true
db_path: "data/internal_logs.sqlite"
clean_database: false
retention_days: 30
max_entries: 100000
vacuum_on_start: true
batch_write: 100
"""
import json
import logging
import sqlite3
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
# ---------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------
def _utc_now_iso() -> str:
"""Aktuelle UTC-Zeit als ISO 8601 mit Millisekunden und 'Z'-Suffix."""
return datetime.now(timezone.utc).astimezone(timezone.utc).replace(tzinfo=timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
def _ensure_dir_for(path: Path) -> None:
"""Erzeugt das Zielverzeichnis für eine Datei, falls erforderlich."""
path.parent.mkdir(parents=True, exist_ok=True)
def _to_iso(ts: Union[str, datetime, None]) -> Optional[str]:
if ts is None:
return None
if isinstance(ts, str):
return ts
if isinstance(ts, datetime):
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
ts = ts.astimezone(timezone.utc)
return ts.isoformat(timespec="milliseconds").replace("+00:00", "Z")
return None
# ---------------------------------------------------------
# Kern: SQLiteLogger
# ---------------------------------------------------------
@dataclass
class RetentionPolicy:
retention_days: int = 30
max_entries: int = 100_000
class SQLiteLogger:
"""
Verwaltet die SQLite-Datenbank für interne Logs.
Thread-sicher durch Lock, eine Connection pro Prozess (check_same_thread=False).
"""
def __init__(
self,
db_path: Union[str, Path],
vacuum_on_start: bool = True,
clean_database: bool = False,
retention: Optional[RetentionPolicy] = None,
) -> None:
self.db_path = Path(db_path)
self.vacuum_on_start = bool(vacuum_on_start)
self.clean_database = bool(clean_database)
self.retention = retention or RetentionPolicy()
self._lock = threading.RLock()
self._conn: Optional[sqlite3.Connection] = None
self._initialize_db()
# ---------- Public API ----------
def write(self, entry: Dict[str, Any]) -> None:
"""
Schreibt einen einzelnen Log-Eintrag.
Erwartete Keys:
- ts: ISO 8601, sonst wird automatisch gesetzt
- level: TEXT
- logger: TEXT
- message: TEXT
- meta: dict | JSON-serialisierbar | None
"""
data = self._normalize_entry(entry)
with self._lock, self._connection() as con:
con.execute(
"""
INSERT INTO logs (ts, level, logger, message, meta)
VALUES (?, ?, ?, ?, ?)
""",
(data["ts"], data.get("level"), data.get("logger"), data.get("message"), data.get("meta")),
)
con.commit()
def write_many(self, entries: Iterable[Dict[str, Any]]) -> None:
"""Batch-Insert für mehrere Einträge."""
rows: List[Tuple[str, Optional[str], Optional[str], Optional[str], Optional[str]]] = []
for e in entries:
d = self._normalize_entry(e)
rows.append((d["ts"], d.get("level"), d.get("logger"), d.get("message"), d.get("meta")))
if not rows:
return
with self._lock, self._connection() as con:
con.executemany(
"INSERT INTO logs (ts, level, logger, message, meta) VALUES (?, ?, ?, ?, ?)",
rows,
)
con.commit()
def query(
self,
*,
logger: Optional[str] = None,
level: Optional[str] = None,
from_ts: Optional[Union[str, datetime]] = None,
to_ts: Optional[Union[str, datetime]] = None,
text: Optional[str] = None,
limit: int = 100,
offset: int = 0,
order_desc: bool = True,
) -> List[Dict[str, Any]]:
"""
Liefert Log-Einträge gefiltert und paginiert zurück.
- logger: exakter Match
- level: exakter Match
- from_ts / to_ts: Grenzen (inklusive), ISO 8601 oder datetime
- text: Fulltext-ähnliche Suche per LIKE auf message
"""
clauses: List[str] = []
params: List[Any] = []
if logger:
clauses.append("logger = ?")
params.append(logger)
if level:
clauses.append("level = ?")
params.append(level)
if from_ts is not None:
v = _to_iso(from_ts) or _utc_now_iso()
clauses.append("ts >= ?")
params.append(v)
if to_ts is not None:
v = _to_iso(to_ts) or _utc_now_iso()
clauses.append("ts <= ?")
params.append(v)
if text:
clauses.append("message LIKE ?")
params.append(f"%{text}%")
where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
order = "DESC" if order_desc else "ASC"
sql = f"""
SELECT id, ts, level, logger, message, meta
FROM logs
{where}
ORDER BY ts {order}, id {order}
LIMIT ? OFFSET ?
"""
params.extend([int(max(limit, 0)), int(max(offset, 0))])
with self._lock, self._connection() as con:
cur = con.execute(sql, params)
rows = cur.fetchall() or []
out: List[Dict[str, Any]] = []
for r in rows:
meta_val: Optional[str] = r[5]
meta_obj: Optional[Any] = None
if meta_val:
try:
meta_obj = json.loads(meta_val)
except Exception:
meta_obj = meta_val # fallback: Rohwert
out.append(
{
"id": r[0],
"ts": r[1],
"level": r[2],
"logger": r[3],
"message": r[4],
"meta": meta_obj,
}
)
return out
def cleanup(self) -> None:
"""
Führt Aufräumregeln aus:
- Lösche Einträge älter als retention_days (wenn > 0).
- Reduziere auf max_entries (wenn > 0), lösche älteste zuerst.
"""
with self._lock, self._connection() as con:
# 1) Zeitbasierte Aufbewahrung
if self.retention.retention_days and self.retention.retention_days > 0:
cutoff = datetime.now(timezone.utc) - timedelta(days=int(self.retention.retention_days))
cutoff_iso = _to_iso(cutoff) or _utc_now_iso()
con.execute("DELETE FROM logs WHERE ts < ?", (cutoff_iso,))
con.commit()
# 2) Anzahl begrenzen
if self.retention.max_entries and self.retention.max_entries > 0:
cur = con.execute("SELECT COUNT(*) FROM logs")
total = int(cur.fetchone()[0])
overflow = total - int(self.retention.max_entries)
if overflow > 0:
# Lösche die ältesten N Einträge
con.execute(
"""
DELETE FROM logs
WHERE id IN (
SELECT id FROM logs
ORDER BY ts ASC, id ASC
LIMIT ?
)
""",
(overflow,),
)
con.commit()
def get_handler(self, level: Union[int, str] = logging.INFO) -> logging.Handler:
"""
Erzeugt einen logging.Handler, der LogRecords in die SQLite-DB schreibt.
Der zurückgegebene Handler ist threadsicher und kann dem Root-Logger zugewiesen werden.
"""
h = SQLiteLogHandler(self)
if isinstance(level, str):
level = getattr(logging, level.upper(), logging.INFO)
h.setLevel(int(level))
return h
# ---------- Internals ----------
def _initialize_db(self) -> None:
if self.clean_database and self.db_path.exists():
try:
self.db_path.unlink()
except FileNotFoundError:
pass
_ensure_dir_for(self.db_path)
with self._lock, self._connection() as con:
# Pragmas für Stabilität/Performance
con.execute("PRAGMA journal_mode=WAL;")
con.execute("PRAGMA synchronous=NORMAL;")
con.execute("PRAGMA foreign_keys=ON;")
con.execute("PRAGMA temp_store=MEMORY;")
# Schema
con.execute(
"""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
level TEXT,
logger TEXT,
message TEXT,
meta TEXT
)
"""
)
con.execute("CREATE INDEX IF NOT EXISTS idx_logs_ts ON logs (ts);")
con.execute("CREATE INDEX IF NOT EXISTS idx_logs_logger ON logs (logger);")
con.commit()
if self.vacuum_on_start:
# VACUUM darf nicht innerhalb einer aktiven Transaktion laufen
prev_iso = con.isolation_level
try:
con.isolation_level = None
con.execute("VACUUM;")
finally:
con.isolation_level = prev_iso
# Nach Schema-Erstellung sofort Cleanup-Regeln anwenden
self.cleanup()
def _normalize_entry(self, entry: Dict[str, Any]) -> Dict[str, Any]:
ts = _to_iso(entry.get("ts")) or _utc_now_iso()
meta_val = entry.get("meta")
if meta_val is None:
meta_json = None
else:
try:
meta_json = json.dumps(meta_val, ensure_ascii=False, separators=(",", ":"))
except Exception:
meta_json = json.dumps({"_repr": repr(meta_val)}, ensure_ascii=False)
return {
"ts": ts,
"level": str(entry.get("level")) if entry.get("level") is not None else None,
"logger": str(entry.get("logger")) if entry.get("logger") is not None else None,
"message": str(entry.get("message")) if entry.get("message") is not None else None,
"meta": meta_json,
}
def _connection(self) -> sqlite3.Connection:
if self._conn is None:
# check_same_thread=False: erlaubt Nutzung über mehrere Threads, wir schützen per Lock
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
return self._conn
# ---------------------------------------------------------
# logging.Handler-Integration
# ---------------------------------------------------------
class SQLiteLogHandler(logging.Handler):
"""
Logging-Handler, der LogRecords in die SQLite-Datenbank schreibt.
"""
def __init__(self, writer: SQLiteLogger) -> None:
super().__init__()
self._writer = writer
def emit(self, record: logging.LogRecord) -> None:
try:
# Standardfelder
payload: Dict[str, Any] = {
"ts": _utc_now_iso(),
"level": record.levelname,
"logger": record.name,
"message": self.format(record) if self.formatter else record.getMessage(),
}
# Meta: ausgewählte Felder + Extras
meta: Dict[str, Any] = {
"pathname": record.pathname,
"lineno": record.lineno,
"funcName": record.funcName,
"process": record.process,
"threadName": record.threadName,
}
# Extras erkennen: all jene keys, die nicht Standard sind
standard = {
"name",
"msg",
"args",
"levelname",
"levelno",
"pathname",
"filename",
"module",
"exc_info",
"exc_text",
"stack_info",
"lineno",
"funcName",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
"message",
}
for k, v in record.__dict__.items():
if k not in standard:
# Versuch: JSON-serialisierbar machen
try:
json.dumps(v)
meta[k] = v
except Exception:
meta[k] = repr(v)
if record.exc_info:
# Ausnahmeinformationen hinzufügen (als Text)
meta["exc_info"] = logging.Formatter().formatException(record.exc_info)
payload["meta"] = meta
self._writer.write(payload)
except Exception:
# Handler darf niemals den Prozess crashen
self.handleError(record)
# ---------------------------------------------------------
# Modulweite Singletons/Helper-Funktionen
# ---------------------------------------------------------
_logger_instance: Optional[SQLiteLogger] = None
_INSTANCE_LOCK = threading.Lock()
def init(
*,
db_path: Union[str, Path],
vacuum_on_start: bool = True,
clean_database: bool = False,
retention_days: int = 30,
max_entries: int = 100_000,
) -> SQLiteLogger:
"""
Initialisiert den globalen SQLiteLogger (Singleton pro Prozess).
Wird erneut aufgerufen, wird die bestehende Instanz zurückgegeben.
"""
global _logger_instance
with _INSTANCE_LOCK:
if _logger_instance is not None:
return _logger_instance
p = Path(db_path)
# relative Pfade sind relativ zum Arbeitsverzeichnis des Prozesses;
# in Integrationen sollte nach Bedarf auf app/-Pfad aufgelöst werden.
retention = RetentionPolicy(retention_days=int(retention_days), max_entries=int(max_entries))
_logger_instance = SQLiteLogger(
db_path=p,
vacuum_on_start=vacuum_on_start,
clean_database=clean_database,
retention=retention,
)
return _logger_instance
def instance() -> SQLiteLogger:
"""Gibt die initialisierte Instanz zurück oder wirft einen Fehler."""
if _logger_instance is None:
raise RuntimeError("SQLiteLogger ist nicht initialisiert. Bitte init(...) zuerst aufrufen.")
return _logger_instance
def get_engineered_handler(level: Union[int, str] = logging.INFO) -> logging.Handler:
"""
Liefert einen konfigurierten Handler auf Basis der globalen Instanz.
Beispiel:
from logging import getLogger
from logging_internal import init, get_engineered_handler
init(db_path='data/internal_logs.sqlite', retention_days=30, max_entries=100_000)
root = logging.getLogger()
root.addHandler(get_engineered_handler(logging.INFO))
"""
return instance().get_handler(level)
__all__ = [
"SQLiteLogger",
"RetentionPolicy",
"init",
"instance",
"get_engineered_handler",
]