Files
docker/scripts/dynu/fetch_dynu_dns.py
T
2026-04-21 12:31:52 +10:00

227 lines
7.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""Fetch Dynu DNS inventory in strict read-only mode.
This integration is intentionally read-only.
No Dynu mutations are permitted in this repo at this stage.
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
BASE_DOMAIN = "lan.ddnsgeek.com"
DEFAULT_BASE_URL = "https://api.dynu.com"
OUT_PATH = Path("data/dns/dynu_live.json")
class DynuReadOnlyError(RuntimeError):
"""Raised when read-only safety guardrails are not met."""
def require_read_only() -> None:
if os.environ.get("DYNU_READ_ONLY") != "true":
raise DynuReadOnlyError(
"Refusing to run: DYNU_READ_ONLY must be exactly 'true'. "
"This integration is intentionally read-only."
)
def get_json(base_url: str, api_key: str, path: str, query: Optional[Dict[str, Any]] = None) -> Any:
"""HTTP GET helper. Any non-GET usage is blocked by design."""
url = f"{base_url.rstrip('/')}{path}"
if query:
url = f"{url}?{urlencode(query)}"
req = Request(
url=url,
headers={
"accept": "application/json",
"API-Key": api_key,
},
method="GET",
)
try:
with urlopen(req, timeout=30) as resp:
body = resp.read().decode("utf-8")
except HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Dynu API GET failed at {path}: HTTP {exc.code} {detail}") from exc
except URLError as exc:
raise RuntimeError(f"Dynu API GET failed at {path}: {exc}") from exc
try:
return json.loads(body)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Dynu API returned non-JSON response at {path}") from exc
def list_domains(base_url: str, api_key: str) -> List[Dict[str, Any]]:
first = get_json(base_url, api_key, "/v2/dns")
def extract_domains(payload: Any) -> List[Dict[str, Any]]:
if isinstance(payload, list):
return [x for x in payload if isinstance(x, dict)]
if isinstance(payload, dict):
for key in ("domains", "items", "data", "dnsDomains"):
val = payload.get(key)
if isinstance(val, list):
return [x for x in val if isinstance(x, dict)]
return []
domains = extract_domains(first)
if isinstance(first, dict):
page = first.get("pageNumber") or first.get("page") or 1
total = first.get("totalPages") or first.get("pages")
if isinstance(page, int) and isinstance(total, int) and total > page:
for p in range(page + 1, total + 1):
nxt = get_json(base_url, api_key, "/v2/dns", {"page": p})
domains.extend(extract_domains(nxt))
return domains
def list_records(base_url: str, api_key: str, domain_id: Any) -> List[Dict[str, Any]]:
payload = get_json(base_url, api_key, f"/v2/dns/{domain_id}/record")
if isinstance(payload, list):
return [x for x in payload if isinstance(x, dict)]
if isinstance(payload, dict):
for key in ("dnsRecords", "records", "items", "data"):
val = payload.get(key)
if isinstance(val, list):
return [x for x in val if isinstance(x, dict)]
return []
def normalize_hostname(record: Dict[str, Any], domain_name: str) -> str:
node_name = record.get("nodeName") or record.get("hostname") or record.get("node") or ""
node_name = str(node_name).strip().strip(".")
base = domain_name.strip().strip(".")
if not node_name or node_name in {"@", base}:
return base
if node_name.endswith(base):
return node_name
return f"{node_name}.{base}"
def normalize_records(records: Iterable[Dict[str, Any]], domain_name: str) -> List[Dict[str, Any]]:
normalized = []
for rec in records:
record_type = rec.get("recordType") or rec.get("type") or ""
value = (
rec.get("value")
or rec.get("ipv4Address")
or rec.get("ipv6Address")
or rec.get("host")
or rec.get("textData")
or ""
)
target = rec.get("host") or rec.get("target")
priority = rec.get("priority")
ttl = rec.get("ttl")
raw_subset = {
k: rec.get(k)
for k in (
"id",
"nodeName",
"recordType",
"state",
"group",
"host",
"ipv4Address",
"ipv6Address",
"textData",
"ttl",
"priority",
"weight",
"port",
)
if k in rec
}
normalized.append(
{
"id": rec.get("id"),
"hostname": normalize_hostname(rec, domain_name),
"type": str(record_type),
"value": str(value),
"target": target,
"ttl": ttl,
"priority": priority,
"raw": raw_subset,
}
)
return sorted(normalized, key=lambda x: (x["hostname"], x["type"], x["value"], str(x.get("id") or "")))
def main() -> int:
try:
require_read_only()
except DynuReadOnlyError as exc:
print(str(exc), file=sys.stderr)
return 2
api_key = os.environ.get("DYNU_API_KEY")
if not api_key:
print("Missing DYNU_API_KEY. Refusing to call Dynu API.", file=sys.stderr)
return 2
base_url = os.environ.get("DYNU_BASE_URL", DEFAULT_BASE_URL)
domains = list_domains(base_url, api_key)
target = [d for d in domains if str(d.get("name", "")).strip(".").lower() == BASE_DOMAIN]
if not target:
print(
f"Could not find required domain '{BASE_DOMAIN}' in Dynu /v2/dns response.",
file=sys.stderr,
)
return 3
normalized_domains = []
for d in target:
domain_id = d.get("id")
if domain_id is None:
print(f"Domain entry for {BASE_DOMAIN} is missing 'id'; cannot fetch records.", file=sys.stderr)
return 4
records = list_records(base_url, api_key, domain_id)
normalized_domains.append(
{
"name": str(d.get("name", BASE_DOMAIN)).strip().strip("."),
"id": domain_id,
"records": normalize_records(records, str(d.get("name", BASE_DOMAIN))),
}
)
normalized_domains.sort(key=lambda x: (x["name"], str(x.get("id") or "")))
output = {
"source": "dynu",
"read_only": True,
"fetched_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"base_domain": BASE_DOMAIN,
"domains": normalized_domains,
}
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUT_PATH.write_text(json.dumps(output, indent=2, sort_keys=True) + "\n", encoding="utf-8")
print(f"Wrote {OUT_PATH}")
return 0
if __name__ == "__main__":
raise SystemExit(main())