Add Dynu brownfield DNS inventory outputs and generator

This commit is contained in:
beatz174-bit
2026-05-13 06:03:32 +10:00
parent 034ad17cf9
commit 52bd2d9fa2
5 changed files with 276 additions and 38 deletions
@@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""Generate Terraform dynu_dns_record resources/import commands from Dynu inventory outputs."""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from pathlib import Path
SCRIPT_PATH = Path(__file__).resolve()
TF_ROOT = SCRIPT_PATH.parents[1]
GENERATED_DIR = TF_ROOT / "generated"
TF_FILE = GENERATED_DIR / "dynu_dns_records.generated.tf"
IMPORT_SCRIPT = GENERATED_DIR / "import-dynu-dns-records.sh"
INVENTORY_FILE = GENERATED_DIR / "dynu_dns_records_inventory.json"
HEADER_TF = """# ---------------------------------------------------------------------------
# GENERATED FILE - REVIEW BEFORE USE
#
# Generated from Dynu brownfield DNS inventory.
# Do not blindly apply this file to production DNS.
# Import records into Terraform state before allowing Terraform to manage them.
# ---------------------------------------------------------------------------
"""
HEADER_SH = """#!/usr/bin/env bash
# ---------------------------------------------------------------------------
# GENERATED FILE - REVIEW BEFORE USE
#
# Imports existing Dynu DNS records into Terraform state.
# Does not apply changes.
# ---------------------------------------------------------------------------
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TF_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
cd "${TF_ROOT}"
# Re-running imports will fail for resources already in state.
# This script skips imports when state already contains the resource address.
"""
OPTIONAL_FIELDS = ["group", "host", "priority", "weight", "port", "flags", "tag", "value", "node_name"]
def run_terraform_output() -> dict:
if not (TF_ROOT / ".terraform").exists():
raise RuntimeError("Terraform is not initialized in infrastructure/terraform/dynu. Run: terraform init")
cmd = ["terraform", "output", "-json"]
proc = subprocess.run(cmd, cwd=TF_ROOT, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(f"Failed to run {' '.join(cmd)}:\n{proc.stderr.strip()}")
return json.loads(proc.stdout)
def tf_name(record: dict) -> str:
base = f"{record.get('hostname', '')}_{record.get('record_type', '')}_{record.get('id', '')}".lower()
base = base.replace("*", "wildcard")
base = re.sub(r"[^a-z0-9_]+", "_", base)
base = re.sub(r"_+", "_", base).strip("_")
if not base or not re.match(r"^[a-z]", base):
base = f"record_{base}" if base else "record"
if not base.endswith(str(record.get("id", ""))):
base = f"{base}_{record.get('id', '')}"
return base
def hcl_value(value):
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
return json.dumps(value)
def generate_resources(records: list[dict]) -> str:
chunks = [HEADER_TF.rstrip(), ""]
for rec in records:
name = tf_name(rec)
lines = [f'resource "dynu_dns_record" "{name}" {{']
lines.append(f" hostname = {hcl_value(rec.get('hostname'))}")
lines.append(f" record_type = {hcl_value(rec.get('record_type'))}")
if rec.get("ttl") is not None:
lines.append(f" ttl = {hcl_value(rec.get('ttl'))}")
if rec.get("enabled") is not None:
lines.append(f" enabled = {hcl_value(rec.get('enabled'))}")
content = rec.get("content")
rtype = str(rec.get("record_type", "")).upper()
if content in (None, "") and rtype in {"A", "AAAA"}:
lines.append(" dynamic = true")
elif content not in (None, ""):
lines.append(f" content = {hcl_value(content)}")
for field in OPTIONAL_FIELDS:
value = rec.get(field)
if value not in (None, ""):
lines.append(f" {field.ljust(11)}= {hcl_value(value)}")
lines.extend([
"",
" lifecycle {",
" prevent_destroy = true",
" }",
"}",
"",
])
chunks.extend(lines)
return "\n".join(chunks).rstrip() + "\n"
def generate_import_script(records: list[dict]) -> str:
lines = [HEADER_SH.rstrip(), ""]
for rec in records:
name = tf_name(rec)
import_id = f"{rec['domain_id']}/{rec['id']}"
addr = f"dynu_dns_record.{name}"
lines.append(f"if terraform state show '{addr}' >/dev/null 2>&1; then")
lines.append(f" echo 'Skipping already imported: {addr}'")
lines.append("else")
lines.append(f" terraform import '{addr}' '{import_id}'")
lines.append("fi")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def write_file(path: Path, content: str, dry_run: bool, overwrite: bool) -> None:
if path.exists() and not overwrite:
raise RuntimeError(f"Refusing to overwrite existing file: {path}. Re-run with --overwrite.")
if dry_run:
print(f"[dry-run] Would write {path}")
return
path.write_text(content, encoding="utf-8")
print(f"Wrote {path}")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true", help="Print intended output paths without writing files.")
parser.add_argument("--overwrite", "--force", action="store_true", dest="overwrite", help="Overwrite existing generated files.")
args = parser.parse_args()
try:
out = run_terraform_output()
records = out["dynu_dns_records"]["value"]
if not isinstance(records, list):
raise RuntimeError("terraform output dynu_dns_records did not return a list.")
GENERATED_DIR.mkdir(parents=True, exist_ok=True)
write_file(INVENTORY_FILE, json.dumps(records, indent=2, sort_keys=True) + "\n", args.dry_run, args.overwrite)
write_file(TF_FILE, generate_resources(records), args.dry_run, args.overwrite)
write_file(IMPORT_SCRIPT, generate_import_script(records), args.dry_run, args.overwrite)
if not args.dry_run:
IMPORT_SCRIPT.chmod(0o755)
return 0
except Exception as exc: # noqa: BLE001
print(f"Error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())