Files
docker/scripts/docs/generate-diagrams.py
T

361 lines
14 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import re
import subprocess
import shutil
from pathlib import Path
import yaml
INTERNAL_DOMAIN_RE = re.compile(r"\b[a-zA-Z0-9.-]+\.lan\.ddnsgeek\.com\b")
HOST_MATCH_RE = re.compile(r"Host\(([^)]*)\)")
TICKED_HOST_RE = re.compile(r"`([^`]+)`")
def require_dot() -> None:
if not shutil.which("dot"):
raise SystemExit(
"Graphviz 'dot' not found in environment. Install graphviz before running docs generation."
)
def load_compose(path: Path) -> dict:
with path.open() as handle:
return yaml.safe_load(handle) or {}
PARENT_KEYS = ("node", "node_name", "host", "physical_host", "hypervisor_host", "proxmox_node")
def display_domain(value: str, mode: str) -> str:
if mode == "full":
return value
if mode == "placeholder":
return "<internal-domain>" if INTERNAL_DOMAIN_RE.search(value) else value
if INTERNAL_DOMAIN_RE.search(value):
label = re.sub(r"\.lan\.ddnsgeek\.com$", "", value)
return f"{label}.<domain>"
return value
def parse_labels(service: dict) -> dict[str, str]:
labels = service.get("labels") or {}
if isinstance(labels, list):
mapped = {}
for item in labels:
if "=" in str(item):
k, v = str(item).split("=", 1)
mapped[k] = v
return mapped
if isinstance(labels, dict):
return {str(k): str(v) for k, v in labels.items()}
return {}
def extract_hosts(rule: str) -> list[str]:
hosts: list[str] = []
for m in HOST_MATCH_RE.findall(rule or ""):
for host in TICKED_HOST_RE.findall(m):
hosts.append(host.strip())
return hosts
def render_svg(dot_path: Path, svg_path: Path) -> None:
subprocess.run(["dot", "-Tsvg", str(dot_path), "-o", str(svg_path)], check=True)
def write_dot(path: Path, lines: list[str]) -> None:
path.write_text("\n".join(lines) + "\n")
def infer_host(service_name: str, service: dict) -> str:
labels = parse_labels(service)
for key in ("com.docker.compose.project", "infra.host", "host"):
value = labels.get(key)
if value:
return value.lower()
s = service_name.lower()
if "raspi" in s or "pi" in s:
return "raspberrypi"
if "proxmox" in s:
return "proxmox"
return "docker"
def categorize_service(service_name: str) -> str:
s = service_name.lower()
if any(k in s for k in ["traefik", "authelia", "oauth", "auth", "proxy", "nginx", "caddy"]):
return "edge/proxy/auth"
if any(k in s for k in ["prometheus", "grafana", "loki", "promtail", "alert", "node-exporter", "cadvisor"]):
return "monitoring"
if any(k in s for k in ["watchtower", "diun", "ansible", "cron", "runner", "backup"]):
return "automation"
if any(k in s for k in ["postgres", "mariadb", "mysql", "redis", "minio", "nfs", "storage", "db", "queue"]):
return "storage/database/support"
return "apps"
def load_inventory(path: Path | None) -> dict:
if not path or not path.exists():
return {}
payload = json.loads(path.read_text())
return payload.get("value", payload) if isinstance(payload, dict) else {}
def to_records(data) -> list[dict]:
if not isinstance(data, dict):
return []
out = []
for key, value in data.items():
if isinstance(value, dict):
rec = dict(value)
rec.setdefault("_key", str(key))
rec.setdefault("name", rec.get("hostname") or rec.get("vm_name") or str(key))
out.append(rec)
return out
def parent_name(item: dict) -> str:
for k in PARENT_KEYS:
v = item.get(k)
if v:
return str(v)
return ""
def generate_physical_topology(compose: dict, inventory: dict, out_dot: Path, out_svg: Path) -> None:
physical = to_records(inventory.get("physical_hosts", {}))
virtual = to_records(inventory.get("virtual_hosts", {})) + to_records(inventory.get("vms", {}))
if not physical and not virtual:
lines = [
"digraph PhysicalTopology {",
" graph [rankdir=LR, fontname=\"Helvetica\", nodesep=1.0, ranksep=1.5];",
' "placeholder:inventory" [shape=note, style="filled", fillcolor="#fef3c7", label="Host inventory JSON not found.\\nGenerate terraform inventory and rerun scripts/docs/generate-all.sh\\n(--host-inventory <path>)."];',
]
lines.append("}")
write_dot(out_dot, lines)
render_svg(out_dot, out_svg)
return
lines = [
"digraph PhysicalTopology {",
" graph [rankdir=LR, compound=true, splines=polyline, nodesep=0.95, ranksep=1.7, ratio=compress, fontname=\"Helvetica\", fontsize=13, concentrate=true, newrank=true];",
" node [fontname=\"Helvetica\", fontsize=12, style=\"rounded,filled\", fillcolor=\"#ffffff\"];",
" edge [fontname=\"Helvetica\", fontsize=10, color=\"#64748b\"];",
]
phys_names = {str(p.get("name")): p for p in physical}
children: dict[str, list[dict]] = {k: [] for k in phys_names}
orphans: list[dict] = []
for vm in virtual:
parent = parent_name(vm)
if parent in children:
children[parent].append(vm)
else:
orphans.append(vm)
for host, record in sorted(phys_names.items()):
host_role = str(record.get("role", "") or "")
cluster_label = f"{host}\\n{host_role}" if host_role else host
lines.extend([
f' subgraph "cluster_{host}" {{',
f' label="{cluster_label}";',
' style="rounded,filled";',
' color="#60a5fa";',
' fillcolor="#eff6ff";',
f' "phys:{host}" [label="{host}", shape=box3d, fillcolor="#bfdbfe"];',
])
for vm in sorted(children.get(host, []), key=lambda x: str(x.get("name", "")).lower()):
vm_name = str(vm.get("name"))
vm_role = str(vm.get("role", "") or "virtual host")
cluster_id = f"cluster_{host}_{re.sub(r'[^a-zA-Z0-9]+', '_', vm_name)}"
lines.extend([
f' subgraph "{cluster_id}" {{',
f' label="{vm_name}";',
' style="rounded,dashed";',
' color="#bfdbfe";',
' fillcolor="#f8fbff";',
f' "vm:{vm_name}" [label="{vm_name}\\n{vm_role}", shape=component, fillcolor="#dcfce7"];',
])
if "docker" in vm_role.lower() or "docker" in vm_name.lower():
lines.append(f' "role:{vm_name}" [label="Docker host", shape=box, fillcolor="#fef3c7"];')
lines.append(f' "vm:{vm_name}" -> "role:{vm_name}" [style=dashed, label="runs"];')
lines.append(' }')
lines.append(f' "phys:{host}" -> "vm:{vm_name}" [label="hosts"];')
lines.append(' }')
if orphans:
lines.extend([
' subgraph "cluster_orphans" {',
' label="Unmapped virtual hosts"; style="rounded,dashed"; color="#d1d5db";',
])
for vm in sorted(orphans, key=lambda x: str(x.get("name", "")).lower()):
vm_name = str(vm.get("name"))
lines.append(f' "vm:{vm_name}" [label="{vm_name}", shape=component, fillcolor="#fee2e2"];')
lines.append(" }")
lines.extend([
' subgraph "cluster_legend" {',
' label="Legend"; style="rounded"; color="#d1d5db";',
' "leg_host" [label="Physical host", shape=box3d, fillcolor="#eff6ff"];',
' "leg_vm" [label="Virtual machine", shape=component, fillcolor="#dcfce7"];',
' "leg_role" [label="Hosted role", shape=box, fillcolor="#fef3c7"];',
' "leg_host" -> "leg_vm" [label="hosts"];',
' "leg_vm" -> "leg_role" [style=dashed, label="runs"];',
' }',
"}",
])
write_dot(out_dot, lines)
render_svg(out_dot, out_svg)
def generate_docker_traefik_dynu(compose: dict, dns_inventory: dict, domain_mode: str, out_dot: Path, out_svg: Path) -> None:
services = compose.get("services") or {}
networks = compose.get("networks") or {}
lines = [
"digraph DockerTraefikDynu {",
" graph [rankdir=LR, compound=true, splines=polyline, nodesep=0.9, ranksep=1.6, fontname=\"Helvetica\", concentrate=true, newrank=true];",
" node [fontname=\"Helvetica\", fontsize=11, style=\"rounded,filled\"];",
" edge [fontname=\"Helvetica\", fontsize=9, color=\"#334155\"];",
' "dynu" [label="Dynu / Public DNS", shape=box, fillcolor="#fde68a"];',
' "svc:traefik" [label="Traefik", shape=box, fillcolor="#bfdbfe"];',
' "dynu" -> "svc:traefik" [penwidth=1.6];',
]
routes: dict[str, dict] = {}
dns_nodes: set[str] = set()
for svc_name, svc in sorted(services.items()):
labels = parse_labels(svc)
router_prefix = "traefik.http.routers."
service_prefix = "traefik.http.services."
lb_ports = {}
for k, v in labels.items():
if k.startswith(service_prefix) and k.endswith(".loadbalancer.server.port"):
lb_ports[k[len(service_prefix):].split(".", 1)[0]] = v
routers = sorted({k[len(router_prefix):].split(".", 1)[0] for k in labels if k.startswith(router_prefix)})
for router in routers:
rule = labels.get(f"{router_prefix}{router}.rule", "")
target = labels.get(f"{router_prefix}{router}.service", svc_name)
middlewares = labels.get(f"{router_prefix}{router}.middlewares", "")
tls = labels.get(f"{router_prefix}{router}.tls", "false")
port = lb_ports.get(target, "")
badges = []
if str(tls).lower() in ("true", "1"):
badges.append("TLS")
mw_low = middlewares.lower()
if "authelia" in mw_low:
badges.append("authelia")
if "mtls" in mw_low:
badges.append("mTLS")
hosts = [display_domain(h, domain_mode) for h in extract_hosts(rule)]
if not hosts:
continue
info = routes.setdefault(svc_name, {"hosts": set(), "port": port, "badges": set()})
info["hosts"].update(hosts)
if port:
info["port"] = port
info["badges"].update(badges)
for svc_name, info in sorted(routes.items()):
label = svc_name
if info.get("port"):
label += f"\n:{info['port']}"
if info.get("badges"):
label += "\n[" + ", ".join(sorted(info["badges"])) + "]"
lines.append(f' "svc:{svc_name}" [label="{label}", shape=box, fillcolor="#dcfce7"];')
lines.append(f' "svc:traefik" -> "svc:{svc_name}" [penwidth=1.4];')
for host in sorted(info["hosts"]):
dns_nodes.add(host)
lines.append(f' "dns:{host}" [label="{host}", shape=note, fillcolor="#fef3c7"];')
lines.append(f' "dns:{host}" -> "dynu";')
for record in (dns_inventory.get("records", []) if isinstance(dns_inventory, dict) else []):
host = record.get("hostname") or record.get("name")
if not host:
continue
host_disp = display_domain(str(host), domain_mode)
if host_disp in dns_nodes:
continue
dns_nodes.add(host_disp)
lines.append(f' "dns:{host_disp}" [label="{host_disp}", shape=note, fillcolor="#fef3c7"];')
lines.append(f' "dns:{host_disp}" -> "dynu" [style=dashed, color=\"#94a3b8\"];')
lines.append(' { rank=same; ' + '; '.join([f'"dns:{d}"' for d in sorted(dns_nodes)]) + '; }' if dns_nodes else '')
lines.append(' subgraph "cluster_networks" {')
lines.append(' label="Docker backend networks"; style="rounded,dashed"; color="#d1d5db";')
for net in sorted(networks.keys()):
lines.append(f' "net:{net}" [label="{net}", shape=ellipse, fillcolor="#f8fafc"];')
lines.append(' }')
for svc_name in sorted(routes.keys()):
svc = services.get(svc_name, {})
svc_nets = svc.get("networks") or []
if isinstance(svc_nets, dict):
svc_nets = svc_nets.keys()
for net in svc_nets:
lines.append(f' "svc:{svc_name}" -> "net:{net}" [style=dashed, color="#94a3b8", arrowsize=0.7];')
lines.append("}")
write_dot(out_dot, lines)
render_svg(out_dot, out_svg)
def generate_compose_topology(compose: dict, out_dot: Path, out_svg: Path) -> None:
services = compose.get("services") or {}
networks = compose.get("networks") or {}
lines = [
"digraph Compose {",
" rankdir=LR;",
' node [fontname="Helvetica"];',
]
for service in sorted(services):
lines.append(f' "svc:{service}" [label="{service}", shape=box, style=filled, fillcolor="#dfefff"];')
for net in sorted(networks):
lines.append(f' "net:{net}" [label="{net}", shape=ellipse, style=filled, fillcolor="#f4f4f4"];')
for service, svc in sorted(services.items()):
svc_nets = svc.get("networks") or []
if isinstance(svc_nets, dict):
svc_nets = svc_nets.keys()
for net in svc_nets:
lines.append(f' "svc:{service}" -> "net:{net}";')
lines.append("}")
write_dot(out_dot, lines)
render_svg(out_dot, out_svg)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("legacy", nargs="*")
parser.add_argument("--compose")
parser.add_argument("--out-dir")
parser.add_argument("--host-inventory")
parser.add_argument("--dns-inventory")
parser.add_argument("--domain-display", choices=["full", "redacted-label", "placeholder"], default="redacted-label")
args = parser.parse_args()
require_dot()
if args.compose and args.out_dir:
compose_path = Path(args.compose)
out_dir = Path(args.out_dir)
elif len(args.legacy) == 3:
compose_path = Path(args.legacy[0])
out_dir = Path(args.legacy[1]).parent
else:
raise SystemExit("Usage: generate-diagrams.py --compose <compose.yml> --out-dir <dir>")
out_dir.mkdir(parents=True, exist_ok=True)
compose = load_compose(compose_path)
host_inventory = load_inventory(Path(args.host_inventory)) if args.host_inventory else {}
dns_inventory = load_inventory(Path(args.dns_inventory)) if args.dns_inventory else {}
generate_docker_traefik_dynu(compose, dns_inventory, args.domain_display, out_dir / "docker-traefik-dynu.dot", out_dir / "docker-traefik-dynu.svg")
generate_physical_topology(compose, host_inventory, out_dir / "physical-topology.dot", out_dir / "physical-topology.svg")
generate_compose_topology(compose, out_dir / "docker-compose.dot", out_dir / "docker-compose.svg")
if __name__ == "__main__":
main()