This commit is contained in:
2025-11-13 01:02:07 +01:00
parent 67e5ed3807
commit 8ce890cae5
18 changed files with 767 additions and 442 deletions

0
src/__init__.py Normal file
View File

0
src/services/__init__.py Normal file
View File

57
src/services/ansible.py Normal file
View File

@@ -0,0 +1,57 @@
from __future__ import annotations
import os
import subprocess
from pathlib import Path
from typing import Iterable, Iterator, List, Optional, Tuple
from .hosts import TargetHost
def build_cmd(playbook_path: Path, host: TargetHost, inventory: Optional[str] = "") -> List[str]:
cmd: List[str] = ["ansible-playbook", str(playbook_path)]
if host.is_local:
# Lokalmode: Inventar auf localhost, connection=local
cmd.extend(["-i", "localhost,", "-c", "local"])
else:
if inventory:
cmd.extend(["-i", str(inventory)])
# System-Inventory, dann -l host zur Limitierung
cmd.extend(["-l", host.host])
return cmd
def run_playbook_stream(playbook_path: Path, host: TargetHost, inventory: Optional[str] = "") -> Iterator[str]:
"""
Führt ansible-playbook aus und liefert stdout/stderr zeilenweise.
"""
env = os.environ.copy()
# stdout Callback default für einfache, kontinuierliche Ausgabe
env.setdefault("ANSIBLE_STDOUT_CALLBACK", "default")
cmd = build_cmd(playbook_path, host, inventory)
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
except FileNotFoundError:
yield "Fehler: ansible-playbook nicht gefunden. Bitte Installation prüfen."
return
assert proc.stdout is not None
for line in proc.stdout:
yield line.rstrip("\n")
proc.wait()
yield f"[EXIT] Code: {proc.returncode}"
def run_multiple_playbooks_stream(paths: Iterable[Path], host: TargetHost, inventory: Optional[str] = "") -> Iterator[Tuple[Path, str]]:
"""
Führt mehrere Playbooks nacheinander aus, streamed (path, line).
"""
for p in paths:
yield (p, f"=== Starte Playbook: {p} ===")
for line in run_playbook_stream(p, host, inventory):
yield (p, line)
yield (p, f"=== Fertig: {p} ===")

90
src/services/config.py Normal file
View File

@@ -0,0 +1,90 @@
from __future__ import annotations
import os
import json
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Dict, Any
CONFIG_DIR = Path("config")
CONFIG_FILE = CONFIG_DIR / "config.json"
@dataclass
class Config:
playbook_root: Path
data_subdir: str = "data"
inventory: str = ""
remote_data_path: str = "/opt/tui-data"
sync_tool: str = "auto" # rsync|scp|auto
logging_dir: str = "logs"
@property
def data_root(self) -> Path:
return self.playbook_root / self.data_subdir
def _load_file_config() -> Dict[str, Any]:
if CONFIG_FILE.exists():
with CONFIG_FILE.open("r", encoding="utf-8") as f:
return json.load(f)
return {}
def _apply_env_overrides(cfg: Dict[str, Any]) -> Dict[str, Any]:
env_map = {
"ANSIBLE_TUI_PLAYBOOK_ROOT": "playbook_root",
"ANSIBLE_TUI_DATA_SUBDIR": "data_subdir",
"ANSIBLE_TUI_INVENTORY": "inventory",
"ANSIBLE_TUI_REMOTE_DATA_PATH": "remote_data_path",
"ANSIBLE_TUI_SYNC_TOOL": "sync_tool",
"ANSIBLE_TUI_LOGGING_DIR": "logging_dir",
}
for env, key in env_map.items():
val = os.environ.get(env)
if val:
cfg[key] = val
return cfg
def _with_defaults(cfg: Dict[str, Any]) -> Dict[str, Any]:
defaults = {
"playbook_root": "/playbook",
"data_subdir": "data",
"inventory": "",
"remote_data_path": "/opt/tui-data",
"sync_tool": "auto",
"logging_dir": "logs",
}
out = {**defaults, **cfg}
return out
def _validate(cfg: Dict[str, Any]) -> None:
root = Path(cfg["playbook_root"]).expanduser()
if not root.exists():
# Falls nicht vorhanden, anlegen (nicht fatal)
root.mkdir(parents=True, exist_ok=True)
@lru_cache(maxsize=1)
def get_config() -> Config:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
file_cfg = _load_file_config()
cfg = _with_defaults(file_cfg)
cfg = _apply_env_overrides(cfg)
_validate(cfg)
playbook_root = Path(cfg["playbook_root"]).expanduser().resolve()
return Config(
playbook_root=playbook_root,
data_subdir=str(cfg["data_subdir"]),
inventory=str(cfg["inventory"]),
remote_data_path=str(cfg["remote_data_path"]),
sync_tool=str(cfg["sync_tool"]),
logging_dir=str(cfg["logging_dir"]),
)
def ensure_runtime_dirs() -> None:
logs_dir = Path(get_config().logging_dir)
logs_dir.mkdir(parents=True, exist_ok=True)

88
src/services/deps_ext.py Normal file
View File

@@ -0,0 +1,88 @@
from __future__ import annotations
import shutil
import subprocess
import sys
import socket
from typing import List, Dict
from deps import check_dependencies as base_check
def check_python_version(min_major: int = 3, min_minor: int = 9) -> bool:
return sys.version_info >= (min_major, min_minor)
def check_pip() -> bool:
return shutil.which("pip3") is not None or shutil.which("pip") is not None
def check_module_installed(mod_name: str) -> bool:
try:
__import__(mod_name)
return True
except Exception:
return False
def check_rsync() -> bool:
return shutil.which("rsync") is not None
def check_network() -> bool:
try:
socket.gethostbyname("pypi.org")
return True
except Exception:
return False
def check_dependencies_ext() -> Dict[str, List[str]]:
missing_bins = list(base_check()) # ansible, ssh, scp
if not check_rsync():
missing_bins.append("rsync")
missing_pip: List[str] = []
if not check_module_installed("textual"):
missing_pip.append("textual")
messages: List[str] = []
if not check_python_version():
messages.append("Python >= 3.9 benötigt")
if not check_pip():
messages.append("pip/pip3 nicht gefunden")
if not check_network():
messages.append("Netzwerk/DNS Auflösung scheint nicht zu funktionieren")
return {
"missing_bins": sorted(set(missing_bins)),
"missing_pip": sorted(set(missing_pip)),
"messages": messages,
}
def install_dependencies_ext(missing_bins: List[str], missing_pip: List[str]) -> None:
cmds = []
if missing_bins:
cmds.append(["sudo", "apt", "update"])
bin_map = {
"ansible": ["ansible"],
"ssh": ["openssh-client"],
"scp": ["openssh-client"],
"rsync": ["rsync"],
}
apt_pkgs = set()
for b in set(missing_bins):
for pkg in bin_map.get(b, [b]):
apt_pkgs.add(pkg)
cmds.append(["sudo", "apt", "install", "-y", *sorted(apt_pkgs)])
for cmd in cmds:
try:
subprocess.run(cmd, check=False)
except Exception as exc:
print("Fehler bei:", " ".join(cmd), exc)
if missing_pip:
pip = shutil.which("pip3") or shutil.which("pip") or "pip3"
try:
subprocess.run([pip, "install", *missing_pip], check=False)
except Exception as exc:
print("pip install Fehler:", exc)

31
src/services/discovery.py Normal file
View File

@@ -0,0 +1,31 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import List, Tuple
def discover_playbooks(root: Path, data_subdir: str = "data") -> List[Tuple[str, Path, int]]:
"""
Liefert Liste aus (Display-Text, voller Pfad, Level).
Ignoriert das data-Verzeichnis auf oberster Ebene.
"""
playbooks: List[Tuple[str, Path, int]] = []
if not root.exists():
return playbooks
for dirpath, dirnames, filenames in os.walk(root):
path = Path(dirpath)
# ignore top-level data dir
if path.parent == root and path.name == data_subdir:
dirnames[:] = []
continue
rel = path.relative_to(root)
level = 0 if rel == Path(".") else len(rel.parts)
for fn in filenames:
if fn.endswith(".yml") or fn.endswith(".yaml"):
full = path / fn
indent = " " * level
display = f"{indent}{fn}"
playbooks.append((display, full, level))
return playbooks

118
src/services/hosts.py Normal file
View File

@@ -0,0 +1,118 @@
from __future__ import annotations
import json
import os
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import List, Optional
CONFIG_DIR = Path("config")
HOSTS_FILE = CONFIG_DIR / "hosts.json"
@dataclass
class TargetHost:
name: str
host: str
user: str
password: str = ""
ssh_key_setup: bool = False
is_local: bool = False
is_default: bool = False
def _ensure_config_dir() -> None:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
def save_hosts(hosts: List[TargetHost]) -> None:
_ensure_config_dir()
with HOSTS_FILE.open("w", encoding="utf-8") as f:
json.dump([asdict(h) for h in hosts], f, indent=2, ensure_ascii=False)
def load_hosts() -> List[TargetHost]:
_ensure_config_dir()
if not HOSTS_FILE.exists():
user = os.getenv("USER", "pi")
hosts = [TargetHost(name="lokal", host="localhost", user=user, is_local=True, is_default=True)]
save_hosts(hosts)
return hosts
with HOSTS_FILE.open("r", encoding="utf-8") as f:
data = json.load(f)
hosts = [TargetHost(**item) for item in data]
# Sicherstellen: lokaler Host vorhanden
if not any(h.is_local for h in hosts):
hosts.append(TargetHost(name="lokal", host="localhost", user=os.getenv("USER", "pi"), is_local=True))
# Sicherstellen: ein Default gesetzt
if not any(h.is_default for h in hosts) and hosts:
hosts[0].is_default = True
return hosts
def get_default_index(hosts: Optional[List[TargetHost]] = None) -> int:
hs = hosts or load_hosts()
for i, h in enumerate(hs):
if h.is_default:
return i
return 0 if hs else -1
_current_index: Optional[int] = None
def get_current_index() -> int:
global _current_index
hs = load_hosts()
if _current_index is None:
_current_index = get_default_index(hs)
# clamp
if not hs:
_current_index = -1
else:
if _current_index >= len(hs):
_current_index = len(hs) - 1
if _current_index < 0:
_current_index = 0
return _current_index
def set_current_index(idx: int) -> None:
global _current_index
_current_index = idx
def get_current_host() -> Optional[TargetHost]:
hs = load_hosts()
idx = get_current_index()
if 0 <= idx < len(hs):
return hs[idx]
return None
def set_default(idx: int) -> None:
hs = load_hosts()
if not hs:
return
for i, h in enumerate(hs):
h.is_default = (i == idx)
save_hosts(hs)
def add_host(name: str, host: str, user: str, ssh_key_setup: bool = False, password: str = "") -> None:
hs = load_hosts()
hs.append(TargetHost(name=name, host=host, user=user, password=password, ssh_key_setup=ssh_key_setup))
save_hosts(hs)
def remove_host(idx: int) -> None:
hs = load_hosts()
if 0 <= idx < len(hs):
was_default = hs[idx].is_default
del hs[idx]
if hs:
if was_default or not any(h.is_default for h in hs):
hs[0].is_default = True
save_hosts(hs)

0
src/tui/__init__.py Normal file
View File

57
src/tui/app.py Normal file
View File

@@ -0,0 +1,57 @@
from __future__ import annotations
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer, Static
from .screens.startup_check import StartupCheckScreen
class TuiAnsibleApp(App):
BINDINGS = [
("q", "quit", "Beenden"),
("s", "server", "Server"),
("a", "select_all", "Alle markieren"),
("d", "toggle_sync", "Daten-Sync"),
("h", "help", "Hilfe"),
]
TITLE = "TUI Ansible Installer"
SUB_TITLE = "textual"
def compose(self) -> ComposeResult:
yield Header()
yield Static("", id="body")
yield Footer()
async def on_mount(self) -> None:
await self.push_screen(StartupCheckScreen())
def action_quit(self) -> None:
self.exit()
def action_server(self) -> None:
# Wird im PlaybookBrowser umgesetzt
pass
def action_select_all(self) -> None:
screen = self.screen
if hasattr(screen, "action_select_all"):
try:
screen.action_select_all()
except Exception:
pass
def action_toggle_sync(self) -> None:
screen = self.screen
if hasattr(screen, "action_toggle_sync"):
try:
screen.action_toggle_sync()
except Exception:
pass
def main() -> None:
app = TuiAnsibleApp()
app.run()
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,100 @@
from __future__ import annotations
from pathlib import Path
from typing import List, Set, Tuple
from textual.screen import Screen
from textual.app import ComposeResult
from textual.widgets import Static, ListView, ListItem, Footer, Header
from ...services.config import get_config
from ...services.discovery import discover_playbooks
from ...services.hosts import get_current_host
from .server_management import ServerManagementScreen
from .run_output import RunOutputScreen
class PlaybookBrowserScreen(Screen):
BINDINGS = [
("space", "toggle_select", "Auswahl toggeln"),
("a", "select_all", "Alle markieren"),
("enter", "run", "Ausführen"),
("s", "server", "Server"),
("q", "app.quit", "Beenden"),
]
def compose(self) -> ComposeResult:
cfg = get_config()
self.header = Header()
yield self.header
host = get_current_host()
host_text = "Ziel: unbekannt"
if host:
host_text = f"Ziel: {host.name} ({lokal if host.is_local else host.host})"
self.hostline = Static(host_text, id="hostline")
yield self.hostline
yield Static(f"Playbooks unter {cfg.playbook_root}", id="title")
self.items: List[Tuple[str, Path, int]] = discover_playbooks(cfg.playbook_root, cfg.data_subdir)
self.selected: Set[int] = set()
self.list_view = ListView()
for i, (display, _path, _lvl) in enumerate(self.items):
self.list_view.append(ListItem(Static(self._render_label(i, display))))
if not self.items:
self.list_view.append(ListItem(Static("Keine Playbooks gefunden")))
yield self.list_view
yield Footer()
def _render_label(self, idx: int, display: str) -> str:
mark = "[x]" if idx in self.selected else "[ ]"
return f"{mark} {display}"
def action_toggle_select(self) -> None:
if not self.items:
return
idx = self.list_view.index or 0
if idx >= len(self.items):
return
if idx in self.selected:
self.selected.remove(idx)
else:
self.selected.add(idx)
display, _p, _l = self.items[idx]
li = self.list_view.children[idx]
if isinstance(li, ListItem) and li.children:
w = li.children[0]
if isinstance(w, Static):
w.update(self._render_label(idx, display))
def action_select_all(self) -> None:
if not self.items:
return
if len(self.selected) == len(self.items):
self.selected.clear()
else:
self.selected = set(range(len(self.items)))
for i, (display, _p, _l) in enumerate(self.items):
li = self.list_view.children[i]
if isinstance(li, ListItem) and li.children:
w = li.children[0]
if isinstance(w, Static):
w.update(self._render_label(i, display))
def action_server(self) -> None:
self.app.push_screen(ServerManagementScreen())
def action_run(self) -> None:
if not self.items:
return
if not self.selected:
idx = self.list_view.index or 0
if idx < len(self.items):
self.selected.add(idx)
host = get_current_host()
if not host:
return
paths: List[Path] = [self.items[i][1] for i in sorted(self.selected)]
self.app.push_screen(RunOutputScreen(paths, host))
def on_show(self) -> None:
host = get_current_host()
if host:
self.hostline.update(f"Ziel: {host.name} ({lokal if host.is_local else host.host})")
else:
self.hostline.update("Ziel: unbekannt")

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
from typing import List
from pathlib import Path
from textual.screen import Screen
from textual.app import ComposeResult
from textual.widgets import Header, Footer, Static, TextLog
from ...services.config import get_config
from ...services.hosts import TargetHost
from ...services.ansible import run_multiple_playbooks_stream
class RunOutputScreen(Screen):
BINDINGS = [
("q", "app.pop_screen", "Zurück"),
("escape", "app.pop_screen", "Zurück"),
]
def __init__(self, playbooks: List[Path], host: TargetHost) -> None:
super().__init__()
self.playbooks = playbooks
self.host = host
def compose(self) -> ComposeResult:
yield Header()
yield Static(f"Ziel: {self.host.name} ({lokal if self.host.is_local else self.host.host})", id="hostline")
self.log = TextLog(highlight=False, wrap=True)
yield self.log
yield Footer()
async def on_mount(self) -> None:
self.call_later(self._run)
async def _run(self) -> None:
cfg = get_config()
for p, line in run_multiple_playbooks_stream(self.playbooks, self.host, cfg.inventory or ""):
self.log.write(line)
self.log.write("Alle Playbooks beendet. Drücke q/ESC zum Zurückkehren.")

View File

@@ -0,0 +1,58 @@
from __future__ import annotations
from textual.screen import Screen
from textual.app import ComposeResult
from textual.widgets import Header, Footer, ListView, ListItem, Static
from ...services.hosts import load_hosts, get_current_index, set_current_index, set_default, remove_host
class ServerManagementScreen(Screen):
BINDINGS = [
("q", "app.pop_screen", "Zurück"),
("enter", "select", "Auswählen"),
("f", "make_default", "Als Standard setzen"),
("delete", "delete", "Löschen"),
]
def compose(self) -> ComposeResult:
yield Header()
self.title = Static("Server / Zielsysteme ↑/↓ Auswahl, Enter wählen, f Standard, Entf löschen, q zurück")
yield self.title
self.list = ListView(id="hosts")
yield self.list
yield Footer()
def on_mount(self) -> None:
self.refresh_list()
def refresh_list(self) -> None:
hosts = load_hosts()
items = []
current_idx = get_current_index()
for i, h in enumerate(hosts):
parts = []
parts.append(">" if i == current_idx else " ")
parts.append(f"{h.name} ({lokal if h.is_local else h.host})")
if h.is_default:
parts.append("[default]")
label = " ".join(parts)
items.append(ListItem(Static(label)))
self.list.clear()
for item in items:
self.list.append(item)
if 0 <= current_idx < len(items):
self.list.index = current_idx
def action_select(self) -> None:
idx = self.list.index or 0
set_current_index(idx)
self.app.pop_screen()
def action_make_default(self) -> None:
idx = self.list.index or 0
set_default(idx)
self.refresh_list()
def action_delete(self) -> None:
idx = self.list.index or 0
remove_host(idx)
self.refresh_list()

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
from textual.screen import Screen
from textual.widgets import Button, Static
from textual.containers import Vertical
from textual.app import ComposeResult
from ...services.deps_ext import check_dependencies_ext, install_dependencies_ext
from ...services.config import get_config, ensure_runtime_dirs
from .playbook_browser import PlaybookBrowserScreen
class StartupCheckScreen(Screen):
def compose(self) -> ComposeResult:
self.status = Static("", id="status")
yield Vertical(
Static("Autocheck der Abhängigkeiten", id="title"),
self.status,
Button("Installieren und fortfahren", id="install", variant="primary"),
Button("Ohne Installation fortfahren", id="continue"),
)
def on_mount(self) -> None:
ensure_runtime_dirs()
_ = get_config() # Konfig initialisieren
info = check_dependencies_ext()
lines = []
if info["messages"]:
lines.extend([f"- {m}" for m in info["messages"]])
if info["missing_bins"]:
lines.append("Fehlende Pakete: " + ", ".join(info["missing_bins"]))
if info["missing_pip"]:
lines.append("Fehlende Python-Module: " + ", ".join(info["missing_pip"]))
if not lines:
lines = ["Alle benötigten Komponenten scheinen vorhanden zu sein."]
self.status.update("\\n".join(lines))
def on_button_pressed(self, event: Button.Pressed) -> None:
info = check_dependencies_ext()
if event.button.id == "install":
install_dependencies_ext(info["missing_bins"], info["missing_pip"])
# Weiter zum Browser
self.app.push_screen(PlaybookBrowserScreen())