#!/usr/bin/env python3 """Generate Terraform dynu_dns_record resources/import commands from Dynu inventory outputs.""" from __future__ import annotations import argparse import json import re import subprocess import sys from pathlib import Path SCRIPT_PATH = Path(__file__).resolve() TF_ROOT = SCRIPT_PATH.parents[1] GENERATED_DIR = TF_ROOT / "generated" TF_FILE = GENERATED_DIR / "dynu_dns_records.generated.tf" IMPORT_SCRIPT = GENERATED_DIR / "import-dynu-dns-records.sh" INVENTORY_FILE = GENERATED_DIR / "dynu_dns_records_inventory.json" DEFAULT_RECORDS_OUTPUT = "dynu_dns_records" REQUIRED_RECORD_FIELDS = ("id", "domain_id", "hostname", "record_type") HEADER_TF = """# --------------------------------------------------------------------------- # GENERATED FILE - REVIEW BEFORE USE # # Generated from Dynu brownfield DNS inventory. # Do not blindly apply this file to production DNS. # Import records into Terraform state before allowing Terraform to manage them. # --------------------------------------------------------------------------- """ HEADER_SH = """#!/usr/bin/env bash # --------------------------------------------------------------------------- # GENERATED FILE - REVIEW BEFORE USE # # Imports existing Dynu DNS records into Terraform state. # Does not apply changes. # --------------------------------------------------------------------------- set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" TF_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" cd "${TF_ROOT}" # Re-running imports will fail for resources already in state. # This script skips imports when state already contains the resource address. """ OPTIONAL_FIELDS = ["group", "host", "priority", "weight", "port", "flags", "tag", "value", "node_name"] def run_terraform_output() -> dict: if not (TF_ROOT / ".terraform").exists(): raise RuntimeError("Terraform is not initialized in infrastructure/terraform/dynu. Run: terraform init") cmd = ["terraform", "output", "-json"] proc = subprocess.run(cmd, cwd=TF_ROOT, capture_output=True, text=True) if proc.returncode != 0: raise RuntimeError(f"Failed to run {' '.join(cmd)}:\n{proc.stderr.strip()}") return json.loads(proc.stdout) def type_shape_name(value: object) -> str: if isinstance(value, list): return "list" if isinstance(value, dict): return "object" return type(value).__name__ def extract_records(payload: object, output_name: str) -> list[dict]: source = payload if isinstance(payload, list): source = payload elif isinstance(payload, dict): if isinstance(payload.get("value"), list): source = payload["value"] elif isinstance(payload.get("records"), list): source = payload["records"] elif isinstance(payload.get("value"), dict) and isinstance(payload["value"].get("records"), list): source = payload["value"]["records"] elif output_name in payload and isinstance(payload[output_name], dict): output_wrapper = payload[output_name] if isinstance(output_wrapper.get("value"), list): source = output_wrapper["value"] elif isinstance(output_wrapper.get("value"), dict) and isinstance(output_wrapper["value"].get("records"), list): source = output_wrapper["value"]["records"] elif isinstance(output_wrapper.get("records"), list): source = output_wrapper["records"] else: raise RuntimeError(f"Output '{output_name}' does not contain a records list.") else: raise RuntimeError(f"Output '{output_name}' not found and no records list discovered.") else: raise RuntimeError(f"Unsupported JSON payload type: {type(payload).__name__}") if not isinstance(source, list): raise RuntimeError(f"Output '{output_name}' did not resolve to a list of records.") return source def validate_records(records: list[dict], output_name: str) -> None: for i, record in enumerate(records): if not isinstance(record, dict): raise RuntimeError(f"Selected output '{output_name}' has non-object record at index {i}: {type(record).__name__}.") missing = [field for field in REQUIRED_RECORD_FIELDS if field not in record] if missing: missing_text = ", ".join(missing) raise RuntimeError( f"Selected output '{output_name}' contains records, but they are not importable Dynu provider records. " f"Record #{i} is missing required fields: {missing_text}. " "Choose an output sourced from data.dynu_dns_records.root, such as dynu_dns_records or dynu_dns_inventory." ) def describe_output(output_name: str, output_wrapper: object, full_outputs: dict) -> dict: details = { "name": output_name, "usable": False, "shape": type_shape_name(output_wrapper), "record_count": "none", "error": "no records list found", } if isinstance(output_wrapper, dict) and "value" in output_wrapper: details["shape"] = type_shape_name(output_wrapper.get("value")) try: records = extract_records(full_outputs, output_name) except RuntimeError as exc: details["error"] = str(exc) return details details["record_count"] = len(records) try: validate_records(records, output_name) except RuntimeError as exc: details["error"] = str(exc) if isinstance(output_wrapper, dict) and isinstance(output_wrapper.get("value"), dict) and isinstance(output_wrapper["value"].get("records"), list): details["shape"] = "object with records list" elif isinstance(output_wrapper, dict) and isinstance(output_wrapper.get("value"), list): details["shape"] = "list" return details details["usable"] = True details["error"] = None if isinstance(output_wrapper, dict) and isinstance(output_wrapper.get("value"), dict) and isinstance(output_wrapper["value"].get("records"), list): details["shape"] = "object with records list" elif isinstance(output_wrapper, dict) and isinstance(output_wrapper.get("value"), list): details["shape"] = "list" return details def choose_output_interactively(outputs: dict, descriptions: list[dict]) -> str | None: print("\nAvailable Terraform outputs:\n") indexed = {str(i): item for i, item in enumerate(descriptions, 1)} by_name = {item["name"]: item for item in descriptions} for i, item in enumerate(descriptions, 1): print(f" {i}) {item['name']}") print(f" usable: {'yes' if item['usable'] else 'no'}") print(f" shape: {item['shape']}") print(f" record count: {item['record_count']}") if item["error"]: print(f" reason: {item['error']}") print() attempts = 0 while attempts < 3: attempts += 1 try: selection = input(f"Choose an output to use for DNS records [1-{len(descriptions)}], or press Enter to cancel: ").strip() except KeyboardInterrupt: print("\nSelection cancelled.") return None if selection == "": print("Selection cancelled.") return None candidate = indexed.get(selection) or by_name.get(selection) if candidate is None: print("Invalid selection. Enter a number from the list or an exact output name.") continue if not candidate["usable"]: print(f"Output '{candidate['name']}' is not usable: {candidate['error']}") continue return candidate["name"] raise RuntimeError("Too many invalid selections. Exiting without writing files.") def tf_name(record: dict) -> str: base = f"{record.get('hostname', '')}_{record.get('record_type', '')}_{record.get('id', '')}".lower() base = base.replace("*", "wildcard") base = re.sub(r"[^a-z0-9_]+", "_", base) base = re.sub(r"_+", "_", base).strip("_") if not base or not re.match(r"^[a-z]", base): base = f"record_{base}" if base else "record" if not base.endswith(str(record.get("id", ""))): base = f"{base}_{record.get('id', '')}" return base def hcl_value(value): if isinstance(value, bool): return "true" if value else "false" if isinstance(value, (int, float)): return str(value) return json.dumps(value) def generate_resources(records: list[dict]) -> str: chunks = [HEADER_TF.rstrip(), ""] for rec in records: name = tf_name(rec) lines = [f'resource "dynu_dns_record" "{name}" {{'] lines.append(f" hostname = {hcl_value(rec.get('hostname'))}") lines.append(f" record_type = {hcl_value(rec.get('record_type'))}") if rec.get("ttl") is not None: lines.append(f" ttl = {hcl_value(rec.get('ttl'))}") enabled = rec.get("enabled") if enabled is None: enabled = rec.get("state") if enabled is not None: lines.append(f" enabled = {hcl_value(enabled)}") content = rec.get("content") rtype = str(rec.get("record_type", "")).upper() if content in (None, "") and rtype in {"A", "AAAA"}: lines.append(" dynamic = true") elif content not in (None, ""): lines.append(f" content = {hcl_value(content)}") for field in OPTIONAL_FIELDS: value = rec.get(field) if value not in (None, ""): lines.append(f" {field.ljust(11)}= {hcl_value(value)}") lines.extend([ "", " lifecycle {", " prevent_destroy = true", " }", "}", "", ]) chunks.extend(lines) return "\n".join(chunks).rstrip() + "\n" def generate_import_script(records: list[dict]) -> str: lines = [HEADER_SH.rstrip(), ""] for rec in records: name = tf_name(rec) import_id = f"{rec['domain_id']}/{rec['id']}" addr = f"dynu_dns_record.{name}" lines.append(f"if terraform state show '{addr}' >/dev/null 2>&1; then") lines.append(f" echo 'Skipping already imported: {addr}'") lines.append("else") lines.append(f" terraform import '{addr}' '{import_id}'") lines.append("fi") lines.append("") return "\n".join(lines).rstrip() + "\n" def write_file(path: Path, content: str, dry_run: bool, overwrite: bool) -> None: if path.exists() and not overwrite: raise RuntimeError(f"Refusing to overwrite existing file: {path}. Re-run with --overwrite.") if dry_run: print(f"[dry-run] Would write {path}") return path.write_text(content, encoding="utf-8") print(f"Wrote {path}") def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true", help="Print intended output paths without writing files.") parser.add_argument("--overwrite", "--force", action="store_true", dest="overwrite", help="Overwrite existing generated files.") parser.add_argument("--from-file", type=Path, help="Load inventory JSON from a file instead of calling terraform output.") parser.add_argument( "--records-output", default=None, help=( "Terraform output name containing Dynu DNS records. " f"Defaults to {DEFAULT_RECORDS_OUTPUT}; if missing in an interactive terminal, " "the script prompts you to choose from available outputs." ), ) parser.add_argument("--no-interactive", action="store_true", help="Disable interactive output selection.") args = parser.parse_args() records_output_explicit = args.records_output is not None records_output = args.records_output or DEFAULT_RECORDS_OUTPUT try: payload = json.loads(args.from_file.read_text(encoding="utf-8")) if args.from_file else run_terraform_output() selected_output = records_output descriptions: list[dict] = [] if isinstance(payload, dict): descriptions = [describe_output(name, payload[name], payload) for name in sorted(payload)] try: records = extract_records(payload, selected_output) validate_records(records, selected_output) except RuntimeError as exc: is_interactive = sys.stdin.isatty() and not args.no_interactive should_prompt = isinstance(payload, dict) and not records_output_explicit and is_interactive if should_prompt: print(f"Terraform output '{selected_output}' was not found or is unusable.\n") chosen = choose_output_interactively(payload, descriptions) if chosen is None: print("Exiting without writing files.") return 1 selected_output = chosen records = extract_records(payload, selected_output) validate_records(records, selected_output) else: if isinstance(payload, dict): available = ", ".join(sorted(payload.keys())) or "(none)" if records_output_explicit: raise RuntimeError( f"Missing or unusable Terraform output '{selected_output}'. " f"Available outputs: {available}. Details: {exc}" ) raise RuntimeError( f"Missing or unusable Terraform output '{selected_output}'. " f"Available outputs: {available}.\n\n" "Run interactively to choose an output, or pass one explicitly, for example:\n\n" " python3 scripts/generate-brownfield-records.py --records-output dynu_dns_inventory --dry-run" ) raise GENERATED_DIR.mkdir(parents=True, exist_ok=True) write_file(INVENTORY_FILE, json.dumps(records, indent=2, sort_keys=True) + "\n", args.dry_run, args.overwrite) write_file(TF_FILE, generate_resources(records), args.dry_run, args.overwrite) write_file(IMPORT_SCRIPT, generate_import_script(records), args.dry_run, args.overwrite) if not args.dry_run: IMPORT_SCRIPT.chmod(0o755) return 0 except Exception as exc: # noqa: BLE001 print(f"Error: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())