#!/usr/bin/env python3 """Fetch Dynu DNS inventory in strict read-only mode. This integration is intentionally read-only. No Dynu mutations are permitted in this repo at this stage. """ from __future__ import annotations import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional from urllib.error import HTTPError, URLError from urllib.parse import urlencode from urllib.request import Request, urlopen BASE_DOMAIN = "lan.ddnsgeek.com" DEFAULT_BASE_URL = "https://api.dynu.com" OUT_PATH = Path("data/dns/dynu_live.json") class DynuReadOnlyError(RuntimeError): """Raised when read-only safety guardrails are not met.""" def require_read_only() -> None: if os.environ.get("DYNU_READ_ONLY") != "true": raise DynuReadOnlyError( "Refusing to run: DYNU_READ_ONLY must be exactly 'true'. " "This integration is intentionally read-only." ) def get_json(base_url: str, api_key: str, path: str, query: Optional[Dict[str, Any]] = None) -> Any: """HTTP GET helper. Any non-GET usage is blocked by design.""" url = f"{base_url.rstrip('/')}{path}" if query: url = f"{url}?{urlencode(query)}" req = Request( url=url, headers={ "accept": "application/json", "API-Key": api_key, }, method="GET", ) try: with urlopen(req, timeout=30) as resp: body = resp.read().decode("utf-8") except HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") hint = "" if exc.code == 401: hint = ( " Check DYNU_API_KEY from secrets/dynu.env, verify it is a valid Dynu API key, " "and ensure DYNU_BASE_URL points to the Dynu API endpoint." ) raise RuntimeError(f"Dynu API GET failed at {path}: HTTP {exc.code} {detail}.{hint}") from exc except URLError as exc: raise RuntimeError(f"Dynu API GET failed at {path}: {exc}") from exc try: return json.loads(body) except json.JSONDecodeError as exc: raise RuntimeError(f"Dynu API returned non-JSON response at {path}") from exc def list_domains(base_url: str, api_key: str) -> List[Dict[str, Any]]: first = get_json(base_url, api_key, "/v2/dns") def extract_domains(payload: Any) -> List[Dict[str, Any]]: if isinstance(payload, list): return [x for x in payload if isinstance(x, dict)] if isinstance(payload, dict): for key in ("domains", "items", "data", "dnsDomains"): val = payload.get(key) if isinstance(val, list): return [x for x in val if isinstance(x, dict)] return [] domains = extract_domains(first) if isinstance(first, dict): page = first.get("pageNumber") or first.get("page") or 1 total = first.get("totalPages") or first.get("pages") if isinstance(page, int) and isinstance(total, int) and total > page: for p in range(page + 1, total + 1): nxt = get_json(base_url, api_key, "/v2/dns", {"page": p}) domains.extend(extract_domains(nxt)) return domains def list_records(base_url: str, api_key: str, domain_id: Any) -> List[Dict[str, Any]]: payload = get_json(base_url, api_key, f"/v2/dns/{domain_id}/record") if isinstance(payload, list): return [x for x in payload if isinstance(x, dict)] if isinstance(payload, dict): for key in ("dnsRecords", "records", "items", "data"): val = payload.get(key) if isinstance(val, list): return [x for x in val if isinstance(x, dict)] return [] def normalize_hostname(record: Dict[str, Any], domain_name: str) -> str: node_name = record.get("nodeName") or record.get("hostname") or record.get("node") or "" node_name = str(node_name).strip().strip(".") base = domain_name.strip().strip(".") if not node_name or node_name in {"@", base}: return base if node_name.endswith(base): return node_name return f"{node_name}.{base}" def normalize_records(records: Iterable[Dict[str, Any]], domain_name: str) -> List[Dict[str, Any]]: normalized = [] for rec in records: record_type = rec.get("recordType") or rec.get("type") or "" value = ( rec.get("value") or rec.get("ipv4Address") or rec.get("ipv6Address") or rec.get("host") or rec.get("textData") or "" ) target = rec.get("host") or rec.get("target") priority = rec.get("priority") ttl = rec.get("ttl") raw_subset = { k: rec.get(k) for k in ( "id", "nodeName", "recordType", "state", "group", "host", "ipv4Address", "ipv6Address", "textData", "ttl", "priority", "weight", "port", ) if k in rec } normalized.append( { "id": rec.get("id"), "hostname": normalize_hostname(rec, domain_name), "type": str(record_type), "value": str(value), "target": target, "ttl": ttl, "priority": priority, "raw": raw_subset, } ) return sorted(normalized, key=lambda x: (x["hostname"], x["type"], x["value"], str(x.get("id") or ""))) def main() -> int: try: require_read_only() except DynuReadOnlyError as exc: print(str(exc), file=sys.stderr) return 2 api_key = os.environ.get("DYNU_API_KEY") if not api_key: print("Missing DYNU_API_KEY. Refusing to call Dynu API.", file=sys.stderr) return 2 api_key = api_key.strip().strip("'").strip('"') if not api_key: print("DYNU_API_KEY is empty after trimming quotes/whitespace.", file=sys.stderr) return 2 base_url = os.environ.get("DYNU_BASE_URL", DEFAULT_BASE_URL).strip().strip("'").strip('"') domains = list_domains(base_url, api_key) target = [d for d in domains if str(d.get("name", "")).strip(".").lower() == BASE_DOMAIN] if not target: print( f"Could not find required domain '{BASE_DOMAIN}' in Dynu /v2/dns response.", file=sys.stderr, ) return 3 normalized_domains = [] for d in target: domain_id = d.get("id") if domain_id is None: print(f"Domain entry for {BASE_DOMAIN} is missing 'id'; cannot fetch records.", file=sys.stderr) return 4 records = list_records(base_url, api_key, domain_id) normalized_domains.append( { "name": str(d.get("name", BASE_DOMAIN)).strip().strip("."), "id": domain_id, "records": normalize_records(records, str(d.get("name", BASE_DOMAIN))), } ) normalized_domains.sort(key=lambda x: (x["name"], str(x.get("id") or ""))) output = { "source": "dynu", "read_only": True, "fetched_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), "base_domain": BASE_DOMAIN, "domains": normalized_domains, } OUT_PATH.parent.mkdir(parents=True, exist_ok=True) OUT_PATH.write_text(json.dumps(output, indent=2, sort_keys=True) + "\n", encoding="utf-8") print(f"Wrote {OUT_PATH}") return 0 if __name__ == "__main__": raise SystemExit(main())