docs: fix Mermaid labels for GitHub parser compatibility
This commit is contained in:
Executable
+427
@@ -0,0 +1,427 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Render Prometheus inventory into documentation and Mermaid diagrams."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
GENERATED_BEGIN = "<!-- BEGIN GENERATED PROMETHEUS SECTION -->"
|
||||
GENERATED_END = "<!-- END GENERATED PROMETHEUS SECTION -->"
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"--inventory-file",
|
||||
default="docs/runtime/prometheus-inventory.json",
|
||||
help="Path to normalized Prometheus inventory JSON.",
|
||||
)
|
||||
parser.add_argument("--docs-dir", default="docs", help="Documentation directory.")
|
||||
parser.add_argument("--diagrams-dir", default="docs/diagrams", help="Diagram output directory.")
|
||||
parser.add_argument("--readme-file", default="README.md", help="README path for regeneration notes.")
|
||||
parser.add_argument("--architecture-file", default="docs/architecture.md", help="Architecture markdown path.")
|
||||
parser.add_argument("--network-file", default="docs/network.md", help="Network markdown path.")
|
||||
parser.add_argument("--coverage-file", default="docs/monitoring-coverage.md", help="Coverage markdown path.")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Print changes instead of writing files.")
|
||||
parser.add_argument("--verbose", action="store_true", help="Print detailed processing output.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def load_json(path: Path) -> dict[str, Any]:
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
data = json.load(handle)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError(f"Inventory must be a JSON object: {path}")
|
||||
return data
|
||||
|
||||
|
||||
def merged_labels(target: dict[str, Any]) -> dict[str, str]:
|
||||
discovered = target.get("discovered_labels") or {}
|
||||
labels = target.get("labels") or {}
|
||||
merged = {k: str(v) for k, v in discovered.items()}
|
||||
merged.update({k: str(v) for k, v in labels.items()})
|
||||
return merged
|
||||
|
||||
|
||||
def normalize_targets(inventory: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
normalized: list[dict[str, Any]] = []
|
||||
for target in inventory.get("targets") or []:
|
||||
if not isinstance(target, dict):
|
||||
continue
|
||||
labels = merged_labels(target)
|
||||
parsed = urlparse(str(target.get("scrape_url") or ""))
|
||||
host = parsed.hostname or labels.get("hostname") or "unknown"
|
||||
endpoint = parsed.path or "/metrics"
|
||||
normalized.append(
|
||||
{
|
||||
"job": str(target.get("job") or labels.get("job") or "<missing>"),
|
||||
"instance": str(target.get("instance") or labels.get("instance") or "<missing>"),
|
||||
"health": str(target.get("health") or "unknown"),
|
||||
"scrape_url": str(target.get("scrape_url") or ""),
|
||||
"last_error": str(target.get("last_error") or ""),
|
||||
"host": host,
|
||||
"endpoint": endpoint,
|
||||
"service": labels.get("service", "unknown"),
|
||||
"role": labels.get("role", "unknown"),
|
||||
"hypervisor": labels.get("hypervisor", "unknown"),
|
||||
"network": labels.get("network", "unknown"),
|
||||
"exposure": labels.get("exposure", "unknown"),
|
||||
}
|
||||
)
|
||||
normalized.sort(key=lambda t: (t["job"], t["instance"], t["scrape_url"]))
|
||||
return normalized
|
||||
|
||||
|
||||
def markdown_table(headers: list[str], rows: list[list[str]]) -> str:
|
||||
line = "| " + " | ".join(headers) + " |"
|
||||
sep = "| " + " | ".join(["---"] * len(headers)) + " |"
|
||||
body = ["| " + " | ".join(row) + " |" for row in rows]
|
||||
return "\n".join([line, sep, *body])
|
||||
|
||||
|
||||
def summarize_targets(targets: list[dict[str, Any]], unhealthy: list[dict[str, Any]]) -> dict[str, Any]:
|
||||
by_job: dict[str, dict[str, int]] = defaultdict(lambda: {"active": 0, "unhealthy": 0})
|
||||
by_instance: dict[str, dict[str, Any]] = defaultdict(lambda: {"jobs": set(), "unhealthy": 0, "total": 0})
|
||||
by_service: dict[str, dict[str, Any]] = defaultdict(lambda: {"instances": set(), "unhealthy": 0, "total": 0})
|
||||
by_exposure: dict[str, int] = defaultdict(int)
|
||||
|
||||
unhealthy_set = {(u["job"], u["instance"], u["scrape_url"]) for u in unhealthy}
|
||||
|
||||
for target in targets:
|
||||
job = target["job"]
|
||||
instance = target["instance"]
|
||||
service = target["service"] if target["service"] != "<missing>" else "unknown"
|
||||
by_job[job]["active"] += 1
|
||||
key = (target["job"], target["instance"], target["scrape_url"])
|
||||
if key in unhealthy_set or target["health"] != "up":
|
||||
by_job[job]["unhealthy"] += 1
|
||||
by_instance[instance]["unhealthy"] += 1
|
||||
by_service[service]["unhealthy"] += 1
|
||||
|
||||
by_instance[instance]["jobs"].add(job)
|
||||
by_instance[instance]["total"] += 1
|
||||
by_service[service]["instances"].add(instance)
|
||||
by_service[service]["total"] += 1
|
||||
by_exposure[target["exposure"]] += 1
|
||||
|
||||
return {
|
||||
"by_job": dict(sorted(by_job.items())),
|
||||
"by_instance": {
|
||||
key: {
|
||||
"jobs": sorted(value["jobs"]),
|
||||
"unhealthy": value["unhealthy"],
|
||||
"total": value["total"],
|
||||
}
|
||||
for key, value in sorted(by_instance.items())
|
||||
},
|
||||
"by_service": {
|
||||
key: {
|
||||
"instances": sorted(value["instances"]),
|
||||
"unhealthy": value["unhealthy"],
|
||||
"total": value["total"],
|
||||
}
|
||||
for key, value in sorted(by_service.items())
|
||||
},
|
||||
"by_exposure": dict(sorted(by_exposure.items())),
|
||||
}
|
||||
|
||||
|
||||
def render_monitoring_coverage(inventory: dict[str, Any], targets: list[dict[str, Any]]) -> str:
|
||||
unhealthy = normalize_targets({"targets": inventory.get("unhealthy_targets") or []})
|
||||
summaries = summarize_targets(targets, unhealthy)
|
||||
missing = (inventory.get("unknowns") or {}).get("missing_label_counts") or {}
|
||||
|
||||
lines = [
|
||||
"# Monitoring Coverage",
|
||||
"",
|
||||
"## Overview",
|
||||
"",
|
||||
"This page is generated from Prometheus-observed runtime inventory. It supplements declared architecture docs and does not replace static source-of-truth configuration.",
|
||||
"",
|
||||
f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`",
|
||||
f"- Prometheus URL: `{inventory.get('prometheus_url', 'unknown')}`",
|
||||
f"- Active scrape targets observed: `{len(targets)}`",
|
||||
f"- Unhealthy scrape targets observed: `{len(unhealthy)}`",
|
||||
"",
|
||||
"## Coverage by job",
|
||||
"",
|
||||
]
|
||||
|
||||
job_rows = [
|
||||
[job, str(data["active"]), str(data["unhealthy"])]
|
||||
for job, data in summaries["by_job"].items()
|
||||
] or [["none", "0", "0"]]
|
||||
lines.append(markdown_table(["job", "active targets", "unhealthy targets"], job_rows))
|
||||
|
||||
lines.extend(["", "## Coverage by instance", ""])
|
||||
instance_rows = [
|
||||
[instance, ", ".join(data["jobs"]), f"{data['total'] - data['unhealthy']}/{data['total']} up"]
|
||||
for instance, data in summaries["by_instance"].items()
|
||||
] or [["none", "", ""]]
|
||||
lines.append(markdown_table(["instance", "jobs", "health"], instance_rows))
|
||||
|
||||
lines.extend(["", "## Coverage by service", ""])
|
||||
service_rows = [
|
||||
[service, ", ".join(data["instances"]), f"{data['total'] - data['unhealthy']}/{data['total']} up"]
|
||||
for service, data in summaries["by_service"].items()
|
||||
] or [["unknown", "", ""]]
|
||||
lines.append(markdown_table(["service", "instances", "health"], service_rows))
|
||||
|
||||
lines.extend(["", "## Unhealthy targets", ""])
|
||||
unhealthy_rows = [
|
||||
[u["job"], u["instance"], u["scrape_url"], u["health"], u["last_error"] or "none"]
|
||||
for u in unhealthy
|
||||
] or [["none", "", "", "", ""]]
|
||||
lines.append(markdown_table(["job", "instance", "scrape URL", "health", "last error"], unhealthy_rows))
|
||||
|
||||
lines.extend(["", "## Unknowns / missing metadata", ""])
|
||||
missing_rows = [[k, str(v)] for k, v in sorted(missing.items())] or [["none", "0"]]
|
||||
lines.append(markdown_table(["label", "targets missing"], missing_rows))
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
"Unknown or missing metadata is treated as `unknown` in generated summaries to avoid over-claiming topology.",
|
||||
"",
|
||||
"## Regeneration instructions",
|
||||
"",
|
||||
"```bash",
|
||||
"python3 scripts/render_prometheus_docs.py --inventory-file docs/runtime/prometheus-inventory.json",
|
||||
"```",
|
||||
"",
|
||||
]
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def render_network_doc(inventory: dict[str, Any], targets: list[dict[str, Any]]) -> str:
|
||||
summaries = summarize_targets(targets, normalize_targets({"targets": inventory.get("unhealthy_targets") or []}))
|
||||
|
||||
endpoint_rows = [[t["job"], t["instance"], t["scrape_url"], t["network"], t["exposure"]] for t in targets]
|
||||
endpoint_rows = endpoint_rows or [["none", "", "", "", ""]]
|
||||
|
||||
exposure_rows = [[exp, str(count)] for exp, count in summaries["by_exposure"].items()] or [["unknown", "0"]]
|
||||
|
||||
paths = sorted({t["endpoint"] for t in targets})
|
||||
path_rows = [[path, str(sum(1 for t in targets if t["endpoint"] == path))] for path in paths] or [["/metrics", "0"]]
|
||||
|
||||
lines = [
|
||||
"# Network and Exposure View (Prometheus Observed)",
|
||||
"",
|
||||
"## Overview",
|
||||
"",
|
||||
"This document is generated from Prometheus scrape metadata and endpoint URLs. It is an observed monitoring view and not a physical network map.",
|
||||
"",
|
||||
f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`",
|
||||
"- Physical topology, VLAN mapping, and bridge membership remain unknown unless explicitly documented elsewhere.",
|
||||
"",
|
||||
"## Observed scrape endpoints",
|
||||
"",
|
||||
markdown_table(["job", "instance", "scrape URL", "network label", "exposure label"], endpoint_rows),
|
||||
"",
|
||||
"## Internal vs public indicators",
|
||||
"",
|
||||
markdown_table(["exposure label", "targets"], exposure_rows),
|
||||
"",
|
||||
"All indicators above are label-derived. Missing labels are rendered as `unknown`.",
|
||||
"",
|
||||
"## Monitoring paths",
|
||||
"",
|
||||
markdown_table(["metrics path", "observed targets"], path_rows),
|
||||
"",
|
||||
"## Unknowns and limits",
|
||||
"",
|
||||
"- Prometheus can confirm scrape reachability but not ownership or placement boundaries.",
|
||||
"- No VLAN, switch, or hypervisor placement is inferred unless present in inventory labels.",
|
||||
"- Treat this as runtime evidence to pair with declared architecture docs.",
|
||||
"",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def render_architecture_section(inventory: dict[str, Any], targets: list[dict[str, Any]]) -> str:
|
||||
summaries = summarize_targets(targets, normalize_targets({"targets": inventory.get("unhealthy_targets") or []}))
|
||||
notes = inventory.get("notes") or []
|
||||
|
||||
lines = [
|
||||
"## Runtime visibility from Prometheus",
|
||||
"",
|
||||
GENERATED_BEGIN,
|
||||
"",
|
||||
"Prometheus inventory provides **observed runtime coverage** of scrape targets. It complements (but does not replace) declared architecture in Compose files and static docs.",
|
||||
"",
|
||||
f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`",
|
||||
f"- Observed jobs: `{len(summaries['by_job'])}`",
|
||||
f"- Observed instances: `{len(summaries['by_instance'])}`",
|
||||
f"- Observed services (label-derived): `{len(summaries['by_service'])}`",
|
||||
"",
|
||||
"### Observed monitoring view",
|
||||
"",
|
||||
markdown_table(
|
||||
["job", "targets", "unhealthy"],
|
||||
[[job, str(data["active"]), str(data["unhealthy"])] for job, data in summaries["by_job"].items()] or [["none", "0", "0"]],
|
||||
),
|
||||
"",
|
||||
"### Data sources",
|
||||
"",
|
||||
"- `docs/runtime/prometheus-inventory.json` (normalized runtime export)",
|
||||
"- Prometheus scrape metadata (`targets` + label sets)",
|
||||
"- Existing repository architecture docs for declared topology",
|
||||
]
|
||||
if notes:
|
||||
lines.extend(["", "### Notes from inventory", ""])
|
||||
for note in notes:
|
||||
lines.append(f"- {note}")
|
||||
lines.extend(["", GENERATED_END, ""])
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def upsert_generated_section(path: Path, section_markdown: str, dry_run: bool, verbose: bool) -> None:
|
||||
existing = path.read_text(encoding="utf-8") if path.exists() else ""
|
||||
section_body = section_markdown
|
||||
|
||||
if GENERATED_BEGIN in existing and GENERATED_END in existing:
|
||||
pattern = re.compile(
|
||||
rf"{re.escape(GENERATED_BEGIN)}.*?{re.escape(GENERATED_END)}",
|
||||
re.DOTALL,
|
||||
)
|
||||
replacement = "\n".join(
|
||||
line for line in section_body.splitlines() if line.strip() not in {"## Runtime visibility from Prometheus"}
|
||||
)
|
||||
updated = pattern.sub(replacement.strip(), existing)
|
||||
else:
|
||||
updated = existing.rstrip() + "\n\n" + section_body.strip() + "\n"
|
||||
|
||||
write_file(path, updated, dry_run=dry_run, verbose=verbose)
|
||||
|
||||
|
||||
def mermaid_safe_id(value: str) -> str:
|
||||
safe = re.sub(r"[^a-zA-Z0-9_]", "_", value)
|
||||
safe = re.sub(r"_+", "_", safe).strip("_")
|
||||
return safe or "unknown"
|
||||
|
||||
|
||||
def render_monitoring_mermaid(targets: list[dict[str, Any]]) -> str:
|
||||
by_host: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
||||
for target in targets:
|
||||
by_host[target["host"]].append(target)
|
||||
|
||||
lines = [
|
||||
"flowchart LR",
|
||||
" Prom[Prometheus]",
|
||||
"",
|
||||
" classDef scrape stroke-dasharray: 5 5;",
|
||||
]
|
||||
|
||||
for host, host_targets in sorted(by_host.items()):
|
||||
host_id = mermaid_safe_id(f"host_{host}")
|
||||
lines.append(f' subgraph {host_id}["Host: {host}"]')
|
||||
for target in sorted(host_targets, key=lambda t: (t["job"], t["instance"])):
|
||||
tid = mermaid_safe_id(f"{target['job']}_{target['instance']}")
|
||||
label = f"{target['job']}<br/>{target['instance']}"
|
||||
lines.append(f' {tid}["{label}"]')
|
||||
lines.append(" end")
|
||||
|
||||
lines.append("")
|
||||
for target in targets:
|
||||
tid = mermaid_safe_id(f"{target['job']}_{target['instance']}")
|
||||
lines.append(f" Prom -. scrape .-> {tid}")
|
||||
lines.append(f" class {tid} scrape;")
|
||||
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def render_architecture_mermaid(targets: list[dict[str, Any]]) -> str:
|
||||
jobs = sorted({t["job"] for t in targets})
|
||||
lines = [
|
||||
"flowchart TB",
|
||||
" Declared[Declared architecture<br/>(Compose + docs)]",
|
||||
" Runtime[Observed runtime<br/>(Prometheus inventory)]",
|
||||
" Declared --> Runtime",
|
||||
"",
|
||||
' subgraph Monitoring["Prometheus observed jobs"]',
|
||||
]
|
||||
for job in jobs:
|
||||
jid = mermaid_safe_id(f"job_{job}")
|
||||
lines.append(f' {jid}["{job}"]')
|
||||
lines.extend([" end", "", " Runtime --> Monitoring", ""])
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def write_file(path: Path, content: str, dry_run: bool, verbose: bool) -> None:
|
||||
if dry_run:
|
||||
print(f"[DRY RUN] Would write: {path}")
|
||||
return
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(content, encoding="utf-8")
|
||||
if verbose:
|
||||
print(f"Wrote {path}")
|
||||
|
||||
|
||||
def update_readme(path: Path, dry_run: bool, verbose: bool) -> None:
|
||||
if not path.exists():
|
||||
return
|
||||
existing = path.read_text(encoding="utf-8")
|
||||
marker = "## Prometheus Runtime Inventory Export"
|
||||
snippet = (
|
||||
"\nRegenerate derived docs/diagrams from inventory:\n\n"
|
||||
"```bash\n"
|
||||
"python3 scripts/render_prometheus_docs.py --inventory-file docs/runtime/prometheus-inventory.json\n"
|
||||
"```\n"
|
||||
)
|
||||
if marker in existing and "scripts/render_prometheus_docs.py" not in existing:
|
||||
updated = existing.replace(marker, marker + snippet)
|
||||
write_file(path, updated, dry_run=dry_run, verbose=verbose)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
inventory_path = Path(args.inventory_file)
|
||||
docs_dir = Path(args.docs_dir)
|
||||
diagrams_dir = Path(args.diagrams_dir)
|
||||
|
||||
inventory = load_json(inventory_path)
|
||||
targets = normalize_targets(inventory)
|
||||
|
||||
coverage_path = Path(args.coverage_file)
|
||||
network_path = Path(args.network_file)
|
||||
architecture_path = Path(args.architecture_file)
|
||||
|
||||
coverage_md = render_monitoring_coverage(inventory, targets)
|
||||
network_md = render_network_doc(inventory, targets)
|
||||
architecture_section = render_architecture_section(inventory, targets)
|
||||
monitoring_mmd = render_monitoring_mermaid(targets)
|
||||
architecture_mmd = render_architecture_mermaid(targets)
|
||||
|
||||
def resolve_doc_path(path: Path) -> Path:
|
||||
if path.is_absolute():
|
||||
return path
|
||||
if len(path.parts) == 1:
|
||||
return docs_dir / path
|
||||
return path
|
||||
|
||||
write_file(resolve_doc_path(coverage_path), coverage_md, args.dry_run, args.verbose)
|
||||
write_file(resolve_doc_path(network_path), network_md, args.dry_run, args.verbose)
|
||||
upsert_generated_section(
|
||||
resolve_doc_path(architecture_path),
|
||||
architecture_section,
|
||||
args.dry_run,
|
||||
args.verbose,
|
||||
)
|
||||
|
||||
write_file(diagrams_dir / "monitoring-coverage.mmd", monitoring_mmd, args.dry_run, args.verbose)
|
||||
write_file(diagrams_dir / "architecture.mmd", architecture_mmd, args.dry_run, args.verbose)
|
||||
update_readme(Path(args.readme_file), args.dry_run, args.verbose)
|
||||
|
||||
if args.verbose:
|
||||
print(f"Processed {len(targets)} targets from {inventory_path}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user