Multi-Environment Agent Deployment: Managing Different Configs Across Clusters
Manage AI agent configurations across multiple Kubernetes clusters using GitOps workflows, config synchronization, drift detection, and environment promotion pipelines.
The Multi-Cluster Challenge
Production AI agent systems rarely run in a single cluster. You might have a development cluster for rapid iteration, a staging cluster for integration testing, and one or more production clusters across regions. Each cluster runs the same agent code but with different configuration: different models, different token limits, different tool endpoints, different guardrail thresholds.
Without a systematic approach, configuration drift becomes inevitable. Staging might silently diverge from production, and a change that passed staging tests fails in production because the configs were not actually equivalent.
GitOps Configuration Structure
The foundation of multi-environment config management is a git repository where each environment has its own directory, with a shared base that all environments inherit from.
# Directory structure representation
CONFIG_STRUCTURE = """
agent-configs/
base/
agent.toml # Shared defaults
tools.toml # Tool definitions
guardrails.toml # Safety settings
overlays/
development/
kustomization.yaml
agent-patch.toml # Dev overrides
staging/
kustomization.yaml
agent-patch.toml # Staging overrides
production/
kustomization.yaml
agent-patch.toml # Prod overrides
production-eu/
kustomization.yaml
agent-patch.toml # EU region overrides
"""
Config Merger for Environments
Build a tool that merges base configuration with environment-specific overlays, producing the final resolved config for each environment.
from pathlib import Path
from copy import deepcopy
from typing import Any
try:
import tomllib
except ImportError:
import tomli as tomllib
class EnvironmentConfigBuilder:
def __init__(self, config_root: str):
self._root = Path(config_root)
self._base_dir = self._root / "base"
self._overlays_dir = self._root / "overlays"
def build(self, environment: str) -> dict[str, Any]:
# Load base configs
base = {}
for toml_file in sorted(self._base_dir.glob("*.toml")):
with open(toml_file, "rb") as f:
section = tomllib.load(f)
base = self._deep_merge(base, section)
# Load environment overlay
overlay_dir = self._overlays_dir / environment
if not overlay_dir.exists():
raise ValueError(f"Unknown environment: {environment}")
for toml_file in sorted(overlay_dir.glob("*.toml")):
with open(toml_file, "rb") as f:
overlay = tomllib.load(f)
base = self._deep_merge(base, overlay)
return base
def _deep_merge(self, base: dict, overlay: dict) -> dict:
result = deepcopy(base)
for key, value in overlay.items():
if (
key in result
and isinstance(result[key], dict)
and isinstance(value, dict)
):
result[key] = self._deep_merge(result[key], value)
else:
result[key] = deepcopy(value)
return result
def list_environments(self) -> list[str]:
return [
d.name for d in self._overlays_dir.iterdir() if d.is_dir()
]
Drift Detection
Drift occurs when the actual running configuration diverges from what the git repository says it should be. A drift detector compares the expected config with what is actually deployed.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import json
import hashlib
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class DriftReport:
environment: str
checked_at: datetime
expected_hash: str
actual_hash: str
has_drift: bool
drifted_fields: list[dict]
class DriftDetector:
def __init__(self, config_builder: EnvironmentConfigBuilder):
self._builder = config_builder
def check(
self, environment: str, actual_config: dict
) -> DriftReport:
expected = self._builder.build(environment)
expected_hash = self._hash_config(expected)
actual_hash = self._hash_config(actual_config)
drifted = []
if expected_hash != actual_hash:
drifted = self._find_differences(expected, actual_config)
return DriftReport(
environment=environment,
checked_at=datetime.utcnow(),
expected_hash=expected_hash,
actual_hash=actual_hash,
has_drift=expected_hash != actual_hash,
drifted_fields=drifted,
)
def _hash_config(self, config: dict) -> str:
serialized = json.dumps(config, sort_keys=True)
return hashlib.sha256(serialized.encode()).hexdigest()[:12]
def _find_differences(
self, expected: dict, actual: dict, prefix: str = ""
) -> list[dict]:
diffs = []
all_keys = set(expected.keys()) | set(actual.keys())
for key in sorted(all_keys):
full_key = f"{prefix}.{key}" if prefix else key
exp_val = expected.get(key)
act_val = actual.get(key)
if isinstance(exp_val, dict) and isinstance(act_val, dict):
diffs.extend(
self._find_differences(exp_val, act_val, full_key)
)
elif exp_val != act_val:
diffs.append({
"field": full_key,
"expected": exp_val,
"actual": act_val,
})
return diffs
Promotion Workflow
Changes should flow through environments in order: development to staging to production. A promotion pipeline ensures configs are tested at each stage before advancing.
from enum import Enum
class PromotionStatus(Enum):
PENDING = "pending"
TESTING = "testing"
APPROVED = "approved"
PROMOTED = "promoted"
REJECTED = "rejected"
@dataclass
class PromotionRequest:
id: str
source_env: str
target_env: str
config_hash: str
status: PromotionStatus
created_by: str
created_at: datetime
approved_by: Optional[str] = None
test_results: Optional[dict] = None
PROMOTION_ORDER = ["development", "staging", "production"]
class PromotionManager:
def __init__(self, config_builder: EnvironmentConfigBuilder):
self._builder = config_builder
self._requests: list[PromotionRequest] = []
def request_promotion(
self, source_env: str, target_env: str, requested_by: str
) -> PromotionRequest:
# Validate promotion order
src_idx = PROMOTION_ORDER.index(source_env)
tgt_idx = PROMOTION_ORDER.index(target_env)
if tgt_idx != src_idx + 1:
raise ValueError(
f"Cannot promote from {source_env} to {target_env}. "
f"Must follow order: {' -> '.join(PROMOTION_ORDER)}"
)
source_config = self._builder.build(source_env)
config_hash = hashlib.sha256(
json.dumps(source_config, sort_keys=True).encode()
).hexdigest()[:12]
request = PromotionRequest(
id=f"promo_{config_hash}_{target_env}",
source_env=source_env,
target_env=target_env,
config_hash=config_hash,
status=PromotionStatus.PENDING,
created_by=requested_by,
created_at=datetime.utcnow(),
)
self._requests.append(request)
return request
def approve(self, request_id: str, approver: str):
req = next((r for r in self._requests if r.id == request_id), None)
if not req:
raise KeyError(f"Request not found: {request_id}")
if req.created_by == approver:
raise ValueError("Cannot self-approve promotions")
req.status = PromotionStatus.APPROVED
req.approved_by = approver
Config Sync to Clusters
After approval, the sync engine pushes the configuration to the target cluster. In a Kubernetes environment, this typically means updating a ConfigMap or Secret.
class ConfigSyncer:
def __init__(self, config_builder: EnvironmentConfigBuilder):
self._builder = config_builder
def sync_to_cluster(self, environment: str) -> dict:
config = self._builder.build(environment)
config_json = json.dumps(config, sort_keys=True, indent=2)
# In real implementation, this would use the Kubernetes API
configmap = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": f"agent-config-{environment}",
"namespace": "ai-agents",
"labels": {
"app": "ai-agent",
"environment": environment,
"config-hash": hashlib.sha256(
config_json.encode()
).hexdigest()[:8],
},
},
"data": {
"agent-config.json": config_json,
},
}
return configmap
def generate_all(self) -> dict[str, dict]:
return {
env: self.sync_to_cluster(env)
for env in self._builder.list_environments()
}
Automated Drift Alerts
Run drift detection on a schedule and alert when configuration has diverged from the expected state.
async def drift_check_job(
detector: DriftDetector,
environments: list[str],
get_actual_config, # Function to fetch running config from cluster
alert_fn, # Function to send alerts
):
for env in environments:
actual = await get_actual_config(env)
report = detector.check(env, actual)
if report.has_drift:
await alert_fn(
f"Config drift detected in {env}",
f"Fields: {json.dumps(report.drifted_fields, indent=2)}",
)
FAQ
How do I handle secrets that differ across environments?
Never store secrets in the config repository. Use Kubernetes Secrets or an external secrets manager like HashiCorp Vault. Reference secrets by name in your config files, and let the cluster-specific secrets provider inject the actual values. This keeps the git repository free of sensitive data while still tracking which secrets each environment needs.
What happens if I need to hotfix production without going through the promotion pipeline?
Support an emergency bypass path that still requires approval from two team members. Log the bypass event prominently, and require a follow-up PR that backfills the change into the development and staging configurations within 24 hours. The goal is to keep environments in sync even after emergency changes.
How do I handle config changes that are not backward compatible?
Treat non-backward-compatible config changes the same way you treat database migrations. Version your config schema, and include a migration script that transforms old config format to new. During the transition, support both formats with a compatibility layer that reads old keys and maps them to new ones.
#MultiEnvironment #AIAgents #GitOps #Kubernetes #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.