This commit is contained in:
2025-11-13 13:09:07 +01:00
parent e02c3b79cc
commit 47a22ac2a6
12 changed files with 1126 additions and 6 deletions

View File

@@ -0,0 +1,46 @@
agent_api:
metadata:
version: "0.1.0"
description: "Agent Gateway für Worker + OpenAI-kompatible LLMs"
http:
base_path: "/api/agent/v1"
timeout_seconds: 60
rate_limit_per_minute: 120
enable_cors: true
auth:
api_key_header: "x-agent-api-key"
allowed_keys: []
allow_unauthenticated: true
worker:
adapter: "inline"
endpoint: null
timeout_seconds: 30
llm:
provider: "openai"
base_url: "https://api.openai.com/v1"
model: "gpt-4o-mini"
temperature: 0.2
max_tokens: 1200
api_key: null
request_timeout_seconds: 45
retry:
max_attempts: 3
backoff_seconds: 2
execution:
mode: "async"
response_timeout_seconds: 30
queue_ttl_seconds: 300
heartbeat_interval_seconds: 10
allow_long_polling: true
logging:
enabled: true
log_payloads: false
redact_fields:
- "user_input"
- "llm_response"

View File

@@ -0,0 +1,6 @@
"""Initialisierungspaket für das Agenten-API-Modul."""
from __future__ import annotations
from .router import agent_router
__all__ = ["agent_router"]

View File

@@ -0,0 +1,130 @@
"""Konfigurationslader für das Agenten-API-Modul."""
from __future__ import annotations
import copy
import os
import threading
from pathlib import Path
from typing import Any, Dict, Mapping, cast
import yaml
_ENV_LLM_KEY = "AGENT_API_LLM_KEY"
_DEFAULT_CONFIG: Dict[str, Any] = {
"agent_api": {
"metadata": {
"version": "0.1.0",
"description": "Agent Gateway für Worker + OpenAI-kompatible LLMs",
},
"http": {
"base_path": "/api/agent/v1",
"timeout_seconds": 60,
"rate_limit_per_minute": 120,
"enable_cors": True,
},
"auth": {
"api_key_header": "x-agent-api-key",
"allowed_keys": [],
"allow_unauthenticated": True,
},
"worker": {
"adapter": "inline",
"endpoint": None,
"timeout_seconds": 30,
},
"llm": {
"provider": "openai",
"base_url": "https://api.openai.com/v1",
"model": "gpt-4o-mini",
"temperature": 0.2,
"max_tokens": 1_200,
"api_key": None,
"request_timeout_seconds": 45,
"retry": {
"max_attempts": 3,
"backoff_seconds": 2,
},
},
"execution": {
"mode": "async",
"response_timeout_seconds": 30,
"queue_ttl_seconds": 300,
"heartbeat_interval_seconds": 10,
"allow_long_polling": True,
},
"logging": {
"enabled": True,
"log_payloads": False,
"redact_fields": ["user_input", "llm_response"],
},
}
}
_config_cache: Dict[str, Any] | None = None
_CACHE_LOCK = threading.Lock()
def _config_path() -> Path:
base_dir = Path(__file__).resolve().parents[2]
return base_dir / "config" / "agent_api.yaml"
def _load_file(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
with path.open("r", encoding="utf-8") as handle:
raw_data: Any = yaml.safe_load(handle) or {}
if not isinstance(raw_data, dict):
raise ValueError(f"Ungültiges Konfigurationsformat in {path}")
return cast(Dict[str, Any], raw_data)
def _deep_merge(base: Dict[str, Any], override: Mapping[str, Any]) -> Dict[str, Any]:
merged: Dict[str, Any] = copy.deepcopy(base)
for key, value in override.items():
if (
key in merged
and isinstance(merged[key], dict)
and isinstance(value, Mapping)
):
merged[key] = _deep_merge(merged[key], value) # type: ignore[arg-type]
else:
merged[key] = copy.deepcopy(value)
return merged
def _apply_env_overrides(config: Dict[str, Any]) -> None:
api_key = os.getenv(_ENV_LLM_KEY)
if api_key:
config.setdefault("agent_api", {}).setdefault("llm", {})["api_key"] = api_key
def _load_config_uncached() -> Dict[str, Any]:
file_data = _load_file(_config_path())
merged = _deep_merge(_DEFAULT_CONFIG, file_data)
_apply_env_overrides(merged)
return merged
def get_config(force_reload: bool = False) -> Dict[str, Any]:
"""
Liefert die Agenten-API-Konfiguration als Dictionary.
Args:
force_reload: Wenn True, wird die Datei erneut eingelesen.
Returns:
Dict[str, Any]: Zusammengeführte Konfiguration aus Defaults,
YAML-Datei und ENV-Overrides.
"""
global _config_cache
if force_reload:
with _CACHE_LOCK:
_config_cache = _load_config_uncached()
return copy.deepcopy(_config_cache)
if _config_cache is None:
with _CACHE_LOCK:
if _config_cache is None:
_config_cache = _load_config_uncached()
return copy.deepcopy(_config_cache)

View File

@@ -0,0 +1,113 @@
"""HTTP-basierter LLM-Client für das Agenten-API-Modul."""
from __future__ import annotations
import json
import time
from typing import Any, Dict, Mapping, Optional
import httpx
from .config import get_config
class LLMClient:
"""
Kapselt Aufrufe an ein OpenAI-kompatibles REST-Interface.
Die Implementierung unterstützt einfache Retries, Timeout-Handling sowie
einen deterministischen Stub, falls kein API-Key konfiguriert wurde.
"""
def __init__(self, config: Optional[Mapping[str, Any]] = None) -> None:
cfg = dict(config or get_config())
agent_cfg = cfg.get("agent_api", {})
llm_cfg: Dict[str, Any] = dict(agent_cfg.get("llm", {}))
self.base_url: str = str(llm_cfg.get("base_url", "https://api.openai.com/v1")).rstrip("/")
self.model: str = str(llm_cfg.get("model", "gpt-4o-mini"))
self.temperature: float = float(llm_cfg.get("temperature", 0.2))
self.max_tokens: int = int(llm_cfg.get("max_tokens", 1_200))
self.api_key: Optional[str] = llm_cfg.get("api_key")
self.request_timeout_seconds: float = float(llm_cfg.get("request_timeout_seconds", 45))
retry_cfg: Dict[str, Any] = dict(llm_cfg.get("retry", {}))
self.max_attempts: int = max(1, int(retry_cfg.get("max_attempts", 3)))
self.backoff_seconds: float = float(retry_cfg.get("backoff_seconds", 2))
def generate(self, prompt: Dict[str, Any]) -> Dict[str, Any]:
"""
Sendet den Prompt an den konfigurierten LLM-Endpunkt oder liefert eine Stub-Antwort.
Args:
prompt: Strukturierter Prompt, erzeugt durch den WorkerAdapter.
Returns:
Dict[str, Any]: Antwort-Payload, der den OpenAI-ähnlichen Response widerspiegelt.
"""
if not self.api_key:
return self._local_stub(prompt)
url = f"{self.base_url}/chat/completions"
payload = {
"model": self.model,
"messages": self._build_messages(prompt),
"temperature": self.temperature,
"max_tokens": self.max_tokens,
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
last_error: Optional[BaseException] = None
for attempt in range(1, self.max_attempts + 1):
try:
with httpx.Client(timeout=self.request_timeout_seconds) as client:
response = client.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except (httpx.HTTPError, httpx.TimeoutException) as exc:
last_error = exc
if attempt >= self.max_attempts:
break
time.sleep(self.backoff_seconds)
if last_error:
raise RuntimeError(f"LLM request failed after {self.max_attempts} attempts") from last_error
raise RuntimeError("LLM request failed unexpectedly without additional context")
@staticmethod
def _build_messages(prompt: Mapping[str, Any]) -> Any:
user_input = str(prompt.get("user_input", ""))
context = prompt.get("context") or {}
metadata = prompt.get("metadata") or {}
parts = [
{"role": "system", "content": json.dumps({"context": context, "metadata": metadata})},
{"role": "user", "content": user_input},
]
return parts
@staticmethod
def _local_stub(prompt: Mapping[str, Any]) -> Dict[str, Any]:
user_input = prompt.get("user_input", "")
return {
"id": "local_stub_completion",
"object": "chat.completion",
"created": int(time.time()),
"model": "local-stub",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": f"[local_stub] Echo: {user_input}",
},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": len(json.dumps(prompt)),
"completion_tokens": len(str(user_input)),
"total_tokens": len(json.dumps(prompt)) + len(str(user_input)),
},
"meta": {"note": "local_stub"},
}

View File

@@ -0,0 +1,49 @@
"""Pydantic-Modelle für das Agenten-API-Modul."""
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
class TaskCreate(BaseModel):
"""
Repräsentiert den Payload zur Erstellung eines neuen Agenten-Tasks.
"""
user_input: str = Field(..., description="Freitext-Anfrage oder Prompt des Nutzers.")
context: Optional[Dict[str, Any]] = Field(
None, description="Optionaler Kontext mit zusätzlichen strukturierten Daten."
)
metadata: Optional[Dict[str, Any]] = Field(
None, description="Beliebige Metadaten zur Nachverfolgung von Tasks."
)
sync: Optional[bool] = Field(
None,
description=(
"Optionaler Override, um den Task synchron zu verarbeiten, "
"selbst wenn die Ausführung standardmäßig asynchron erfolgt."
),
)
class TaskStatusResponse(BaseModel):
"""
Antwortmodell für den Status eines bestehenden Tasks.
"""
task_id: str = Field(..., description="Eindeutige Kennung des Tasks.")
status: str = Field(..., description="Aktueller Status des Tasks.")
result: Optional[Dict[str, Any]] = Field(
None,
description="Ergebnisdaten des Tasks, sobald verfügbar.",
)
expires_at: Optional[datetime] = Field(
None,
description="Zeitpunkt, an dem der Task automatisch ausläuft.",
)
retry_after: Optional[int] = Field(
None,
description="Empfohlene Wartezeit in Sekunden für den nächsten Statusabruf.",
)

View File

@@ -0,0 +1,88 @@
"""FastAPI-Router für das Agenten-API-Modul."""
from __future__ import annotations
from typing import Dict, Optional
from fastapi import APIRouter, HTTPException, Response, status
from fastapi.responses import JSONResponse
from .models import TaskCreate
from .service import AgentService
agent_router = APIRouter(prefix="/agent/v1", tags=["agent_api"])
_service = AgentService()
def _build_response(
payload: Dict[str, Optional[str]],
status_code: int,
retry_after: Optional[int] = None,
) -> JSONResponse:
headers = {}
if retry_after is not None:
headers["Retry-After"] = str(retry_after)
return JSONResponse(content=payload, status_code=status_code, headers=headers)
@agent_router.post(
"/tasks",
response_model_exclude_none=True,
summary="Neuen Task einreichen",
)
async def submit_task(task: TaskCreate) -> JSONResponse:
"""
Legt einen neuen Task an und startet die Verarbeitung.
"""
result = _service.submit_task(task)
response_payload = result.response.model_dump()
if result.detail:
response_payload["detail"] = result.detail
return _build_response(response_payload, result.status_code, result.response.retry_after)
@agent_router.get(
"/tasks/{task_id}",
response_model_exclude_none=True,
summary="Status eines Tasks abrufen",
)
async def get_status(task_id: str) -> JSONResponse:
"""
Liefert den Status eines Tasks.
"""
status_map = {
"processing": status.HTTP_202_ACCEPTED,
"succeeded": status.HTTP_200_OK,
"failed": status.HTTP_500_INTERNAL_SERVER_ERROR,
"cancelled": status.HTTP_409_CONFLICT,
"expired": status.HTTP_410_GONE,
}
try:
status_response = _service.get_status(task_id)
except KeyError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
status_code = status_map.get(status_response.status, status.HTTP_200_OK)
payload = status_response.model_dump()
if status_response.status in {"failed", "cancelled", "expired"}:
payload.setdefault("detail", status_response.status)
return _build_response(payload, status_code, status_response.retry_after)
@agent_router.post(
"/tasks/{task_id}/cancel",
response_model_exclude_none=True,
summary="Laufenden Task abbrechen",
)
async def cancel_task(task_id: str) -> JSONResponse:
"""
Markiert einen laufenden Task als abgebrochen.
"""
try:
status_response = _service.cancel_task(task_id)
except KeyError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
payload = status_response.model_dump()
payload.setdefault("detail", "task_cancelled")
return _build_response(payload, status.HTTP_200_OK, status_response.retry_after)

View File

@@ -0,0 +1,310 @@
"""Service-Schicht für das Agenten-API-Modul."""
from __future__ import annotations
import threading
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Mapping, Optional
from .config import get_config
from .llm_client import LLMClient
from .models import TaskCreate, TaskStatusResponse
from .worker_adapter import WorkerAdapter
_PROCESSING_STATUS = "processing"
_SUCCEEDED_STATUS = "succeeded"
_FAILED_STATUS = "failed"
_CANCELLED_STATUS = "cancelled"
_EXPIRED_STATUS = "expired"
@dataclass
class _TaskEntry:
payload: TaskCreate
status: str = _PROCESSING_STATUS
result: Optional[Dict[str, Any]] = None
created_at: datetime = field(
default_factory=lambda: datetime.now(timezone.utc)
)
expires_at: datetime = field(
default_factory=lambda: datetime.now(timezone.utc) + timedelta(minutes=5)
)
last_heartbeat: datetime = field(
default_factory=lambda: datetime.now(timezone.utc)
)
detail: Optional[str] = None
retry_after: Optional[int] = None
error: Optional[str] = None
def to_response(self, task_id: str) -> TaskStatusResponse:
return TaskStatusResponse(
task_id=task_id,
status=self.status,
result=self.result,
expires_at=self.expires_at if self.expires_at.tzinfo else self.expires_at.replace(tzinfo=timezone.utc),
retry_after=self.retry_after,
)
@dataclass
class SubmitResult:
response: TaskStatusResponse
status_code: int
detail: Optional[str] = None
class AgentService:
"""
Koordiniert die Lebenszyklen von Tasks, Worker-Adapter und LLM-Client.
"""
def __init__(
self,
*,
worker_adapter: Optional[WorkerAdapter] = None,
llm_client: Optional[LLMClient] = None,
) -> None:
self._lock = threading.RLock()
self._tasks: Dict[str, _TaskEntry] = {}
self._worker = worker_adapter or WorkerAdapter()
self._llm_client = llm_client or LLMClient()
# ------------------------------------------------------------------ #
# Öffentliche API
# ------------------------------------------------------------------ #
def submit_task(self, payload: TaskCreate) -> SubmitResult:
"""
Legt einen neuen Task an und triggert die Verarbeitung entsprechend
der konfigurierten Ausführungsstrategie.
"""
config = self._current_config()
execution_cfg = self._execution_config(config)
queue_ttl = int(execution_cfg.get("queue_ttl_seconds", 300))
retry_after = int(execution_cfg.get("heartbeat_interval_seconds", 10))
timeout_seconds = float(execution_cfg.get("response_timeout_seconds", 30))
default_mode = str(execution_cfg.get("mode", "async")).lower()
mode = default_mode
if payload.sync is True:
mode = "sync"
elif payload.sync is False:
mode = "async"
now = datetime.now(timezone.utc)
task_id = str(uuid.uuid4())
entry = _TaskEntry(
payload=payload,
status=_PROCESSING_STATUS,
created_at=now,
expires_at=now + timedelta(seconds=queue_ttl),
last_heartbeat=now,
retry_after=retry_after,
)
with self._lock:
self._tasks[task_id] = entry
thread = threading.Thread(
target=self.process_task,
args=(task_id,),
name=f"agent-task-{task_id}",
daemon=True,
)
thread.start()
if mode == "async":
return SubmitResult(
response=entry.to_response(task_id),
status_code=202,
detail=None,
)
# synchroner Modus
thread.join(timeout_seconds)
if thread.is_alive():
return SubmitResult(
response=self._task_response(task_id),
status_code=408,
detail="still_running",
)
entry = self._task_entry(task_id)
if entry.status == _SUCCEEDED_STATUS:
return SubmitResult(
response=entry.to_response(task_id),
status_code=200,
detail=None,
)
if entry.status == _FAILED_STATUS:
return SubmitResult(
response=entry.to_response(task_id),
status_code=500,
detail=entry.detail or "task_failed",
)
if entry.status == _CANCELLED_STATUS:
return SubmitResult(
response=entry.to_response(task_id),
status_code=409,
detail=entry.detail or "task_cancelled",
)
if entry.status == _EXPIRED_STATUS:
return SubmitResult(
response=entry.to_response(task_id),
status_code=410,
detail=entry.detail or "task_expired",
)
return SubmitResult(
response=entry.to_response(task_id),
status_code=202,
detail=None,
)
def process_task(self, task_id: str) -> None:
"""
Führt den Worker- und LLM-Workflow für einen gegebenen Task aus.
"""
try:
entry = self._task_entry(task_id)
except KeyError:
return
if entry.status in {_CANCELLED_STATUS, _EXPIRED_STATUS}:
return
self.update_heartbeat(task_id)
try:
prepared = self._worker.pre_process(entry.payload)
prompt = self._worker.run_task(prepared)
llm_response = self._llm_client.generate(prompt)
result_payload = {
"prepared_prompt": prompt,
"llm_response": llm_response,
}
now = datetime.now(timezone.utc)
with self._lock:
stored = self._tasks.get(task_id)
if stored is None or stored.status in {_CANCELLED_STATUS, _EXPIRED_STATUS}:
return
stored.status = _SUCCEEDED_STATUS
stored.result = result_payload
stored.last_heartbeat = now
stored.detail = None
stored.retry_after = None
except Exception as exc: # pragma: no cover - Fehlerpfad
now = datetime.now(timezone.utc)
with self._lock:
stored = self._tasks.get(task_id)
if stored is None:
return
if stored.status not in {_CANCELLED_STATUS, _EXPIRED_STATUS}:
stored.status = _FAILED_STATUS
stored.result = {"error": str(exc)}
stored.detail = exc.__class__.__name__
stored.error = repr(exc)
stored.last_heartbeat = now
stored.retry_after = None
finally:
self._finalize_task(task_id)
def get_status(self, task_id: str) -> TaskStatusResponse:
"""
Liefert den aktuellen Status eines Tasks.
"""
entry = self._task_entry(task_id)
self._evaluate_expiration(task_id, entry)
return self._task_response(task_id)
def cancel_task(self, task_id: str) -> TaskStatusResponse:
"""
Markiert einen laufenden Task als abgebrochen.
"""
entry = self._task_entry(task_id)
with self._lock:
if entry.status in {_SUCCEEDED_STATUS, _FAILED_STATUS, _EXPIRED_STATUS}:
return entry.to_response(task_id)
entry.status = _CANCELLED_STATUS
entry.result = entry.result or {"detail": "Task cancelled by client"}
entry.detail = "task_cancelled"
entry.last_heartbeat = datetime.now(timezone.utc)
entry.retry_after = None
return self._task_response(task_id)
def update_heartbeat(self, task_id: str) -> None:
"""
Aktualisiert den Heartbeat eines Tasks und verlängert dessen TTL.
"""
entry = self._task_entry(task_id)
config = self._current_config()
execution_cfg = self._execution_config(config)
queue_ttl = int(execution_cfg.get("queue_ttl_seconds", 300))
now = datetime.now(timezone.utc)
with self._lock:
stored = self._tasks.get(task_id)
if stored is None:
return
stored.last_heartbeat = now
stored.expires_at = now + timedelta(seconds=queue_ttl)
# ------------------------------------------------------------------ #
# Interne Hilfsfunktionen
# ------------------------------------------------------------------ #
def _task_entry(self, task_id: str) -> _TaskEntry:
with self._lock:
entry = self._tasks.get(task_id)
if entry is None:
raise KeyError(f"Task '{task_id}' not found")
return entry
def _task_response(self, task_id: str) -> TaskStatusResponse:
entry = self._task_entry(task_id)
return entry.to_response(task_id)
def _finalize_task(self, task_id: str) -> None:
entry = self._task_entry(task_id)
self._evaluate_expiration(task_id, entry)
def _evaluate_expiration(self, task_id: str, entry: _TaskEntry) -> None:
now = datetime.now(timezone.utc)
if entry.status in {_SUCCEEDED_STATUS, _FAILED_STATUS, _CANCELLED_STATUS}:
return
if now > entry.expires_at:
with self._lock:
stored = self._tasks.get(task_id)
if stored is None:
return
stored.status = _EXPIRED_STATUS
stored.result = stored.result or {"detail": "Task expired"}
stored.detail = "task_expired"
stored.last_heartbeat = now
stored.retry_after = None
@staticmethod
def _current_config() -> Dict[str, Any]:
return get_config()
@staticmethod
def _execution_config(config: Mapping[str, Any]) -> Mapping[str, Any]:
agent_cfg = config.get("agent_api", {})
return agent_cfg.get("execution", {})
# ------------------------------------------------------------------ #
# Debug-/Test-Hilfen
# ------------------------------------------------------------------ #
def clear_tasks(self) -> None:
"""
Löscht alle gemerkten Tasks. Hilfreich für Tests.
"""
with self._lock:
self._tasks.clear()
def list_tasks(self) -> Dict[str, _TaskEntry]:
"""
Gibt einen Snapshot der aktuellen Tasks zurück.
"""
with self._lock:
return dict(self._tasks)

View File

@@ -0,0 +1,50 @@
"""Worker-Adapter für das Agenten-API-Modul."""
from __future__ import annotations
import datetime as _dt
from typing import Any, Dict
from .models import TaskCreate
class WorkerAdapter:
"""
Einfacher Inline-Worker-Adapter.
Die Implementierung dient als Stub für spätere Erweiterungen. Sie nimmt den
eingehenden Payload entgegen, reichert diesen mit Metadaten an und liefert
einen Prompt für den LLM-Client zurück.
"""
@staticmethod
def pre_process(payload: TaskCreate) -> Dict[str, Any]:
"""
Bereitet den Task-Payload für den LLM-Aufruf vor.
Args:
payload: Valider TaskCreate-Payload.
Returns:
Dict[str, Any]: Strukturierter Prompt inklusive Metadaten.
"""
timestamp = _dt.datetime.now(_dt.timezone.utc)
prepared_prompt: Dict[str, Any] = {
"user_input": payload.user_input,
"context": payload.context or {},
"metadata": payload.metadata or {},
"created_at": timestamp.isoformat(),
}
return prepared_prompt
@staticmethod
def run_task(prepared: Dict[str, Any]) -> Dict[str, Any]:
"""
Führt optionale Worker-Logik aus und liefert den Prompt für den LLM-Client.
Args:
prepared: Vom WorkerAdapter.pre_process erzeugter Prompt.
Returns:
Dict[str, Any]: Prompt für den LLM-Client.
"""
return prepared