Add strict read-only Dynu DNS inventory integration

This commit is contained in:
beatz174-bit
2026-04-21 12:31:52 +10:00
parent c77db36865
commit 580e9b9aed
9 changed files with 653 additions and 0 deletions
+13
View File
@@ -0,0 +1,13 @@
#!/usr/bin/env bash
set -euo pipefail
# This integration is intentionally read-only.
# No Dynu mutations are permitted in this repo at this stage.
if [[ "${DYNU_READ_ONLY:-}" != "true" ]]; then
echo "Refusing to run: DYNU_READ_ONLY must be exactly 'true'." >&2
exit 2
fi
python3 scripts/dynu/fetch_dynu_dns.py
python3 scripts/dynu/correlate_dynu_with_traefik.py
+307
View File
@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""Correlate Dynu DNS data with Traefik host rules in compose sources.
This integration is intentionally read-only.
No Dynu mutations are permitted in this repo at this stage.
"""
from __future__ import annotations
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Tuple
BASE_DOMAIN = "lan.ddnsgeek.com"
DYN_DATA = Path("data/dns/dynu_live.json")
OUT_JSON = Path("data/dns/dynu_traefik_inventory.json")
OUT_MD = Path("docs/generated/dns-inventory.md")
HOST_RULE_RE = re.compile(r"Host\((.*?)\)")
DOMAIN_RE = re.compile(r"[`\"']([^`\"']+)[`\"']")
class ReadOnlyError(RuntimeError):
pass
def require_read_only() -> None:
if os.environ.get("DYNU_READ_ONLY") != "true":
raise ReadOnlyError(
"Refusing to run: DYNU_READ_ONLY must be exactly 'true'. "
"This integration is intentionally read-only."
)
def compose_files(root: Path) -> List[Path]:
files = [root / "default-network.yml"]
for area in ("apps", "monitoring", "core"):
base = root / area
if not base.exists():
continue
for p in sorted(base.glob("*/*")):
if p.is_file() and p.name in {"docker-compose.yml", "docker-compose.yaml"}:
files.append(p)
return files
def parse_hosts_from_label(label_value: str) -> List[str]:
found = []
for fragment in HOST_RULE_RE.findall(label_value):
for host in DOMAIN_RE.findall(fragment):
h = host.strip().strip(".").lower()
if h:
found.append(h)
return sorted(set(found))
def extract_traefik_hosts(path: Path) -> List[Dict[str, str]]:
lines = path.read_text(encoding="utf-8").splitlines()
entries: List[Dict[str, str]] = []
in_services = False
current_service = ""
current_labels_indent = None
for raw in lines:
line = raw.rstrip("\n")
stripped = line.strip()
if stripped == "services:":
in_services = True
current_service = ""
current_labels_indent = None
continue
if not in_services:
continue
service_match = re.match(r"^(\s{2})([A-Za-z0-9_.-]+):\s*$", line)
if service_match:
current_service = service_match.group(2)
current_labels_indent = None
continue
if re.match(r"^\S", line):
in_services = False
current_service = ""
current_labels_indent = None
continue
labels_match = re.match(r"^(\s+)labels:\s*$", line)
if labels_match and current_service:
current_labels_indent = len(labels_match.group(1))
continue
if current_labels_indent is None:
continue
indent = len(line) - len(line.lstrip(" "))
if indent <= current_labels_indent:
current_labels_indent = None
continue
if "traefik.http.routers." not in line or ".rule" not in line:
continue
label_value = ""
list_match = re.match(r"^\s*-\s*([\"']?)(.+)\1\s*$", stripped)
if list_match:
payload = list_match.group(2)
label_value = payload.split("=", 1)[1] if "=" in payload else payload
else:
map_match = re.match(r"^\s*([\"']?[^:]+\1):\s*(.+)$", line)
if map_match:
label_value = map_match.group(2).strip().strip("\"'")
for fqdn in parse_hosts_from_label(label_value):
entries.append(
{
"fqdn": fqdn,
"stack": path.parts[0],
"service": current_service,
"source_compose_file": str(path),
}
)
return entries
def load_dynu(path: Path) -> Dict[str, List[Dict[str, str]]]:
payload = json.loads(path.read_text(encoding="utf-8"))
if payload.get("base_domain") != BASE_DOMAIN:
raise RuntimeError(
f"Dynu JSON base_domain mismatch. Expected {BASE_DOMAIN}, got {payload.get('base_domain')}"
)
index: Dict[str, List[Dict[str, str]]] = defaultdict(list)
for domain in payload.get("domains", []):
for record in domain.get("records", []):
host = str(record.get("hostname", "")).strip(".").lower()
if host:
index[host].append(
{
"type": str(record.get("type", "")),
"value": str(record.get("value", "")),
"target": str(record.get("target") or ""),
"ttl": str(record.get("ttl") if record.get("ttl") is not None else ""),
}
)
for host in index:
index[host] = sorted(index[host], key=lambda x: (x["type"], x["value"], x["target"], x["ttl"]))
return index
def write_markdown(data: Dict) -> None:
matched = [x for x in data["inventory"] if x["status"] == "matched"]
missing = [x for x in data["inventory"] if x["status"] == "missing_in_dynu"]
dns_only = [x for x in data["inventory"] if x["status"] == "dns_only"]
lines = [
"# DNS Inventory (Dynu + Traefik)",
"",
"> This integration is intentionally read-only. No Dynu mutations are permitted in this repo at this stage.",
"",
f"- Base domain: `{data['base_domain']}`",
f"- Dynu fetched at: `{data['dynu_fetched_at']}`",
f"- Inventory generated at: `{data['generated_at']}`",
"",
"## Summary",
"",
f"- Traefik hostnames discovered: **{data['summary']['traefik_hostnames']}**",
f"- Dynu hostnames discovered: **{data['summary']['dynu_hostnames']}**",
f"- Matched: **{data['summary']['matched']}**",
f"- Missing in Dynu: **{data['summary']['missing_in_dynu']}**",
f"- Dynu DNS only: **{data['summary']['dns_only']}**",
f"- Duplicate Traefik hostnames: **{data['summary']['duplicate_traefik_hostnames']}**",
"",
"## Dynu Records",
"",
"| Hostname | Type | Value | TTL |",
"|---|---|---|---|",
]
for row in data["dynu_records_table"]:
lines.append(f"| `{row['hostname']}` | `{row['type']}` | `{row['value']}` | `{row['ttl']}` |")
lines.extend(["", "## Correlation", "", "| Hostname | Status | Service(s) | Source compose file(s) | DNS records |", "|---|---|---|---|---|"])
for row in data["inventory"]:
svc = ", ".join(sorted({f"{e['stack']}/{e['service']}" for e in row.get('traefik_entries', [])})) or "-"
src = ", ".join(sorted({e['source_compose_file'] for e in row.get('traefik_entries', [])})) or "-"
dns = ", ".join([f"{r['type']}:{r['value']}" for r in row.get("dynu_records", [])]) or "-"
lines.append(f"| `{row['fqdn']}` | `{row['status']}` | {svc} | {src} | {dns} |")
def section(title: str, rows: List[Dict]) -> None:
lines.extend(["", f"## {title}", ""])
if not rows:
lines.append("_None._")
return
for row in rows:
lines.append(f"- `{row['fqdn']}`")
section("Matched records", matched)
section("Traefik hostnames missing in Dynu", missing)
section("Dynu DNS records not mapped to known Traefik services", dns_only)
OUT_MD.parent.mkdir(parents=True, exist_ok=True)
OUT_MD.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
try:
require_read_only()
except ReadOnlyError as exc:
print(str(exc), file=sys.stderr)
return 2
if not DYN_DATA.exists():
print(f"Missing {DYN_DATA}. Run fetch_dynu_dns.py first.", file=sys.stderr)
return 3
dyn_payload = json.loads(DYN_DATA.read_text(encoding="utf-8"))
dynu_index = load_dynu(DYN_DATA)
repo_root = Path(__file__).resolve().parents[2]
hosts = []
for cf in compose_files(repo_root):
hosts.extend(extract_traefik_hosts(cf.relative_to(repo_root)))
by_fqdn: Dict[str, List[Dict[str, str]]] = defaultdict(list)
for entry in hosts:
if entry["fqdn"].endswith(BASE_DOMAIN):
by_fqdn[entry["fqdn"]].append(entry)
duplicate_hosts = {k for k, v in by_fqdn.items() if len(v) > 1}
combined_fqdns = sorted(set(by_fqdn.keys()) | set(dynu_index.keys()))
inventory = []
for fqdn in combined_fqdns:
traefik_entries = sorted(
by_fqdn.get(fqdn, []),
key=lambda x: (x["stack"], x["service"], x["source_compose_file"]),
)
dns_records = dynu_index.get(fqdn, [])
if traefik_entries and dns_records:
status = "matched"
elif traefik_entries and not dns_records:
status = "missing_in_dynu"
else:
status = "dns_only"
inventory.append(
{
"fqdn": fqdn,
"status": status,
"duplicate": fqdn in duplicate_hosts,
"traefik_entries": traefik_entries,
"dynu_records": dns_records,
}
)
dynu_rows = []
for fqdn in sorted(dynu_index.keys()):
for rec in dynu_index[fqdn]:
dynu_rows.append(
{
"hostname": fqdn,
"type": rec["type"],
"value": rec["value"],
"ttl": rec["ttl"],
}
)
output = {
"source": "dynu+traefik",
"read_only": True,
"base_domain": BASE_DOMAIN,
"dynu_fetched_at": dyn_payload.get("fetched_at"),
"generated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"summary": {
"traefik_hostnames": len(by_fqdn),
"dynu_hostnames": len(dynu_index),
"matched": sum(1 for x in inventory if x["status"] == "matched"),
"missing_in_dynu": sum(1 for x in inventory if x["status"] == "missing_in_dynu"),
"dns_only": sum(1 for x in inventory if x["status"] == "dns_only"),
"duplicate_traefik_hostnames": len(duplicate_hosts),
},
"inventory": inventory,
"dynu_records_table": dynu_rows,
}
OUT_JSON.parent.mkdir(parents=True, exist_ok=True)
OUT_JSON.write_text(json.dumps(output, indent=2, sort_keys=True) + "\n", encoding="utf-8")
write_markdown(output)
print(f"Wrote {OUT_JSON}")
print(f"Wrote {OUT_MD}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+226
View File
@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""Fetch Dynu DNS inventory in strict read-only mode.
This integration is intentionally read-only.
No Dynu mutations are permitted in this repo at this stage.
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
BASE_DOMAIN = "lan.ddnsgeek.com"
DEFAULT_BASE_URL = "https://api.dynu.com"
OUT_PATH = Path("data/dns/dynu_live.json")
class DynuReadOnlyError(RuntimeError):
"""Raised when read-only safety guardrails are not met."""
def require_read_only() -> None:
if os.environ.get("DYNU_READ_ONLY") != "true":
raise DynuReadOnlyError(
"Refusing to run: DYNU_READ_ONLY must be exactly 'true'. "
"This integration is intentionally read-only."
)
def get_json(base_url: str, api_key: str, path: str, query: Optional[Dict[str, Any]] = None) -> Any:
"""HTTP GET helper. Any non-GET usage is blocked by design."""
url = f"{base_url.rstrip('/')}{path}"
if query:
url = f"{url}?{urlencode(query)}"
req = Request(
url=url,
headers={
"accept": "application/json",
"API-Key": api_key,
},
method="GET",
)
try:
with urlopen(req, timeout=30) as resp:
body = resp.read().decode("utf-8")
except HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Dynu API GET failed at {path}: HTTP {exc.code} {detail}") from exc
except URLError as exc:
raise RuntimeError(f"Dynu API GET failed at {path}: {exc}") from exc
try:
return json.loads(body)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Dynu API returned non-JSON response at {path}") from exc
def list_domains(base_url: str, api_key: str) -> List[Dict[str, Any]]:
first = get_json(base_url, api_key, "/v2/dns")
def extract_domains(payload: Any) -> List[Dict[str, Any]]:
if isinstance(payload, list):
return [x for x in payload if isinstance(x, dict)]
if isinstance(payload, dict):
for key in ("domains", "items", "data", "dnsDomains"):
val = payload.get(key)
if isinstance(val, list):
return [x for x in val if isinstance(x, dict)]
return []
domains = extract_domains(first)
if isinstance(first, dict):
page = first.get("pageNumber") or first.get("page") or 1
total = first.get("totalPages") or first.get("pages")
if isinstance(page, int) and isinstance(total, int) and total > page:
for p in range(page + 1, total + 1):
nxt = get_json(base_url, api_key, "/v2/dns", {"page": p})
domains.extend(extract_domains(nxt))
return domains
def list_records(base_url: str, api_key: str, domain_id: Any) -> List[Dict[str, Any]]:
payload = get_json(base_url, api_key, f"/v2/dns/{domain_id}/record")
if isinstance(payload, list):
return [x for x in payload if isinstance(x, dict)]
if isinstance(payload, dict):
for key in ("dnsRecords", "records", "items", "data"):
val = payload.get(key)
if isinstance(val, list):
return [x for x in val if isinstance(x, dict)]
return []
def normalize_hostname(record: Dict[str, Any], domain_name: str) -> str:
node_name = record.get("nodeName") or record.get("hostname") or record.get("node") or ""
node_name = str(node_name).strip().strip(".")
base = domain_name.strip().strip(".")
if not node_name or node_name in {"@", base}:
return base
if node_name.endswith(base):
return node_name
return f"{node_name}.{base}"
def normalize_records(records: Iterable[Dict[str, Any]], domain_name: str) -> List[Dict[str, Any]]:
normalized = []
for rec in records:
record_type = rec.get("recordType") or rec.get("type") or ""
value = (
rec.get("value")
or rec.get("ipv4Address")
or rec.get("ipv6Address")
or rec.get("host")
or rec.get("textData")
or ""
)
target = rec.get("host") or rec.get("target")
priority = rec.get("priority")
ttl = rec.get("ttl")
raw_subset = {
k: rec.get(k)
for k in (
"id",
"nodeName",
"recordType",
"state",
"group",
"host",
"ipv4Address",
"ipv6Address",
"textData",
"ttl",
"priority",
"weight",
"port",
)
if k in rec
}
normalized.append(
{
"id": rec.get("id"),
"hostname": normalize_hostname(rec, domain_name),
"type": str(record_type),
"value": str(value),
"target": target,
"ttl": ttl,
"priority": priority,
"raw": raw_subset,
}
)
return sorted(normalized, key=lambda x: (x["hostname"], x["type"], x["value"], str(x.get("id") or "")))
def main() -> int:
try:
require_read_only()
except DynuReadOnlyError as exc:
print(str(exc), file=sys.stderr)
return 2
api_key = os.environ.get("DYNU_API_KEY")
if not api_key:
print("Missing DYNU_API_KEY. Refusing to call Dynu API.", file=sys.stderr)
return 2
base_url = os.environ.get("DYNU_BASE_URL", DEFAULT_BASE_URL)
domains = list_domains(base_url, api_key)
target = [d for d in domains if str(d.get("name", "")).strip(".").lower() == BASE_DOMAIN]
if not target:
print(
f"Could not find required domain '{BASE_DOMAIN}' in Dynu /v2/dns response.",
file=sys.stderr,
)
return 3
normalized_domains = []
for d in target:
domain_id = d.get("id")
if domain_id is None:
print(f"Domain entry for {BASE_DOMAIN} is missing 'id'; cannot fetch records.", file=sys.stderr)
return 4
records = list_records(base_url, api_key, domain_id)
normalized_domains.append(
{
"name": str(d.get("name", BASE_DOMAIN)).strip().strip("."),
"id": domain_id,
"records": normalize_records(records, str(d.get("name", BASE_DOMAIN))),
}
)
normalized_domains.sort(key=lambda x: (x["name"], str(x.get("id") or "")))
output = {
"source": "dynu",
"read_only": True,
"fetched_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"base_domain": BASE_DOMAIN,
"domains": normalized_domains,
}
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUT_PATH.write_text(json.dumps(output, indent=2, sort_keys=True) + "\n", encoding="utf-8")
print(f"Wrote {OUT_PATH}")
return 0
if __name__ == "__main__":
raise SystemExit(main())