Files
docker/scripts/render_prometheus_docs.py
T

594 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""Render Prometheus inventory into documentation and Mermaid diagrams."""
from __future__ import annotations
import argparse
import json
import re
from collections import defaultdict
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
GENERATED_BEGIN = "<!-- BEGIN GENERATED PROMETHEUS SECTION -->"
GENERATED_END = "<!-- END GENERATED PROMETHEUS SECTION -->"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--inventory-file",
default="docs/runtime/prometheus-inventory.json",
help="Path to normalized Prometheus inventory JSON.",
)
parser.add_argument("--docs-dir", default="docs", help="Documentation directory.")
parser.add_argument("--diagrams-dir", default="docs/diagrams", help="Diagram output directory.")
parser.add_argument("--readme-file", default="README.md", help="README path for regeneration notes.")
parser.add_argument("--architecture-file", default="docs/architecture.md", help="Architecture markdown path.")
parser.add_argument(
"--dynu-dns-inventory-file",
default="infrastructure/terraform/dynu/generated/dynu_dns_records_inventory.json",
help="Path to Dynu DNS brownfield inventory JSON.",
)
parser.add_argument("--skip-dynu-dns", action="store_true", help="Skip Dynu DNS inventory loading/rendering.")
parser.add_argument("--network-file", default="docs/network.md", help="Network markdown path.")
parser.add_argument("--coverage-file", default="docs/monitoring-coverage.md", help="Coverage markdown path.")
parser.add_argument("--dry-run", action="store_true", help="Print changes instead of writing files.")
parser.add_argument("--verbose", action="store_true", help="Print detailed processing output.")
return parser.parse_args()
def load_json(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
if not isinstance(data, dict):
raise ValueError(f"Inventory must be a JSON object: {path}")
return data
def load_optional_json(path: Path) -> Any | None:
if not path.exists():
return None
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def normalize_dynu_dns_records(payload: Any) -> list[dict[str, Any]]:
records_payload = payload
if isinstance(payload, dict):
if isinstance(payload.get("records"), list):
records_payload = payload["records"]
elif isinstance(payload.get("value"), list):
records_payload = payload["value"]
if not isinstance(records_payload, list):
raise ValueError("Dynu DNS inventory must be a list or object with list field `records`/`value`.")
normalized: list[dict[str, Any]] = []
for record in records_payload:
if not isinstance(record, dict):
continue
normalized.append(
{
"id": record.get("id"),
"domain_id": record.get("domain_id"),
"domain_name": str(record.get("domain_name") or "unknown"),
"hostname": str(record.get("hostname") or "unknown"),
"node_name": str(record.get("node_name") or "unknown"),
"record_type": str(record.get("record_type") or "unknown"),
"content": record.get("content"),
"state": record.get("state") if "state" in record else record.get("enabled"),
"ttl": record.get("ttl"),
"updated_on": record.get("updated_on"),
}
)
normalized.sort(
key=lambda r: (
str(r.get("domain_name") or ""),
str(r.get("hostname") or ""),
str(r.get("record_type") or ""),
str(r.get("id") or ""),
)
)
return normalized
def render_dynu_dns_architecture_section(records: list[dict[str, Any]], inventory_path: Path | None) -> str:
domains = sorted({r["domain_name"] for r in records if r["domain_name"] != "unknown"})
dynamic_count = 0
static_count = 0
disabled_count = 0
rows: list[list[str]] = []
for record in records:
rtype = str(record.get("record_type") or "").upper()
raw_content = record.get("content")
content = "" if raw_content is None else str(raw_content).strip()
is_dynamic = rtype in {"A", "AAAA"} and not content
mode = "dynamic" if is_dynamic else ("static" if content else ("record" if rtype not in {"A", "AAAA"} else "static"))
target = "dynamic" if is_dynamic else (content or ("unknown" if not content else content))
if is_dynamic:
dynamic_count += 1
else:
static_count += 1
enabled_value = record.get("state")
enabled_text = "unknown" if enabled_value is None else str(bool(enabled_value)).lower()
if enabled_value is False:
disabled_count += 1
rows.append(
[
str(record.get("hostname") or "unknown"),
rtype or "unknown",
target,
mode,
str(record.get("ttl") if record.get("ttl") is not None else "unknown"),
enabled_text,
str(record.get("id") if record.get("id") is not None else "unknown"),
str(record.get("updated_on") or "unknown"),
]
)
domain_text = ", ".join(domains) if domains else "unknown"
lines = [
"### Dynu DNS brownfield inventory",
"",
"Dynu DNS is managed as a brownfield reconciliation source. Terraform imports the root domain and individual DNS records into state, while generated configuration provides reviewable management intent.",
"",
f"- Inventory source: `{inventory_path}`" if inventory_path else "- Inventory source: `unknown`",
f"- Records observed: `{len(records)}`",
f"- Domains observed: `{domain_text}`",
f"- Dynamic A/AAAA records: `{dynamic_count}`",
f"- Static records: `{static_count}`",
f"- Disabled records: `{disabled_count}`",
"",
"#### DNS records",
"",
markdown_table(["hostname", "type", "target/content", "mode", "ttl", "enabled", "record id", "updated"], rows or [["none", "", "", "", "", "", "", ""]]),
"",
]
return "\n".join(lines)
def merged_labels(target: dict[str, Any]) -> dict[str, str]:
discovered = target.get("discovered_labels") or {}
labels = target.get("labels") or {}
merged = {k: str(v) for k, v in discovered.items()}
merged.update({k: str(v) for k, v in labels.items()})
return merged
def normalize_targets(inventory: dict[str, Any]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for target in inventory.get("targets") or []:
if not isinstance(target, dict):
continue
labels = merged_labels(target)
parsed = urlparse(str(target.get("scrape_url") or ""))
host = parsed.hostname or labels.get("hostname") or "unknown"
endpoint = parsed.path or "/metrics"
normalized.append(
{
"job": str(target.get("job") or labels.get("job") or "<missing>"),
"instance": str(target.get("instance") or labels.get("instance") or "<missing>"),
"health": str(target.get("health") or "unknown"),
"scrape_url": str(target.get("scrape_url") or ""),
"last_error": str(target.get("last_error") or ""),
"host": host,
"endpoint": endpoint,
"service": labels.get("service", "unknown"),
"role": labels.get("role", "unknown"),
"hypervisor": labels.get("hypervisor", "unknown"),
"network": labels.get("network", "unknown"),
"exposure": labels.get("exposure", "unknown"),
}
)
normalized.sort(key=lambda t: (t["job"], t["instance"], t["scrape_url"]))
return normalized
def markdown_table(headers: list[str], rows: list[list[str]]) -> str:
line = "| " + " | ".join(headers) + " |"
sep = "| " + " | ".join(["---"] * len(headers)) + " |"
body = ["| " + " | ".join(row) + " |" for row in rows]
return "\n".join([line, sep, *body])
def summarize_targets(targets: list[dict[str, Any]], unhealthy: list[dict[str, Any]]) -> dict[str, Any]:
by_job: dict[str, dict[str, int]] = defaultdict(lambda: {"active": 0, "unhealthy": 0})
by_instance: dict[str, dict[str, Any]] = defaultdict(lambda: {"jobs": set(), "unhealthy": 0, "total": 0})
by_service: dict[str, dict[str, Any]] = defaultdict(lambda: {"instances": set(), "unhealthy": 0, "total": 0})
by_exposure: dict[str, int] = defaultdict(int)
unhealthy_set = {(u["job"], u["instance"], u["scrape_url"]) for u in unhealthy}
for target in targets:
job = target["job"]
instance = target["instance"]
service = target["service"] if target["service"] != "<missing>" else "unknown"
by_job[job]["active"] += 1
key = (target["job"], target["instance"], target["scrape_url"])
if key in unhealthy_set or target["health"] != "up":
by_job[job]["unhealthy"] += 1
by_instance[instance]["unhealthy"] += 1
by_service[service]["unhealthy"] += 1
by_instance[instance]["jobs"].add(job)
by_instance[instance]["total"] += 1
by_service[service]["instances"].add(instance)
by_service[service]["total"] += 1
by_exposure[target["exposure"]] += 1
return {
"by_job": dict(sorted(by_job.items())),
"by_instance": {
key: {
"jobs": sorted(value["jobs"]),
"unhealthy": value["unhealthy"],
"total": value["total"],
}
for key, value in sorted(by_instance.items())
},
"by_service": {
key: {
"instances": sorted(value["instances"]),
"unhealthy": value["unhealthy"],
"total": value["total"],
}
for key, value in sorted(by_service.items())
},
"by_exposure": dict(sorted(by_exposure.items())),
}
def render_monitoring_coverage(inventory: dict[str, Any], targets: list[dict[str, Any]]) -> str:
unhealthy = normalize_targets({"targets": inventory.get("unhealthy_targets") or []})
summaries = summarize_targets(targets, unhealthy)
missing = (inventory.get("unknowns") or {}).get("missing_label_counts") or {}
lines = [
"# Monitoring Coverage",
"",
"## Overview",
"",
"This page is generated from Prometheus-observed runtime inventory. It supplements declared architecture docs and does not replace static source-of-truth configuration.",
"",
f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`",
f"- Prometheus URL: `{inventory.get('prometheus_url', 'unknown')}`",
f"- Active scrape targets observed: `{len(targets)}`",
f"- Unhealthy scrape targets observed: `{len(unhealthy)}`",
"",
"## Coverage by job",
"",
]
job_rows = [
[job, str(data["active"]), str(data["unhealthy"])]
for job, data in summaries["by_job"].items()
] or [["none", "0", "0"]]
lines.append(markdown_table(["job", "active targets", "unhealthy targets"], job_rows))
lines.extend(["", "## Coverage by instance", ""])
instance_rows = [
[instance, ", ".join(data["jobs"]), f"{data['total'] - data['unhealthy']}/{data['total']} up"]
for instance, data in summaries["by_instance"].items()
] or [["none", "", ""]]
lines.append(markdown_table(["instance", "jobs", "health"], instance_rows))
lines.extend(["", "## Coverage by service", ""])
service_rows = [
[service, ", ".join(data["instances"]), f"{data['total'] - data['unhealthy']}/{data['total']} up"]
for service, data in summaries["by_service"].items()
] or [["unknown", "", ""]]
lines.append(markdown_table(["service", "instances", "health"], service_rows))
lines.extend(["", "## Unhealthy targets", ""])
unhealthy_rows = [
[u["job"], u["instance"], u["scrape_url"], u["health"], u["last_error"] or "none"]
for u in unhealthy
] or [["none", "", "", "", ""]]
lines.append(markdown_table(["job", "instance", "scrape URL", "health", "last error"], unhealthy_rows))
lines.extend(["", "## Unknowns / missing metadata", ""])
missing_rows = [[k, str(v)] for k, v in sorted(missing.items())] or [["none", "0"]]
lines.append(markdown_table(["label", "targets missing"], missing_rows))
lines.extend(
[
"",
"Unknown or missing metadata is treated as `unknown` in generated summaries to avoid over-claiming topology.",
"",
"## Regeneration instructions",
"",
"```bash",
"python3 scripts/render_prometheus_docs.py --inventory-file docs/runtime/prometheus-inventory.json",
"```",
"",
]
)
return "\n".join(lines)
def render_network_doc(inventory: dict[str, Any], targets: list[dict[str, Any]]) -> str:
summaries = summarize_targets(targets, normalize_targets({"targets": inventory.get("unhealthy_targets") or []}))
endpoint_rows = [[t["job"], t["instance"], t["scrape_url"], t["network"], t["exposure"]] for t in targets]
endpoint_rows = endpoint_rows or [["none", "", "", "", ""]]
exposure_rows = [[exp, str(count)] for exp, count in summaries["by_exposure"].items()] or [["unknown", "0"]]
paths = sorted({t["endpoint"] for t in targets})
path_rows = [[path, str(sum(1 for t in targets if t["endpoint"] == path))] for path in paths] or [["/metrics", "0"]]
lines = [
"# Network and Exposure View (Prometheus Observed)",
"",
"## Overview",
"",
"This document is generated from Prometheus scrape metadata and endpoint URLs. It is an observed monitoring view and not a physical network map.",
"",
f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`",
"- Physical topology, VLAN mapping, and bridge membership remain unknown unless explicitly documented elsewhere.",
"",
"## Observed scrape endpoints",
"",
markdown_table(["job", "instance", "scrape URL", "network label", "exposure label"], endpoint_rows),
"",
"## Internal vs public indicators",
"",
markdown_table(["exposure label", "targets"], exposure_rows),
"",
"All indicators above are label-derived. Missing labels are rendered as `unknown`.",
"",
"## Monitoring paths",
"",
markdown_table(["metrics path", "observed targets"], path_rows),
"",
"## Unknowns and limits",
"",
"- Prometheus can confirm scrape reachability but not ownership or placement boundaries.",
"- No VLAN, switch, or hypervisor placement is inferred unless present in inventory labels.",
"- Treat this as runtime evidence to pair with declared architecture docs.",
"",
]
return "\n".join(lines)
def render_architecture_section(
inventory: dict[str, Any],
targets: list[dict[str, Any]],
dynu_records: list[dict[str, Any]] | None = None,
dynu_inventory_path: Path | None = None,
dynu_inventory_missing: bool = False,
) -> str:
summaries = summarize_targets(targets, normalize_targets({"targets": inventory.get("unhealthy_targets") or []}))
notes = inventory.get("notes") or []
lines = [
"## Runtime and infrastructure inventory",
"",
GENERATED_BEGIN,
"",
"Prometheus inventory provides **observed runtime coverage** of scrape targets. It complements (but does not replace) declared architecture in Compose files and static docs.",
"",
f"- Inventory timestamp: `{inventory.get('generated_at', 'unknown')}`",
f"- Observed jobs: `{len(summaries['by_job'])}`",
f"- Observed instances: `{len(summaries['by_instance'])}`",
f"- Observed services (label-derived): `{len(summaries['by_service'])}`",
"",
"### Observed monitoring view",
"",
markdown_table(
["job", "targets", "unhealthy"],
[[job, str(data["active"]), str(data["unhealthy"])] for job, data in summaries["by_job"].items()] or [["none", "0", "0"]],
),
"",
]
if dynu_records is not None:
lines.extend(["", render_dynu_dns_architecture_section(dynu_records, dynu_inventory_path).rstrip(), ""])
elif dynu_inventory_missing:
lines.extend(
[
"### Dynu DNS brownfield inventory",
"",
f"Dynu DNS inventory was not found at `{dynu_inventory_path}`.",
"",
"Generate it with:",
"",
"```bash",
"cd infrastructure/terraform/dynu",
"python3 scripts/generate-brownfield-records.py --overwrite",
"```",
"",
]
)
lines.extend(
[
"### Data sources",
"",
"- `docs/runtime/prometheus-inventory.json` (normalized runtime export)",
]
)
if dynu_records is not None:
lines.append(f"- `{dynu_inventory_path}` (Dynu DNS brownfield inventory)")
elif dynu_inventory_missing:
lines.append("- Dynu DNS inventory not available; run the Dynu brownfield generator.")
lines.extend(
[
"- Prometheus scrape metadata (`targets` + label sets)",
"- Existing repository architecture docs for declared topology",
]
)
if notes:
lines.extend(["", "### Notes from inventory", ""])
for note in notes:
lines.append(f"- {note}")
lines.extend(["", GENERATED_END, ""])
return "\n".join(lines)
def upsert_generated_section(path: Path, section_markdown: str, dry_run: bool, verbose: bool) -> None:
existing = path.read_text(encoding="utf-8") if path.exists() else ""
section_body = section_markdown
if GENERATED_BEGIN in existing and GENERATED_END in existing:
pattern = re.compile(
rf"{re.escape(GENERATED_BEGIN)}.*?{re.escape(GENERATED_END)}",
re.DOTALL,
)
replacement = "\n".join(
line
for line in section_body.splitlines()
if line.strip() not in {"## Runtime visibility from Prometheus", "## Runtime and infrastructure inventory"}
)
updated = pattern.sub(replacement.strip(), existing)
else:
updated = existing.rstrip() + "\n\n" + section_body.strip() + "\n"
write_file(path, updated, dry_run=dry_run, verbose=verbose)
def mermaid_safe_id(value: str) -> str:
safe = re.sub(r"[^a-zA-Z0-9_]", "_", value)
safe = re.sub(r"_+", "_", safe).strip("_")
return safe or "unknown"
def render_monitoring_mermaid(targets: list[dict[str, Any]]) -> str:
by_host: dict[str, list[dict[str, Any]]] = defaultdict(list)
for target in targets:
by_host[target["host"]].append(target)
lines = [
"flowchart LR",
" Prom[Prometheus]",
"",
" classDef scrape stroke-dasharray: 5 5;",
]
for host, host_targets in sorted(by_host.items()):
host_id = mermaid_safe_id(f"host_{host}")
lines.append(f' subgraph {host_id}["Host: {host}"]')
for target in sorted(host_targets, key=lambda t: (t["job"], t["instance"])):
tid = mermaid_safe_id(f"{target['job']}_{target['instance']}")
label = f"{target['job']}<br/>{target['instance']}"
lines.append(f' {tid}["{label}"]')
lines.append(" end")
lines.append("")
for target in targets:
tid = mermaid_safe_id(f"{target['job']}_{target['instance']}")
lines.append(f" Prom -. scrape .-> {tid}")
lines.append(f" class {tid} scrape;")
return "\n".join(lines) + "\n"
def render_architecture_mermaid(targets: list[dict[str, Any]]) -> str:
jobs = sorted({t["job"] for t in targets})
lines = [
"flowchart TB",
" Declared[Declared architecture<br/>(Compose + docs)]",
" Runtime[Observed runtime<br/>(Prometheus inventory)]",
" Declared --> Runtime",
"",
' subgraph Monitoring["Prometheus observed jobs"]',
]
for job in jobs:
jid = mermaid_safe_id(f"job_{job}")
lines.append(f' {jid}["{job}"]')
lines.extend([" end", "", " Runtime --> Monitoring", ""])
return "\n".join(lines)
def write_file(path: Path, content: str, dry_run: bool, verbose: bool) -> None:
if dry_run:
print(f"[DRY RUN] Would write: {path}")
return
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
if verbose:
print(f"Wrote {path}")
def update_readme(path: Path, dry_run: bool, verbose: bool) -> None:
if not path.exists():
return
existing = path.read_text(encoding="utf-8")
marker = "## Prometheus Runtime Inventory Export"
snippet = (
"\nRegenerate derived docs/diagrams from inventory:\n\n"
"```bash\n"
"python3 scripts/render_prometheus_docs.py --inventory-file docs/runtime/prometheus-inventory.json\n"
"```\n"
)
if marker in existing and "scripts/render_prometheus_docs.py" not in existing:
updated = existing.replace(marker, marker + snippet)
write_file(path, updated, dry_run=dry_run, verbose=verbose)
def main() -> int:
args = parse_args()
inventory_path = Path(args.inventory_file)
docs_dir = Path(args.docs_dir)
diagrams_dir = Path(args.diagrams_dir)
dynu_inventory_path = Path(args.dynu_dns_inventory_file)
inventory = load_json(inventory_path)
targets = normalize_targets(inventory)
coverage_path = Path(args.coverage_file)
network_path = Path(args.network_file)
architecture_path = Path(args.architecture_file)
coverage_md = render_monitoring_coverage(inventory, targets)
network_md = render_network_doc(inventory, targets)
dynu_records: list[dict[str, Any]] | None = None
dynu_inventory_missing = False
if not args.skip_dynu_dns:
try:
dynu_payload = load_optional_json(dynu_inventory_path)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in Dynu DNS inventory file {dynu_inventory_path}: {exc}") from exc
if dynu_payload is None:
dynu_inventory_missing = True
else:
dynu_records = normalize_dynu_dns_records(dynu_payload)
architecture_section = render_architecture_section(
inventory,
targets,
dynu_records=dynu_records,
dynu_inventory_path=dynu_inventory_path,
dynu_inventory_missing=dynu_inventory_missing,
)
monitoring_mmd = render_monitoring_mermaid(targets)
architecture_mmd = render_architecture_mermaid(targets)
def resolve_doc_path(path: Path) -> Path:
if path.is_absolute():
return path
if len(path.parts) == 1:
return docs_dir / path
return path
write_file(resolve_doc_path(coverage_path), coverage_md, args.dry_run, args.verbose)
write_file(resolve_doc_path(network_path), network_md, args.dry_run, args.verbose)
upsert_generated_section(
resolve_doc_path(architecture_path),
architecture_section,
args.dry_run,
args.verbose,
)
write_file(diagrams_dir / "monitoring-coverage.mmd", monitoring_mmd, args.dry_run, args.verbose)
write_file(diagrams_dir / "architecture.mmd", architecture_mmd, args.dry_run, args.verbose)
update_readme(Path(args.readme_file), args.dry_run, args.verbose)
if args.verbose:
print(f"Processed {len(targets)} targets from {inventory_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())