360 lines
14 KiB
Python
360 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import shutil
|
|
from pathlib import Path
|
|
import yaml
|
|
|
|
INTERNAL_DOMAIN_RE = re.compile(r"\b[a-zA-Z0-9.-]+\.lan\.ddnsgeek\.com\b")
|
|
HOST_MATCH_RE = re.compile(r"Host\(([^)]*)\)")
|
|
TICKED_HOST_RE = re.compile(r"`([^`]+)`")
|
|
|
|
|
|
def require_dot() -> None:
|
|
if not shutil.which("dot"):
|
|
raise SystemExit(
|
|
"Graphviz 'dot' not found in environment. Install graphviz before running docs generation."
|
|
)
|
|
|
|
|
|
def load_compose(path: Path) -> dict:
|
|
with path.open() as handle:
|
|
return yaml.safe_load(handle) or {}
|
|
|
|
|
|
PARENT_KEYS = ("node", "node_name", "host", "physical_host", "hypervisor_host", "proxmox_node")
|
|
|
|
|
|
def display_domain(value: str, mode: str) -> str:
|
|
if mode == "full":
|
|
return value
|
|
if mode == "placeholder":
|
|
return "<internal-domain>" if INTERNAL_DOMAIN_RE.search(value) else value
|
|
if INTERNAL_DOMAIN_RE.search(value):
|
|
label = re.sub(r"\.lan\.ddnsgeek\.com$", "", value)
|
|
return f"{label}.<domain>"
|
|
return value
|
|
|
|
|
|
def parse_labels(service: dict) -> dict[str, str]:
|
|
labels = service.get("labels") or {}
|
|
if isinstance(labels, list):
|
|
mapped = {}
|
|
for item in labels:
|
|
if "=" in str(item):
|
|
k, v = str(item).split("=", 1)
|
|
mapped[k] = v
|
|
return mapped
|
|
if isinstance(labels, dict):
|
|
return {str(k): str(v) for k, v in labels.items()}
|
|
return {}
|
|
|
|
|
|
def extract_hosts(rule: str) -> list[str]:
|
|
hosts: list[str] = []
|
|
for m in HOST_MATCH_RE.findall(rule or ""):
|
|
for host in TICKED_HOST_RE.findall(m):
|
|
hosts.append(host.strip())
|
|
return hosts
|
|
|
|
|
|
def render_svg(dot_path: Path, svg_path: Path) -> None:
|
|
subprocess.run(["dot", "-Tsvg", str(dot_path), "-o", str(svg_path)], check=True)
|
|
|
|
|
|
def write_dot(path: Path, lines: list[str]) -> None:
|
|
path.write_text("\n".join(lines) + "\n")
|
|
|
|
|
|
def infer_host(service_name: str, service: dict) -> str:
|
|
labels = parse_labels(service)
|
|
for key in ("com.docker.compose.project", "infra.host", "host"):
|
|
value = labels.get(key)
|
|
if value:
|
|
return value.lower()
|
|
s = service_name.lower()
|
|
if "raspi" in s or "pi" in s:
|
|
return "raspberrypi"
|
|
if "proxmox" in s:
|
|
return "proxmox"
|
|
return "docker"
|
|
|
|
|
|
def categorize_service(service_name: str) -> str:
|
|
s = service_name.lower()
|
|
if any(k in s for k in ["traefik", "authelia", "oauth", "auth", "proxy", "nginx", "caddy"]):
|
|
return "edge/proxy/auth"
|
|
if any(k in s for k in ["prometheus", "grafana", "loki", "promtail", "alert", "node-exporter", "cadvisor"]):
|
|
return "monitoring"
|
|
if any(k in s for k in ["watchtower", "diun", "ansible", "cron", "runner", "backup"]):
|
|
return "automation"
|
|
if any(k in s for k in ["postgres", "mariadb", "mysql", "redis", "minio", "nfs", "storage", "db", "queue"]):
|
|
return "storage/database/support"
|
|
return "apps"
|
|
|
|
|
|
def load_inventory(path: Path | None) -> dict:
|
|
if not path or not path.exists():
|
|
return {}
|
|
payload = json.loads(path.read_text())
|
|
return payload.get("value", payload) if isinstance(payload, dict) else {}
|
|
|
|
|
|
def to_records(data) -> list[dict]:
|
|
if not isinstance(data, dict):
|
|
return []
|
|
out = []
|
|
for key, value in data.items():
|
|
if isinstance(value, dict):
|
|
rec = dict(value)
|
|
rec.setdefault("_key", str(key))
|
|
rec.setdefault("name", rec.get("hostname") or rec.get("vm_name") or str(key))
|
|
out.append(rec)
|
|
return out
|
|
|
|
|
|
def parent_name(item: dict) -> str:
|
|
for k in PARENT_KEYS:
|
|
v = item.get(k)
|
|
if v:
|
|
return str(v)
|
|
return ""
|
|
|
|
|
|
def generate_physical_topology(compose: dict, inventory: dict, out_dot: Path, out_svg: Path) -> None:
|
|
physical = to_records(inventory.get("physical_hosts", {}))
|
|
virtual = to_records(inventory.get("virtual_hosts", {})) + to_records(inventory.get("vms", {}))
|
|
if not physical and not virtual:
|
|
lines = [
|
|
"digraph PhysicalTopology {",
|
|
" graph [rankdir=LR, fontname=\"Helvetica\", nodesep=1.0, ranksep=1.5];",
|
|
' "placeholder:inventory" [shape=note, style="filled", fillcolor="#fef3c7", label="Host inventory JSON not found.\\nGenerate terraform inventory and rerun scripts/docs/generate-all.sh\\n(--host-inventory <path>)."];',
|
|
]
|
|
lines.append("}")
|
|
write_dot(out_dot, lines)
|
|
render_svg(out_dot, out_svg)
|
|
return
|
|
|
|
lines = [
|
|
"digraph PhysicalTopology {",
|
|
" graph [rankdir=LR, compound=true, splines=polyline, nodesep=0.95, ranksep=1.7, ratio=compress, fontname=\"Helvetica\", fontsize=13, concentrate=true, newrank=true];",
|
|
" node [fontname=\"Helvetica\", fontsize=12, style=\"rounded,filled\", fillcolor=\"#ffffff\"];",
|
|
" edge [fontname=\"Helvetica\", fontsize=10, color=\"#64748b\"];",
|
|
]
|
|
phys_names = {str(p.get("name")): p for p in physical}
|
|
children: dict[str, list[dict]] = {k: [] for k in phys_names}
|
|
orphans: list[dict] = []
|
|
for vm in virtual:
|
|
parent = parent_name(vm)
|
|
if parent in children:
|
|
children[parent].append(vm)
|
|
else:
|
|
orphans.append(vm)
|
|
for host, record in sorted(phys_names.items()):
|
|
host_role = record.get("role", "")
|
|
lines.extend([
|
|
f' subgraph "cluster_{host}" {{',
|
|
f' label="{host}\\n{host_role}".strip();',
|
|
' style="rounded,filled";',
|
|
' color="#60a5fa";',
|
|
' fillcolor="#eff6ff";',
|
|
f' "phys:{host}" [label="{host}", shape=box3d, fillcolor="#bfdbfe"];',
|
|
])
|
|
for vm in sorted(children.get(host, []), key=lambda x: str(x.get("name", "")).lower()):
|
|
vm_name = str(vm.get("name"))
|
|
vm_role = str(vm.get("role", "") or "virtual host")
|
|
cluster_id = f"cluster_{host}_{re.sub(r'[^a-zA-Z0-9]+', '_', vm_name)}"
|
|
lines.extend([
|
|
f' subgraph "{cluster_id}" {{',
|
|
f' label="{vm_name}";',
|
|
' style="rounded,dashed";',
|
|
' color="#bfdbfe";',
|
|
' fillcolor="#f8fbff";',
|
|
f' "vm:{vm_name}" [label="{vm_name}\\n{vm_role}", shape=component, fillcolor="#dcfce7"];',
|
|
])
|
|
if "docker" in vm_role.lower() or "docker" in vm_name.lower():
|
|
lines.append(f' "role:{vm_name}" [label="Docker host", shape=box, fillcolor="#fef3c7"];')
|
|
lines.append(f' "vm:{vm_name}" -> "role:{vm_name}" [style=dashed, label="runs"];')
|
|
lines.append(' }')
|
|
lines.append(f' "phys:{host}" -> "vm:{vm_name}" [label="hosts"];')
|
|
lines.append(' }')
|
|
if orphans:
|
|
lines.extend([
|
|
' subgraph "cluster_orphans" {',
|
|
' label="Unmapped virtual hosts"; style="rounded,dashed"; color="#d1d5db";',
|
|
])
|
|
for vm in sorted(orphans, key=lambda x: str(x.get("name", "")).lower()):
|
|
vm_name = str(vm.get("name"))
|
|
lines.append(f' "vm:{vm_name}" [label="{vm_name}", shape=component, fillcolor="#fee2e2"];')
|
|
lines.append(" }")
|
|
|
|
lines.extend([
|
|
' subgraph "cluster_legend" {',
|
|
' label="Legend"; style="rounded"; color="#d1d5db";',
|
|
' "leg_host" [label="Physical host", shape=box3d, fillcolor="#eff6ff"];',
|
|
' "leg_vm" [label="Virtual machine", shape=component, fillcolor="#dcfce7"];',
|
|
' "leg_role" [label="Hosted role", shape=box, fillcolor="#fef3c7"];',
|
|
' "leg_host" -> "leg_vm" [label="hosts"];',
|
|
' "leg_vm" -> "leg_role" [style=dashed, label="runs"];',
|
|
' }',
|
|
"}",
|
|
])
|
|
write_dot(out_dot, lines)
|
|
render_svg(out_dot, out_svg)
|
|
|
|
|
|
def generate_docker_traefik_dynu(compose: dict, dns_inventory: dict, domain_mode: str, out_dot: Path, out_svg: Path) -> None:
|
|
services = compose.get("services") or {}
|
|
networks = compose.get("networks") or {}
|
|
lines = [
|
|
"digraph DockerTraefikDynu {",
|
|
" graph [rankdir=LR, compound=true, splines=polyline, nodesep=0.9, ranksep=1.6, fontname=\"Helvetica\", concentrate=true, newrank=true];",
|
|
" node [fontname=\"Helvetica\", fontsize=11, style=\"rounded,filled\"];",
|
|
" edge [fontname=\"Helvetica\", fontsize=9, color=\"#334155\"];",
|
|
' "dynu" [label="Dynu / Public DNS", shape=box, fillcolor="#fde68a"];',
|
|
' "svc:traefik" [label="Traefik", shape=box, fillcolor="#bfdbfe"];',
|
|
' "dynu" -> "svc:traefik" [penwidth=1.6];',
|
|
]
|
|
|
|
routes: dict[str, dict] = {}
|
|
dns_nodes: set[str] = set()
|
|
|
|
for svc_name, svc in sorted(services.items()):
|
|
labels = parse_labels(svc)
|
|
router_prefix = "traefik.http.routers."
|
|
service_prefix = "traefik.http.services."
|
|
lb_ports = {}
|
|
for k, v in labels.items():
|
|
if k.startswith(service_prefix) and k.endswith(".loadbalancer.server.port"):
|
|
lb_ports[k[len(service_prefix):].split(".", 1)[0]] = v
|
|
|
|
routers = sorted({k[len(router_prefix):].split(".", 1)[0] for k in labels if k.startswith(router_prefix)})
|
|
for router in routers:
|
|
rule = labels.get(f"{router_prefix}{router}.rule", "")
|
|
target = labels.get(f"{router_prefix}{router}.service", svc_name)
|
|
middlewares = labels.get(f"{router_prefix}{router}.middlewares", "")
|
|
tls = labels.get(f"{router_prefix}{router}.tls", "false")
|
|
port = lb_ports.get(target, "")
|
|
badges = []
|
|
if str(tls).lower() in ("true", "1"):
|
|
badges.append("TLS")
|
|
mw_low = middlewares.lower()
|
|
if "authelia" in mw_low:
|
|
badges.append("authelia")
|
|
if "mtls" in mw_low:
|
|
badges.append("mTLS")
|
|
hosts = [display_domain(h, domain_mode) for h in extract_hosts(rule)]
|
|
if not hosts:
|
|
continue
|
|
info = routes.setdefault(svc_name, {"hosts": set(), "port": port, "badges": set()})
|
|
info["hosts"].update(hosts)
|
|
if port:
|
|
info["port"] = port
|
|
info["badges"].update(badges)
|
|
|
|
for svc_name, info in sorted(routes.items()):
|
|
label = svc_name
|
|
if info.get("port"):
|
|
label += f"\n:{info['port']}"
|
|
if info.get("badges"):
|
|
label += "\n[" + ", ".join(sorted(info["badges"])) + "]"
|
|
lines.append(f' "svc:{svc_name}" [label="{label}", shape=box, fillcolor="#dcfce7"];')
|
|
lines.append(f' "svc:traefik" -> "svc:{svc_name}" [penwidth=1.4];')
|
|
for host in sorted(info["hosts"]):
|
|
dns_nodes.add(host)
|
|
lines.append(f' "dns:{host}" [label="{host}", shape=note, fillcolor="#fef3c7"];')
|
|
lines.append(f' "dns:{host}" -> "dynu";')
|
|
for record in (dns_inventory.get("records", []) if isinstance(dns_inventory, dict) else []):
|
|
host = record.get("hostname") or record.get("name")
|
|
if not host:
|
|
continue
|
|
host_disp = display_domain(str(host), domain_mode)
|
|
if host_disp in dns_nodes:
|
|
continue
|
|
dns_nodes.add(host_disp)
|
|
lines.append(f' "dns:{host_disp}" [label="{host_disp}", shape=note, fillcolor="#fef3c7"];')
|
|
lines.append(f' "dns:{host_disp}" -> "dynu" [style=dashed, color=\"#94a3b8\"];')
|
|
|
|
lines.append(' { rank=same; ' + '; '.join([f'"dns:{d}"' for d in sorted(dns_nodes)]) + '; }' if dns_nodes else '')
|
|
|
|
lines.append(' subgraph "cluster_networks" {')
|
|
lines.append(' label="Docker backend networks"; style="rounded,dashed"; color="#d1d5db";')
|
|
for net in sorted(networks.keys()):
|
|
lines.append(f' "net:{net}" [label="{net}", shape=ellipse, fillcolor="#f8fafc"];')
|
|
lines.append(' }')
|
|
|
|
for svc_name in sorted(routes.keys()):
|
|
svc = services.get(svc_name, {})
|
|
svc_nets = svc.get("networks") or []
|
|
if isinstance(svc_nets, dict):
|
|
svc_nets = svc_nets.keys()
|
|
for net in svc_nets:
|
|
lines.append(f' "svc:{svc_name}" -> "net:{net}" [style=dashed, color="#94a3b8", arrowsize=0.7];')
|
|
|
|
lines.append("}")
|
|
write_dot(out_dot, lines)
|
|
render_svg(out_dot, out_svg)
|
|
|
|
|
|
|
|
def generate_compose_topology(compose: dict, out_dot: Path, out_svg: Path) -> None:
|
|
services = compose.get("services") or {}
|
|
networks = compose.get("networks") or {}
|
|
lines = [
|
|
"digraph Compose {",
|
|
" rankdir=LR;",
|
|
' node [fontname="Helvetica"];',
|
|
]
|
|
for service in sorted(services):
|
|
lines.append(f' "svc:{service}" [label="{service}", shape=box, style=filled, fillcolor="#dfefff"];')
|
|
for net in sorted(networks):
|
|
lines.append(f' "net:{net}" [label="{net}", shape=ellipse, style=filled, fillcolor="#f4f4f4"];')
|
|
|
|
for service, svc in sorted(services.items()):
|
|
svc_nets = svc.get("networks") or []
|
|
if isinstance(svc_nets, dict):
|
|
svc_nets = svc_nets.keys()
|
|
for net in svc_nets:
|
|
lines.append(f' "svc:{service}" -> "net:{net}";')
|
|
|
|
lines.append("}")
|
|
write_dot(out_dot, lines)
|
|
render_svg(out_dot, out_svg)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("legacy", nargs="*")
|
|
parser.add_argument("--compose")
|
|
parser.add_argument("--out-dir")
|
|
parser.add_argument("--host-inventory")
|
|
parser.add_argument("--dns-inventory")
|
|
parser.add_argument("--domain-display", choices=["full", "redacted-label", "placeholder"], default="redacted-label")
|
|
args = parser.parse_args()
|
|
|
|
require_dot()
|
|
|
|
if args.compose and args.out_dir:
|
|
compose_path = Path(args.compose)
|
|
out_dir = Path(args.out_dir)
|
|
elif len(args.legacy) == 3:
|
|
compose_path = Path(args.legacy[0])
|
|
out_dir = Path(args.legacy[1]).parent
|
|
else:
|
|
raise SystemExit("Usage: generate-diagrams.py --compose <compose.yml> --out-dir <dir>")
|
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
compose = load_compose(compose_path)
|
|
host_inventory = load_inventory(Path(args.host_inventory)) if args.host_inventory else {}
|
|
dns_inventory = load_inventory(Path(args.dns_inventory)) if args.dns_inventory else {}
|
|
|
|
generate_docker_traefik_dynu(compose, dns_inventory, args.domain_display, out_dir / "docker-traefik-dynu.dot", out_dir / "docker-traefik-dynu.svg")
|
|
generate_physical_topology(compose, host_inventory, out_dir / "physical-topology.dot", out_dir / "physical-topology.svg")
|
|
generate_compose_topology(compose, out_dir / "docker-compose.dot", out_dir / "docker-compose.svg")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|