#!/usr/bin/env python3 import curses import json import os import subprocess from dataclasses import dataclass, asdict from pathlib import Path from typing import List, Tuple from deps import check_dependencies, install_dependencies PLAYBOOK_ROOT = Path("/playbook") DATA_DIR_NAME = "data" CONFIG_DIR = Path("config") HOSTS_FILE = CONFIG_DIR / "hosts.json" @dataclass class TargetHost: name: str # Anzeigename host: str # Host/IP user: str # Benutzername password: str # (optional, eher für SSH-Key-Setup gedacht) ssh_key_setup: bool is_local: bool = False def load_hosts() -> List[TargetHost]: CONFIG_DIR.mkdir(parents=True, exist_ok=True) if not HOSTS_FILE.exists(): # Default: nur "lokal" hosts = [TargetHost(name="lokal", host="localhost", user=os.getenv("USER", "pi"), password="", ssh_key_setup=False, is_local=True)] save_hosts(hosts) return hosts with HOSTS_FILE.open("r", encoding="utf-8") as f: data = json.load(f) hosts = [TargetHost(**item) for item in data] # Falls keine lokalen dabei sind, Standard hinzufügen if not any(h.is_local for h in hosts): hosts.append(TargetHost(name="lokal", host="localhost", user=os.getenv("USER", "pi"), password="", ssh_key_setup=False, is_local=True)) return hosts def save_hosts(hosts: List[TargetHost]) -> None: CONFIG_DIR.mkdir(parents=True, exist_ok=True) with HOSTS_FILE.open("w", encoding="utf-8") as f: json.dump([asdict(h) for h in hosts], f, indent=2) def collect_playbooks(root: Path) -> List[Tuple[str, Path, int]]: """ Sucht nach .yml/.yaml unterhalb von /playbook, ignoriert /playbook/data. Rückgabe: Liste aus (Anzeigetext, voller Pfad, Level) """ playbooks: List[Tuple[str, Path, int]] = [] if not root.exists(): return playbooks for dirpath, dirnames, filenames in os.walk(root): # /playbook/data ignorieren if Path(dirpath).name == DATA_DIR_NAME and Path(dirpath).parent == root: # Unterordner von /playbook/data auch überspringen dirnames[:] = [] continue rel = Path(dirpath).relative_to(root) level = 0 if rel == Path(".") else len(rel.parts) for filename in filenames: if not (filename.endswith(".yml") or filename.endswith(".yaml")): continue full_path = Path(dirpath) / filename indent = " " * level display = f"{indent}{filename}" playbooks.append((display, full_path, level)) return playbooks def run_playbook(playbook_path: Path, target: TargetHost, stdscr) -> None: """ Führt ansible-playbook aus und zeigt die Ausgabe in der TUI. Sehr einfache Variante, synchron/blockierend. """ stdscr.clear() stdscr.addstr(0, 0, f"Starte Playbook: {playbook_path}") stdscr.addstr(1, 0, f"Ziel: {target.name} ({'lokal' if target.is_local else target.host})") stdscr.refresh() cmd = ["ansible-playbook", str(playbook_path)] # Für Remote-Target könnte man ein Inventory oder --limit host ergänzen, # hier nur eine sehr simple Beispiel-Variante: if not target.is_local: # Erwartung: target.host ist im Inventory definiert cmd.extend(["-l", target.host]) # Ausgabe direkt anzeigen try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) except FileNotFoundError: stdscr.addstr(3, 0, "Fehler: ansible-playbook wurde nicht gefunden.") stdscr.getch() return row = 3 max_y, max_x = stdscr.getmaxyx() for line in proc.stdout: # Wenn zu weit unten, etwas nach oben "scrollen" if row >= max_y - 1: stdscr.scroll(1) row = max_y - 2 stdscr.addstr(row, 0, line[: max_x - 1]) row += 1 stdscr.refresh() proc.wait() stdscr.addstr(row + 1, 0, "Playbook abgeschlossen. Taste drücken um zurückzukehren.") stdscr.refresh() stdscr.getch() def server_menu(stdscr, hosts: List[TargetHost], current_index: int) -> int: """ Einfaches Server-Menü: - Hosts auflisten - Auswahl mit Pfeiltasten - Enter = Host auswählen - N = neuen Host hinzufügen (sehr basic, text-basiert) Gibt den Index des ausgewählten Hosts zurück (oder unverändert bei Abbruch). """ curses.curs_set(0) selected = current_index while True: stdscr.clear() stdscr.addstr(0, 0, "Server / Zielsysteme") stdscr.addstr(1, 0, "↑/↓: Auswahl | Enter: wählen | n: neuen Host anlegen | q: zurück") row = 3 for i, host in enumerate(hosts): prefix = "> " if i == selected else " " label = f"{host.name} ({'lokal' if host.is_local else host.host})" stdscr.addstr(row, 0, prefix + label) row += 1 stdscr.refresh() key = stdscr.getch() if key in (curses.KEY_UP, ord('k')): selected = (selected - 1) % len(hosts) elif key in (curses.KEY_DOWN, ord('j')): selected = (selected + 1) % len(hosts) elif key in (ord('q'), 27): # q oder ESC return current_index elif key in (curses.KEY_ENTER, 10, 13): return selected elif key in (ord('n'), ord('N')): # sehr simple Eingabe über die Statuszeile curses.echo() stdscr.clear() stdscr.addstr(0, 0, "Neuen Host anlegen. Leere Eingabe = Abbruch.") stdscr.addstr(2, 0, "Anzeigename: ") name = stdscr.getstr(2, 13, 40).decode("utf-8").strip() if not name: curses.noecho() continue stdscr.addstr(3, 0, "Host/IP: ") host_ip = stdscr.getstr(3, 9, 40).decode("utf-8").strip() if not host_ip: curses.noecho() continue stdscr.addstr(4, 0, "Benutzer (default: pi): ") user = stdscr.getstr(4, 23, 40).decode("utf-8").strip() or "pi" stdscr.addstr(5, 0, "SSH-Key automatisch einrichten? (j/N): ") ssh_key_ans = stdscr.getstr(5, 39, 3).decode("utf-8").strip().lower() ssh_key_setup = ssh_key_ans == "j" curses.noecho() new_host = TargetHost( name=name, host=host_ip, user=user, password="", ssh_key_setup=ssh_key_setup, is_local=False, ) hosts.append(new_host) save_hosts(hosts) selected = len(hosts) - 1 def main_tui(stdscr): curses.curs_set(0) stdscr.nodelay(False) hosts = load_hosts() current_host_index = 0 playbooks = collect_playbooks(PLAYBOOK_ROOT) selected_index = 0 if playbooks else -1 while True: stdscr.clear() max_y, max_x = stdscr.getmaxyx() # Header current_host = hosts[current_host_index] header = f"TUI Ansible Installer – Ziel: {current_host.name} ({'lokal' if current_host.is_local else current_host.host})" stdscr.addstr(0, 0, header[: max_x - 1]) # Hilfe help_line = "[↑/↓] Navigieren [Enter] Playbook ausführen [s] Server wählen [q] Beenden" stdscr.addstr(1, 0, help_line[: max_x - 1]) # Playbooks zeichnen start_row = 3 for i, (display, path, level) in enumerate(playbooks): row = start_row + i if row >= max_y - 1: break prefix = "➤ " if i == selected_index else " " line = f"{prefix}{display}" stdscr.addstr(row, 0, line[: max_x - 1]) if not playbooks: stdscr.addstr(start_row, 0, f"Keine Playbooks unter {PLAYBOOK_ROOT} gefunden.") stdscr.refresh() key = stdscr.getch() if key in (ord('q'), 27): # q oder ESC break if key in (curses.KEY_UP, ord('k')): if playbooks: selected_index = (selected_index - 1) % len(playbooks) elif key in (curses.KEY_DOWN, ord('j')): if playbooks: selected_index = (selected_index + 1) % len(playbooks) elif key in (ord('s'), ord('S')): current_host_index = server_menu(stdscr, hosts, current_host_index) elif key in (curses.KEY_ENTER, 10, 13): if playbooks and selected_index >= 0: _, pb_path, _ = playbooks[selected_index] run_playbook(pb_path, hosts[current_host_index], stdscr) def startup_check_and_run(): # Dependencies prüfen VOR curses, damit Frage klar sichtbar ist. missing = check_dependencies() if missing: print("Folgende benötigte Komponenten fehlen:") for m in missing: print(" -", m) ans = input("System vorbereiten und fehlende Komponenten installieren? [J/n]: ").strip().lower() if ans in ("", "j", "ja", "y", "yes"): install_dependencies(missing) else: print("Abhängigkeiten werden NICHT automatisch installiert. Die TUI kann eingeschränkt sein.") input("Weiter mit Enter...") curses.wrapper(main_tui) if __name__ == "__main__": startup_check_and_run()