From 47a22ac2a67b1afa2798f285e7beb40734d7020c Mon Sep 17 00:00:00 2001 From: madgerm Date: Thu, 13 Nov 2025 13:09:07 +0100 Subject: [PATCH] new --- .../Entworfener_Code/app/code/app/main.py | 8 + .../app/config/agent_api.yaml | 46 +++ .../app/src/agent_api/__init__.py | 6 + .../app/src/agent_api/config.py | 130 ++++++++ .../app/src/agent_api/llm_client.py | 113 +++++++ .../app/src/agent_api/models.py | 49 +++ .../app/src/agent_api/router.py | 88 +++++ .../app/src/agent_api/service.py | 310 ++++++++++++++++++ .../app/src/agent_api/worker_adapter.py | 50 +++ 01_Modulerweiterungen/Planung/Agenten_API.md | 247 ++++++++++++++ git_sync.sh | 72 ++++ roadmap_readme.md | 13 +- 12 files changed, 1126 insertions(+), 6 deletions(-) create mode 100644 01_Modulerweiterungen/Entworfener_Code/app/config/agent_api.yaml create mode 100644 01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/__init__.py create mode 100644 01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/config.py create mode 100644 01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/llm_client.py create mode 100644 01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/models.py create mode 100644 01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/router.py create mode 100644 01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/service.py create mode 100644 01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/worker_adapter.py create mode 100644 01_Modulerweiterungen/Planung/Agenten_API.md create mode 100755 git_sync.sh diff --git a/00_Globale_Richtlinien/Entworfener_Code/app/code/app/main.py b/00_Globale_Richtlinien/Entworfener_Code/app/code/app/main.py index a37c1aa..30339ea 100644 --- a/00_Globale_Richtlinien/Entworfener_Code/app/code/app/main.py +++ b/00_Globale_Richtlinien/Entworfener_Code/app/code/app/main.py @@ -49,6 +49,14 @@ def create_app(config: Optional[Dict[str, Any]] = None) -> FastAPI: # Router existiert evtl. noch nicht beim ersten Scaffold pass + try: + from agent_api.router import agent_router + + app.include_router(agent_router, prefix="/api") + except Exception: + # Agenten-API ist optional und wird bei fehlender Implementierung ignoriert + pass + @app.get("/health", tags=["health"]) async def health() -> Dict[str, str]: """ diff --git a/01_Modulerweiterungen/Entworfener_Code/app/config/agent_api.yaml b/01_Modulerweiterungen/Entworfener_Code/app/config/agent_api.yaml new file mode 100644 index 0000000..44aa776 --- /dev/null +++ b/01_Modulerweiterungen/Entworfener_Code/app/config/agent_api.yaml @@ -0,0 +1,46 @@ +agent_api: + metadata: + version: "0.1.0" + description: "Agent Gateway für Worker + OpenAI-kompatible LLMs" + + http: + base_path: "/api/agent/v1" + timeout_seconds: 60 + rate_limit_per_minute: 120 + enable_cors: true + + auth: + api_key_header: "x-agent-api-key" + allowed_keys: [] + allow_unauthenticated: true + + worker: + adapter: "inline" + endpoint: null + timeout_seconds: 30 + + llm: + provider: "openai" + base_url: "https://api.openai.com/v1" + model: "gpt-4o-mini" + temperature: 0.2 + max_tokens: 1200 + api_key: null + request_timeout_seconds: 45 + retry: + max_attempts: 3 + backoff_seconds: 2 + + execution: + mode: "async" + response_timeout_seconds: 30 + queue_ttl_seconds: 300 + heartbeat_interval_seconds: 10 + allow_long_polling: true + + logging: + enabled: true + log_payloads: false + redact_fields: + - "user_input" + - "llm_response" \ No newline at end of file diff --git a/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/__init__.py b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/__init__.py new file mode 100644 index 0000000..d073767 --- /dev/null +++ b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/__init__.py @@ -0,0 +1,6 @@ +"""Initialisierungspaket für das Agenten-API-Modul.""" +from __future__ import annotations + +from .router import agent_router + +__all__ = ["agent_router"] \ No newline at end of file diff --git a/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/config.py b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/config.py new file mode 100644 index 0000000..222d142 --- /dev/null +++ b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/config.py @@ -0,0 +1,130 @@ +"""Konfigurationslader für das Agenten-API-Modul.""" +from __future__ import annotations + +import copy +import os +import threading +from pathlib import Path +from typing import Any, Dict, Mapping, cast + +import yaml + +_ENV_LLM_KEY = "AGENT_API_LLM_KEY" + +_DEFAULT_CONFIG: Dict[str, Any] = { + "agent_api": { + "metadata": { + "version": "0.1.0", + "description": "Agent Gateway für Worker + OpenAI-kompatible LLMs", + }, + "http": { + "base_path": "/api/agent/v1", + "timeout_seconds": 60, + "rate_limit_per_minute": 120, + "enable_cors": True, + }, + "auth": { + "api_key_header": "x-agent-api-key", + "allowed_keys": [], + "allow_unauthenticated": True, + }, + "worker": { + "adapter": "inline", + "endpoint": None, + "timeout_seconds": 30, + }, + "llm": { + "provider": "openai", + "base_url": "https://api.openai.com/v1", + "model": "gpt-4o-mini", + "temperature": 0.2, + "max_tokens": 1_200, + "api_key": None, + "request_timeout_seconds": 45, + "retry": { + "max_attempts": 3, + "backoff_seconds": 2, + }, + }, + "execution": { + "mode": "async", + "response_timeout_seconds": 30, + "queue_ttl_seconds": 300, + "heartbeat_interval_seconds": 10, + "allow_long_polling": True, + }, + "logging": { + "enabled": True, + "log_payloads": False, + "redact_fields": ["user_input", "llm_response"], + }, + } +} + +_config_cache: Dict[str, Any] | None = None +_CACHE_LOCK = threading.Lock() + + +def _config_path() -> Path: + base_dir = Path(__file__).resolve().parents[2] + return base_dir / "config" / "agent_api.yaml" + + +def _load_file(path: Path) -> Dict[str, Any]: + if not path.exists(): + return {} + with path.open("r", encoding="utf-8") as handle: + raw_data: Any = yaml.safe_load(handle) or {} + if not isinstance(raw_data, dict): + raise ValueError(f"Ungültiges Konfigurationsformat in {path}") + return cast(Dict[str, Any], raw_data) + + +def _deep_merge(base: Dict[str, Any], override: Mapping[str, Any]) -> Dict[str, Any]: + merged: Dict[str, Any] = copy.deepcopy(base) + for key, value in override.items(): + if ( + key in merged + and isinstance(merged[key], dict) + and isinstance(value, Mapping) + ): + merged[key] = _deep_merge(merged[key], value) # type: ignore[arg-type] + else: + merged[key] = copy.deepcopy(value) + return merged + + +def _apply_env_overrides(config: Dict[str, Any]) -> None: + api_key = os.getenv(_ENV_LLM_KEY) + if api_key: + config.setdefault("agent_api", {}).setdefault("llm", {})["api_key"] = api_key + + +def _load_config_uncached() -> Dict[str, Any]: + file_data = _load_file(_config_path()) + merged = _deep_merge(_DEFAULT_CONFIG, file_data) + _apply_env_overrides(merged) + return merged + + +def get_config(force_reload: bool = False) -> Dict[str, Any]: + """ + Liefert die Agenten-API-Konfiguration als Dictionary. + + Args: + force_reload: Wenn True, wird die Datei erneut eingelesen. + + Returns: + Dict[str, Any]: Zusammengeführte Konfiguration aus Defaults, + YAML-Datei und ENV-Overrides. + """ + global _config_cache + if force_reload: + with _CACHE_LOCK: + _config_cache = _load_config_uncached() + return copy.deepcopy(_config_cache) + if _config_cache is None: + with _CACHE_LOCK: + if _config_cache is None: + _config_cache = _load_config_uncached() + return copy.deepcopy(_config_cache) \ No newline at end of file diff --git a/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/llm_client.py b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/llm_client.py new file mode 100644 index 0000000..52dfd2a --- /dev/null +++ b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/llm_client.py @@ -0,0 +1,113 @@ +"""HTTP-basierter LLM-Client für das Agenten-API-Modul.""" +from __future__ import annotations + +import json +import time +from typing import Any, Dict, Mapping, Optional + +import httpx + +from .config import get_config + + +class LLMClient: + """ + Kapselt Aufrufe an ein OpenAI-kompatibles REST-Interface. + + Die Implementierung unterstützt einfache Retries, Timeout-Handling sowie + einen deterministischen Stub, falls kein API-Key konfiguriert wurde. + """ + + def __init__(self, config: Optional[Mapping[str, Any]] = None) -> None: + cfg = dict(config or get_config()) + agent_cfg = cfg.get("agent_api", {}) + llm_cfg: Dict[str, Any] = dict(agent_cfg.get("llm", {})) + + self.base_url: str = str(llm_cfg.get("base_url", "https://api.openai.com/v1")).rstrip("/") + self.model: str = str(llm_cfg.get("model", "gpt-4o-mini")) + self.temperature: float = float(llm_cfg.get("temperature", 0.2)) + self.max_tokens: int = int(llm_cfg.get("max_tokens", 1_200)) + self.api_key: Optional[str] = llm_cfg.get("api_key") + self.request_timeout_seconds: float = float(llm_cfg.get("request_timeout_seconds", 45)) + retry_cfg: Dict[str, Any] = dict(llm_cfg.get("retry", {})) + self.max_attempts: int = max(1, int(retry_cfg.get("max_attempts", 3))) + self.backoff_seconds: float = float(retry_cfg.get("backoff_seconds", 2)) + + def generate(self, prompt: Dict[str, Any]) -> Dict[str, Any]: + """ + Sendet den Prompt an den konfigurierten LLM-Endpunkt oder liefert eine Stub-Antwort. + + Args: + prompt: Strukturierter Prompt, erzeugt durch den WorkerAdapter. + + Returns: + Dict[str, Any]: Antwort-Payload, der den OpenAI-ähnlichen Response widerspiegelt. + """ + if not self.api_key: + return self._local_stub(prompt) + + url = f"{self.base_url}/chat/completions" + payload = { + "model": self.model, + "messages": self._build_messages(prompt), + "temperature": self.temperature, + "max_tokens": self.max_tokens, + } + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + last_error: Optional[BaseException] = None + for attempt in range(1, self.max_attempts + 1): + try: + with httpx.Client(timeout=self.request_timeout_seconds) as client: + response = client.post(url, headers=headers, json=payload) + response.raise_for_status() + return response.json() + except (httpx.HTTPError, httpx.TimeoutException) as exc: + last_error = exc + if attempt >= self.max_attempts: + break + time.sleep(self.backoff_seconds) + + if last_error: + raise RuntimeError(f"LLM request failed after {self.max_attempts} attempts") from last_error + raise RuntimeError("LLM request failed unexpectedly without additional context") + + @staticmethod + def _build_messages(prompt: Mapping[str, Any]) -> Any: + user_input = str(prompt.get("user_input", "")) + context = prompt.get("context") or {} + metadata = prompt.get("metadata") or {} + parts = [ + {"role": "system", "content": json.dumps({"context": context, "metadata": metadata})}, + {"role": "user", "content": user_input}, + ] + return parts + + @staticmethod + def _local_stub(prompt: Mapping[str, Any]) -> Dict[str, Any]: + user_input = prompt.get("user_input", "") + return { + "id": "local_stub_completion", + "object": "chat.completion", + "created": int(time.time()), + "model": "local-stub", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": f"[local_stub] Echo: {user_input}", + }, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": len(json.dumps(prompt)), + "completion_tokens": len(str(user_input)), + "total_tokens": len(json.dumps(prompt)) + len(str(user_input)), + }, + "meta": {"note": "local_stub"}, + } \ No newline at end of file diff --git a/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/models.py b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/models.py new file mode 100644 index 0000000..d42931f --- /dev/null +++ b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/models.py @@ -0,0 +1,49 @@ +"""Pydantic-Modelle für das Agenten-API-Modul.""" +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, Optional + +from pydantic import BaseModel, Field + + +class TaskCreate(BaseModel): + """ + Repräsentiert den Payload zur Erstellung eines neuen Agenten-Tasks. + """ + + user_input: str = Field(..., description="Freitext-Anfrage oder Prompt des Nutzers.") + context: Optional[Dict[str, Any]] = Field( + None, description="Optionaler Kontext mit zusätzlichen strukturierten Daten." + ) + metadata: Optional[Dict[str, Any]] = Field( + None, description="Beliebige Metadaten zur Nachverfolgung von Tasks." + ) + sync: Optional[bool] = Field( + None, + description=( + "Optionaler Override, um den Task synchron zu verarbeiten, " + "selbst wenn die Ausführung standardmäßig asynchron erfolgt." + ), + ) + + +class TaskStatusResponse(BaseModel): + """ + Antwortmodell für den Status eines bestehenden Tasks. + """ + + task_id: str = Field(..., description="Eindeutige Kennung des Tasks.") + status: str = Field(..., description="Aktueller Status des Tasks.") + result: Optional[Dict[str, Any]] = Field( + None, + description="Ergebnisdaten des Tasks, sobald verfügbar.", + ) + expires_at: Optional[datetime] = Field( + None, + description="Zeitpunkt, an dem der Task automatisch ausläuft.", + ) + retry_after: Optional[int] = Field( + None, + description="Empfohlene Wartezeit in Sekunden für den nächsten Statusabruf.", + ) \ No newline at end of file diff --git a/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/router.py b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/router.py new file mode 100644 index 0000000..3fbba21 --- /dev/null +++ b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/router.py @@ -0,0 +1,88 @@ +"""FastAPI-Router für das Agenten-API-Modul.""" +from __future__ import annotations + +from typing import Dict, Optional + +from fastapi import APIRouter, HTTPException, Response, status +from fastapi.responses import JSONResponse + +from .models import TaskCreate +from .service import AgentService + +agent_router = APIRouter(prefix="/agent/v1", tags=["agent_api"]) + +_service = AgentService() + + +def _build_response( + payload: Dict[str, Optional[str]], + status_code: int, + retry_after: Optional[int] = None, +) -> JSONResponse: + headers = {} + if retry_after is not None: + headers["Retry-After"] = str(retry_after) + return JSONResponse(content=payload, status_code=status_code, headers=headers) + + +@agent_router.post( + "/tasks", + response_model_exclude_none=True, + summary="Neuen Task einreichen", +) +async def submit_task(task: TaskCreate) -> JSONResponse: + """ + Legt einen neuen Task an und startet die Verarbeitung. + """ + result = _service.submit_task(task) + response_payload = result.response.model_dump() + if result.detail: + response_payload["detail"] = result.detail + return _build_response(response_payload, result.status_code, result.response.retry_after) + + +@agent_router.get( + "/tasks/{task_id}", + response_model_exclude_none=True, + summary="Status eines Tasks abrufen", +) +async def get_status(task_id: str) -> JSONResponse: + """ + Liefert den Status eines Tasks. + """ + status_map = { + "processing": status.HTTP_202_ACCEPTED, + "succeeded": status.HTTP_200_OK, + "failed": status.HTTP_500_INTERNAL_SERVER_ERROR, + "cancelled": status.HTTP_409_CONFLICT, + "expired": status.HTTP_410_GONE, + } + try: + status_response = _service.get_status(task_id) + except KeyError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc + + status_code = status_map.get(status_response.status, status.HTTP_200_OK) + payload = status_response.model_dump() + if status_response.status in {"failed", "cancelled", "expired"}: + payload.setdefault("detail", status_response.status) + return _build_response(payload, status_code, status_response.retry_after) + + +@agent_router.post( + "/tasks/{task_id}/cancel", + response_model_exclude_none=True, + summary="Laufenden Task abbrechen", +) +async def cancel_task(task_id: str) -> JSONResponse: + """ + Markiert einen laufenden Task als abgebrochen. + """ + try: + status_response = _service.cancel_task(task_id) + except KeyError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc + + payload = status_response.model_dump() + payload.setdefault("detail", "task_cancelled") + return _build_response(payload, status.HTTP_200_OK, status_response.retry_after) \ No newline at end of file diff --git a/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/service.py b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/service.py new file mode 100644 index 0000000..886b2aa --- /dev/null +++ b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/service.py @@ -0,0 +1,310 @@ +"""Service-Schicht für das Agenten-API-Modul.""" +from __future__ import annotations + +import threading +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Mapping, Optional + +from .config import get_config +from .llm_client import LLMClient +from .models import TaskCreate, TaskStatusResponse +from .worker_adapter import WorkerAdapter + +_PROCESSING_STATUS = "processing" +_SUCCEEDED_STATUS = "succeeded" +_FAILED_STATUS = "failed" +_CANCELLED_STATUS = "cancelled" +_EXPIRED_STATUS = "expired" + + +@dataclass +class _TaskEntry: + payload: TaskCreate + status: str = _PROCESSING_STATUS + result: Optional[Dict[str, Any]] = None + created_at: datetime = field( + default_factory=lambda: datetime.now(timezone.utc) + ) + expires_at: datetime = field( + default_factory=lambda: datetime.now(timezone.utc) + timedelta(minutes=5) + ) + last_heartbeat: datetime = field( + default_factory=lambda: datetime.now(timezone.utc) + ) + detail: Optional[str] = None + retry_after: Optional[int] = None + error: Optional[str] = None + + def to_response(self, task_id: str) -> TaskStatusResponse: + return TaskStatusResponse( + task_id=task_id, + status=self.status, + result=self.result, + expires_at=self.expires_at if self.expires_at.tzinfo else self.expires_at.replace(tzinfo=timezone.utc), + retry_after=self.retry_after, + ) + + +@dataclass +class SubmitResult: + response: TaskStatusResponse + status_code: int + detail: Optional[str] = None + + +class AgentService: + """ + Koordiniert die Lebenszyklen von Tasks, Worker-Adapter und LLM-Client. + """ + + def __init__( + self, + *, + worker_adapter: Optional[WorkerAdapter] = None, + llm_client: Optional[LLMClient] = None, + ) -> None: + self._lock = threading.RLock() + self._tasks: Dict[str, _TaskEntry] = {} + self._worker = worker_adapter or WorkerAdapter() + self._llm_client = llm_client or LLMClient() + + # ------------------------------------------------------------------ # + # Öffentliche API + # ------------------------------------------------------------------ # + + def submit_task(self, payload: TaskCreate) -> SubmitResult: + """ + Legt einen neuen Task an und triggert die Verarbeitung entsprechend + der konfigurierten Ausführungsstrategie. + """ + config = self._current_config() + execution_cfg = self._execution_config(config) + queue_ttl = int(execution_cfg.get("queue_ttl_seconds", 300)) + retry_after = int(execution_cfg.get("heartbeat_interval_seconds", 10)) + timeout_seconds = float(execution_cfg.get("response_timeout_seconds", 30)) + default_mode = str(execution_cfg.get("mode", "async")).lower() + + mode = default_mode + if payload.sync is True: + mode = "sync" + elif payload.sync is False: + mode = "async" + + now = datetime.now(timezone.utc) + task_id = str(uuid.uuid4()) + entry = _TaskEntry( + payload=payload, + status=_PROCESSING_STATUS, + created_at=now, + expires_at=now + timedelta(seconds=queue_ttl), + last_heartbeat=now, + retry_after=retry_after, + ) + + with self._lock: + self._tasks[task_id] = entry + + thread = threading.Thread( + target=self.process_task, + args=(task_id,), + name=f"agent-task-{task_id}", + daemon=True, + ) + thread.start() + + if mode == "async": + return SubmitResult( + response=entry.to_response(task_id), + status_code=202, + detail=None, + ) + + # synchroner Modus + thread.join(timeout_seconds) + if thread.is_alive(): + return SubmitResult( + response=self._task_response(task_id), + status_code=408, + detail="still_running", + ) + + entry = self._task_entry(task_id) + if entry.status == _SUCCEEDED_STATUS: + return SubmitResult( + response=entry.to_response(task_id), + status_code=200, + detail=None, + ) + if entry.status == _FAILED_STATUS: + return SubmitResult( + response=entry.to_response(task_id), + status_code=500, + detail=entry.detail or "task_failed", + ) + if entry.status == _CANCELLED_STATUS: + return SubmitResult( + response=entry.to_response(task_id), + status_code=409, + detail=entry.detail or "task_cancelled", + ) + if entry.status == _EXPIRED_STATUS: + return SubmitResult( + response=entry.to_response(task_id), + status_code=410, + detail=entry.detail or "task_expired", + ) + return SubmitResult( + response=entry.to_response(task_id), + status_code=202, + detail=None, + ) + + def process_task(self, task_id: str) -> None: + """ + Führt den Worker- und LLM-Workflow für einen gegebenen Task aus. + """ + try: + entry = self._task_entry(task_id) + except KeyError: + return + + if entry.status in {_CANCELLED_STATUS, _EXPIRED_STATUS}: + return + + self.update_heartbeat(task_id) + try: + prepared = self._worker.pre_process(entry.payload) + prompt = self._worker.run_task(prepared) + llm_response = self._llm_client.generate(prompt) + + result_payload = { + "prepared_prompt": prompt, + "llm_response": llm_response, + } + now = datetime.now(timezone.utc) + with self._lock: + stored = self._tasks.get(task_id) + if stored is None or stored.status in {_CANCELLED_STATUS, _EXPIRED_STATUS}: + return + stored.status = _SUCCEEDED_STATUS + stored.result = result_payload + stored.last_heartbeat = now + stored.detail = None + stored.retry_after = None + except Exception as exc: # pragma: no cover - Fehlerpfad + now = datetime.now(timezone.utc) + with self._lock: + stored = self._tasks.get(task_id) + if stored is None: + return + if stored.status not in {_CANCELLED_STATUS, _EXPIRED_STATUS}: + stored.status = _FAILED_STATUS + stored.result = {"error": str(exc)} + stored.detail = exc.__class__.__name__ + stored.error = repr(exc) + stored.last_heartbeat = now + stored.retry_after = None + finally: + self._finalize_task(task_id) + + def get_status(self, task_id: str) -> TaskStatusResponse: + """ + Liefert den aktuellen Status eines Tasks. + """ + entry = self._task_entry(task_id) + self._evaluate_expiration(task_id, entry) + return self._task_response(task_id) + + def cancel_task(self, task_id: str) -> TaskStatusResponse: + """ + Markiert einen laufenden Task als abgebrochen. + """ + entry = self._task_entry(task_id) + with self._lock: + if entry.status in {_SUCCEEDED_STATUS, _FAILED_STATUS, _EXPIRED_STATUS}: + return entry.to_response(task_id) + entry.status = _CANCELLED_STATUS + entry.result = entry.result or {"detail": "Task cancelled by client"} + entry.detail = "task_cancelled" + entry.last_heartbeat = datetime.now(timezone.utc) + entry.retry_after = None + return self._task_response(task_id) + + def update_heartbeat(self, task_id: str) -> None: + """ + Aktualisiert den Heartbeat eines Tasks und verlängert dessen TTL. + """ + entry = self._task_entry(task_id) + config = self._current_config() + execution_cfg = self._execution_config(config) + queue_ttl = int(execution_cfg.get("queue_ttl_seconds", 300)) + now = datetime.now(timezone.utc) + with self._lock: + stored = self._tasks.get(task_id) + if stored is None: + return + stored.last_heartbeat = now + stored.expires_at = now + timedelta(seconds=queue_ttl) + + # ------------------------------------------------------------------ # + # Interne Hilfsfunktionen + # ------------------------------------------------------------------ # + + def _task_entry(self, task_id: str) -> _TaskEntry: + with self._lock: + entry = self._tasks.get(task_id) + if entry is None: + raise KeyError(f"Task '{task_id}' not found") + return entry + + def _task_response(self, task_id: str) -> TaskStatusResponse: + entry = self._task_entry(task_id) + return entry.to_response(task_id) + + def _finalize_task(self, task_id: str) -> None: + entry = self._task_entry(task_id) + self._evaluate_expiration(task_id, entry) + + def _evaluate_expiration(self, task_id: str, entry: _TaskEntry) -> None: + now = datetime.now(timezone.utc) + if entry.status in {_SUCCEEDED_STATUS, _FAILED_STATUS, _CANCELLED_STATUS}: + return + if now > entry.expires_at: + with self._lock: + stored = self._tasks.get(task_id) + if stored is None: + return + stored.status = _EXPIRED_STATUS + stored.result = stored.result or {"detail": "Task expired"} + stored.detail = "task_expired" + stored.last_heartbeat = now + stored.retry_after = None + + @staticmethod + def _current_config() -> Dict[str, Any]: + return get_config() + + @staticmethod + def _execution_config(config: Mapping[str, Any]) -> Mapping[str, Any]: + agent_cfg = config.get("agent_api", {}) + return agent_cfg.get("execution", {}) + + # ------------------------------------------------------------------ # + # Debug-/Test-Hilfen + # ------------------------------------------------------------------ # + + def clear_tasks(self) -> None: + """ + Löscht alle gemerkten Tasks. Hilfreich für Tests. + """ + with self._lock: + self._tasks.clear() + + def list_tasks(self) -> Dict[str, _TaskEntry]: + """ + Gibt einen Snapshot der aktuellen Tasks zurück. + """ + with self._lock: + return dict(self._tasks) \ No newline at end of file diff --git a/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/worker_adapter.py b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/worker_adapter.py new file mode 100644 index 0000000..e95f7a7 --- /dev/null +++ b/01_Modulerweiterungen/Entworfener_Code/app/src/agent_api/worker_adapter.py @@ -0,0 +1,50 @@ +"""Worker-Adapter für das Agenten-API-Modul.""" +from __future__ import annotations + +import datetime as _dt +from typing import Any, Dict + +from .models import TaskCreate + + +class WorkerAdapter: + """ + Einfacher Inline-Worker-Adapter. + + Die Implementierung dient als Stub für spätere Erweiterungen. Sie nimmt den + eingehenden Payload entgegen, reichert diesen mit Metadaten an und liefert + einen Prompt für den LLM-Client zurück. + """ + + @staticmethod + def pre_process(payload: TaskCreate) -> Dict[str, Any]: + """ + Bereitet den Task-Payload für den LLM-Aufruf vor. + + Args: + payload: Valider TaskCreate-Payload. + + Returns: + Dict[str, Any]: Strukturierter Prompt inklusive Metadaten. + """ + timestamp = _dt.datetime.now(_dt.timezone.utc) + prepared_prompt: Dict[str, Any] = { + "user_input": payload.user_input, + "context": payload.context or {}, + "metadata": payload.metadata or {}, + "created_at": timestamp.isoformat(), + } + return prepared_prompt + + @staticmethod + def run_task(prepared: Dict[str, Any]) -> Dict[str, Any]: + """ + Führt optionale Worker-Logik aus und liefert den Prompt für den LLM-Client. + + Args: + prepared: Vom WorkerAdapter.pre_process erzeugter Prompt. + + Returns: + Dict[str, Any]: Prompt für den LLM-Client. + """ + return prepared \ No newline at end of file diff --git a/01_Modulerweiterungen/Planung/Agenten_API.md b/01_Modulerweiterungen/Planung/Agenten_API.md new file mode 100644 index 0000000..1aa92fe --- /dev/null +++ b/01_Modulerweiterungen/Planung/Agenten_API.md @@ -0,0 +1,247 @@ +# Agenten-API – Planung & Architektur + +## Zielsetzung + +Die Agenten-API erweitert die Basis-API aus [`00_Globale_Richtlinien`](../00_Globale_Richtlinien/README.md) um OpenAPI-kompatible Endpunkte, die als Vermittlungsschicht zwischen externen Anfragenden, internen Worker/Agenten und einem LLM mit OpenAI-kompatibler Schnittstelle fungieren. Sie übernimmt folgende Aufgaben: + +1. **Entgegennahme von Nutzeranfragen** (z. B. Anweisungen, Aufgabenbeschreibungen, Kontextdaten). +2. **Anreicherung, Validierung und Normalisierung** der Anfragen. +3. **Koordination eines Worker/Agenten**, der Vorverarbeitungsschritte ausführt (z. B. Kontextsuche, Prompt-Erweiterung, Werkzeugaufrufe). +4. **Weiterleitung** der aufbereiteten Anfrage an ein LLM mit OpenAI-kompatibler REST-API. +5. **Postprocessing der LLM-Antwort** (z. B. Extraktion strukturierter Resultate, Fehlerbehandlung, Logging) und Rückgabe an den ursprünglichen Anfragenden. + +Die API bleibt streng modular: Planung unter `/Planung/`, Umsetzung unter `/Entworfener_Code/`. + +--- + +## Anwendungsfälle + +| Use-Case | Beschreibung | +|----------|---------------| +| `submit_task` | Externe Systeme übermitteln komplexe Arbeitsaufträge an den Agenten. | +| `status_query` | Abfrage, ob ein Auftrag bereits verarbeitet wurde (Polling). | +| `cancel_task` | Steuerung laufender Aufträge (Cancel/Abort). | +| `interactive_chat` | Ad-hoc-Konversationen mit dem LLM inkl. Kontextanreicherung. | +| `tool_feedback` | Worker meldet Zwischenergebnisse zurück, die an das LLM angehängt werden. | + +--- + +## API-Design (OpenAPI) + +### Basisdaten + +- **Base Path**: `/api/agent` +- **Version**: `v1` +- **Schema**: JSON (application/json) +- **Authentifizierung**: zukünftige Erweiterung; initial optionales `api_key`-Header-Feld. +- **Fehlerformat**: RFC 7807-kompatibles JSON (`type`, `title`, `detail`, `instance`, optionale `meta`). + +### Endpunkte (first iteration) + +| Methode & Pfad | Zweck | Status | +|----------------|-------|--------| +| `POST /v1/tasks` | Erstellt neuen Auftrag. | Implementieren | +| `GET /v1/tasks/{task_id}` | Status & Ergebnis abrufen. | Implementieren | +| `POST /v1/chat` | Direkter Chat (Streaming optional). | Optional (Stretch) | +| `POST /v1/tasks/{task_id}/cancel` | Laufenden Auftrag abbrechen. | Optional | +| `POST /v1/tasks/{task_id}/feedback` | Worker liefert Zwischenstände/Tool-Ausgaben. | Optional | + +--- + +## Komponentenarchitektur + +```plaintext +FastAPI Router (agent_api/router.py) +└── AgentService (agent_api/service.py) + ├── WorkerAdapter (agent_api/worker_adapter.py) + ├── LLMClient (agent_api/llm_client.py) + ├── Persistence (optional spätere Erweiterung) + └── Config Provider (agent_api/config.py) +``` + +### 1. Router (`router.py`) +- Definiert die FastAPI-Routen und Pydantic-Modelle. +- Validiert Eingaben (z. B. Pflichtfelder, Limits). +- Übersetzt Exceptions in HTTP-Fehler (Problem Details). + +### 2. Service (`service.py`) +- Zentrale Koordinationslogik. +- Verantwortlich für: + - Generierung Task-ID. + - Übergabe an WorkerAdapter (Vorverarbeitung). + - Aufruf des LLMClient. + - Zusammenführung von Ergebnissen, Logging, Telemetrie. +- Kapselt Retries, Timeout-Handling, Circuit-Breaker (später). + +### 3. WorkerAdapter (`worker_adapter.py`) +- Abstraktionslayer für Worker/Agenten (lokal, remote, Message-Bus). +- Erste Version: synchroner Stub, der eingehende Daten durchreicht. +- Später: Integration in Task-Queue (Celery, Arq, RQ) oder Event-System. + +### 4. LLMClient (`llm_client.py`) +- Kapselt HTTP-Aufrufe gegen OpenAI-kompatible APIs. +- Unterstützt: + - Text Completion / Chat Completion. + - Streaming (später). + - Fehlerbehandlung (Rate-Limit, Timeout, Invalid Request). +- Konfiguration aus agenten-spezifischem Config-File (API-Key, Endpoint, Model). + +### 5. Config (`config.py`) +- Lädt `agent_api.yaml` (siehe unten) und ermöglicht Zugriffe auf Parameter. +- Option auf Umgebungsvariablen (Override sensibler Felder, z. B. API-Key). +- Links zur globalen Config (`00_Globale_Richtlinien/Entworfener_Code/app/src/config_loader.py`). + +--- + +## Konfigurationsschema (`agent_api.yaml`) + +```yaml +agent_api: + metadata: + version: "0.1.0" + description: "Agent Gateway für Worker + OpenAI-kompatible LLMs" + + http: + base_path: "/api/agent/v1" + timeout_seconds: 60 + rate_limit_per_minute: 120 + enable_cors: true + + auth: + api_key_header: "x-agent-api-key" + allowed_keys: [] # Optional: Liste statischer Schlüssel + allow_unauthenticated: true + + worker: + adapter: "inline" # inline | queue | external + endpoint: null # URL falls adapter=external + timeout_seconds: 30 + + llm: + provider: "openai" # openai | azure_openai | anthropic (kompatibel) + base_url: "https://api.openai.com/v1" + model: "gpt-4o-mini" + temperature: 0.2 + max_tokens: 1200 + api_key: null # via ENV: AGENT_API_LLM_KEY + request_timeout_seconds: 45 + retry: + max_attempts: 3 + backoff_seconds: 2 + + execution: + mode: "async" # async | sync + response_timeout_seconds: 30 + queue_ttl_seconds: 300 + heartbeat_interval_seconds: 10 + allow_long_polling: true + + logging: + enabled: true + log_payloads: false + redact_fields: + - "user_input" + - "llm_response" +``` + +- Sensible Werte (API-Key) werden über Umgebungsvariablen überschrieben. +- `adapter` kann später für Worker-Typen erweitert werden. + +--- + +## Timeout- und TTL-Strategie + +### Verhalten von OpenAI-kompatiblen APIs +- Der Standard-Endpunkt von OpenAI (und kompatiblen Anbietern) ist request/response-basiert. Wenn ein LLM länger braucht, bestimmt der Client, wie lange er wartet, bevor er mit einem Timeout abbricht. +- HTTP-Clients sowie Proxys besitzen meist ein Standard-Timeout (z. B. 30–60 Sekunden). Bleibt die Antwort länger aus, wird die Verbindung beendet – unabhängig davon, ob der LLM-Aufruf intern noch läuft. +- Streaming-Endpunkte (Server-Sent Events) halten die Verbindung offen. Sobald keine Daten übertragen werden, trennen viele Proxys nach ihrer Idle-Timeout-Regel. + +### Anforderungen für die Agenten-API +- Verhindern, dass externe Clients wegen langer LLM-Laufzeiten abbrechen. +- Aufträge weiterbearbeiten können, auch wenn der initiale HTTP-Request beendet wurde. +- Ergebnisse bereitstellen, sobald sie fertig sind (Polling, später Webhook/Streaming). + +### Konzept und API-Verhalten +1. **Async-Standardmodus (`execution.mode = "async"`)** + - `POST /v1/tasks` antwortet unmittelbar mit `202 Accepted` + `task_id`. + - Verarbeitung geschieht im Hintergrund (WorkerAdapter). + - Client ruft periodisch `GET /v1/tasks/{task_id}` auf. Optional ermöglicht `allow_long_polling` längeres Offenhalten der Verbindung mit `Retry-After`-Headern. +2. **Synchroner Modus (`execution.mode = "sync"`)** + - Die API wartet maximal `response_timeout_seconds`. + - Benötigt das LLM länger, wird eine Antwort mit `408 Request Timeout` oder erneut `202 Accepted` plus Hinweis zurückgegeben. + - Die Aufgabe bleibt aktiv; Clients können später den Status abrufen. +3. **TTL und Heartbeat** + - `queue_ttl_seconds` begrenzt die Gesamtlebensdauer eines Tasks (z. B. 5 Minuten). + - `heartbeat_interval_seconds` definiert, wie oft Worker/Adapter Aktivität melden. Bleibt der Heartbeat aus, wird der Task als `expired` markiert. + +### Status-Codes und Rückmeldungen +- `202 Accepted`: Aufgabe angenommen, Verarbeitung läuft. Response enthält `task_id`, `status = processing`, optional `retry_after`. +- `200 OK`: Aufgabe abgeschlossen. Antwort enthält fertige `result`-Payload sowie Metadaten (`duration_ms`, `completed_at`). +- `408 Request Timeout`: Synchroner Modus, Ergebnis liegt noch nicht vor. Response enthält `task_id`, `status = processing`, `detail = "still_running"`. +- `410 Gone`: Aufgabe abgelaufen (`expired`) oder bewusst entfernt. +- `500 Internal Server Error`: Verarbeitung fehlgeschlagen. Response enthält Fehlerdetails und Hinweise zum Retrying. + +### Integration in die Implementierung +- `AgentService` verwaltet Task-Status (`processing`, `succeeded`, `failed`, `expired`) und entscheidet anhand der Konfiguration über Sync/Async-Verhalten. +- `WorkerAdapter` sendet Heartbeats; bleibt dieser aus, sorgt die TTL für automatisches Aufräumen. +- `LLMClient` respektiert `request_timeout_seconds` und `retry`-Parameter; nach ausgeschöpften Retries wird der Task auf `failed` gesetzt. +- Persistenzschicht (später Redis/DB) speichert `expires_at` und `last_heartbeat`, sodass Expiration sauber umgesetzt werden kann. + +Damit ist festgelegt, wie der Agent weiterarbeitet, obwohl ein Client den ursprünglichen HTTP-Request beendet oder ein Timeout erreicht. Die Agenten-API übernimmt das Handling von Time-to-Live und Task-Lebenszyklen und stellt sicher, dass LLM-Laufzeiten den Client nicht blockieren. + +--- + +## Sequenzfluss (High-Level) + +1. **Client** sendet POST `/v1/tasks` mit Payload (`user_input`, optional `context`, `metadata`). +2. **Router** validiert Payload → ruft `AgentService.submit_task`. +3. **AgentService**: + 1. erzeugt Task-ID, speichert Minimalzustand (in-memory, später DB). + 2. ruft `WorkerAdapter.pre_process` für Kontextanreicherung. + 3. erstellt Request an `LLMClient.generate`. +4. **LLMClient**: + - baut ChatCompletion- oder Completion-Request. + - sendet HTTP-Request (mit API-Key, Timeout, Retries). +5. **AgentService** sammelt Antwort, führt optional Postprocessing aus (z. B. extrahierte Schritte). +6. **Router** liefert HTTP-Response mit Task-ID, Status, generiertem Output. + +Für `GET /v1/tasks/{task_id}` werden Ergebnisse aus internem Store (in-memory, Pydantic-`Dict[str, TaskStatus]`) gelesen; langfristig Persistenz (Redis/DB). + +--- + +## Integration mit Haupt-App + +- `00_Globale_Richtlinien/Entworfener_Code/app/code/app/main.py` erweitert Try-Import: + ```python + try: + from agent_api.router import agent_router + app.include_router(agent_router, prefix="/api") + except Exception: + ... + ``` +- `agent_api.router` sorgt selbst für Prefix `/agent`. +- Konfiguration: `agent_api.yaml` wird in `start.py` oder neuem Initialisierungsschritt geladen (via `config_loader.Settings` oder separatem Loader). + +--- + +## Tests & Qualität + +- Unit-Tests für Service + LLMClient (Mock HTTPX). +- FastAPI TestClient für Endpunkte (Beispiel-Request + Response). +- Konfig-Validierung (Fehlende API-Keys → Fehler). +- Logging-Integration (ggf. in SQLite/External Logging einspeisen). + +--- + +## Offene Punkte / Nächste Schritte + +1. **Planung finalisieren** (dieses Dokument). +2. **Konfigurationsdatei** `app/config/agent_api.yaml` erstellen. +3. **Codegerüst** unter `app/src/agent_api/` implementieren: + - `config.py`, `models.py`, `service.py`, `worker_adapter.py`, `llm_client.py`, `router.py`. +4. **Tests**: `app/tests/test_agent_api.py`. +5. **Integration** in `create_app` + `start.py`. +6. **Security**: API-Key-Check und Rate-Limiting (Basis). +7. **Dokumentation**: README + Beispiel-Requests. + +Dieses Dokument dient als Referenz und Abstimmungspunkt, bevor mit der Implementierung fortgefahren wird. \ No newline at end of file diff --git a/git_sync.sh b/git_sync.sh new file mode 100755 index 0000000..4fef472 --- /dev/null +++ b/git_sync.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +# ===================================================== +# Git Projekt Management Script +# ===================================================== +# Funktionen: +# 1) Erstmaliges Hochladen (Repo initialisieren + push) +# 2) Änderungen hochladen (commit + push) +# 3) Projekt vom Server herunterladen (clone) +# 4) Benutzername für Git-Server speichern +# ===================================================== + +# === Konfiguration === +REMOTE_URL="http://192.168.19.10:3020/madgerm/KI-Cluster-Roadmap.git" +# Wenn dein Login eine E-Mail ist, benutze %40 statt @ im REMOTE_URL: +# Beispiel: http://madgerm%40msn.com@192.168.19.10:3020/madgerm/MiniMal.git + +echo "---------------------------------------------" +echo " GIT PROJECT MANAGER" +echo "---------------------------------------------" +echo "1) Projekt erstmalig hochladen" +echo "2) Änderungen hochladen (Standard)" +echo "3) Projekt vom Server herunterladen" +echo "4) Git-Login (Benutzername) speichern" +echo +read -p "Wähle eine Option [1-4, Standard=2]: " choice + +# Wenn der Benutzer einfach Enter drückt, wird 2 gesetzt +choice=${choice:-2} + +case $choice in + 1) + echo ">>> Projekt wird initialisiert und hochgeladen..." + git init + git branch -M main + git add . + git commit -m "Initial commit" + git remote add origin "$REMOTE_URL" + git push -u origin main + ;; + 2) + echo ">>> Änderungen werden zum Server hochgeladen..." + git add . + git commit -m "Auto sync: $(date '+%Y-%m-%d %H:%M:%S')" + git push origin main + ;; + 3) + echo ">>> Projekt wird vom Server heruntergeladen..." + read -p "Zielordner (Standard: aktuelles Verzeichnis): " TARGET_DIR + TARGET_DIR=${TARGET_DIR:-.} + git clone "$REMOTE_URL" "$TARGET_DIR" + ;; + 4) + echo ">>> Git-Benutzernamen speichern..." + read -p "Gib deinen Git-Login (z.B. madgerm@msn.com) ein: " GITUSER + if [ -n "$GITUSER" ]; then + git config --global user.name "$GITUSER" + git config --global credential.username "$GITUSER" + echo "✅ Benutzername gespeichert:" + echo " user.name = $(git config --global user.name)" + echo " credential.username = $(git config --global credential.username)" + echo + echo "💡 Hinweis: Wenn du dein Passwort speichern möchtest:" + echo " git config --global credential.helper store" + else + echo "⚠️ Kein Benutzername eingegeben. Vorgang abgebrochen." + fi + ;; + *) + echo "Ungültige Eingabe. Abbruch." + ;; +esac diff --git a/roadmap_readme.md b/roadmap_readme.md index f2b1316..2e5c592 100644 --- a/roadmap_readme.md +++ b/roadmap_readme.md @@ -55,14 +55,15 @@ Diese Trennung erlaubt eine saubere Zusammenarbeit zwischen **Strategie- und Bui │ ├── config_loader.py │ └── logging_setup.py │ -├── 01_Zieldefinition_und_Subsysteme/ +├── 01_Modulerweiterungen/ │ ├── Planung/ -│ │ ├── Zieldefinition.md -│ │ ├── Systemvision.md -│ │ ├── Subsystem_Uebersicht.md -│ │ └── Agenten_Uebersicht.md +│ │ ├── Architektur.md +│ │ └── (weitere Modul-Spezifikationen folgen) │ ├── Entworfener_Code/ -│ │ └── (spätere Prototypen oder System-Demos) +│ │ └── app/ +│ │ └── src/ +│ │ ├── logging_internal.py +│ │ └── logging_external.py │ └── README.md │ ├── 02_API_Design_und_Kommunikation/