docs: add Prometheus inventory export script for diagram/documentation pipeline

This commit is contained in:
beatz174-bit
2026-04-13 16:29:10 +10:00
parent 7646f8187b
commit c0ed8cfc5f
4 changed files with 530 additions and 0 deletions
+11
View File
@@ -44,3 +44,14 @@ flowchart TB
``` ```
For a request-flow/network view and architecture notes, see [docs/architecture.md](docs/architecture.md). For a request-flow/network view and architecture notes, see [docs/architecture.md](docs/architecture.md).
## Prometheus Runtime Inventory Export
Use `scripts/export_prometheus_inventory.py` to snapshot Prometheus-observed runtime inventory into versionable artifacts for docs/diagram workflows.
```bash
export PROMETHEUS_URL="https://prometheus.example.com"
python3 scripts/export_prometheus_inventory.py --output-dir docs/runtime
```
This writes raw API snapshots and a normalized inventory JSON under `docs/runtime/`, and updates `docs/prometheus-inventory.md`.
+6
View File
@@ -95,6 +95,12 @@ Prometheus scrape targets indicate additional infrastructure outside the local C
- `nix-cache` - `nix-cache`
- `kuma.lan.ddnsgeek.com` - `kuma.lan.ddnsgeek.com`
## Runtime Inventory Input
Prometheus runtime inventory snapshots are exported with `scripts/export_prometheus_inventory.py` and committed under `docs/runtime/`. The latest human-readable summary is in [docs/prometheus-inventory.md](prometheus-inventory.md).
These artifacts are an observed-runtime input for architecture diagrams/docs and should be combined with repository configuration, not treated as sole source of truth.
## Assumptions / Unknowns ## Assumptions / Unknowns
The repository provides enough detail to infer **container-level architecture**, but not full **Proxmox host/VM topology**. The repository provides enough detail to infer **container-level architecture**, but not full **Proxmox host/VM topology**.
+38
View File
@@ -0,0 +1,38 @@
# Prometheus Runtime Inventory
This document is generated by `scripts/export_prometheus_inventory.py` from Prometheus runtime data. Commit the generated update whenever architecture/runtime inventory changes.
## Usage
Required:
- `PROMETHEUS_URL`
Optional:
- `PROMETHEUS_TIMEOUT` (default: `10` seconds)
- `PROMETHEUS_BEARER_TOKEN`
- `PROMETHEUS_USERNAME`
- `PROMETHEUS_PASSWORD`
- `PROMETHEUS_VERIFY_TLS` (default: `true`)
Run:
```bash
export PROMETHEUS_URL="https://prometheus.example.com"
python3 scripts/export_prometheus_inventory.py --output-dir docs/runtime
```
## Outputs
- Raw API snapshots: `docs/runtime/prometheus-targets.json`, query result snapshots, and optional label-query snapshots.
- Normalized inventory: `docs/runtime/prometheus-inventory.json`
- Human summary: `docs/prometheus-inventory.md` (this file)
## Diagram/Docs pipeline note
Treat Prometheus data as observed runtime telemetry. Do not infer placement/topology/network boundaries unless labels or other repo sources explicitly provide that information.
## Status
Inventory has not been generated in this environment because no `PROMETHEUS_URL` was provided for execution.
+475
View File
@@ -0,0 +1,475 @@
#!/usr/bin/env python3
"""Export Prometheus runtime inventory for documentation/diagram workflows."""
from __future__ import annotations
import argparse
import json
import os
import ssl
import sys
from collections import Counter, defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib import error, parse, request
DEFAULT_TIMEOUT = 10.0
class ExportError(RuntimeError):
"""Raised for expected hard failures in export flow."""
@dataclass
class Config:
prometheus_url: str
output_dir: Path
timeout: float
verify_tls: bool
bearer_token: str | None
username: str | None
password: str | None
verbose: bool
def parse_bool(value: str | None, default: bool = True) -> bool:
if value is None:
return default
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise ExportError(f"Invalid boolean value: {value!r}")
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--output-dir", default="docs/runtime", help="Directory for JSON artifacts")
parser.add_argument("--prometheus-url", help="Prometheus base URL")
parser.add_argument("--timeout", type=float, help="HTTP timeout in seconds")
parser.add_argument("--insecure", action="store_true", help="Disable TLS verification")
parser.add_argument("--verbose", action="store_true", help="Print progress details")
return parser.parse_args(argv)
def load_config(args: argparse.Namespace) -> Config:
prometheus_url = args.prometheus_url or os.environ.get("PROMETHEUS_URL")
if not prometheus_url:
raise ExportError(
"PROMETHEUS_URL is required. Set PROMETHEUS_URL or pass --prometheus-url."
)
timeout_value = args.timeout
if timeout_value is None:
timeout_raw = os.environ.get("PROMETHEUS_TIMEOUT")
timeout_value = float(timeout_raw) if timeout_raw else DEFAULT_TIMEOUT
verify_tls = parse_bool(os.environ.get("PROMETHEUS_VERIFY_TLS"), default=True)
if args.insecure:
verify_tls = False
return Config(
prometheus_url=prometheus_url.rstrip("/"),
output_dir=Path(args.output_dir),
timeout=timeout_value,
verify_tls=verify_tls,
bearer_token=os.environ.get("PROMETHEUS_BEARER_TOKEN"),
username=os.environ.get("PROMETHEUS_USERNAME"),
password=os.environ.get("PROMETHEUS_PASSWORD"),
verbose=args.verbose,
)
def build_opener(config: Config) -> request.OpenerDirector:
handlers: list[Any] = []
if config.username and config.password and not config.bearer_token:
password_manager = request.HTTPPasswordMgrWithDefaultRealm()
password_manager.add_password(None, config.prometheus_url, config.username, config.password)
handlers.append(request.HTTPBasicAuthHandler(password_manager))
if config.prometheus_url.startswith("https://") and not config.verify_tls:
insecure_context = ssl.create_default_context()
insecure_context.check_hostname = False
insecure_context.verify_mode = ssl.CERT_NONE
handlers.append(request.HTTPSHandler(context=insecure_context))
return request.build_opener(*handlers)
def api_get_json(
opener: request.OpenerDirector,
config: Config,
endpoint: str,
params: dict[str, str] | None = None,
) -> dict[str, Any]:
url = f"{config.prometheus_url}{endpoint}"
if params:
url = f"{url}?{parse.urlencode(params)}"
req = request.Request(url)
req.add_header("Accept", "application/json")
if config.bearer_token:
req.add_header("Authorization", f"Bearer {config.bearer_token}")
try:
with opener.open(req, timeout=config.timeout) as response:
body = response.read().decode("utf-8")
except error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise ExportError(f"Prometheus API error for {endpoint}: HTTP {exc.code} - {detail}") from exc
except error.URLError as exc:
raise ExportError(
f"Failed to reach Prometheus at {config.prometheus_url}: {exc.reason}"
) from exc
except TimeoutError as exc:
raise ExportError(
f"Timed out reaching Prometheus at {config.prometheus_url} after {config.timeout}s"
) from exc
try:
payload = json.loads(body)
except json.JSONDecodeError as exc:
raise ExportError(f"Invalid JSON returned by {endpoint}") from exc
if payload.get("status") != "success":
raise ExportError(f"Prometheus API returned non-success status for {endpoint}: {payload}")
return payload
def write_json(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\n")
def extract_vector(query_response: dict[str, Any]) -> list[dict[str, Any]]:
data = query_response.get("data", {})
if data.get("resultType") != "vector":
return []
return data.get("result", [])
def counter_from_vector(results: list[dict[str, Any]], label: str) -> dict[str, int]:
output: dict[str, int] = {}
for item in results:
metric = item.get("metric", {})
key = metric.get(label, "<missing>")
value_raw = item.get("value", [None, "0"])[1]
try:
output[key] = int(float(value_raw))
except (TypeError, ValueError):
output[key] = 0
return dict(sorted(output.items(), key=lambda kv: kv[0]))
def nested_counter_from_vector(
results: list[dict[str, Any]],
parent_label: str,
child_label: str,
) -> dict[str, dict[str, int]]:
nested: dict[str, dict[str, int]] = defaultdict(dict)
for item in results:
metric = item.get("metric", {})
parent = metric.get(parent_label, "<missing>")
child = metric.get(child_label, "<missing>")
value_raw = item.get("value", [None, "0"])[1]
try:
value = int(float(value_raw))
except (TypeError, ValueError):
value = 0
nested[parent][child] = value
return {
parent: dict(sorted(children.items(), key=lambda kv: kv[0]))
for parent, children in sorted(nested.items(), key=lambda kv: kv[0])
}
def summarize_targets(targets_payload: dict[str, Any]) -> tuple[list[dict[str, Any]], list[dict[str, Any]], Counter]:
data = targets_payload.get("data", {})
active_targets = data.get("activeTargets", [])
normalized_targets: list[dict[str, Any]] = []
unhealthy_targets: list[dict[str, Any]] = []
missing_labels = Counter()
important_labels = ["hostname", "service", "role", "hypervisor", "network", "exposure"]
for target in sorted(active_targets, key=lambda t: (t.get("labels", {}).get("job", ""), t.get("scrapeUrl", ""))):
labels = target.get("labels", {})
discovered = target.get("discoveredLabels", {})
merged = dict(discovered)
merged.update(labels)
for label in important_labels:
if label not in merged:
missing_labels[label] += 1
normalized = {
"job": labels.get("job", "<missing>"),
"instance": labels.get("instance", "<missing>"),
"health": target.get("health", "unknown"),
"scrape_pool": target.get("scrapePool"),
"scrape_url": target.get("scrapeUrl"),
"last_error": target.get("lastError") or "",
"labels": labels,
"discovered_labels": discovered,
}
normalized_targets.append(normalized)
if target.get("health") != "up":
unhealthy_targets.append(normalized)
return normalized_targets, unhealthy_targets, missing_labels
def build_inventory(
config: Config,
targets_payload: dict[str, Any],
query_payloads: dict[str, dict[str, Any]],
) -> dict[str, Any]:
now = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
targets, unhealthy_targets, missing_labels = summarize_targets(targets_payload)
up_results = extract_vector(query_payloads["up"])
jobs_results = extract_vector(query_payloads["jobs"])
job_instance_results = extract_vector(query_payloads["job_instance"])
optional = {
"services": counter_from_vector(extract_vector(query_payloads["service"]), "service"),
"roles": counter_from_vector(extract_vector(query_payloads["role"]), "role"),
"hypervisors": counter_from_vector(extract_vector(query_payloads["hypervisor"]), "hypervisor"),
"networks": counter_from_vector(extract_vector(query_payloads["network"]), "network"),
"exposures": counter_from_vector(extract_vector(query_payloads["exposure"]), "exposure"),
}
unknowns = {
"missing_label_counts": dict(sorted(missing_labels.items(), key=lambda kv: kv[0])),
"notes": [
"Prometheus runtime data is observational and not authoritative for placement/topology.",
"Do not infer Proxmox host placement, VM placement, VLAN layout, or public/internal boundaries without explicit labels or additional inventory sources.",
],
}
return {
"generated_at": now,
"prometheus_url": config.prometheus_url,
"targets": targets,
"jobs": counter_from_vector(jobs_results, "job"),
"instances": nested_counter_from_vector(job_instance_results, "job", "instance"),
"services": optional["services"],
"roles": optional["roles"],
"hypervisors": optional["hypervisors"],
"networks": optional["networks"],
"exposures": optional["exposures"],
"unhealthy_targets": unhealthy_targets,
"unknowns": unknowns,
"notes": [
"The `up` query indicates scrape success from Prometheus perspective only.",
"Use static repository architecture docs and deployment configs with this runtime export for complete diagrams.",
],
"query_observations": {
"up_series_count": len(up_results),
"job_count": len(counter_from_vector(jobs_results, "job")),
},
}
def markdown_table(headers: list[str], rows: list[list[str]]) -> str:
table = ["| " + " | ".join(headers) + " |", "| " + " | ".join(["---"] * len(headers)) + " |"]
for row in rows:
table.append("| " + " | ".join(row) + " |")
return "\n".join(table)
def generate_markdown(inventory: dict[str, Any]) -> str:
jobs_rows = [[job, str(count)] for job, count in inventory["jobs"].items()] or [["<none>", "0"]]
targets_rows = [
[target["job"], target["instance"], target["health"], target.get("scrape_url") or ""]
for target in inventory["targets"]
] or [["<none>", "<none>", "unknown", ""]]
unhealthy_rows = [
[target["job"], target["instance"], target["health"], target.get("last_error", "")]
for target in inventory["unhealthy_targets"]
] or [["None", "", "", ""]]
host_rows: list[list[str]] = []
for job, instances in inventory["instances"].items():
for instance, count in instances.items():
host_rows.append([job, instance, str(count)])
if not host_rows:
host_rows = [["<none>", "<none>", "0"]]
service_rows = [[name, str(value)] for name, value in inventory["services"].items()] or [["<none>", "0"]]
network_rows: list[list[str]] = []
for section in ["networks", "exposures", "roles", "hypervisors"]:
for name, value in inventory[section].items():
network_rows.append([section[:-1], name, str(value)])
if not network_rows:
network_rows = [["<none>", "<none>", "0"]]
unknown_rows = [
[label, str(count)]
for label, count in inventory["unknowns"].get("missing_label_counts", {}).items()
] or [["<none>", "0"]]
lines = [
"# Prometheus Runtime Inventory",
"",
"## 1. Overview",
"",
f"- Generated at: `{inventory['generated_at']}`",
f"- Prometheus URL: `{inventory['prometheus_url']}`",
f"- Total active targets: `{len(inventory['targets'])}`",
f"- Unhealthy targets: `{len(inventory['unhealthy_targets'])}`",
"- Source type: Observed runtime telemetry (not sole source of truth).",
"",
"## 2. Scrape jobs",
"",
markdown_table(["Job", "Observed target count"], jobs_rows),
"",
"## 3. Active targets",
"",
markdown_table(["Job", "Instance", "Health", "Scrape URL"], targets_rows),
"",
"## 4. Unhealthy targets",
"",
markdown_table(["Job", "Instance", "Health", "Last error"], unhealthy_rows),
"",
"## 5. Hosts / instances observed",
"",
markdown_table(["Job", "Instance", "Series count"], host_rows),
"",
"## 6. Services observed",
"",
markdown_table(["Service", "Series count"], service_rows),
"",
"## 7. Network / exposure metadata observed",
"",
markdown_table(["Category", "Label", "Series count"], network_rows),
"",
"## 8. Unknowns / missing metadata",
"",
markdown_table(["Missing label", "Targets missing"], unknown_rows),
"",
"Notes:",
"",
]
for note in inventory["unknowns"].get("notes", []):
lines.append(f"- {note}")
lines.extend(
[
"",
"## 9. Regeneration instructions",
"",
"```bash",
"export PROMETHEUS_URL=\"https://prometheus.example.com\"",
"# Optional auth:",
"# export PROMETHEUS_BEARER_TOKEN=\"...\"",
"# or export PROMETHEUS_USERNAME=\"...\"; export PROMETHEUS_PASSWORD=\"...\"",
"python3 scripts/export_prometheus_inventory.py --output-dir docs/runtime",
"```",
"",
"This inventory feeds documentation and diagram workflows as an observed-runtime input alongside static repo configuration.",
"",
]
)
return "\n".join(lines)
def write_markdown(path: Path, text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
def log(config: Config, message: str) -> None:
if config.verbose:
print(message, file=sys.stderr)
def main(argv: list[str]) -> int:
try:
args = parse_args(argv)
config = load_config(args)
opener = build_opener(config)
output_dir = config.output_dir
output_dir.mkdir(parents=True, exist_ok=True)
endpoint_map = {
"targets": ("/api/v1/targets", None, "prometheus-targets.json"),
"up": ("/api/v1/query", {"query": "up"}, "prometheus-query-up.json"),
"jobs": (
"/api/v1/query",
{"query": "count by (job) (up)"},
"prometheus-query-jobs.json",
),
"job_instance": (
"/api/v1/query",
{"query": "count by (job, instance) (up)"},
"prometheus-query-job-instance.json",
),
"service": (
"/api/v1/query",
{"query": "count by (service) (up)"},
"prometheus-query-service.json",
),
"role": (
"/api/v1/query",
{"query": "count by (role) (up)"},
"prometheus-query-role.json",
),
"hypervisor": (
"/api/v1/query",
{"query": "count by (hypervisor) (up)"},
"prometheus-query-hypervisor.json",
),
"network": (
"/api/v1/query",
{"query": "count by (network) (up)"},
"prometheus-query-network.json",
),
"exposure": (
"/api/v1/query",
{"query": "count by (exposure) (up)"},
"prometheus-query-exposure.json",
),
}
payloads: dict[str, dict[str, Any]] = {}
for name, (endpoint, params, filename) in endpoint_map.items():
log(config, f"Querying {name}: {endpoint}")
payload = api_get_json(opener, config, endpoint, params=params)
payloads[name] = payload
write_json(output_dir / filename, payload)
inventory = build_inventory(config, payloads["targets"], payloads)
write_json(output_dir / "prometheus-inventory.json", inventory)
markdown = generate_markdown(inventory)
markdown_path = output_dir.parent / "prometheus-inventory.md"
write_markdown(markdown_path, markdown)
log(config, f"Wrote inventory artifacts to {output_dir} and {markdown_path}")
return 0
except ExportError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))