#!/usr/bin/env python3 """Render Prometheus inventory into documentation and Mermaid diagrams.""" from __future__ import annotations import argparse import json import re from collections import defaultdict from pathlib import Path from typing import Any from urllib.parse import urlparse GENERATED_BEGIN = "" GENERATED_END = "" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--inventory-file", default="docs/runtime/prometheus-inventory.json", help="Path to normalized Prometheus inventory JSON.", ) parser.add_argument("--docs-dir", default="docs", help="Documentation directory.") parser.add_argument("--diagrams-dir", default="docs/diagrams", help="Diagram output directory.") parser.add_argument("--readme-file", default="README.md", help="README path for regeneration notes.") parser.add_argument("--architecture-file", default="docs/architecture.md", help="Architecture markdown path.") parser.add_argument("--network-file", default="docs/network.md", help="Network markdown path.") parser.add_argument("--coverage-file", default="docs/monitoring-coverage.md", help="Coverage markdown path.") parser.add_argument("--dry-run", action="store_true", help="Print changes instead of writing files.") parser.add_argument("--verbose", action="store_true", help="Print detailed processing output.") return parser.parse_args() def load_json(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as handle: data = json.load(handle) if not isinstance(data, dict): raise ValueError(f"Inventory must be a JSON object: {path}") return data def merged_labels(target: dict[str, Any]) -> dict[str, str]: discovered = target.get("discovered_labels") or {} labels = target.get("labels") or {} merged = {k: str(v) for k, v in discovered.items()} merged.update({k: str(v) for k, v in labels.items()}) return merged def normalize_targets(inventory: dict[str, Any]) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] for target in inventory.get("targets") or []: if not isinstance(target, dict): continue labels = merged_labels(target) parsed = urlparse(str(target.get("scrape_url") or "")) host = parsed.hostname or labels.get("hostname") or "unknown" endpoint = parsed.path or "/metrics" normalized.append( { "job": str(target.get("job") or labels.get("job") or ""), "instance": str(target.get("instance") or labels.get("instance") or ""), "health": str(target.get("health") or "unknown"), "scrape_url": str(target.get("scrape_url") or ""), "last_error": str(target.get("last_error") or ""), "host": host, "endpoint": endpoint, "service": labels.get("service", "unknown"), "role": labels.get("role", "unknown"), "hypervisor": labels.get("hypervisor", "unknown"), "network": labels.get("network", "unknown"), "exposure": labels.get("exposure", "unknown"), } ) normalized.sort(key=lambda t: (t["job"], t["instance"], t["scrape_url"])) return normalized def markdown_table(headers: list[str], rows: list[list[str]]) -> str: line = "| " + " | ".join(headers) + " |" sep = "| " + " | ".join(["---"] * len(headers)) + " |" body = ["| " + " | ".join(row) + " |" for row in rows] return "\n".join([line, sep, *body]) def summarize_targets(targets: list[dict[str, Any]], unhealthy: list[dict[str, Any]]) -> dict[str, Any]: by_job: dict[str, dict[str, int]] = defaultdict(lambda: {"active": 0, "unhealthy": 0}) by_instance: dict[str, dict[str, Any]] = defaultdict(lambda: {"jobs": set(), "unhealthy": 0, "total": 0}) by_service: dict[str, dict[str, Any]] = defaultdict(lambda: {"instances": set(), "unhealthy": 0, "total": 0}) by_exposure: dict[str, int] = defaultdict(int) unhealthy_set = {(u["job"], u["instance"], u["scrape_url"]) for u in unhealthy} for target in targets: job = target["job"] instance = target["instance"] service = target["service"] if target["service"] != "" else "unknown" by_job[job]["active"] += 1 key = (target["job"], target["instance"], target["scrape_url"]) if key in unhealthy_set or target["health"] != "up": by_job[job]["unhealthy"] += 1 by_instance[instance]["unhealthy"] += 1 by_service[service]["unhealthy"] += 1 by_instance[instance]["jobs"].add(job) by_instance[instance]["total"] += 1 by_service[service]["instances"].add(instance) by_service[service]["total"] += 1 by_exposure[target["exposure"]] += 1 return { "by_job": dict(sorted(by_job.items())), "by_instance": { key: { "jobs": sorted(value["jobs"]), "unhealthy": value["unhealthy"], "total": value["total"], } for key, value in sorted(by_instance.items()) }, "by_service": { key: { "instances": sorted(value["instances"]), "unhealthy": value["unhealthy"], "total": value["total"], } for key, value in sorted(by_service.items()) }, "by_exposure": dict(sorted(by_exposure.items())), } def render_monitoring_coverage(inventory: dict[str, Any], targets: list[dict[str, Any]]) -> str: unhealthy = normalize_targets({"targets": inventory.get("unhealthy_targets") or []}) summaries = summarize_targets(targets, unhealthy) missing = (inventory.get("unknowns") or {}).get("missing_label_counts") or {} lines = [ "# Monitoring Coverage", "", "## Overview", "", "This page is generated from Prometheus-observed runtime inventory. It supplements declared architecture docs and does not replace static source-of-truth configuration.", "", f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`", f"- Prometheus URL: `{inventory.get('prometheus_url', 'unknown')}`", f"- Active scrape targets observed: `{len(targets)}`", f"- Unhealthy scrape targets observed: `{len(unhealthy)}`", "", "## Coverage by job", "", ] job_rows = [ [job, str(data["active"]), str(data["unhealthy"])] for job, data in summaries["by_job"].items() ] or [["none", "0", "0"]] lines.append(markdown_table(["job", "active targets", "unhealthy targets"], job_rows)) lines.extend(["", "## Coverage by instance", ""]) instance_rows = [ [instance, ", ".join(data["jobs"]), f"{data['total'] - data['unhealthy']}/{data['total']} up"] for instance, data in summaries["by_instance"].items() ] or [["none", "", ""]] lines.append(markdown_table(["instance", "jobs", "health"], instance_rows)) lines.extend(["", "## Coverage by service", ""]) service_rows = [ [service, ", ".join(data["instances"]), f"{data['total'] - data['unhealthy']}/{data['total']} up"] for service, data in summaries["by_service"].items() ] or [["unknown", "", ""]] lines.append(markdown_table(["service", "instances", "health"], service_rows)) lines.extend(["", "## Unhealthy targets", ""]) unhealthy_rows = [ [u["job"], u["instance"], u["scrape_url"], u["health"], u["last_error"] or "none"] for u in unhealthy ] or [["none", "", "", "", ""]] lines.append(markdown_table(["job", "instance", "scrape URL", "health", "last error"], unhealthy_rows)) lines.extend(["", "## Unknowns / missing metadata", ""]) missing_rows = [[k, str(v)] for k, v in sorted(missing.items())] or [["none", "0"]] lines.append(markdown_table(["label", "targets missing"], missing_rows)) lines.extend( [ "", "Unknown or missing metadata is treated as `unknown` in generated summaries to avoid over-claiming topology.", "", "## Regeneration instructions", "", "```bash", "python3 scripts/render_prometheus_docs.py --inventory-file docs/runtime/prometheus-inventory.json", "```", "", ] ) return "\n".join(lines) def render_network_doc(inventory: dict[str, Any], targets: list[dict[str, Any]]) -> str: summaries = summarize_targets(targets, normalize_targets({"targets": inventory.get("unhealthy_targets") or []})) endpoint_rows = [[t["job"], t["instance"], t["scrape_url"], t["network"], t["exposure"]] for t in targets] endpoint_rows = endpoint_rows or [["none", "", "", "", ""]] exposure_rows = [[exp, str(count)] for exp, count in summaries["by_exposure"].items()] or [["unknown", "0"]] paths = sorted({t["endpoint"] for t in targets}) path_rows = [[path, str(sum(1 for t in targets if t["endpoint"] == path))] for path in paths] or [["/metrics", "0"]] lines = [ "# Network and Exposure View (Prometheus Observed)", "", "## Overview", "", "This document is generated from Prometheus scrape metadata and endpoint URLs. It is an observed monitoring view and not a physical network map.", "", f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`", "- Physical topology, VLAN mapping, and bridge membership remain unknown unless explicitly documented elsewhere.", "", "## Observed scrape endpoints", "", markdown_table(["job", "instance", "scrape URL", "network label", "exposure label"], endpoint_rows), "", "## Internal vs public indicators", "", markdown_table(["exposure label", "targets"], exposure_rows), "", "All indicators above are label-derived. Missing labels are rendered as `unknown`.", "", "## Monitoring paths", "", markdown_table(["metrics path", "observed targets"], path_rows), "", "## Unknowns and limits", "", "- Prometheus can confirm scrape reachability but not ownership or placement boundaries.", "- No VLAN, switch, or hypervisor placement is inferred unless present in inventory labels.", "- Treat this as runtime evidence to pair with declared architecture docs.", "", ] return "\n".join(lines) def render_architecture_section(inventory: dict[str, Any], targets: list[dict[str, Any]]) -> str: summaries = summarize_targets(targets, normalize_targets({"targets": inventory.get("unhealthy_targets") or []})) notes = inventory.get("notes") or [] lines = [ "## Runtime visibility from Prometheus", "", GENERATED_BEGIN, "", "Prometheus inventory provides **observed runtime coverage** of scrape targets. It complements (but does not replace) declared architecture in Compose files and static docs.", "", f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`", f"- Observed jobs: `{len(summaries['by_job'])}`", f"- Observed instances: `{len(summaries['by_instance'])}`", f"- Observed services (label-derived): `{len(summaries['by_service'])}`", "", "### Observed monitoring view", "", markdown_table( ["job", "targets", "unhealthy"], [[job, str(data["active"]), str(data["unhealthy"])] for job, data in summaries["by_job"].items()] or [["none", "0", "0"]], ), "", "### Data sources", "", "- `docs/runtime/prometheus-inventory.json` (normalized runtime export)", "- Prometheus scrape metadata (`targets` + label sets)", "- Existing repository architecture docs for declared topology", ] if notes: lines.extend(["", "### Notes from inventory", ""]) for note in notes: lines.append(f"- {note}") lines.extend(["", GENERATED_END, ""]) return "\n".join(lines) def upsert_generated_section(path: Path, section_markdown: str, dry_run: bool, verbose: bool) -> None: existing = path.read_text(encoding="utf-8") if path.exists() else "" section_body = section_markdown if GENERATED_BEGIN in existing and GENERATED_END in existing: pattern = re.compile( rf"{re.escape(GENERATED_BEGIN)}.*?{re.escape(GENERATED_END)}", re.DOTALL, ) replacement = "\n".join( line for line in section_body.splitlines() if line.strip() not in {"## Runtime visibility from Prometheus"} ) updated = pattern.sub(replacement.strip(), existing) else: updated = existing.rstrip() + "\n\n" + section_body.strip() + "\n" write_file(path, updated, dry_run=dry_run, verbose=verbose) def mermaid_safe_id(value: str) -> str: safe = re.sub(r"[^a-zA-Z0-9_]", "_", value) safe = re.sub(r"_+", "_", safe).strip("_") return safe or "unknown" def render_monitoring_mermaid(targets: list[dict[str, Any]]) -> str: by_host: dict[str, list[dict[str, Any]]] = defaultdict(list) for target in targets: by_host[target["host"]].append(target) lines = [ "flowchart LR", " Prom[Prometheus]", "", " classDef scrape stroke-dasharray: 5 5;", ] for host, host_targets in sorted(by_host.items()): host_id = mermaid_safe_id(f"host_{host}") lines.append(f' subgraph {host_id}["Host: {host}"]') for target in sorted(host_targets, key=lambda t: (t["job"], t["instance"])): tid = mermaid_safe_id(f"{target['job']}_{target['instance']}") label = f"{target['job']}
{target['instance']}" lines.append(f' {tid}["{label}"]') lines.append(" end") lines.append("") for target in targets: tid = mermaid_safe_id(f"{target['job']}_{target['instance']}") lines.append(f" Prom -. scrape .-> {tid}") lines.append(f" class {tid} scrape;") return "\n".join(lines) + "\n" def render_architecture_mermaid(targets: list[dict[str, Any]]) -> str: jobs = sorted({t["job"] for t in targets}) lines = [ "flowchart TB", " Declared[Declared architecture
(Compose + docs)]", " Runtime[Observed runtime
(Prometheus inventory)]", " Declared --> Runtime", "", ' subgraph Monitoring["Prometheus observed jobs"]', ] for job in jobs: jid = mermaid_safe_id(f"job_{job}") lines.append(f' {jid}["{job}"]') lines.extend([" end", "", " Runtime --> Monitoring", ""]) return "\n".join(lines) def write_file(path: Path, content: str, dry_run: bool, verbose: bool) -> None: if dry_run: print(f"[DRY RUN] Would write: {path}") return path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") if verbose: print(f"Wrote {path}") def update_readme(path: Path, dry_run: bool, verbose: bool) -> None: if not path.exists(): return existing = path.read_text(encoding="utf-8") marker = "## Prometheus Runtime Inventory Export" snippet = ( "\nRegenerate derived docs/diagrams from inventory:\n\n" "```bash\n" "python3 scripts/render_prometheus_docs.py --inventory-file docs/runtime/prometheus-inventory.json\n" "```\n" ) if marker in existing and "scripts/render_prometheus_docs.py" not in existing: updated = existing.replace(marker, marker + snippet) write_file(path, updated, dry_run=dry_run, verbose=verbose) def main() -> int: args = parse_args() inventory_path = Path(args.inventory_file) docs_dir = Path(args.docs_dir) diagrams_dir = Path(args.diagrams_dir) inventory = load_json(inventory_path) targets = normalize_targets(inventory) coverage_path = Path(args.coverage_file) network_path = Path(args.network_file) architecture_path = Path(args.architecture_file) coverage_md = render_monitoring_coverage(inventory, targets) network_md = render_network_doc(inventory, targets) architecture_section = render_architecture_section(inventory, targets) monitoring_mmd = render_monitoring_mermaid(targets) architecture_mmd = render_architecture_mermaid(targets) def resolve_doc_path(path: Path) -> Path: if path.is_absolute(): return path if len(path.parts) == 1: return docs_dir / path return path write_file(resolve_doc_path(coverage_path), coverage_md, args.dry_run, args.verbose) write_file(resolve_doc_path(network_path), network_md, args.dry_run, args.verbose) upsert_generated_section( resolve_doc_path(architecture_path), architecture_section, args.dry_run, args.verbose, ) write_file(diagrams_dir / "monitoring-coverage.mmd", monitoring_mmd, args.dry_run, args.verbose) write_file(diagrams_dir / "architecture.mmd", architecture_mmd, args.dry_run, args.verbose) update_readme(Path(args.readme_file), args.dry_run, args.verbose) if args.verbose: print(f"Processed {len(targets)} targets from {inventory_path}") return 0 if __name__ == "__main__": raise SystemExit(main())