#!/usr/bin/env python3 """Correlate Dynu DNS data with Traefik host rules in compose sources. This integration is intentionally read-only. No Dynu mutations are permitted in this repo at this stage. """ from __future__ import annotations import json import os import re import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Set import yaml BASE_DOMAIN = "lan.ddnsgeek.com" ALLOWED_UNMAPPED_HOSTNAMES = ["edge.lan.ddnsgeek.com"] DYN_DATA = Path("data/dns/dynu_live.json") OUT_JSON = Path("data/dns/dynu_traefik_inventory.json") OUT_MD = Path("docs/generated/dns-inventory.md") HOST_CALL_RE = re.compile(r"Host\s*\(([^)]*)\)", re.IGNORECASE) QUOTED_HOST_RE = re.compile(r"[`\"']([^`\"']+)[`\"']") ROUTER_LABEL_RE = re.compile(r"^traefik\.http\.routers\.([^.]+)\.(.+)$") class ReadOnlyError(RuntimeError): pass def require_read_only() -> None: if os.environ.get("DYNU_READ_ONLY") != "true": raise ReadOnlyError( "Refusing to run: DYNU_READ_ONLY must be exactly 'true'. " "This integration is intentionally read-only." ) def compose_files(root: Path) -> List[Path]: files: Set[Path] = set() if (root / "default-network.yml").exists(): files.add(root / "default-network.yml") for area in ("apps", "monitoring", "core"): base = root / area if not base.exists(): continue for pattern in ("**/docker-compose.yml", "**/docker-compose.yaml"): files.update(p for p in base.glob(pattern) if p.is_file()) return sorted(files) def parse_hosts_from_rule(rule: str) -> List[str]: hosts: Set[str] = set() for call_fragment in HOST_CALL_RE.findall(rule): quoted_hosts = QUOTED_HOST_RE.findall(call_fragment) for host in quoted_hosts: clean = host.strip().strip(".").lower() if clean: hosts.add(clean) if not quoted_hosts: for token in call_fragment.split(","): clean = token.strip().strip(".`\"'").lower() if clean: hosts.add(clean) return sorted(hosts) def load_env_defaults(repo_root: Path) -> Dict[str, str]: env_values: Dict[str, str] = {} for candidate in (repo_root / "default-environment.env", repo_root / ".env"): if not candidate.exists(): continue for line in candidate.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue key, value = stripped.split("=", 1) env_values[key.strip()] = value.strip().strip("'\"") return env_values def resolve_rule_variables(rule: str, env_values: Dict[str, str]) -> str: var_re = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}") def replacer(match: re.Match[str]) -> str: key = match.group(1) if key in os.environ: return os.environ[key] return env_values.get(key, match.group(0)) return var_re.sub(replacer, rule) def normalize_labels(raw_labels: Any) -> Dict[str, str]: labels: Dict[str, str] = {} if isinstance(raw_labels, dict): for key, value in raw_labels.items(): labels[str(key)] = "" if value is None else str(value) return labels if isinstance(raw_labels, list): for item in raw_labels: if isinstance(item, str) and "=" in item: key, value = item.split("=", 1) labels[key.strip()] = value.strip() elif isinstance(item, str): labels[item.strip()] = "" return labels return labels def infer_stack(compose_file: Path) -> str: parts = compose_file.parts return parts[0] if parts else "unknown" def boolish(value: str) -> bool: return value.strip().lower() in {"1", "true", "yes", "on"} def parse_middlewares(raw_value: str) -> List[str]: return [item.strip() for item in raw_value.split(",") if item.strip()] def extract_traefik_hosts(path: Path, env_values: Dict[str, str]) -> List[Dict[str, Any]]: try: payload = yaml.safe_load(path.read_text(encoding="utf-8")) or {} except yaml.YAMLError as exc: raise RuntimeError(f"Failed to parse compose YAML in {path}: {exc}") from exc services = payload.get("services") if not isinstance(services, dict): return [] entries: List[Dict[str, Any]] = [] stack = infer_stack(path) for service_name, service_payload in services.items(): if not isinstance(service_payload, dict): continue labels = normalize_labels(service_payload.get("labels")) router_fields: Dict[str, Dict[str, str]] = defaultdict(dict) for label_key, label_value in labels.items(): match = ROUTER_LABEL_RE.match(label_key) if not match: continue router_name, field_name = match.groups() router_fields[router_name][field_name] = label_value for router_name, fields in router_fields.items(): rule = fields.get("rule", "") if not rule: continue router_label_key = f"traefik.http.routers.{router_name}.rule" middlewares = parse_middlewares(fields.get("middlewares", "")) tls_options = fields.get("tls.options", "") tls_enabled = boolish(fields.get("tls", "")) or bool(tls_options) or bool(fields.get("tls.certresolver", "")) lowered_metadata = " ".join([tls_options, ",".join(middlewares)]).lower() uses_mtls = "mtls" in lowered_metadata uses_authelia = "authelia" in lowered_metadata resolved_rule = resolve_rule_variables(rule, env_values) for fqdn in parse_hosts_from_rule(resolved_rule): entries.append( { "fqdn": fqdn, "service": str(service_name), "stack": stack, "source_compose_file": str(path), "router": router_name, "router_label_keys": [router_label_key], "raw_rule": rule, "resolved_rule": resolved_rule, "uses_tls": tls_enabled, "tls_options": tls_options, "middlewares": middlewares, "uses_mtls": uses_mtls, "uses_authelia": uses_authelia, } ) return entries def load_dynu(path: Path) -> Dict[str, List[Dict[str, str]]]: payload = json.loads(path.read_text(encoding="utf-8")) if payload.get("base_domain") != BASE_DOMAIN: raise RuntimeError( f"Dynu JSON base_domain mismatch. Expected {BASE_DOMAIN}, got {payload.get('base_domain')}" ) index: Dict[str, List[Dict[str, str]]] = defaultdict(list) for domain in payload.get("domains", []): for record in domain.get("records", []): host = str(record.get("hostname", "")).strip(".").lower() if host: index[host].append( { "type": str(record.get("type", "")), "value": str(record.get("value", "")), "target": str(record.get("target") or ""), "ttl": str(record.get("ttl") if record.get("ttl") is not None else ""), } ) for host in index: index[host] = sorted(index[host], key=lambda x: (x["type"], x["value"], x["target"], x["ttl"])) return index def is_subdomain_of_base(fqdn: str) -> bool: return fqdn.endswith(f".{BASE_DOMAIN}") def summarize_reasons( has_traefik: bool, has_dns: bool, is_allowed_unmapped: bool, is_ambiguous: bool, is_enforced_dns_subdomain: bool, ) -> List[str]: reasons: List[str] = [] if has_traefik and has_dns: reasons.append("mapped") if has_dns and not has_traefik and is_allowed_unmapped: reasons.append("allowed_unmapped") if has_dns and not has_traefik and is_enforced_dns_subdomain and not is_allowed_unmapped: reasons.append("unexpected_unmapped") if has_dns and not has_traefik: reasons.append("dns_only") if has_traefik and not has_dns: reasons.append("traefik_only") if is_ambiguous: reasons.append("duplicate_mapping") reasons.append("ambiguous_mapping") return reasons def write_markdown(data: Dict[str, Any]) -> None: inventory = data["inventory"] lines = [ "# DNS Inventory (Dynu + Traefik)", "", "> This integration is intentionally read-only. No Dynu mutations are permitted in this repo at this stage.", "", f"- Base domain: `{data['base_domain']}`", f"- Dynu fetched at: `{data['dynu_fetched_at']}`", f"- Inventory generated at: `{data['generated_at']}`", "", "## Summary", "", f"- Traefik hostnames discovered: **{data['summary']['traefik_hostnames']}**", f"- Dynu hostnames discovered: **{data['summary']['dynu_hostnames']}**", f"- Mapped hostnames: **{data['summary']['mapped_hostnames']}**", f"- DNS-only hostnames: **{data['summary']['dns_only_hostnames']}**", f"- Traefik-only hostnames: **{data['summary']['traefik_only_hostnames']}**", f"- Ambiguous hostnames: **{len(data['validation']['ambiguous_hostnames'])}**", "", "## Validation", "", f"- Validation ok: **{str(data['validation']['validation_ok']).lower()}**", f"- Allowed unmapped hostnames: `{', '.join(data['validation']['allowed_unmapped_hostnames'])}`", f"- Unexpected unmapped hostnames: **{len(data['validation']['unexpected_unmapped_hostnames'])}**", f"- Duplicate hostnames: **{len(data['validation']['duplicate_hostnames'])}**", f"- Ambiguous hostnames: **{len(data['validation']['ambiguous_hostnames'])}**", "", ] def bullet_list(title: str, values: Iterable[str]) -> None: rows = list(values) lines.extend([f"### {title}", ""]) if not rows: lines.append("_None._") else: for value in rows: lines.append(f"- `{value}`") lines.append("") bullet_list("Allowed unmapped hostnames", data["validation"]["allowed_unmapped_hostnames"]) bullet_list("Unexpected unmapped hostnames", data["validation"]["unexpected_unmapped_hostnames"]) bullet_list("Duplicate hostnames", data["validation"]["duplicate_hostnames"]) bullet_list("Ambiguous hostnames", data["validation"]["ambiguous_hostnames"]) lines.extend( [ "## Correlation", "", "| Hostname | Status | Reasons | Service(s) | Route metadata | DNS records |", "|---|---|---|---|---|---|", ] ) for row in inventory: services = sorted({f"{entry['stack']}/{entry['service']}" for entry in row["traefik_entries"]}) service_cell = ", ".join(services) if services else "-" reason_cell = ", ".join(row["reasons"]) if row["reasons"] else "-" route_chunks = [] for entry in row["traefik_entries"]: middlewares = ",".join(entry.get("middlewares", [])) or "-" route_chunks.append( f"{entry['router']} [tls={str(entry['uses_tls']).lower()}, mtls={str(entry['uses_mtls']).lower()}, authelia={str(entry['uses_authelia']).lower()}, tls_options={entry.get('tls_options') or '-'}, middlewares={middlewares}]" ) route_cell = "
".join(route_chunks) if route_chunks else "-" dns_cell = ", ".join(f"{item['type']}:{item['value']}" for item in row["dynu_records"]) if row["dynu_records"] else "-" lines.append(f"| `{row['fqdn']}` | `{row['status']}` | `{reason_cell}` | {service_cell} | {route_cell} | {dns_cell} |") OUT_MD.parent.mkdir(parents=True, exist_ok=True) OUT_MD.write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> int: try: require_read_only() except ReadOnlyError as exc: print(str(exc), file=sys.stderr) return 2 if not DYN_DATA.exists(): print(f"Missing {DYN_DATA}. Run fetch_dynu_dns.py first.", file=sys.stderr) return 3 dyn_payload = json.loads(DYN_DATA.read_text(encoding="utf-8")) dynu_index = load_dynu(DYN_DATA) repo_root = Path(__file__).resolve().parents[2] env_values = load_env_defaults(repo_root) hosts: List[Dict[str, Any]] = [] for cf in compose_files(repo_root): hosts.extend(extract_traefik_hosts(cf.relative_to(repo_root), env_values)) by_fqdn: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for entry in hosts: if entry["fqdn"] == BASE_DOMAIN or is_subdomain_of_base(entry["fqdn"]): by_fqdn[entry["fqdn"]].append(entry) duplicate_hostnames = sorted(k for k, v in by_fqdn.items() if len(v) > 1) combined_fqdns = sorted(set(by_fqdn.keys()) | set(dynu_index.keys())) inventory = [] ambiguous_hostnames: List[str] = [] for fqdn in combined_fqdns: traefik_entries = sorted( by_fqdn.get(fqdn, []), key=lambda x: (x["stack"], x["service"], x["source_compose_file"], x["router"]), ) dns_records = dynu_index.get(fqdn, []) is_allowed_unmapped = fqdn in ALLOWED_UNMAPPED_HOSTNAMES has_traefik = bool(traefik_entries) has_dns = bool(dns_records) service_keys = {f"{item['stack']}/{item['service']}" for item in traefik_entries} is_ambiguous = len(service_keys) > 1 if is_ambiguous: ambiguous_hostnames.append(fqdn) is_enforced_dns_subdomain = is_subdomain_of_base(fqdn) if has_traefik and has_dns: status = "mapped" elif has_dns and is_allowed_unmapped: status = "allowed_unmapped" elif has_dns and not has_traefik and is_enforced_dns_subdomain: status = "unexpected_unmapped" elif has_dns and not has_traefik: status = "dns_only" else: status = "traefik_only" reasons = summarize_reasons( has_traefik, has_dns, is_allowed_unmapped, is_ambiguous, is_enforced_dns_subdomain ) inventory.append( { "fqdn": fqdn, "status": status, "reasons": reasons, "duplicate": fqdn in duplicate_hostnames, "traefik_entries": traefik_entries, "dynu_records": dns_records, } ) subdomain_dns_hosts = sorted(host for host in dynu_index if is_subdomain_of_base(host)) unexpected_unmapped_hostnames = sorted( host for host in subdomain_dns_hosts if host not in by_fqdn and host not in ALLOWED_UNMAPPED_HOSTNAMES ) validation = { "allowed_unmapped_hostnames": sorted(ALLOWED_UNMAPPED_HOSTNAMES), "unexpected_unmapped_hostnames": unexpected_unmapped_hostnames, "duplicate_hostnames": duplicate_hostnames, "ambiguous_hostnames": sorted(set(ambiguous_hostnames)), "validation_ok": len(unexpected_unmapped_hostnames) == 0, } dynu_rows = [] for fqdn in sorted(dynu_index.keys()): for rec in dynu_index[fqdn]: dynu_rows.append( { "hostname": fqdn, "type": rec["type"], "value": rec["value"], "ttl": rec["ttl"], } ) output = { "source": "dynu+traefik", "read_only": True, "base_domain": BASE_DOMAIN, "dynu_fetched_at": dyn_payload.get("fetched_at"), "generated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), "summary": { "traefik_hostnames": len(by_fqdn), "dynu_hostnames": len(dynu_index), "mapped_hostnames": sum(1 for x in inventory if x["status"] == "mapped"), "dns_only_hostnames": sum(1 for x in inventory if "dns_only" in x["reasons"]), "traefik_only_hostnames": sum(1 for x in inventory if x["status"] == "traefik_only"), }, "validation": validation, "inventory": inventory, "dynu_records_table": dynu_rows, } OUT_JSON.parent.mkdir(parents=True, exist_ok=True) OUT_JSON.write_text(json.dumps(output, indent=2, sort_keys=True) + "\n", encoding="utf-8") write_markdown(output) print(f"Wrote {OUT_JSON}") print(f"Wrote {OUT_MD}") if os.environ.get("DYNU_ENFORCE_VALIDATION") == "true" and not validation["validation_ok"]: print( "Validation failed: unexpected unmapped hostnames were found: " + ", ".join(validation["unexpected_unmapped_hostnames"]), file=sys.stderr, ) return 4 return 0 if __name__ == "__main__": raise SystemExit(main())