#!/usr/bin/env python3 import os import re import time import json import logging import docker import requests import yaml from prometheus_client import Gauge, start_http_server # --- Logging --- LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.DEBUG), format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger("docker-update-exporter") # --- Config --- EXPORTER_PORT = 9105 CHECK_INTERVAL = 60 CACHE_TTL = 6 * 3600 SERVICES_UP_SCRIPT = "/compose/services-up.sh" CACHE_FILE = "/data/remote_digest_cache.json" client = docker.from_env() # --- Metrics --- CONTAINER_UPDATE = Gauge( "docker_container_update_available", "1 if container image is out of date (compose drift or registry), 0 otherwise", ["container", "compose_image", "running_image", "com_docker_compose_project"] ) LAST_CHECK = Gauge( "docker_image_update_last_check_timestamp", "Last time the update check ran (unix timestamp)" ) # --- Persistent Cache --- def load_cache(): if not os.path.exists(CACHE_FILE): logger.info(f"Cache file does not exist yet: {CACHE_FILE}") return {} try: with open(CACHE_FILE, "r") as f: cache = json.load(f) logger.info(f"Loaded {len(cache)} cached remote digests") return cache except Exception as e: logger.error(f"Failed to load cache: {e}") return {} def save_cache(): try: os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True) with open(CACHE_FILE, "w") as f: json.dump(REMOTE_DIGEST_CACHE, f) logger.debug(f"Saved {len(REMOTE_DIGEST_CACHE)} remote digests to cache") except Exception as e: logger.error(f"Failed to save cache: {e}") REMOTE_DIGEST_CACHE = load_cache() # --- Helpers --- def get_project_prefix_from_script(script_path): prefix = "core-" if not os.path.exists(script_path): return prefix try: with open(script_path) as f: for line in f: m = re.match(r'PROJECT\s*=\s*["\']?([^"\']+)', line) if m: return m.group(1) + "-" except Exception as e: logger.warning(f"Failed reading project prefix: {e}") return prefix def get_local_digest(image_name): try: img = client.images.get(image_name) digests = img.attrs.get("RepoDigests", []) if digests: digest = digests[0].split("@")[1] logger.debug(f"Local digest for {image_name}: {digest}") return digest logger.debug(f"No local digest found for {image_name}") except Exception: pass return None def get_remote_digest(image_name): now = time.time() if image_name in REMOTE_DIGEST_CACHE: digest, ts = REMOTE_DIGEST_CACHE[image_name] if now - ts < CACHE_TTL: return digest try: if "/" not in image_name: registry = "docker.io" repo = "library/" + image_name else: parts = image_name.split("/") if "." in parts[0] or ":" in parts[0]: registry = parts[0] repo = "/".join(parts[1:]) else: registry = "docker.io" repo = image_name if ":" in repo: repo, tag = repo.rsplit(":", 1) else: tag = "latest" if registry in ["docker.io", "registry-1.docker.io"]: token_res = requests.get( "https://auth.docker.io/token", params={"service": "registry.docker.io", "scope": f"repository:{repo}:pull"}, timeout=10 ) token = token_res.json().get("token") manifest_url = f"https://registry-1.docker.io/v2/{repo}/manifests/{tag}" elif registry == "ghcr.io": token_res = requests.get( "https://ghcr.io/token", params={"service": "ghcr.io", "scope": f"repository:{repo}:pull"}, timeout=10 ) token = token_res.json().get("token") manifest_url = f"https://ghcr.io/v2/{repo}/manifests/{tag}" else: logger.warning(f"Unsupported registry {registry} for {image_name}") return None if not token: return None res = requests.get( manifest_url, headers={"Authorization": f"Bearer {token}", "Accept": "application/vnd.docker.distribution.manifest.v2+json"}, timeout=10 ) if res.status_code == 200: digest = res.headers.get("Docker-Content-Digest") REMOTE_DIGEST_CACHE[image_name] = (digest, now) save_cache() return digest except Exception as e: logger.debug(f"Error fetching remote digest for {image_name}: {e}") return None # --- Dockerfile Image Extraction --- def parse_dockerfile_for_image(dockerfile_path): if not os.path.exists(dockerfile_path): return None image_name = None try: with open(dockerfile_path) as df: for line in df: line = line.strip() # Prefer LABEL with image if present if "LABEL" in line and "image=" in line: match = re.search(r'image=["\']?([^"\']+)["\']?', line) if match: image_name = match.group(1) logger.debug(f"Found LABEL image={image_name} in {dockerfile_path}") return image_name # If no LABEL, use the FROM line as fallback df.seek(0) for line in df: line = line.strip() if line.upper().startswith("FROM "): parts = line.split() if len(parts) >= 2: base_image = parts[1] logger.debug(f"Found base FROM {base_image} in {dockerfile_path}") return base_image except Exception as e: logger.debug(f"Error reading Dockerfile {dockerfile_path}: {e}") return image_name # --- Compose parsing --- def get_compose_files_from_script(script_path): files = [] if not os.path.exists(script_path): return files base_dir = os.path.dirname(script_path) try: with open(script_path) as f: content = f.read() match = re.search(r'FILES\s*=\s*\((.*?)\)', content, re.DOTALL) if match: for line in match.group(1).splitlines(): line = line.strip() if line.startswith("-f"): path = line[2:].strip() if path: full = os.path.normpath(os.path.join(base_dir, path)) files.append(full) except Exception as e: logger.warning(f"Failed parsing services-up.sh: {e}") return files def parse_compose_services(compose_files): svc_map = {} for f in compose_files: try: with open(f) as stream: data = yaml.safe_load(stream) or {} for svc_name, svc_def in data.get("services", {}).items(): image = svc_def.get("image") is_built = False if not image and "build" in svc_def: is_built = True build_ctx = svc_def["build"] dockerfile_path = None if isinstance(build_ctx, dict): context = build_ctx.get("context", ".") dockerfile_path = os.path.join(context, build_ctx.get("dockerfile", "Dockerfile")) elif isinstance(build_ctx, str): dockerfile_path = os.path.join(build_ctx, "Dockerfile") image = parse_dockerfile_for_image(dockerfile_path) if not image: logger.info(f"Defaulting build image for {svc_name} to {svc_name}:latest") image = f"{svc_name}:latest" svc_map[svc_name] = (image, is_built) except Exception as e: logger.warning(f"Failed parsing {f}: {e}") logger.debug(f"Service image mapping: {svc_map}") return svc_map # --- Main check --- def check_containers(): CONTAINER_UPDATE.clear() prefix = get_project_prefix_from_script(SERVICES_UP_SCRIPT) compose_files = get_compose_files_from_script(SERVICES_UP_SCRIPT) svc_map = parse_compose_services(compose_files) containers = client.containers.list() for container in containers: proj = container.labels.get("com.docker.compose.project") if not proj: continue svc = container.labels.get("com.docker.compose.service") running = container.attrs["Config"]["Image"] compose_image = None is_built = False if svc in svc_map: compose_image, is_built = svc_map[svc] if is_built: name, _, _ = compose_image.partition(":") compose_image = f"{prefix}{name}" update_flag = 0 local_digest = get_local_digest(running) remote_digest = get_remote_digest(compose_image if is_built else running) if local_digest and remote_digest and local_digest != remote_digest: update_flag = 1 CONTAINER_UPDATE.labels( container=container.name, compose_image=compose_image or "unknown", running_image=running, com_docker_compose_project=proj ).set(update_flag) # --- Runner --- if __name__ == "__main__": start_http_server(EXPORTER_PORT) while True: try: check_containers() except Exception as e: logger.exception(f"update check failed: {e}") time.sleep(CHECK_INTERVAL)