Skip to content
Learn Agentic AI14 min read0 views

Cross-Organizational Multi-Agent Systems: Federated Agents Across Company Boundaries

Design multi-agent systems that span organizational boundaries with proper API contracts, trust boundaries, data sharing controls, and compliance frameworks. Build federated agent architectures safely.

When Agents Cross Company Boundaries

Multi-agent systems become significantly more complex when agents from different organizations need to collaborate. A supply chain optimization system might involve a manufacturer's demand forecasting agent, a logistics provider's routing agent, and a retailer's inventory management agent — each owned by a different company with different data policies, security requirements, and business objectives.

This is not a theoretical concern. As AI agent ecosystems mature, federated multi-agent architectures are becoming necessary for any workflow that spans organizational boundaries. The challenge is building trust, enforcing data boundaries, and maintaining compliance when you do not control the other side.

Trust Boundaries and the Agent Gateway

The first principle of cross-organizational agent design: never trust the other organization's agents directly. All communication goes through a gateway that validates, sanitizes, and logs every interaction.

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
import hashlib
import json

class TrustLevel(Enum):
    UNTRUSTED = "untrusted"
    BASIC = "basic"
    VERIFIED = "verified"
    PRIVILEGED = "privileged"

@dataclass
class OrganizationProfile:
    org_id: str
    name: str
    trust_level: TrustLevel
    allowed_operations: list[str]
    data_classification_ceiling: str  # "public", "internal", "confidential"
    rate_limit_per_minute: int = 60
    api_key_hash: str = ""

@dataclass
class AgentMessage:
    source_org: str
    source_agent: str
    target_org: str
    target_agent: str
    operation: str
    payload: dict[str, Any]
    timestamp: str = field(
        default_factory=lambda: datetime.now().isoformat()
    )
    message_id: str = ""

class AgentGateway:
    def __init__(self, own_org_id: str):
        self.own_org_id = own_org_id
        self.org_registry: dict[str, OrganizationProfile] = {}
        self.audit_log: list[dict] = []

    def register_org(self, profile: OrganizationProfile):
        self.org_registry[profile.org_id] = profile

    def process_inbound(self, message: AgentMessage) -> dict:
        org = self.org_registry.get(message.source_org)
        if not org:
            return self._reject("Unknown organization", message)

        if org.trust_level == TrustLevel.UNTRUSTED:
            return self._reject("Organization not trusted", message)

        if message.operation not in org.allowed_operations:
            return self._reject(
                f"Operation '{message.operation}' not permitted",
                message,
            )

        sanitized = self._sanitize_payload(message.payload, org)

        self._audit(message, "accepted")
        return {
            "status": "accepted",
            "sanitized_payload": sanitized,
            "trust_level": org.trust_level.value,
        }

    def process_outbound(
        self, message: AgentMessage, data_classification: str
    ) -> dict:
        org = self.org_registry.get(message.target_org)
        if not org:
            return self._reject("Unknown target org", message)

        classification_rank = {
            "public": 0, "internal": 1, "confidential": 2
        }
        if classification_rank.get(data_classification, 99) >            classification_rank.get(org.data_classification_ceiling, 0):
            return self._reject(
                f"Data classification '{data_classification}' exceeds "
                f"ceiling '{org.data_classification_ceiling}'",
                message,
            )

        filtered = self._filter_outbound_data(
            message.payload, org.data_classification_ceiling
        )
        self._audit(message, "sent")
        return {"status": "sent", "filtered_payload": filtered}

    def _sanitize_payload(self, payload: dict, org) -> dict:
        sanitized = {}
        for key, value in payload.items():
            if isinstance(value, str) and len(value) > 10000:
                sanitized[key] = value[:10000]
            else:
                sanitized[key] = value
        return sanitized

    def _filter_outbound_data(self, payload, ceiling):
        return {k: v for k, v in payload.items()
                if not k.startswith("_internal")}

    def _reject(self, reason, message):
        self._audit(message, f"rejected: {reason}")
        return {"status": "rejected", "reason": reason}

    def _audit(self, message, action):
        self.audit_log.append({
            "timestamp": datetime.now().isoformat(),
            "source_org": message.source_org,
            "target_org": message.target_org,
            "operation": message.operation,
            "action": action,
        })

API Contracts for Agent Interoperability

When two organizations agree to let their agents communicate, they need a formal contract defining the operations, data schemas, SLAs, and failure modes.

@dataclass
class AgentAPIContract:
    contract_id: str
    party_a: str
    party_b: str
    operations: list[dict]  # name, request_schema, response_schema
    sla: dict  # max_latency_ms, availability_percent, etc.
    data_policy: dict  # retention, allowed_fields, redacted_fields
    effective_date: str
    expiration_date: str

    def validate_request(self, operation: str, payload: dict) -> dict:
        op_spec = next(
            (o for o in self.operations if o["name"] == operation),
            None,
        )
        if not op_spec:
            return {"valid": False, "error": "Operation not in contract"}

        required_fields = op_spec.get("request_schema", {}).get(
            "required", []
        )
        missing = [f for f in required_fields if f not in payload]
        if missing:
            return {
                "valid": False,
                "error": f"Missing fields: {missing}",
            }

        redacted = self.data_policy.get("redacted_fields", [])
        for field_name in redacted:
            if field_name in payload:
                return {
                    "valid": False,
                    "error": f"Field '{field_name}' must not be sent",
                }

        return {"valid": True}

# Define a contract between two organizations
supply_chain_contract = AgentAPIContract(
    contract_id="SC-2026-001",
    party_a="manufacturer_co",
    party_b="logistics_co",
    operations=[
        {
            "name": "request_shipping_quote",
            "request_schema": {
                "required": ["origin", "destination", "weight_kg"],
            },
            "response_schema": {
                "required": ["quote_id", "price_usd", "eta_days"],
            },
        },
        {
            "name": "track_shipment",
            "request_schema": {"required": ["tracking_id"]},
            "response_schema": {
                "required": ["status", "current_location"],
            },
        },
    ],
    sla={"max_latency_ms": 5000, "availability_percent": 99.5},
    data_policy={
        "retention_days": 90,
        "redacted_fields": ["customer_ssn", "internal_cost"],
    },
    effective_date="2026-01-01",
    expiration_date="2026-12-31",
)

Data Sharing With Privacy Controls

Cross-organizational data sharing requires explicit controls over what data leaves your boundary, how it is transformed, and what the receiving party can do with it.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class DataSharingController:
    def __init__(self):
        self.sharing_rules: dict[str, dict] = {}

    def add_rule(
        self, target_org: str, allowed_fields: list[str],
        transforms: dict[str, str] | None = None,
    ):
        self.sharing_rules[target_org] = {
            "allowed_fields": set(allowed_fields),
            "transforms": transforms or {},
        }

    def prepare_for_sharing(
        self, data: dict, target_org: str
    ) -> dict:
        rule = self.sharing_rules.get(target_org)
        if not rule:
            return {}  # share nothing by default

        filtered = {
            k: v for k, v in data.items()
            if k in rule["allowed_fields"]
        }

        for field_name, transform_type in rule["transforms"].items():
            if field_name in filtered:
                filtered[field_name] = self._apply_transform(
                    filtered[field_name], transform_type
                )

        return filtered

    def _apply_transform(self, value, transform_type: str):
        if transform_type == "hash":
            return hashlib.sha256(str(value).encode()).hexdigest()[:16]
        elif transform_type == "round":
            return round(float(value), 0)
        elif transform_type == "redact":
            return "[REDACTED]"
        return value

# Only share specific fields, with transforms for sensitive values
controller = DataSharingController()
controller.add_rule(
    target_org="logistics_co",
    allowed_fields=["order_id", "weight_kg", "destination_zip", "customer_id"],
    transforms={"customer_id": "hash"},  # pseudonymize
)

Compliance and Audit Trail

Every cross-organizational interaction must be auditable. Regulations like GDPR, HIPAA, and SOC2 require proof of what data was shared, with whom, and under what authority. The audit log in the gateway provides this, but you should also maintain a compliance checker that validates ongoing adherence to contracts and policies.

FAQ

How do I handle version mismatches when the other organization updates their agent API?

Use semantic versioning in your API contracts and support at least the current and previous major version simultaneously. Include a version field in every agent message. The gateway should reject messages with unsupported versions and log them for debugging. Negotiate upgrade timelines in your contract — typically 90 days of overlap between versions.

What happens when a cross-organizational agent call fails or times out?

Design for failure at every level. Set aggressive timeouts (the SLA max latency), implement circuit breakers that stop calling a failing external agent after 3 consecutive failures, and always have a local fallback. For a shipping quote, the fallback might be a cached recent quote or an estimated range. Never let an external agent failure cascade into your internal system going down.

How do I verify that the other organization's agent is not sending manipulated data?

Use cryptographic signing for all inter-organizational messages. Each organization signs outbound messages with its private key, and the receiving gateway verifies the signature. For high-stakes operations, add a mutual attestation step where both parties agree on the message contents before either acts on them. This prevents replay attacks and tampered payloads.


#FederatedAgents #CrossOrganization #APIContracts #TrustBoundaries #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.