#!/usr/bin/env python3 """Export Prometheus runtime inventory for documentation/diagram workflows.""" from __future__ import annotations import argparse import json import os import ssl import sys from collections import Counter, defaultdict from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib import error, parse, request DEFAULT_TIMEOUT = 10.0 class ExportError(RuntimeError): """Raised for expected hard failures in export flow.""" @dataclass class Config: prometheus_url: str output_dir: Path timeout: float verify_tls: bool bearer_token: str | None username: str | None password: str | None verbose: bool def parse_bool(value: str | None, default: bool = True) -> bool: if value is None: return default normalized = value.strip().lower() if normalized in {"1", "true", "yes", "on"}: return True if normalized in {"0", "false", "no", "off"}: return False raise ExportError(f"Invalid boolean value: {value!r}") def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--output-dir", default="docs/runtime", help="Directory for JSON artifacts") parser.add_argument("--prometheus-url", help="Prometheus base URL") parser.add_argument("--timeout", type=float, help="HTTP timeout in seconds") parser.add_argument("--insecure", action="store_true", help="Disable TLS verification") parser.add_argument("--verbose", action="store_true", help="Print progress details") return parser.parse_args(argv) def load_config(args: argparse.Namespace) -> Config: prometheus_url = args.prometheus_url or os.environ.get("PROMETHEUS_URL") if not prometheus_url: raise ExportError( "PROMETHEUS_URL is required. Set PROMETHEUS_URL or pass --prometheus-url." ) timeout_value = args.timeout if timeout_value is None: timeout_raw = os.environ.get("PROMETHEUS_TIMEOUT") timeout_value = float(timeout_raw) if timeout_raw else DEFAULT_TIMEOUT verify_tls = parse_bool(os.environ.get("PROMETHEUS_VERIFY_TLS"), default=True) if args.insecure: verify_tls = False return Config( prometheus_url=prometheus_url.rstrip("/"), output_dir=Path(args.output_dir), timeout=timeout_value, verify_tls=verify_tls, bearer_token=os.environ.get("PROMETHEUS_BEARER_TOKEN"), username=os.environ.get("PROMETHEUS_USERNAME"), password=os.environ.get("PROMETHEUS_PASSWORD"), verbose=args.verbose, ) def build_opener(config: Config) -> request.OpenerDirector: handlers: list[Any] = [] if config.username and config.password and not config.bearer_token: password_manager = request.HTTPPasswordMgrWithDefaultRealm() password_manager.add_password(None, config.prometheus_url, config.username, config.password) handlers.append(request.HTTPBasicAuthHandler(password_manager)) if config.prometheus_url.startswith("https://") and not config.verify_tls: insecure_context = ssl.create_default_context() insecure_context.check_hostname = False insecure_context.verify_mode = ssl.CERT_NONE handlers.append(request.HTTPSHandler(context=insecure_context)) return request.build_opener(*handlers) def api_get_json( opener: request.OpenerDirector, config: Config, endpoint: str, params: dict[str, str] | None = None, ) -> dict[str, Any]: url = f"{config.prometheus_url}{endpoint}" if params: url = f"{url}?{parse.urlencode(params)}" req = request.Request(url) req.add_header("Accept", "application/json") if config.bearer_token: req.add_header("Authorization", f"Bearer {config.bearer_token}") try: with opener.open(req, timeout=config.timeout) as response: body = response.read().decode("utf-8") except error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise ExportError(f"Prometheus API error for {endpoint}: HTTP {exc.code} - {detail}") from exc except error.URLError as exc: raise ExportError( f"Failed to reach Prometheus at {config.prometheus_url}: {exc.reason}" ) from exc except TimeoutError as exc: raise ExportError( f"Timed out reaching Prometheus at {config.prometheus_url} after {config.timeout}s" ) from exc try: payload = json.loads(body) except json.JSONDecodeError as exc: raise ExportError(f"Invalid JSON returned by {endpoint}") from exc if payload.get("status") != "success": raise ExportError(f"Prometheus API returned non-success status for {endpoint}: {payload}") return payload def write_json(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2, sort_keys=True) handle.write("\n") def extract_vector(query_response: dict[str, Any]) -> list[dict[str, Any]]: data = query_response.get("data", {}) if data.get("resultType") != "vector": return [] return data.get("result", []) def counter_from_vector(results: list[dict[str, Any]], label: str) -> dict[str, int]: output: dict[str, int] = {} for item in results: metric = item.get("metric", {}) key = metric.get(label, "") value_raw = item.get("value", [None, "0"])[1] try: output[key] = int(float(value_raw)) except (TypeError, ValueError): output[key] = 0 return dict(sorted(output.items(), key=lambda kv: kv[0])) def nested_counter_from_vector( results: list[dict[str, Any]], parent_label: str, child_label: str, ) -> dict[str, dict[str, int]]: nested: dict[str, dict[str, int]] = defaultdict(dict) for item in results: metric = item.get("metric", {}) parent = metric.get(parent_label, "") child = metric.get(child_label, "") value_raw = item.get("value", [None, "0"])[1] try: value = int(float(value_raw)) except (TypeError, ValueError): value = 0 nested[parent][child] = value return { parent: dict(sorted(children.items(), key=lambda kv: kv[0])) for parent, children in sorted(nested.items(), key=lambda kv: kv[0]) } def summarize_targets(targets_payload: dict[str, Any]) -> tuple[list[dict[str, Any]], list[dict[str, Any]], Counter]: data = targets_payload.get("data", {}) active_targets = data.get("activeTargets", []) normalized_targets: list[dict[str, Any]] = [] unhealthy_targets: list[dict[str, Any]] = [] missing_labels = Counter() important_labels = ["hostname", "service", "role", "hypervisor", "network", "exposure"] for target in sorted(active_targets, key=lambda t: (t.get("labels", {}).get("job", ""), t.get("scrapeUrl", ""))): labels = target.get("labels", {}) discovered = target.get("discoveredLabels", {}) merged = dict(discovered) merged.update(labels) for label in important_labels: if label not in merged: missing_labels[label] += 1 normalized = { "job": labels.get("job", ""), "instance": labels.get("instance", ""), "health": target.get("health", "unknown"), "scrape_pool": target.get("scrapePool"), "scrape_url": target.get("scrapeUrl"), "last_error": target.get("lastError") or "", "labels": labels, "discovered_labels": discovered, } normalized_targets.append(normalized) if target.get("health") != "up": unhealthy_targets.append(normalized) return normalized_targets, unhealthy_targets, missing_labels def build_inventory( config: Config, targets_payload: dict[str, Any], query_payloads: dict[str, dict[str, Any]], ) -> dict[str, Any]: now = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") targets, unhealthy_targets, missing_labels = summarize_targets(targets_payload) up_results = extract_vector(query_payloads["up"]) jobs_results = extract_vector(query_payloads["jobs"]) job_instance_results = extract_vector(query_payloads["job_instance"]) optional = { "services": counter_from_vector(extract_vector(query_payloads["service"]), "service"), "roles": counter_from_vector(extract_vector(query_payloads["role"]), "role"), "hypervisors": counter_from_vector(extract_vector(query_payloads["hypervisor"]), "hypervisor"), "networks": counter_from_vector(extract_vector(query_payloads["network"]), "network"), "exposures": counter_from_vector(extract_vector(query_payloads["exposure"]), "exposure"), } unknowns = { "missing_label_counts": dict(sorted(missing_labels.items(), key=lambda kv: kv[0])), "notes": [ "Prometheus runtime data is observational and not authoritative for placement/topology.", "Do not infer Proxmox host placement, VM placement, VLAN layout, or public/internal boundaries without explicit labels or additional inventory sources.", ], } return { "generated_at": now, "prometheus_url": config.prometheus_url, "targets": targets, "jobs": counter_from_vector(jobs_results, "job"), "instances": nested_counter_from_vector(job_instance_results, "job", "instance"), "services": optional["services"], "roles": optional["roles"], "hypervisors": optional["hypervisors"], "networks": optional["networks"], "exposures": optional["exposures"], "unhealthy_targets": unhealthy_targets, "unknowns": unknowns, "notes": [ "The `up` query indicates scrape success from Prometheus perspective only.", "Use static repository architecture docs and deployment configs with this runtime export for complete diagrams.", ], "query_observations": { "up_series_count": len(up_results), "job_count": len(counter_from_vector(jobs_results, "job")), }, } def markdown_table(headers: list[str], rows: list[list[str]]) -> str: table = ["| " + " | ".join(headers) + " |", "| " + " | ".join(["---"] * len(headers)) + " |"] for row in rows: table.append("| " + " | ".join(row) + " |") return "\n".join(table) def generate_markdown(inventory: dict[str, Any]) -> str: jobs_rows = [[job, str(count)] for job, count in inventory["jobs"].items()] or [["", "0"]] targets_rows = [ [target["job"], target["instance"], target["health"], target.get("scrape_url") or ""] for target in inventory["targets"] ] or [["", "", "unknown", ""]] unhealthy_rows = [ [target["job"], target["instance"], target["health"], target.get("last_error", "")] for target in inventory["unhealthy_targets"] ] or [["None", "", "", ""]] host_rows: list[list[str]] = [] for job, instances in inventory["instances"].items(): for instance, count in instances.items(): host_rows.append([job, instance, str(count)]) if not host_rows: host_rows = [["", "", "0"]] service_rows = [[name, str(value)] for name, value in inventory["services"].items()] or [["", "0"]] network_rows: list[list[str]] = [] for section in ["networks", "exposures", "roles", "hypervisors"]: for name, value in inventory[section].items(): network_rows.append([section[:-1], name, str(value)]) if not network_rows: network_rows = [["", "", "0"]] unknown_rows = [ [label, str(count)] for label, count in inventory["unknowns"].get("missing_label_counts", {}).items() ] or [["", "0"]] lines = [ "# Prometheus Runtime Inventory", "", "## 1. Overview", "", f"- Generated at: `{inventory['generated_at']}`", f"- Prometheus URL: `{inventory['prometheus_url']}`", f"- Total active targets: `{len(inventory['targets'])}`", f"- Unhealthy targets: `{len(inventory['unhealthy_targets'])}`", "- Source type: Observed runtime telemetry (not sole source of truth).", "", "## 2. Scrape jobs", "", markdown_table(["Job", "Observed target count"], jobs_rows), "", "## 3. Active targets", "", markdown_table(["Job", "Instance", "Health", "Scrape URL"], targets_rows), "", "## 4. Unhealthy targets", "", markdown_table(["Job", "Instance", "Health", "Last error"], unhealthy_rows), "", "## 5. Hosts / instances observed", "", markdown_table(["Job", "Instance", "Series count"], host_rows), "", "## 6. Services observed", "", markdown_table(["Service", "Series count"], service_rows), "", "## 7. Network / exposure metadata observed", "", markdown_table(["Category", "Label", "Series count"], network_rows), "", "## 8. Unknowns / missing metadata", "", markdown_table(["Missing label", "Targets missing"], unknown_rows), "", "Notes:", "", ] for note in inventory["unknowns"].get("notes", []): lines.append(f"- {note}") lines.extend( [ "", "## 9. Regeneration instructions", "", "```bash", "export PROMETHEUS_URL=\"https://prometheus.example.com\"", "# Optional auth:", "# export PROMETHEUS_BEARER_TOKEN=\"...\"", "# or export PROMETHEUS_USERNAME=\"...\"; export PROMETHEUS_PASSWORD=\"...\"", "python3 scripts/export_prometheus_inventory.py --output-dir docs/runtime", "```", "", "This inventory feeds documentation and diagram workflows as an observed-runtime input alongside static repo configuration.", "", ] ) return "\n".join(lines) def write_markdown(path: Path, text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf-8") def log(config: Config, message: str) -> None: if config.verbose: print(message, file=sys.stderr) def main(argv: list[str]) -> int: try: args = parse_args(argv) config = load_config(args) opener = build_opener(config) output_dir = config.output_dir output_dir.mkdir(parents=True, exist_ok=True) endpoint_map = { "targets": ("/api/v1/targets", None, "prometheus-targets.json"), "up": ("/api/v1/query", {"query": "up"}, "prometheus-query-up.json"), "jobs": ( "/api/v1/query", {"query": "count by (job) (up)"}, "prometheus-query-jobs.json", ), "job_instance": ( "/api/v1/query", {"query": "count by (job, instance) (up)"}, "prometheus-query-job-instance.json", ), "service": ( "/api/v1/query", {"query": "count by (service) (up)"}, "prometheus-query-service.json", ), "role": ( "/api/v1/query", {"query": "count by (role) (up)"}, "prometheus-query-role.json", ), "hypervisor": ( "/api/v1/query", {"query": "count by (hypervisor) (up)"}, "prometheus-query-hypervisor.json", ), "network": ( "/api/v1/query", {"query": "count by (network) (up)"}, "prometheus-query-network.json", ), "exposure": ( "/api/v1/query", {"query": "count by (exposure) (up)"}, "prometheus-query-exposure.json", ), } payloads: dict[str, dict[str, Any]] = {} for name, (endpoint, params, filename) in endpoint_map.items(): log(config, f"Querying {name}: {endpoint}") payload = api_get_json(opener, config, endpoint, params=params) payloads[name] = payload write_json(output_dir / filename, payload) inventory = build_inventory(config, payloads["targets"], payloads) write_json(output_dir / "prometheus-inventory.json", inventory) markdown = generate_markdown(inventory) markdown_path = output_dir.parent / "prometheus-inventory.md" write_markdown(markdown_path, markdown) log(config, f"Wrote inventory artifacts to {output_dir} and {markdown_path}") return 0 except ExportError as exc: print(f"ERROR: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))