Cross-Organizational Multi-Agent Systems: Federated Agents Across Company Boundaries
Design multi-agent systems that span organizational boundaries with proper API contracts, trust boundaries, data sharing controls, and compliance frameworks. Build federated agent architectures safely.
When Agents Cross Company Boundaries
Multi-agent systems become significantly more complex when agents from different organizations need to collaborate. A supply chain optimization system might involve a manufacturer's demand forecasting agent, a logistics provider's routing agent, and a retailer's inventory management agent — each owned by a different company with different data policies, security requirements, and business objectives.
This is not a theoretical concern. As AI agent ecosystems mature, federated multi-agent architectures are becoming necessary for any workflow that spans organizational boundaries. The challenge is building trust, enforcing data boundaries, and maintaining compliance when you do not control the other side.
Trust Boundaries and the Agent Gateway
The first principle of cross-organizational agent design: never trust the other organization's agents directly. All communication goes through a gateway that validates, sanitizes, and logs every interaction.
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
import hashlib
import json
class TrustLevel(Enum):
UNTRUSTED = "untrusted"
BASIC = "basic"
VERIFIED = "verified"
PRIVILEGED = "privileged"
@dataclass
class OrganizationProfile:
org_id: str
name: str
trust_level: TrustLevel
allowed_operations: list[str]
data_classification_ceiling: str # "public", "internal", "confidential"
rate_limit_per_minute: int = 60
api_key_hash: str = ""
@dataclass
class AgentMessage:
source_org: str
source_agent: str
target_org: str
target_agent: str
operation: str
payload: dict[str, Any]
timestamp: str = field(
default_factory=lambda: datetime.now().isoformat()
)
message_id: str = ""
class AgentGateway:
def __init__(self, own_org_id: str):
self.own_org_id = own_org_id
self.org_registry: dict[str, OrganizationProfile] = {}
self.audit_log: list[dict] = []
def register_org(self, profile: OrganizationProfile):
self.org_registry[profile.org_id] = profile
def process_inbound(self, message: AgentMessage) -> dict:
org = self.org_registry.get(message.source_org)
if not org:
return self._reject("Unknown organization", message)
if org.trust_level == TrustLevel.UNTRUSTED:
return self._reject("Organization not trusted", message)
if message.operation not in org.allowed_operations:
return self._reject(
f"Operation '{message.operation}' not permitted",
message,
)
sanitized = self._sanitize_payload(message.payload, org)
self._audit(message, "accepted")
return {
"status": "accepted",
"sanitized_payload": sanitized,
"trust_level": org.trust_level.value,
}
def process_outbound(
self, message: AgentMessage, data_classification: str
) -> dict:
org = self.org_registry.get(message.target_org)
if not org:
return self._reject("Unknown target org", message)
classification_rank = {
"public": 0, "internal": 1, "confidential": 2
}
if classification_rank.get(data_classification, 99) > classification_rank.get(org.data_classification_ceiling, 0):
return self._reject(
f"Data classification '{data_classification}' exceeds "
f"ceiling '{org.data_classification_ceiling}'",
message,
)
filtered = self._filter_outbound_data(
message.payload, org.data_classification_ceiling
)
self._audit(message, "sent")
return {"status": "sent", "filtered_payload": filtered}
def _sanitize_payload(self, payload: dict, org) -> dict:
sanitized = {}
for key, value in payload.items():
if isinstance(value, str) and len(value) > 10000:
sanitized[key] = value[:10000]
else:
sanitized[key] = value
return sanitized
def _filter_outbound_data(self, payload, ceiling):
return {k: v for k, v in payload.items()
if not k.startswith("_internal")}
def _reject(self, reason, message):
self._audit(message, f"rejected: {reason}")
return {"status": "rejected", "reason": reason}
def _audit(self, message, action):
self.audit_log.append({
"timestamp": datetime.now().isoformat(),
"source_org": message.source_org,
"target_org": message.target_org,
"operation": message.operation,
"action": action,
})
API Contracts for Agent Interoperability
When two organizations agree to let their agents communicate, they need a formal contract defining the operations, data schemas, SLAs, and failure modes.
@dataclass
class AgentAPIContract:
contract_id: str
party_a: str
party_b: str
operations: list[dict] # name, request_schema, response_schema
sla: dict # max_latency_ms, availability_percent, etc.
data_policy: dict # retention, allowed_fields, redacted_fields
effective_date: str
expiration_date: str
def validate_request(self, operation: str, payload: dict) -> dict:
op_spec = next(
(o for o in self.operations if o["name"] == operation),
None,
)
if not op_spec:
return {"valid": False, "error": "Operation not in contract"}
required_fields = op_spec.get("request_schema", {}).get(
"required", []
)
missing = [f for f in required_fields if f not in payload]
if missing:
return {
"valid": False,
"error": f"Missing fields: {missing}",
}
redacted = self.data_policy.get("redacted_fields", [])
for field_name in redacted:
if field_name in payload:
return {
"valid": False,
"error": f"Field '{field_name}' must not be sent",
}
return {"valid": True}
# Define a contract between two organizations
supply_chain_contract = AgentAPIContract(
contract_id="SC-2026-001",
party_a="manufacturer_co",
party_b="logistics_co",
operations=[
{
"name": "request_shipping_quote",
"request_schema": {
"required": ["origin", "destination", "weight_kg"],
},
"response_schema": {
"required": ["quote_id", "price_usd", "eta_days"],
},
},
{
"name": "track_shipment",
"request_schema": {"required": ["tracking_id"]},
"response_schema": {
"required": ["status", "current_location"],
},
},
],
sla={"max_latency_ms": 5000, "availability_percent": 99.5},
data_policy={
"retention_days": 90,
"redacted_fields": ["customer_ssn", "internal_cost"],
},
effective_date="2026-01-01",
expiration_date="2026-12-31",
)
Data Sharing With Privacy Controls
Cross-organizational data sharing requires explicit controls over what data leaves your boundary, how it is transformed, and what the receiving party can do with it.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class DataSharingController:
def __init__(self):
self.sharing_rules: dict[str, dict] = {}
def add_rule(
self, target_org: str, allowed_fields: list[str],
transforms: dict[str, str] | None = None,
):
self.sharing_rules[target_org] = {
"allowed_fields": set(allowed_fields),
"transforms": transforms or {},
}
def prepare_for_sharing(
self, data: dict, target_org: str
) -> dict:
rule = self.sharing_rules.get(target_org)
if not rule:
return {} # share nothing by default
filtered = {
k: v for k, v in data.items()
if k in rule["allowed_fields"]
}
for field_name, transform_type in rule["transforms"].items():
if field_name in filtered:
filtered[field_name] = self._apply_transform(
filtered[field_name], transform_type
)
return filtered
def _apply_transform(self, value, transform_type: str):
if transform_type == "hash":
return hashlib.sha256(str(value).encode()).hexdigest()[:16]
elif transform_type == "round":
return round(float(value), 0)
elif transform_type == "redact":
return "[REDACTED]"
return value
# Only share specific fields, with transforms for sensitive values
controller = DataSharingController()
controller.add_rule(
target_org="logistics_co",
allowed_fields=["order_id", "weight_kg", "destination_zip", "customer_id"],
transforms={"customer_id": "hash"}, # pseudonymize
)
Compliance and Audit Trail
Every cross-organizational interaction must be auditable. Regulations like GDPR, HIPAA, and SOC2 require proof of what data was shared, with whom, and under what authority. The audit log in the gateway provides this, but you should also maintain a compliance checker that validates ongoing adherence to contracts and policies.
FAQ
How do I handle version mismatches when the other organization updates their agent API?
Use semantic versioning in your API contracts and support at least the current and previous major version simultaneously. Include a version field in every agent message. The gateway should reject messages with unsupported versions and log them for debugging. Negotiate upgrade timelines in your contract — typically 90 days of overlap between versions.
What happens when a cross-organizational agent call fails or times out?
Design for failure at every level. Set aggressive timeouts (the SLA max latency), implement circuit breakers that stop calling a failing external agent after 3 consecutive failures, and always have a local fallback. For a shipping quote, the fallback might be a cached recent quote or an estimated range. Never let an external agent failure cascade into your internal system going down.
How do I verify that the other organization's agent is not sending manipulated data?
Use cryptographic signing for all inter-organizational messages. Each organization signs outbound messages with its private key, and the receiving gateway verifies the signature. For high-stakes operations, add a mutual attestation step where both parties agree on the message contents before either acts on them. This prevents replay attacks and tampered payloads.
#FederatedAgents #CrossOrganization #APIContracts #TrustBoundaries #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.