350 lines
14 KiB
Python
350 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""Generate Terraform dynu_dns_record resources/import commands from Dynu inventory outputs."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
SCRIPT_PATH = Path(__file__).resolve()
|
|
TF_ROOT = SCRIPT_PATH.parents[1]
|
|
GENERATED_DIR = TF_ROOT / "generated"
|
|
TF_FILE = GENERATED_DIR / "dynu_dns_records.generated.tf"
|
|
IMPORT_SCRIPT = GENERATED_DIR / "import-dynu-dns-records.sh"
|
|
INVENTORY_FILE = GENERATED_DIR / "dynu_dns_records_inventory.json"
|
|
DEFAULT_RECORDS_OUTPUT = "dynu_dns_records"
|
|
REQUIRED_RECORD_FIELDS = ("id", "domain_id", "hostname", "record_type")
|
|
|
|
HEADER_TF = """# ---------------------------------------------------------------------------
|
|
# GENERATED FILE - REVIEW BEFORE USE
|
|
#
|
|
# Generated from Dynu brownfield DNS inventory.
|
|
# Do not blindly apply this file to production DNS.
|
|
# Import records into Terraform state before allowing Terraform to manage them.
|
|
# ---------------------------------------------------------------------------
|
|
"""
|
|
|
|
HEADER_SH = """#!/usr/bin/env bash
|
|
# ---------------------------------------------------------------------------
|
|
# GENERATED FILE - REVIEW BEFORE USE
|
|
#
|
|
# Imports existing Dynu DNS records into Terraform state.
|
|
# Does not apply changes.
|
|
# ---------------------------------------------------------------------------
|
|
set -euo pipefail
|
|
|
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
|
TF_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
|
|
cd "${TF_ROOT}"
|
|
|
|
# Re-running imports will fail for resources already in state.
|
|
# This script skips imports when state already contains the resource address.
|
|
"""
|
|
|
|
OPTIONAL_FIELDS = ["group", "host", "priority", "weight", "port", "flags", "tag", "value", "node_name"]
|
|
|
|
|
|
def run_terraform_output() -> dict:
|
|
if not (TF_ROOT / ".terraform").exists():
|
|
raise RuntimeError("Terraform is not initialized in infrastructure/terraform/dynu. Run: terraform init")
|
|
|
|
cmd = ["terraform", "output", "-json"]
|
|
proc = subprocess.run(cmd, cwd=TF_ROOT, capture_output=True, text=True)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"Failed to run {' '.join(cmd)}:\n{proc.stderr.strip()}")
|
|
return json.loads(proc.stdout)
|
|
|
|
|
|
def type_shape_name(value: object) -> str:
|
|
if isinstance(value, list):
|
|
return "list"
|
|
if isinstance(value, dict):
|
|
return "object"
|
|
return type(value).__name__
|
|
|
|
|
|
def extract_records(payload: object, output_name: str) -> list[dict]:
|
|
source = payload
|
|
if isinstance(payload, list):
|
|
source = payload
|
|
elif isinstance(payload, dict):
|
|
if isinstance(payload.get("value"), list):
|
|
source = payload["value"]
|
|
elif isinstance(payload.get("records"), list):
|
|
source = payload["records"]
|
|
elif isinstance(payload.get("value"), dict) and isinstance(payload["value"].get("records"), list):
|
|
source = payload["value"]["records"]
|
|
elif output_name in payload and isinstance(payload[output_name], dict):
|
|
output_wrapper = payload[output_name]
|
|
if isinstance(output_wrapper.get("value"), list):
|
|
source = output_wrapper["value"]
|
|
elif isinstance(output_wrapper.get("value"), dict) and isinstance(output_wrapper["value"].get("records"), list):
|
|
source = output_wrapper["value"]["records"]
|
|
elif isinstance(output_wrapper.get("records"), list):
|
|
source = output_wrapper["records"]
|
|
else:
|
|
raise RuntimeError(f"Output '{output_name}' does not contain a records list.")
|
|
else:
|
|
raise RuntimeError(f"Output '{output_name}' not found and no records list discovered.")
|
|
else:
|
|
raise RuntimeError(f"Unsupported JSON payload type: {type(payload).__name__}")
|
|
|
|
if not isinstance(source, list):
|
|
raise RuntimeError(f"Output '{output_name}' did not resolve to a list of records.")
|
|
return source
|
|
|
|
|
|
def validate_records(records: list[dict], output_name: str) -> None:
|
|
for i, record in enumerate(records):
|
|
if not isinstance(record, dict):
|
|
raise RuntimeError(f"Selected output '{output_name}' has non-object record at index {i}: {type(record).__name__}.")
|
|
missing = [field for field in REQUIRED_RECORD_FIELDS if field not in record]
|
|
if missing:
|
|
missing_text = ", ".join(missing)
|
|
raise RuntimeError(
|
|
f"Selected output '{output_name}' contains records, but they are not importable Dynu provider records. "
|
|
f"Record #{i} is missing required fields: {missing_text}. "
|
|
"Choose an output sourced from data.dynu_dns_records.root, such as dynu_dns_records or dynu_dns_inventory."
|
|
)
|
|
|
|
|
|
def describe_output(output_name: str, output_wrapper: object, full_outputs: dict) -> dict:
|
|
details = {
|
|
"name": output_name,
|
|
"usable": False,
|
|
"shape": type_shape_name(output_wrapper),
|
|
"record_count": "none",
|
|
"error": "no records list found",
|
|
}
|
|
if isinstance(output_wrapper, dict) and "value" in output_wrapper:
|
|
details["shape"] = type_shape_name(output_wrapper.get("value"))
|
|
|
|
try:
|
|
records = extract_records(full_outputs, output_name)
|
|
except RuntimeError as exc:
|
|
details["error"] = str(exc)
|
|
return details
|
|
|
|
details["record_count"] = len(records)
|
|
try:
|
|
validate_records(records, output_name)
|
|
except RuntimeError as exc:
|
|
details["error"] = str(exc)
|
|
if isinstance(output_wrapper, dict) and isinstance(output_wrapper.get("value"), dict) and isinstance(output_wrapper["value"].get("records"), list):
|
|
details["shape"] = "object with records list"
|
|
elif isinstance(output_wrapper, dict) and isinstance(output_wrapper.get("value"), list):
|
|
details["shape"] = "list"
|
|
return details
|
|
|
|
details["usable"] = True
|
|
details["error"] = None
|
|
if isinstance(output_wrapper, dict) and isinstance(output_wrapper.get("value"), dict) and isinstance(output_wrapper["value"].get("records"), list):
|
|
details["shape"] = "object with records list"
|
|
elif isinstance(output_wrapper, dict) and isinstance(output_wrapper.get("value"), list):
|
|
details["shape"] = "list"
|
|
return details
|
|
|
|
|
|
def choose_output_interactively(outputs: dict, descriptions: list[dict]) -> str | None:
|
|
print("\nAvailable Terraform outputs:\n")
|
|
indexed = {str(i): item for i, item in enumerate(descriptions, 1)}
|
|
by_name = {item["name"]: item for item in descriptions}
|
|
|
|
for i, item in enumerate(descriptions, 1):
|
|
print(f" {i}) {item['name']}")
|
|
print(f" usable: {'yes' if item['usable'] else 'no'}")
|
|
print(f" shape: {item['shape']}")
|
|
print(f" record count: {item['record_count']}")
|
|
if item["error"]:
|
|
print(f" reason: {item['error']}")
|
|
print()
|
|
|
|
attempts = 0
|
|
while attempts < 3:
|
|
attempts += 1
|
|
try:
|
|
selection = input(f"Choose an output to use for DNS records [1-{len(descriptions)}], or press Enter to cancel: ").strip()
|
|
except KeyboardInterrupt:
|
|
print("\nSelection cancelled.")
|
|
return None
|
|
|
|
if selection == "":
|
|
print("Selection cancelled.")
|
|
return None
|
|
|
|
candidate = indexed.get(selection) or by_name.get(selection)
|
|
if candidate is None:
|
|
print("Invalid selection. Enter a number from the list or an exact output name.")
|
|
continue
|
|
|
|
if not candidate["usable"]:
|
|
print(f"Output '{candidate['name']}' is not usable: {candidate['error']}")
|
|
continue
|
|
|
|
return candidate["name"]
|
|
|
|
raise RuntimeError("Too many invalid selections. Exiting without writing files.")
|
|
|
|
|
|
def tf_name(record: dict) -> str:
|
|
base = f"{record.get('hostname', '')}_{record.get('record_type', '')}_{record.get('id', '')}".lower()
|
|
base = base.replace("*", "wildcard")
|
|
base = re.sub(r"[^a-z0-9_]+", "_", base)
|
|
base = re.sub(r"_+", "_", base).strip("_")
|
|
if not base or not re.match(r"^[a-z]", base):
|
|
base = f"record_{base}" if base else "record"
|
|
if not base.endswith(str(record.get("id", ""))):
|
|
base = f"{base}_{record.get('id', '')}"
|
|
return base
|
|
|
|
|
|
def hcl_value(value):
|
|
if isinstance(value, bool):
|
|
return "true" if value else "false"
|
|
if isinstance(value, (int, float)):
|
|
return str(value)
|
|
return json.dumps(value)
|
|
|
|
|
|
def generate_resources(records: list[dict]) -> str:
|
|
chunks = [HEADER_TF.rstrip(), ""]
|
|
for rec in records:
|
|
name = tf_name(rec)
|
|
lines = [f'resource "dynu_dns_record" "{name}" {{']
|
|
lines.append(f" hostname = {hcl_value(rec.get('hostname'))}")
|
|
lines.append(f" record_type = {hcl_value(rec.get('record_type'))}")
|
|
if rec.get("ttl") is not None:
|
|
lines.append(f" ttl = {hcl_value(rec.get('ttl'))}")
|
|
enabled = rec.get("enabled")
|
|
if enabled is None:
|
|
enabled = rec.get("state")
|
|
if enabled is not None:
|
|
lines.append(f" enabled = {hcl_value(enabled)}")
|
|
|
|
content = rec.get("content")
|
|
rtype = str(rec.get("record_type", "")).upper()
|
|
if content in (None, "") and rtype in {"A", "AAAA"}:
|
|
lines.append(" dynamic = true")
|
|
elif content not in (None, ""):
|
|
lines.append(f" content = {hcl_value(content)}")
|
|
|
|
for field in OPTIONAL_FIELDS:
|
|
value = rec.get(field)
|
|
if value not in (None, ""):
|
|
lines.append(f" {field.ljust(11)}= {hcl_value(value)}")
|
|
|
|
lines.extend([
|
|
"",
|
|
" lifecycle {",
|
|
" prevent_destroy = true",
|
|
" }",
|
|
"}",
|
|
"",
|
|
])
|
|
chunks.extend(lines)
|
|
return "\n".join(chunks).rstrip() + "\n"
|
|
|
|
|
|
def generate_import_script(records: list[dict]) -> str:
|
|
lines = [HEADER_SH.rstrip(), ""]
|
|
for rec in records:
|
|
name = tf_name(rec)
|
|
import_id = f"{rec['domain_id']}/{rec['id']}"
|
|
addr = f"dynu_dns_record.{name}"
|
|
lines.append(f"if terraform state show '{addr}' >/dev/null 2>&1; then")
|
|
lines.append(f" echo 'Skipping already imported: {addr}'")
|
|
lines.append("else")
|
|
lines.append(f" terraform import '{addr}' '{import_id}'")
|
|
lines.append("fi")
|
|
lines.append("")
|
|
return "\n".join(lines).rstrip() + "\n"
|
|
|
|
|
|
def write_file(path: Path, content: str, dry_run: bool, overwrite: bool) -> None:
|
|
if path.exists() and not overwrite:
|
|
raise RuntimeError(f"Refusing to overwrite existing file: {path}. Re-run with --overwrite.")
|
|
if dry_run:
|
|
print(f"[dry-run] Would write {path}")
|
|
return
|
|
path.write_text(content, encoding="utf-8")
|
|
print(f"Wrote {path}")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--dry-run", action="store_true", help="Print intended output paths without writing files.")
|
|
parser.add_argument("--overwrite", "--force", action="store_true", dest="overwrite", help="Overwrite existing generated files.")
|
|
parser.add_argument("--from-file", type=Path, help="Load inventory JSON from a file instead of calling terraform output.")
|
|
parser.add_argument(
|
|
"--records-output",
|
|
default=None,
|
|
help=(
|
|
"Terraform output name containing Dynu DNS records. "
|
|
f"Defaults to {DEFAULT_RECORDS_OUTPUT}; if missing in an interactive terminal, "
|
|
"the script prompts you to choose from available outputs."
|
|
),
|
|
)
|
|
parser.add_argument("--no-interactive", action="store_true", help="Disable interactive output selection.")
|
|
args = parser.parse_args()
|
|
|
|
records_output_explicit = args.records_output is not None
|
|
records_output = args.records_output or DEFAULT_RECORDS_OUTPUT
|
|
|
|
try:
|
|
payload = json.loads(args.from_file.read_text(encoding="utf-8")) if args.from_file else run_terraform_output()
|
|
|
|
selected_output = records_output
|
|
descriptions: list[dict] = []
|
|
if isinstance(payload, dict):
|
|
descriptions = [describe_output(name, payload[name], payload) for name in sorted(payload)]
|
|
|
|
try:
|
|
records = extract_records(payload, selected_output)
|
|
validate_records(records, selected_output)
|
|
except RuntimeError as exc:
|
|
is_interactive = sys.stdin.isatty() and not args.no_interactive
|
|
should_prompt = isinstance(payload, dict) and not records_output_explicit and is_interactive
|
|
if should_prompt:
|
|
print(f"Terraform output '{selected_output}' was not found or is unusable.\n")
|
|
chosen = choose_output_interactively(payload, descriptions)
|
|
if chosen is None:
|
|
print("Exiting without writing files.")
|
|
return 1
|
|
selected_output = chosen
|
|
records = extract_records(payload, selected_output)
|
|
validate_records(records, selected_output)
|
|
else:
|
|
if isinstance(payload, dict):
|
|
available = ", ".join(sorted(payload.keys())) or "(none)"
|
|
if records_output_explicit:
|
|
raise RuntimeError(
|
|
f"Missing or unusable Terraform output '{selected_output}'. "
|
|
f"Available outputs: {available}. Details: {exc}"
|
|
)
|
|
raise RuntimeError(
|
|
f"Missing or unusable Terraform output '{selected_output}'. "
|
|
f"Available outputs: {available}.\n\n"
|
|
"Run interactively to choose an output, or pass one explicitly, for example:\n\n"
|
|
" python3 scripts/generate-brownfield-records.py --records-output dynu_dns_inventory --dry-run"
|
|
)
|
|
raise
|
|
|
|
GENERATED_DIR.mkdir(parents=True, exist_ok=True)
|
|
write_file(INVENTORY_FILE, json.dumps(records, indent=2, sort_keys=True) + "\n", args.dry_run, args.overwrite)
|
|
write_file(TF_FILE, generate_resources(records), args.dry_run, args.overwrite)
|
|
write_file(IMPORT_SCRIPT, generate_import_script(records), args.dry_run, args.overwrite)
|
|
if not args.dry_run:
|
|
IMPORT_SCRIPT.chmod(0o755)
|
|
return 0
|
|
except Exception as exc: # noqa: BLE001
|
|
print(f"Error: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|