#!/usr/bin/env python3 import argparse import os import re import time import json import logging import docker import yaml from prometheus_client import Gauge, start_http_server # --- Logging --- LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.DEBUG), format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger("docker-update-exporter") # --- Config --- EXPORTER_PORT = 9105 CHECK_INTERVAL = 3600 CACHE_TTL = int(os.getenv("CACHE_TTL", "300")) SERVICES_UP_SCRIPT = os.getenv("SERVICES_UP_SCRIPT", "/compose/services-up.sh") CACHE_FILE = os.getenv("CACHE_FILE", "/data/remote_digest_cache.json") DRY_RUN = os.getenv("DRY_RUN", "false").lower() in ("1", "true", "yes") try: client = docker.from_env() except Exception as e: logger.warning(f"Docker client unavailable at startup: {e}") client = None # --- Metrics --- CONTAINER_UPDATE = Gauge( "docker_container_update_available", "1 if container image is out of date (compose drift or registry), 0 otherwise", ["container", "compose_image", "running_image", "com_docker_compose_project"] ) LAST_CHECK = Gauge( "docker_image_update_last_check_timestamp", "Last time the update check ran (unix timestamp)" ) def set_container_update_metric(container_name, compose_image, running_image, project_name, update_flag): """Set update metric for a container and log the emitted metric payload.""" metric_labels = { "container": container_name, "compose_image": compose_image or "unknown", "running_image": running_image, "com_docker_compose_project": project_name, } CONTAINER_UPDATE.labels(**metric_labels).set(update_flag) logger.info( "Metric emitted: docker_container_update_available=%s labels=%s", update_flag, metric_labels, ) def set_last_check_metric(): """Set and log the timestamp for the most recent check cycle.""" ts = time.time() LAST_CHECK.set(ts) logger.info("Metric emitted: docker_image_update_last_check_timestamp=%s", ts) # --- Persistent Cache --- def load_cache(): if not os.path.exists(CACHE_FILE): logger.info(f"Cache file does not exist yet: {CACHE_FILE}") return {} try: with open(CACHE_FILE, "r") as f: cache = json.load(f) logger.info(f"Loaded {len(cache)} cached remote digests") return cache except Exception as e: logger.error(f"Failed to load cache: {e}") return {} def save_cache(): try: os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True) with open(CACHE_FILE, "w") as f: json.dump(REMOTE_DIGEST_CACHE, f) logger.debug(f"Saved {len(REMOTE_DIGEST_CACHE)} remote digests to cache") except Exception as e: logger.error(f"Failed to save cache: {e}") REMOTE_DIGEST_CACHE = load_cache() now = time.time() REMOTE_DIGEST_CACHE = { image: (digest, ts) for image, (digest, ts) in REMOTE_DIGEST_CACHE.items() if now - ts < CACHE_TTL } # --- Helpers --- def get_project_prefix_from_script(script_path): prefix = "core-" if not os.path.exists(script_path): return prefix try: with open(script_path) as f: for line in f: m = re.match(r'PROJECT\s*=\s*["\']?([^"\']+)', line) if m: return m.group(1) + "-" except Exception as e: logger.warning(f"Failed reading project prefix: {e}") return prefix def get_local_digest(image_name): """ Return the local digest for the specific image reference. """ if client is None: return None try: img = client.images.get(image_name) digests = img.attrs.get("RepoDigests", []) logger.debug(f"RepoDigests for {image_name}: {digests}") for entry in digests: if "@" in entry: digest = entry.split("@", 1)[1] logger.debug(f"Local digest for {image_name}: {digest}") return digest logger.debug(f"No RepoDigest found for {image_name}") except Exception as e: logger.debug(f"Could not get local digest for {image_name}: {e}") return None def get_remote_digest(image_name): """ Return the upstream digest for the exact platform-specific image that Docker would pull on this host. This avoids false positives with multi-arch images where the registry manifest-list digest differs from the pulled image digest. """ now = time.time() cached = REMOTE_DIGEST_CACHE.get(image_name) if cached: digest, ts = cached if now - ts < CACHE_TTL: logger.debug(f"Using cached remote digest for {image_name}: {digest}") return digest if client is None: return None try: registry_data = client.images.get_registry_data(image_name) digest = None # docker SDK versions differ; try the common fields in order if hasattr(registry_data, "id") and registry_data.id: digest = registry_data.id elif hasattr(registry_data, "attrs"): digest = ( registry_data.attrs.get("Descriptor", {}).get("digest") or registry_data.attrs.get("digest") ) if digest: REMOTE_DIGEST_CACHE[image_name] = (digest, now) save_cache() logger.debug(f"Remote digest for {image_name}: {digest}") return digest logger.warning(f"No remote digest found for {image_name}") return None except Exception as e: logger.debug(f"Error fetching remote digest for {image_name}: {e}") return None # --- Dockerfile Image Extraction --- def parse_dockerfile_for_image(dockerfile_path): if not os.path.exists(dockerfile_path): return None try: arg_defaults = {} last_from = None with open(dockerfile_path) as df: for line in df: line = line.strip() if not line or line.startswith("#"): continue if line.upper().startswith("ARG "): arg_body = line[4:].strip() if "=" in arg_body: key, value = arg_body.split("=", 1) arg_defaults[key.strip()] = value.strip() continue # Prefer LABEL with image if present. if "LABEL" in line and "image=" in line: match = re.search(r'image=["\']?([^"\']+)["\']?', line) if match: image_name = normalize_image_name(substitute_dockerfile_args(match.group(1), arg_defaults)) logger.debug(f"Found LABEL image={image_name} in {dockerfile_path}") return image_name if line.upper().startswith("FROM "): from_clause = line[5:].strip() if from_clause.startswith("--"): split_clause = from_clause.split(None, 1) if len(split_clause) < 2: continue from_clause = split_clause[1] parts = from_clause.split() if not parts: continue candidate = substitute_dockerfile_args(parts[0], arg_defaults) if candidate and candidate.lower() != "scratch": last_from = normalize_image_name(candidate) if last_from: logger.debug(f"Found base FROM {last_from} in {dockerfile_path}") return last_from except Exception as e: logger.debug(f"Error reading Dockerfile {dockerfile_path}: {e}") return None def normalize_image_name(image_name): if not image_name: return None if "@" in image_name: return image_name if ":" in image_name.rsplit("/", 1)[-1]: return image_name return f"{image_name}:latest" def is_compose_build_placeholder(image_name, project_name): if not image_name: return False candidate = str(image_name) project_prefix = f"{project_name}-" if candidate.startswith(project_prefix): return True # Keep backward-compatible behavior for historical default project prefix. return candidate.startswith("core-") def substitute_dockerfile_args(value, arg_defaults): if not value: return value pattern = re.compile(r"\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)") def replacer(match): expr = match.group(1) simple = match.group(2) if simple: return arg_defaults.get(simple, "") if ":-" in expr: var_name, default_value = expr.split(":-", 1) return arg_defaults.get(var_name, default_value) if "-" in expr: var_name, default_value = expr.split("-", 1) return arg_defaults.get(var_name, default_value) return arg_defaults.get(expr, "") return pattern.sub(replacer, value) def expand_compose_path(path_value, project_root): raw = str(path_value) raw = raw.replace("${PROJECT_ROOT}", project_root).replace("$PROJECT_ROOT", project_root) return os.path.expandvars(raw) def get_project_root_from_script(script_path): if not script_path: return os.getcwd() return os.path.dirname(os.path.abspath(script_path)) # --- Compose parsing --- def get_compose_files_from_script(script_path): files = [] if not os.path.exists(script_path): return files base_dir = get_project_root_from_script(script_path) def _clean_compose_path(raw_path): cleaned = str(raw_path).strip().strip(",") if (cleaned.startswith('"') and cleaned.endswith('"')) or ( cleaned.startswith("'") and cleaned.endswith("'") ): cleaned = cleaned[1:-1] expanded = expand_compose_path(cleaned, base_dir) if os.path.isabs(expanded): return os.path.normpath(expanded) return os.path.normpath(os.path.join(base_dir, expanded)) try: with open(script_path) as f: content = f.read() match = re.search(r'FILES\s*=\s*\((.*?)\)', content, re.DOTALL) if match: for line in match.group(1).splitlines(): line = line.strip() if line.startswith("-f"): path = line[2:].strip() if path: full = _clean_compose_path(path) files.append(full) # services-up.sh can append many compose files at runtime via: # FILES+=(-f "$file") done < <(find "$PROJECT_ROOT/apps" ...) # Mirror that behavior here so we can map service->compose image. root_dirs = [] find_match = re.search(r'find\s+(.*?)\s+\\\s*\n', content) if find_match: for token in re.findall(r'"([^"]+)"|\'([^\']+)\'', find_match.group(1)): candidate = token[0] or token[1] if candidate: root_dirs.append(_clean_compose_path(candidate)) else: root_dirs = [ os.path.join(base_dir, "apps"), os.path.join(base_dir, "monitoring"), os.path.join(base_dir, "core"), ] for root_dir in root_dirs: if not os.path.isdir(root_dir): continue for candidate in sorted(os.listdir(root_dir)): svc_dir = os.path.join(root_dir, candidate) if not os.path.isdir(svc_dir): continue for compose_name in ("docker-compose.yml", "docker-compose.yaml"): compose_path = os.path.join(svc_dir, compose_name) if os.path.exists(compose_path): files.append(compose_path) # Preserve order while removing duplicates. deduped = [] seen = set() for path in files: if path in seen: continue seen.add(path) deduped.append(path) files = deduped except Exception as e: logger.warning(f"Failed parsing services-up.sh: {e}") return files def parse_project_name_from_script(script_path): project = "core" if not os.path.exists(script_path): return project try: with open(script_path) as f: for line in f: m = re.match(r'PROJECT\s*=\s*["\']?([^"\']+)', line) if m: project = m.group(1) break except Exception as e: logger.warning(f"Failed reading project name: {e}") return project def resolve_local_build_image(service_name, project_name): if client is None: return None try: images = client.images.list(filters={"label": f"com.docker.compose.service={service_name}"}) for image in images: labels = image.attrs.get("Config", {}).get("Labels", {}) or {} if labels.get("com.docker.compose.project") != project_name: continue for tag in image.tags: if tag and "" not in tag: logger.debug(f"Resolved local compose image for {service_name}: {tag}") return normalize_image_name(tag) except Exception as e: logger.debug(f"Could not inspect local build metadata for {service_name}: {e}") return None def parse_compose_services(compose_files, project_name, project_root): svc_map = {} for f in compose_files: if not os.path.exists(f): logger.warning(f"Compose file from services-up.sh is missing: {f}") continue try: with open(f) as stream: data = yaml.safe_load(stream) or {} for svc_name, svc_def in data.get("services", {}).items(): image = normalize_image_name(svc_def.get("image")) profiles = svc_def.get("profiles", []) build_ctx = svc_def.get("build") dockerfile_path = None from_dockerfile = None local_built_image = None if build_ctx: if isinstance(build_ctx, dict): context = build_ctx.get("context", ".") dockerfile = build_ctx.get("dockerfile", "Dockerfile") else: context = build_ctx dockerfile = "Dockerfile" compose_dir = os.path.dirname(f) context_expanded = expand_compose_path(context, project_root) if os.path.isabs(context_expanded): context_path = context_expanded else: context_path = os.path.normpath(os.path.join(compose_dir, context_expanded)) dockerfile_expanded = expand_compose_path(dockerfile, project_root) dockerfile_path = os.path.normpath(os.path.join(context_path, dockerfile_expanded)) from_dockerfile = normalize_image_name(parse_dockerfile_for_image(dockerfile_path)) local_built_image = resolve_local_build_image(svc_name, project_name) placeholder_image = is_compose_build_placeholder(image, project_name) or is_compose_build_placeholder(local_built_image, project_name) if placeholder_image: resolved_image = from_dockerfile or image or local_built_image or f"{project_name}-{svc_name}:latest" else: resolved_image = image or local_built_image or from_dockerfile or f"{project_name}-{svc_name}:latest" svc_map[svc_name] = { "image": resolved_image, "profiles": profiles, "build_context": build_ctx, "compose_file": f, "dockerfile": dockerfile_path } except Exception as e: logger.warning(f"Failed parsing {f}: {e}") logger.debug(f"Service image mapping: {svc_map}") return svc_map # --- Main check --- def check_containers(): if client is None: logger.error("Docker client is unavailable; skipping check cycle") return set_last_check_metric() CONTAINER_UPDATE.clear() project_name = parse_project_name_from_script(SERVICES_UP_SCRIPT) project_root = get_project_root_from_script(SERVICES_UP_SCRIPT) compose_files = get_compose_files_from_script(SERVICES_UP_SCRIPT) svc_map = parse_compose_services(compose_files, project_name, project_root) containers = client.containers.list() pending_metrics = [] remote_targets = set() for container in containers: proj = container.labels.get("com.docker.compose.project") if not proj: continue svc = container.labels.get("com.docker.compose.service") running = container.attrs["Config"]["Image"] compose_image = None if svc in svc_map: compose_image = svc_map[svc]["image"] local_digest = get_local_digest(running) remote_target = compose_image or running # If we cannot determine a local digest, we cannot compare and should # avoid spending a registry lookup for this container. if local_digest: remote_targets.add(remote_target) pending_metrics.append({ "container_name": container.name, "service": svc, "compose_image": compose_image, "running_image": running, "project_name": proj, "remote_target": remote_target, "local_digest": local_digest, }) remote_digests = {target: get_remote_digest(target) for target in remote_targets} for payload in pending_metrics: local_digest = payload["local_digest"] remote_target = payload["remote_target"] remote_digest = remote_digests.get(remote_target) update_flag = 1 if (local_digest and remote_digest and local_digest != remote_digest) else 0 logger.info( "Digest comparison: container=%s service=%s running=%s target=%s local=%s remote=%s", payload["container_name"], payload["service"], payload["running_image"], remote_target, local_digest, remote_digest, ) set_container_update_metric( container_name=payload["container_name"], compose_image=payload["compose_image"], running_image=payload["running_image"], project_name=payload["project_name"], update_flag=update_flag, ) def dump_service_image_mapping(): project_name = parse_project_name_from_script(SERVICES_UP_SCRIPT) project_root = get_project_root_from_script(SERVICES_UP_SCRIPT) compose_files = get_compose_files_from_script(SERVICES_UP_SCRIPT) svc_map = parse_compose_services(compose_files, project_name, project_root) mapping = {name: data["image"] for name, data in sorted(svc_map.items())} logger.info("Service to image mapping:") logger.info(json.dumps(mapping, indent=2, sort_keys=True)) return mapping # --- Runner --- if __name__ == "__main__": parser = argparse.ArgumentParser(description="Docker image update exporter") parser.add_argument("--dry-run", action="store_true", help="Only print service->image mapping and exit") parser.add_argument( "--services-up-script", default=SERVICES_UP_SCRIPT, help=f"Path to services-up script (default: {SERVICES_UP_SCRIPT})", ) parser.add_argument( "--cache-file", default=CACHE_FILE, help=f"Path to digest cache file (default: {CACHE_FILE})", ) parser.add_argument( "--log-level", default=LOG_LEVEL, help=f"Logging level (default: {LOG_LEVEL})", ) args = parser.parse_args() effective_log_level = str(args.log_level).upper() logging.getLogger().setLevel(getattr(logging, effective_log_level, logging.DEBUG)) logger.setLevel(getattr(logging, effective_log_level, logging.DEBUG)) SERVICES_UP_SCRIPT = args.services_up_script CACHE_FILE = args.cache_file REMOTE_DIGEST_CACHE = load_cache() now = time.time() REMOTE_DIGEST_CACHE = { image: (digest, ts) for image, (digest, ts) in REMOTE_DIGEST_CACHE.items() if now - ts < CACHE_TTL } if DRY_RUN or args.dry_run: dump_service_image_mapping() raise SystemExit(0) start_http_server(EXPORTER_PORT) while True: try: check_containers() except Exception as e: logger.exception(f"update check failed: {e}") time.sleep(CHECK_INTERVAL)