Skip to content
Learn Agentic AI12 min read0 views

Multi-Environment Agent Deployment: Managing Different Configs Across Clusters

Manage AI agent configurations across multiple Kubernetes clusters using GitOps workflows, config synchronization, drift detection, and environment promotion pipelines.

The Multi-Cluster Challenge

Production AI agent systems rarely run in a single cluster. You might have a development cluster for rapid iteration, a staging cluster for integration testing, and one or more production clusters across regions. Each cluster runs the same agent code but with different configuration: different models, different token limits, different tool endpoints, different guardrail thresholds.

Without a systematic approach, configuration drift becomes inevitable. Staging might silently diverge from production, and a change that passed staging tests fails in production because the configs were not actually equivalent.

GitOps Configuration Structure

The foundation of multi-environment config management is a git repository where each environment has its own directory, with a shared base that all environments inherit from.

# Directory structure representation
CONFIG_STRUCTURE = """
agent-configs/
  base/
    agent.toml          # Shared defaults
    tools.toml          # Tool definitions
    guardrails.toml     # Safety settings
  overlays/
    development/
      kustomization.yaml
      agent-patch.toml  # Dev overrides
    staging/
      kustomization.yaml
      agent-patch.toml  # Staging overrides
    production/
      kustomization.yaml
      agent-patch.toml  # Prod overrides
    production-eu/
      kustomization.yaml
      agent-patch.toml  # EU region overrides
"""

Config Merger for Environments

Build a tool that merges base configuration with environment-specific overlays, producing the final resolved config for each environment.

from pathlib import Path
from copy import deepcopy
from typing import Any

try:
    import tomllib
except ImportError:
    import tomli as tomllib


class EnvironmentConfigBuilder:
    def __init__(self, config_root: str):
        self._root = Path(config_root)
        self._base_dir = self._root / "base"
        self._overlays_dir = self._root / "overlays"

    def build(self, environment: str) -> dict[str, Any]:
        # Load base configs
        base = {}
        for toml_file in sorted(self._base_dir.glob("*.toml")):
            with open(toml_file, "rb") as f:
                section = tomllib.load(f)
            base = self._deep_merge(base, section)

        # Load environment overlay
        overlay_dir = self._overlays_dir / environment
        if not overlay_dir.exists():
            raise ValueError(f"Unknown environment: {environment}")

        for toml_file in sorted(overlay_dir.glob("*.toml")):
            with open(toml_file, "rb") as f:
                overlay = tomllib.load(f)
            base = self._deep_merge(base, overlay)

        return base

    def _deep_merge(self, base: dict, overlay: dict) -> dict:
        result = deepcopy(base)
        for key, value in overlay.items():
            if (
                key in result
                and isinstance(result[key], dict)
                and isinstance(value, dict)
            ):
                result[key] = self._deep_merge(result[key], value)
            else:
                result[key] = deepcopy(value)
        return result

    def list_environments(self) -> list[str]:
        return [
            d.name for d in self._overlays_dir.iterdir() if d.is_dir()
        ]

Drift Detection

Drift occurs when the actual running configuration diverges from what the git repository says it should be. A drift detector compares the expected config with what is actually deployed.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import json
import hashlib
from dataclasses import dataclass
from datetime import datetime
from typing import Optional


@dataclass
class DriftReport:
    environment: str
    checked_at: datetime
    expected_hash: str
    actual_hash: str
    has_drift: bool
    drifted_fields: list[dict]


class DriftDetector:
    def __init__(self, config_builder: EnvironmentConfigBuilder):
        self._builder = config_builder

    def check(
        self, environment: str, actual_config: dict
    ) -> DriftReport:
        expected = self._builder.build(environment)
        expected_hash = self._hash_config(expected)
        actual_hash = self._hash_config(actual_config)

        drifted = []
        if expected_hash != actual_hash:
            drifted = self._find_differences(expected, actual_config)

        return DriftReport(
            environment=environment,
            checked_at=datetime.utcnow(),
            expected_hash=expected_hash,
            actual_hash=actual_hash,
            has_drift=expected_hash != actual_hash,
            drifted_fields=drifted,
        )

    def _hash_config(self, config: dict) -> str:
        serialized = json.dumps(config, sort_keys=True)
        return hashlib.sha256(serialized.encode()).hexdigest()[:12]

    def _find_differences(
        self, expected: dict, actual: dict, prefix: str = ""
    ) -> list[dict]:
        diffs = []
        all_keys = set(expected.keys()) | set(actual.keys())

        for key in sorted(all_keys):
            full_key = f"{prefix}.{key}" if prefix else key
            exp_val = expected.get(key)
            act_val = actual.get(key)

            if isinstance(exp_val, dict) and isinstance(act_val, dict):
                diffs.extend(
                    self._find_differences(exp_val, act_val, full_key)
                )
            elif exp_val != act_val:
                diffs.append({
                    "field": full_key,
                    "expected": exp_val,
                    "actual": act_val,
                })

        return diffs

Promotion Workflow

Changes should flow through environments in order: development to staging to production. A promotion pipeline ensures configs are tested at each stage before advancing.

from enum import Enum


class PromotionStatus(Enum):
    PENDING = "pending"
    TESTING = "testing"
    APPROVED = "approved"
    PROMOTED = "promoted"
    REJECTED = "rejected"


@dataclass
class PromotionRequest:
    id: str
    source_env: str
    target_env: str
    config_hash: str
    status: PromotionStatus
    created_by: str
    created_at: datetime
    approved_by: Optional[str] = None
    test_results: Optional[dict] = None


PROMOTION_ORDER = ["development", "staging", "production"]


class PromotionManager:
    def __init__(self, config_builder: EnvironmentConfigBuilder):
        self._builder = config_builder
        self._requests: list[PromotionRequest] = []

    def request_promotion(
        self, source_env: str, target_env: str, requested_by: str
    ) -> PromotionRequest:
        # Validate promotion order
        src_idx = PROMOTION_ORDER.index(source_env)
        tgt_idx = PROMOTION_ORDER.index(target_env)
        if tgt_idx != src_idx + 1:
            raise ValueError(
                f"Cannot promote from {source_env} to {target_env}. "
                f"Must follow order: {' -> '.join(PROMOTION_ORDER)}"
            )

        source_config = self._builder.build(source_env)
        config_hash = hashlib.sha256(
            json.dumps(source_config, sort_keys=True).encode()
        ).hexdigest()[:12]

        request = PromotionRequest(
            id=f"promo_{config_hash}_{target_env}",
            source_env=source_env,
            target_env=target_env,
            config_hash=config_hash,
            status=PromotionStatus.PENDING,
            created_by=requested_by,
            created_at=datetime.utcnow(),
        )
        self._requests.append(request)
        return request

    def approve(self, request_id: str, approver: str):
        req = next((r for r in self._requests if r.id == request_id), None)
        if not req:
            raise KeyError(f"Request not found: {request_id}")
        if req.created_by == approver:
            raise ValueError("Cannot self-approve promotions")
        req.status = PromotionStatus.APPROVED
        req.approved_by = approver

Config Sync to Clusters

After approval, the sync engine pushes the configuration to the target cluster. In a Kubernetes environment, this typically means updating a ConfigMap or Secret.

class ConfigSyncer:
    def __init__(self, config_builder: EnvironmentConfigBuilder):
        self._builder = config_builder

    def sync_to_cluster(self, environment: str) -> dict:
        config = self._builder.build(environment)
        config_json = json.dumps(config, sort_keys=True, indent=2)

        # In real implementation, this would use the Kubernetes API
        configmap = {
            "apiVersion": "v1",
            "kind": "ConfigMap",
            "metadata": {
                "name": f"agent-config-{environment}",
                "namespace": "ai-agents",
                "labels": {
                    "app": "ai-agent",
                    "environment": environment,
                    "config-hash": hashlib.sha256(
                        config_json.encode()
                    ).hexdigest()[:8],
                },
            },
            "data": {
                "agent-config.json": config_json,
            },
        }
        return configmap

    def generate_all(self) -> dict[str, dict]:
        return {
            env: self.sync_to_cluster(env)
            for env in self._builder.list_environments()
        }

Automated Drift Alerts

Run drift detection on a schedule and alert when configuration has diverged from the expected state.

async def drift_check_job(
    detector: DriftDetector,
    environments: list[str],
    get_actual_config,  # Function to fetch running config from cluster
    alert_fn,           # Function to send alerts
):
    for env in environments:
        actual = await get_actual_config(env)
        report = detector.check(env, actual)

        if report.has_drift:
            await alert_fn(
                f"Config drift detected in {env}",
                f"Fields: {json.dumps(report.drifted_fields, indent=2)}",
            )

FAQ

How do I handle secrets that differ across environments?

Never store secrets in the config repository. Use Kubernetes Secrets or an external secrets manager like HashiCorp Vault. Reference secrets by name in your config files, and let the cluster-specific secrets provider inject the actual values. This keeps the git repository free of sensitive data while still tracking which secrets each environment needs.

What happens if I need to hotfix production without going through the promotion pipeline?

Support an emergency bypass path that still requires approval from two team members. Log the bypass event prominently, and require a follow-up PR that backfills the change into the development and staging configurations within 24 hours. The goal is to keep environments in sync even after emergency changes.

How do I handle config changes that are not backward compatible?

Treat non-backward-compatible config changes the same way you treat database migrations. Version your config schema, and include a migration script that transforms old config format to new. During the transition, support both formats with a compatibility layer that reads old keys and maps them to new ones.


#MultiEnvironment #AIAgents #GitOps #Kubernetes #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.