Building Collaborative AI Agents: Real-Time Multi-User Agent Interactions
Design systems where multiple users interact with shared AI agents simultaneously, covering session sharing, conflict resolution, turn management, and maintaining consistent state across concurrent participants.
The Multi-User AI Agent Challenge
Single-user AI agents are straightforward: one user sends a message, the agent responds. But collaborative scenarios — a team brainstorming with an AI agent, multiple support agents watching the same AI handle a customer, or a classroom where students interact with a shared AI tutor — require fundamentally different architecture. Multiple participants must see the same agent state, their messages must be ordered consistently, and the agent must handle concurrent inputs without confusion.
The core challenges are state synchronization (all participants see the same conversation), conflict resolution (what happens when two people prompt the agent simultaneously), and presence awareness (who is currently in the session and what are they doing).
Shared Session Architecture
Build a session manager that maintains a single source of truth for the conversation state, with WebSocket connections for each participant broadcasting updates.
import asyncio
import uuid
import time
import json
from dataclasses import dataclass, field
from typing import Optional
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
@dataclass
class Participant:
user_id: str
display_name: str
ws: WebSocket
joined_at: float = field(default_factory=time.time)
is_typing: bool = False
@dataclass
class SharedMessage:
id: str
role: str # "user", "assistant", "system"
content: str
author_id: str
author_name: str
timestamp: float
class CollaborativeSession:
def __init__(self, session_id: str):
self.session_id = session_id
self.participants: dict[str, Participant] = {}
self.messages: list[SharedMessage] = []
self.lock = asyncio.Lock()
self.agent_busy = False
self.message_queue: asyncio.Queue = asyncio.Queue()
async def add_participant(self, user_id: str, name: str, ws: WebSocket):
participant = Participant(user_id=user_id, display_name=name, ws=ws)
self.participants[user_id] = participant
# Send current state to new participant
await ws.send_json({
"type": "session_state",
"messages": [self._serialize_msg(m) for m in self.messages],
"participants": [
{"user_id": p.user_id, "name": p.display_name}
for p in self.participants.values()
],
})
# Notify others
await self._broadcast({
"type": "participant_joined",
"user_id": user_id,
"name": name,
}, exclude=user_id)
async def remove_participant(self, user_id: str):
self.participants.pop(user_id, None)
await self._broadcast({
"type": "participant_left",
"user_id": user_id,
})
async def submit_message(self, user_id: str, content: str):
async with self.lock:
participant = self.participants.get(user_id)
if not participant:
return
msg = SharedMessage(
id=str(uuid.uuid4()),
role="user",
content=content,
author_id=user_id,
author_name=participant.display_name,
timestamp=time.time(),
)
self.messages.append(msg)
await self._broadcast({
"type": "new_message",
"message": self._serialize_msg(msg),
})
# Queue for agent processing
await self.message_queue.put(msg)
async def _broadcast(self, data: dict, exclude: str = ""):
disconnected = []
for uid, participant in self.participants.items():
if uid == exclude:
continue
try:
await participant.ws.send_json(data)
except Exception:
disconnected.append(uid)
for uid in disconnected:
self.participants.pop(uid, None)
def _serialize_msg(self, msg: SharedMessage) -> dict:
return {
"id": msg.id,
"role": msg.role,
"content": msg.content,
"author_id": msg.author_id,
"author_name": msg.author_name,
"timestamp": msg.timestamp,
}
The asyncio.Lock on message submission prevents race conditions where two simultaneous submissions could create inconsistent message ordering. Every participant's WebSocket receives the same broadcast, ensuring state consistency.
Turn Management Strategies
When multiple users can prompt an AI agent, you need a strategy for handling concurrent inputs. Three common approaches are queuing, merging, and explicit turn-taking.
class TurnManager:
"""Manages concurrent user inputs to a shared agent."""
def __init__(self, session: CollaborativeSession):
self.session = session
self.processing = False
async def start_processing_loop(self):
"""Sequential processing: messages are queued and handled in order."""
while True:
msg = await self.session.message_queue.get()
self.processing = True
await self.session._broadcast({
"type": "agent_status",
"status": "thinking",
"triggered_by": msg.author_name,
})
response = await self._run_agent(self.session.messages)
agent_msg = SharedMessage(
id=str(uuid.uuid4()),
role="assistant",
content=response,
author_id="agent",
author_name="AI Assistant",
timestamp=time.time(),
)
async with self.session.lock:
self.session.messages.append(agent_msg)
await self.session._broadcast({
"type": "new_message",
"message": self.session._serialize_msg(agent_msg),
})
self.processing = False
await self.session._broadcast({
"type": "agent_status",
"status": "idle",
})
async def _run_agent(self, messages: list[SharedMessage]) -> str:
# Build conversation context from all messages
conversation = [
{"role": m.role, "content": f"[{m.author_name}]: {m.content}"}
if m.role == "user"
else {"role": m.role, "content": m.content}
for m in messages
]
# Call your LLM here
return await call_llm(conversation)
The sequential queue approach is the simplest and most predictable. Messages from different users arrive in the queue in order, and the agent processes them one at a time. While the agent is processing, the UI shows who triggered the current response, and new messages queue up naturally.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Presence and Typing Indicators
Real-time presence makes the collaborative experience feel alive. Broadcast typing indicators and cursor state so participants know what others are doing.
// Client-side presence broadcasting
class CollaborativeClient {
private ws: WebSocket;
private typingTimeout: NodeJS.Timeout | null = null;
constructor(private sessionId: string, private userId: string) {
this.ws = new WebSocket(
\`/ws/collaborative/\${sessionId}?user_id=\${userId}\`
);
this.ws.onmessage = (event) => this.handleMessage(JSON.parse(event.data));
}
sendTypingIndicator(): void {
this.ws.send(JSON.stringify({ type: "typing_start" }));
if (this.typingTimeout) clearTimeout(this.typingTimeout);
this.typingTimeout = setTimeout(() => {
this.ws.send(JSON.stringify({ type: "typing_stop" }));
}, 2000);
}
submitMessage(content: string): void {
if (this.typingTimeout) {
clearTimeout(this.typingTimeout);
this.typingTimeout = null;
}
this.ws.send(JSON.stringify({ type: "message", content }));
}
private handleMessage(data: any): void {
switch (data.type) {
case "new_message":
this.renderMessage(data.message);
break;
case "participant_joined":
this.addParticipant(data.user_id, data.name);
break;
case "participant_left":
this.removeParticipant(data.user_id);
break;
case "typing_indicator":
this.showTyping(data.user_id, data.is_typing);
break;
case "agent_status":
this.updateAgentStatus(data.status, data.triggered_by);
break;
}
}
}
Debouncing the typing indicator (2-second timeout) prevents flooding the WebSocket with rapid keystrokes. The stop signal is sent after 2 seconds of inactivity or immediately when the user submits the message.
Handling Late Joiners
When a participant joins an active session, they need the full conversation history and current state. The add_participant method sends a session_state snapshot. For long sessions, paginate the history to avoid sending thousands of messages at once.
async def add_participant_with_pagination(
self, user_id: str, name: str, ws: WebSocket, page_size: int = 50
):
participant = Participant(user_id=user_id, display_name=name, ws=ws)
self.participants[user_id] = participant
# Send recent messages first, older messages on demand
recent = self.messages[-page_size:]
await ws.send_json({
"type": "session_state",
"messages": [self._serialize_msg(m) for m in recent],
"total_messages": len(self.messages),
"has_more": len(self.messages) > page_size,
"participants": [
{"user_id": p.user_id, "name": p.display_name}
for p in self.participants.values()
],
})
FAQ
How do you prevent one user from monopolizing the AI agent in a shared session?
Implement rate limiting per user within the session. Track how many messages each user has sent in the last N seconds and either delay or reject messages that exceed the limit. You can also implement a fairness queue that alternates between users rather than processing strictly FIFO. For classroom scenarios, add a "hand raise" feature where users request a turn and the moderator (or the agent itself) decides who goes next.
What happens to the session state if the server restarts?
Persist session state to a durable store (Redis, PostgreSQL) on every message. On restart, reload active sessions from the store. Participants will reconnect automatically (via WebSocket reconnection logic) and receive the full session state on reconnection. The message queue may lose pending items, but since all submitted messages are already persisted, you can rebuild the queue from messages that have not yet received an agent response.
How do you handle conflicting instructions from different users in a collaborative session?
Include all user identities in the conversation context so the agent knows who said what. Add system instructions that guide the agent on conflict resolution — for example, "When users give contradictory instructions, acknowledge both perspectives and ask for clarification." The sequential queue naturally prevents true conflicts since the agent processes one message at a time with the full prior context. For more structured collaboration, assign roles (moderator, contributor, observer) with different permission levels.
#CollaborativeAI #MultiUser #RealTime #Concurrency #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.