Files
docker/scripts/dynu/correlate_dynu_with_traefik.py
2026-04-21 14:11:25 +10:00

466 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""Correlate Dynu DNS data with Traefik host rules in compose sources.
This integration is intentionally read-only.
No Dynu mutations are permitted in this repo at this stage.
"""
from __future__ import annotations
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Set
import yaml
BASE_DOMAIN = "lan.ddnsgeek.com"
ALLOWED_UNMAPPED_HOSTNAMES = ["edge.lan.ddnsgeek.com"]
DYN_DATA = Path("data/dns/dynu_live.json")
OUT_JSON = Path("data/dns/dynu_traefik_inventory.json")
OUT_MD = Path("docs/generated/dns-inventory.md")
HOST_CALL_RE = re.compile(r"Host\s*\(([^)]*)\)", re.IGNORECASE)
QUOTED_HOST_RE = re.compile(r"[`\"']([^`\"']+)[`\"']")
ROUTER_LABEL_RE = re.compile(r"^traefik\.http\.routers\.([^.]+)\.(.+)$")
class ReadOnlyError(RuntimeError):
pass
def require_read_only() -> None:
if os.environ.get("DYNU_READ_ONLY") != "true":
raise ReadOnlyError(
"Refusing to run: DYNU_READ_ONLY must be exactly 'true'. "
"This integration is intentionally read-only."
)
def compose_files(root: Path) -> List[Path]:
files: Set[Path] = set()
if (root / "default-network.yml").exists():
files.add(root / "default-network.yml")
for area in ("apps", "monitoring", "core"):
base = root / area
if not base.exists():
continue
for pattern in ("**/docker-compose.yml", "**/docker-compose.yaml"):
files.update(p for p in base.glob(pattern) if p.is_file())
return sorted(files)
def parse_hosts_from_rule(rule: str) -> List[str]:
hosts: Set[str] = set()
for call_fragment in HOST_CALL_RE.findall(rule):
quoted_hosts = QUOTED_HOST_RE.findall(call_fragment)
for host in quoted_hosts:
clean = host.strip().strip(".").lower()
if clean:
hosts.add(clean)
if not quoted_hosts:
for token in call_fragment.split(","):
clean = token.strip().strip(".`\"'").lower()
if clean:
hosts.add(clean)
return sorted(hosts)
def load_env_defaults(repo_root: Path) -> Dict[str, str]:
env_values: Dict[str, str] = {}
for candidate in (repo_root / "default-environment.env", repo_root / ".env"):
if not candidate.exists():
continue
for line in candidate.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
env_values[key.strip()] = value.strip().strip("'\"")
return env_values
def resolve_rule_variables(rule: str, env_values: Dict[str, str]) -> str:
var_re = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}")
def replacer(match: re.Match[str]) -> str:
key = match.group(1)
if key in os.environ:
return os.environ[key]
return env_values.get(key, match.group(0))
return var_re.sub(replacer, rule)
def normalize_labels(raw_labels: Any) -> Dict[str, str]:
labels: Dict[str, str] = {}
if isinstance(raw_labels, dict):
for key, value in raw_labels.items():
labels[str(key)] = "" if value is None else str(value)
return labels
if isinstance(raw_labels, list):
for item in raw_labels:
if isinstance(item, str) and "=" in item:
key, value = item.split("=", 1)
labels[key.strip()] = value.strip()
elif isinstance(item, str):
labels[item.strip()] = ""
return labels
return labels
def infer_stack(compose_file: Path) -> str:
parts = compose_file.parts
return parts[0] if parts else "unknown"
def boolish(value: str) -> bool:
return value.strip().lower() in {"1", "true", "yes", "on"}
def parse_middlewares(raw_value: str) -> List[str]:
return [item.strip() for item in raw_value.split(",") if item.strip()]
def extract_traefik_hosts(path: Path, env_values: Dict[str, str]) -> List[Dict[str, Any]]:
try:
payload = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
except yaml.YAMLError as exc:
raise RuntimeError(f"Failed to parse compose YAML in {path}: {exc}") from exc
services = payload.get("services")
if not isinstance(services, dict):
return []
entries: List[Dict[str, Any]] = []
stack = infer_stack(path)
for service_name, service_payload in services.items():
if not isinstance(service_payload, dict):
continue
labels = normalize_labels(service_payload.get("labels"))
router_fields: Dict[str, Dict[str, str]] = defaultdict(dict)
for label_key, label_value in labels.items():
match = ROUTER_LABEL_RE.match(label_key)
if not match:
continue
router_name, field_name = match.groups()
router_fields[router_name][field_name] = label_value
for router_name, fields in router_fields.items():
rule = fields.get("rule", "")
if not rule:
continue
router_label_key = f"traefik.http.routers.{router_name}.rule"
middlewares = parse_middlewares(fields.get("middlewares", ""))
tls_options = fields.get("tls.options", "")
tls_enabled = boolish(fields.get("tls", "")) or bool(tls_options) or bool(fields.get("tls.certresolver", ""))
lowered_metadata = " ".join([tls_options, ",".join(middlewares)]).lower()
uses_mtls = "mtls" in lowered_metadata
uses_authelia = "authelia" in lowered_metadata
resolved_rule = resolve_rule_variables(rule, env_values)
for fqdn in parse_hosts_from_rule(resolved_rule):
entries.append(
{
"fqdn": fqdn,
"service": str(service_name),
"stack": stack,
"source_compose_file": str(path),
"router": router_name,
"router_label_keys": [router_label_key],
"raw_rule": rule,
"resolved_rule": resolved_rule,
"uses_tls": tls_enabled,
"tls_options": tls_options,
"middlewares": middlewares,
"uses_mtls": uses_mtls,
"uses_authelia": uses_authelia,
}
)
return entries
def load_dynu(path: Path) -> Dict[str, List[Dict[str, str]]]:
payload = json.loads(path.read_text(encoding="utf-8"))
if payload.get("base_domain") != BASE_DOMAIN:
raise RuntimeError(
f"Dynu JSON base_domain mismatch. Expected {BASE_DOMAIN}, got {payload.get('base_domain')}"
)
index: Dict[str, List[Dict[str, str]]] = defaultdict(list)
for domain in payload.get("domains", []):
for record in domain.get("records", []):
host = str(record.get("hostname", "")).strip(".").lower()
if host:
index[host].append(
{
"type": str(record.get("type", "")),
"value": str(record.get("value", "")),
"target": str(record.get("target") or ""),
"ttl": str(record.get("ttl") if record.get("ttl") is not None else ""),
}
)
for host in index:
index[host] = sorted(index[host], key=lambda x: (x["type"], x["value"], x["target"], x["ttl"]))
return index
def is_subdomain_of_base(fqdn: str) -> bool:
return fqdn.endswith(f".{BASE_DOMAIN}")
def summarize_reasons(
has_traefik: bool,
has_dns: bool,
is_allowed_unmapped: bool,
is_ambiguous: bool,
is_enforced_dns_subdomain: bool,
) -> List[str]:
reasons: List[str] = []
if has_traefik and has_dns:
reasons.append("mapped")
if has_dns and not has_traefik and is_allowed_unmapped:
reasons.append("allowed_unmapped")
if has_dns and not has_traefik and is_enforced_dns_subdomain and not is_allowed_unmapped:
reasons.append("unexpected_unmapped")
if has_dns and not has_traefik:
reasons.append("dns_only")
if has_traefik and not has_dns:
reasons.append("traefik_only")
if is_ambiguous:
reasons.append("duplicate_mapping")
reasons.append("ambiguous_mapping")
return reasons
def write_markdown(data: Dict[str, Any]) -> None:
inventory = data["inventory"]
lines = [
"# DNS Inventory (Dynu + Traefik)",
"",
"> This integration is intentionally read-only. No Dynu mutations are permitted in this repo at this stage.",
"",
f"- Base domain: `{data['base_domain']}`",
f"- Dynu fetched at: `{data['dynu_fetched_at']}`",
f"- Inventory generated at: `{data['generated_at']}`",
"",
"## Summary",
"",
f"- Traefik hostnames discovered: **{data['summary']['traefik_hostnames']}**",
f"- Dynu hostnames discovered: **{data['summary']['dynu_hostnames']}**",
f"- Mapped hostnames: **{data['summary']['mapped_hostnames']}**",
f"- DNS-only hostnames: **{data['summary']['dns_only_hostnames']}**",
f"- Traefik-only hostnames: **{data['summary']['traefik_only_hostnames']}**",
f"- Ambiguous hostnames: **{len(data['validation']['ambiguous_hostnames'])}**",
"",
"## Validation",
"",
f"- Validation ok: **{str(data['validation']['validation_ok']).lower()}**",
f"- Allowed unmapped hostnames: `{', '.join(data['validation']['allowed_unmapped_hostnames'])}`",
f"- Unexpected unmapped hostnames: **{len(data['validation']['unexpected_unmapped_hostnames'])}**",
f"- Duplicate hostnames: **{len(data['validation']['duplicate_hostnames'])}**",
f"- Ambiguous hostnames: **{len(data['validation']['ambiguous_hostnames'])}**",
"",
]
def bullet_list(title: str, values: Iterable[str]) -> None:
rows = list(values)
lines.extend([f"### {title}", ""])
if not rows:
lines.append("_None._")
else:
for value in rows:
lines.append(f"- `{value}`")
lines.append("")
bullet_list("Allowed unmapped hostnames", data["validation"]["allowed_unmapped_hostnames"])
bullet_list("Unexpected unmapped hostnames", data["validation"]["unexpected_unmapped_hostnames"])
bullet_list("Duplicate hostnames", data["validation"]["duplicate_hostnames"])
bullet_list("Ambiguous hostnames", data["validation"]["ambiguous_hostnames"])
lines.extend(
[
"## Correlation",
"",
"| Hostname | Status | Reasons | Service(s) | Route metadata | DNS records |",
"|---|---|---|---|---|---|",
]
)
for row in inventory:
services = sorted({f"{entry['stack']}/{entry['service']}" for entry in row["traefik_entries"]})
service_cell = ", ".join(services) if services else "-"
reason_cell = ", ".join(row["reasons"]) if row["reasons"] else "-"
route_chunks = []
for entry in row["traefik_entries"]:
middlewares = ",".join(entry.get("middlewares", [])) or "-"
route_chunks.append(
f"{entry['router']} [tls={str(entry['uses_tls']).lower()}, mtls={str(entry['uses_mtls']).lower()}, authelia={str(entry['uses_authelia']).lower()}, tls_options={entry.get('tls_options') or '-'}, middlewares={middlewares}]"
)
route_cell = "<br>".join(route_chunks) if route_chunks else "-"
dns_cell = ", ".join(f"{item['type']}:{item['value']}" for item in row["dynu_records"]) if row["dynu_records"] else "-"
lines.append(f"| `{row['fqdn']}` | `{row['status']}` | `{reason_cell}` | {service_cell} | {route_cell} | {dns_cell} |")
OUT_MD.parent.mkdir(parents=True, exist_ok=True)
OUT_MD.write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
try:
require_read_only()
except ReadOnlyError as exc:
print(str(exc), file=sys.stderr)
return 2
if not DYN_DATA.exists():
print(f"Missing {DYN_DATA}. Run fetch_dynu_dns.py first.", file=sys.stderr)
return 3
dyn_payload = json.loads(DYN_DATA.read_text(encoding="utf-8"))
dynu_index = load_dynu(DYN_DATA)
repo_root = Path(__file__).resolve().parents[2]
env_values = load_env_defaults(repo_root)
hosts: List[Dict[str, Any]] = []
for cf in compose_files(repo_root):
hosts.extend(extract_traefik_hosts(cf.relative_to(repo_root), env_values))
by_fqdn: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for entry in hosts:
if entry["fqdn"] == BASE_DOMAIN or is_subdomain_of_base(entry["fqdn"]):
by_fqdn[entry["fqdn"]].append(entry)
duplicate_hostnames = sorted(k for k, v in by_fqdn.items() if len(v) > 1)
combined_fqdns = sorted(set(by_fqdn.keys()) | set(dynu_index.keys()))
inventory = []
ambiguous_hostnames: List[str] = []
for fqdn in combined_fqdns:
traefik_entries = sorted(
by_fqdn.get(fqdn, []),
key=lambda x: (x["stack"], x["service"], x["source_compose_file"], x["router"]),
)
dns_records = dynu_index.get(fqdn, [])
is_allowed_unmapped = fqdn in ALLOWED_UNMAPPED_HOSTNAMES
has_traefik = bool(traefik_entries)
has_dns = bool(dns_records)
service_keys = {f"{item['stack']}/{item['service']}" for item in traefik_entries}
is_ambiguous = len(service_keys) > 1
if is_ambiguous:
ambiguous_hostnames.append(fqdn)
is_enforced_dns_subdomain = is_subdomain_of_base(fqdn)
if has_traefik and has_dns:
status = "mapped"
elif has_dns and is_allowed_unmapped:
status = "allowed_unmapped"
elif has_dns and not has_traefik and is_enforced_dns_subdomain:
status = "unexpected_unmapped"
elif has_dns and not has_traefik:
status = "dns_only"
else:
status = "traefik_only"
reasons = summarize_reasons(
has_traefik, has_dns, is_allowed_unmapped, is_ambiguous, is_enforced_dns_subdomain
)
inventory.append(
{
"fqdn": fqdn,
"status": status,
"reasons": reasons,
"duplicate": fqdn in duplicate_hostnames,
"traefik_entries": traefik_entries,
"dynu_records": dns_records,
}
)
subdomain_dns_hosts = sorted(host for host in dynu_index if is_subdomain_of_base(host))
unexpected_unmapped_hostnames = sorted(
host for host in subdomain_dns_hosts if host not in by_fqdn and host not in ALLOWED_UNMAPPED_HOSTNAMES
)
validation = {
"allowed_unmapped_hostnames": sorted(ALLOWED_UNMAPPED_HOSTNAMES),
"unexpected_unmapped_hostnames": unexpected_unmapped_hostnames,
"duplicate_hostnames": duplicate_hostnames,
"ambiguous_hostnames": sorted(set(ambiguous_hostnames)),
"validation_ok": len(unexpected_unmapped_hostnames) == 0,
}
dynu_rows = []
for fqdn in sorted(dynu_index.keys()):
for rec in dynu_index[fqdn]:
dynu_rows.append(
{
"hostname": fqdn,
"type": rec["type"],
"value": rec["value"],
"ttl": rec["ttl"],
}
)
output = {
"source": "dynu+traefik",
"read_only": True,
"base_domain": BASE_DOMAIN,
"dynu_fetched_at": dyn_payload.get("fetched_at"),
"generated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"summary": {
"traefik_hostnames": len(by_fqdn),
"dynu_hostnames": len(dynu_index),
"mapped_hostnames": sum(1 for x in inventory if x["status"] == "mapped"),
"dns_only_hostnames": sum(1 for x in inventory if "dns_only" in x["reasons"]),
"traefik_only_hostnames": sum(1 for x in inventory if x["status"] == "traefik_only"),
},
"validation": validation,
"inventory": inventory,
"dynu_records_table": dynu_rows,
}
OUT_JSON.parent.mkdir(parents=True, exist_ok=True)
OUT_JSON.write_text(json.dumps(output, indent=2, sort_keys=True) + "\n", encoding="utf-8")
write_markdown(output)
print(f"Wrote {OUT_JSON}")
print(f"Wrote {OUT_MD}")
if os.environ.get("DYNU_ENFORCE_VALIDATION") == "true" and not validation["validation_ok"]:
print(
"Validation failed: unexpected unmapped hostnames were found: "
+ ", ".join(validation["unexpected_unmapped_hostnames"]),
file=sys.stderr,
)
return 4
return 0
if __name__ == "__main__":
raise SystemExit(main())