Files
docker/scripts/dynu/correlate_dynu_with_traefik.py
T
2026-04-21 12:31:52 +10:00

308 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""Correlate Dynu DNS data with Traefik host rules in compose sources.
This integration is intentionally read-only.
No Dynu mutations are permitted in this repo at this stage.
"""
from __future__ import annotations
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Tuple
BASE_DOMAIN = "lan.ddnsgeek.com"
DYN_DATA = Path("data/dns/dynu_live.json")
OUT_JSON = Path("data/dns/dynu_traefik_inventory.json")
OUT_MD = Path("docs/generated/dns-inventory.md")
HOST_RULE_RE = re.compile(r"Host\((.*?)\)")
DOMAIN_RE = re.compile(r"[`\"']([^`\"']+)[`\"']")
class ReadOnlyError(RuntimeError):
pass
def require_read_only() -> None:
if os.environ.get("DYNU_READ_ONLY") != "true":
raise ReadOnlyError(
"Refusing to run: DYNU_READ_ONLY must be exactly 'true'. "
"This integration is intentionally read-only."
)
def compose_files(root: Path) -> List[Path]:
files = [root / "default-network.yml"]
for area in ("apps", "monitoring", "core"):
base = root / area
if not base.exists():
continue
for p in sorted(base.glob("*/*")):
if p.is_file() and p.name in {"docker-compose.yml", "docker-compose.yaml"}:
files.append(p)
return files
def parse_hosts_from_label(label_value: str) -> List[str]:
found = []
for fragment in HOST_RULE_RE.findall(label_value):
for host in DOMAIN_RE.findall(fragment):
h = host.strip().strip(".").lower()
if h:
found.append(h)
return sorted(set(found))
def extract_traefik_hosts(path: Path) -> List[Dict[str, str]]:
lines = path.read_text(encoding="utf-8").splitlines()
entries: List[Dict[str, str]] = []
in_services = False
current_service = ""
current_labels_indent = None
for raw in lines:
line = raw.rstrip("\n")
stripped = line.strip()
if stripped == "services:":
in_services = True
current_service = ""
current_labels_indent = None
continue
if not in_services:
continue
service_match = re.match(r"^(\s{2})([A-Za-z0-9_.-]+):\s*$", line)
if service_match:
current_service = service_match.group(2)
current_labels_indent = None
continue
if re.match(r"^\S", line):
in_services = False
current_service = ""
current_labels_indent = None
continue
labels_match = re.match(r"^(\s+)labels:\s*$", line)
if labels_match and current_service:
current_labels_indent = len(labels_match.group(1))
continue
if current_labels_indent is None:
continue
indent = len(line) - len(line.lstrip(" "))
if indent <= current_labels_indent:
current_labels_indent = None
continue
if "traefik.http.routers." not in line or ".rule" not in line:
continue
label_value = ""
list_match = re.match(r"^\s*-\s*([\"']?)(.+)\1\s*$", stripped)
if list_match:
payload = list_match.group(2)
label_value = payload.split("=", 1)[1] if "=" in payload else payload
else:
map_match = re.match(r"^\s*([\"']?[^:]+\1):\s*(.+)$", line)
if map_match:
label_value = map_match.group(2).strip().strip("\"'")
for fqdn in parse_hosts_from_label(label_value):
entries.append(
{
"fqdn": fqdn,
"stack": path.parts[0],
"service": current_service,
"source_compose_file": str(path),
}
)
return entries
def load_dynu(path: Path) -> Dict[str, List[Dict[str, str]]]:
payload = json.loads(path.read_text(encoding="utf-8"))
if payload.get("base_domain") != BASE_DOMAIN:
raise RuntimeError(
f"Dynu JSON base_domain mismatch. Expected {BASE_DOMAIN}, got {payload.get('base_domain')}"
)
index: Dict[str, List[Dict[str, str]]] = defaultdict(list)
for domain in payload.get("domains", []):
for record in domain.get("records", []):
host = str(record.get("hostname", "")).strip(".").lower()
if host:
index[host].append(
{
"type": str(record.get("type", "")),
"value": str(record.get("value", "")),
"target": str(record.get("target") or ""),
"ttl": str(record.get("ttl") if record.get("ttl") is not None else ""),
}
)
for host in index:
index[host] = sorted(index[host], key=lambda x: (x["type"], x["value"], x["target"], x["ttl"]))
return index
def write_markdown(data: Dict) -> None:
matched = [x for x in data["inventory"] if x["status"] == "matched"]
missing = [x for x in data["inventory"] if x["status"] == "missing_in_dynu"]
dns_only = [x for x in data["inventory"] if x["status"] == "dns_only"]
lines = [
"# DNS Inventory (Dynu + Traefik)",
"",
"> This integration is intentionally read-only. No Dynu mutations are permitted in this repo at this stage.",
"",
f"- Base domain: `{data['base_domain']}`",
f"- Dynu fetched at: `{data['dynu_fetched_at']}`",
f"- Inventory generated at: `{data['generated_at']}`",
"",
"## Summary",
"",
f"- Traefik hostnames discovered: **{data['summary']['traefik_hostnames']}**",
f"- Dynu hostnames discovered: **{data['summary']['dynu_hostnames']}**",
f"- Matched: **{data['summary']['matched']}**",
f"- Missing in Dynu: **{data['summary']['missing_in_dynu']}**",
f"- Dynu DNS only: **{data['summary']['dns_only']}**",
f"- Duplicate Traefik hostnames: **{data['summary']['duplicate_traefik_hostnames']}**",
"",
"## Dynu Records",
"",
"| Hostname | Type | Value | TTL |",
"|---|---|---|---|",
]
for row in data["dynu_records_table"]:
lines.append(f"| `{row['hostname']}` | `{row['type']}` | `{row['value']}` | `{row['ttl']}` |")
lines.extend(["", "## Correlation", "", "| Hostname | Status | Service(s) | Source compose file(s) | DNS records |", "|---|---|---|---|---|"])
for row in data["inventory"]:
svc = ", ".join(sorted({f"{e['stack']}/{e['service']}" for e in row.get('traefik_entries', [])})) or "-"
src = ", ".join(sorted({e['source_compose_file'] for e in row.get('traefik_entries', [])})) or "-"
dns = ", ".join([f"{r['type']}:{r['value']}" for r in row.get("dynu_records", [])]) or "-"
lines.append(f"| `{row['fqdn']}` | `{row['status']}` | {svc} | {src} | {dns} |")
def section(title: str, rows: List[Dict]) -> None:
lines.extend(["", f"## {title}", ""])
if not rows:
lines.append("_None._")
return
for row in rows:
lines.append(f"- `{row['fqdn']}`")
section("Matched records", matched)
section("Traefik hostnames missing in Dynu", missing)
section("Dynu DNS records not mapped to known Traefik services", dns_only)
OUT_MD.parent.mkdir(parents=True, exist_ok=True)
OUT_MD.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
try:
require_read_only()
except ReadOnlyError as exc:
print(str(exc), file=sys.stderr)
return 2
if not DYN_DATA.exists():
print(f"Missing {DYN_DATA}. Run fetch_dynu_dns.py first.", file=sys.stderr)
return 3
dyn_payload = json.loads(DYN_DATA.read_text(encoding="utf-8"))
dynu_index = load_dynu(DYN_DATA)
repo_root = Path(__file__).resolve().parents[2]
hosts = []
for cf in compose_files(repo_root):
hosts.extend(extract_traefik_hosts(cf.relative_to(repo_root)))
by_fqdn: Dict[str, List[Dict[str, str]]] = defaultdict(list)
for entry in hosts:
if entry["fqdn"].endswith(BASE_DOMAIN):
by_fqdn[entry["fqdn"]].append(entry)
duplicate_hosts = {k for k, v in by_fqdn.items() if len(v) > 1}
combined_fqdns = sorted(set(by_fqdn.keys()) | set(dynu_index.keys()))
inventory = []
for fqdn in combined_fqdns:
traefik_entries = sorted(
by_fqdn.get(fqdn, []),
key=lambda x: (x["stack"], x["service"], x["source_compose_file"]),
)
dns_records = dynu_index.get(fqdn, [])
if traefik_entries and dns_records:
status = "matched"
elif traefik_entries and not dns_records:
status = "missing_in_dynu"
else:
status = "dns_only"
inventory.append(
{
"fqdn": fqdn,
"status": status,
"duplicate": fqdn in duplicate_hosts,
"traefik_entries": traefik_entries,
"dynu_records": dns_records,
}
)
dynu_rows = []
for fqdn in sorted(dynu_index.keys()):
for rec in dynu_index[fqdn]:
dynu_rows.append(
{
"hostname": fqdn,
"type": rec["type"],
"value": rec["value"],
"ttl": rec["ttl"],
}
)
output = {
"source": "dynu+traefik",
"read_only": True,
"base_domain": BASE_DOMAIN,
"dynu_fetched_at": dyn_payload.get("fetched_at"),
"generated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"summary": {
"traefik_hostnames": len(by_fqdn),
"dynu_hostnames": len(dynu_index),
"matched": sum(1 for x in inventory if x["status"] == "matched"),
"missing_in_dynu": sum(1 for x in inventory if x["status"] == "missing_in_dynu"),
"dns_only": sum(1 for x in inventory if x["status"] == "dns_only"),
"duplicate_traefik_hostnames": len(duplicate_hosts),
},
"inventory": inventory,
"dynu_records_table": dynu_rows,
}
OUT_JSON.parent.mkdir(parents=True, exist_ok=True)
OUT_JSON.write_text(json.dumps(output, indent=2, sort_keys=True) + "\n", encoding="utf-8")
write_markdown(output)
print(f"Wrote {OUT_JSON}")
print(f"Wrote {OUT_MD}")
return 0
if __name__ == "__main__":
raise SystemExit(main())