Skip to content
Learn Agentic AI11 min read0 views

Building a Conversations API: CRUD Operations for Agent Chat Sessions

Design and implement a full Conversations API for AI agent chat sessions. Covers resource modeling, conversation lifecycle, message threading, metadata management, and FastAPI implementation patterns.

Designing the Conversation Resource Model

A Conversations API is the backbone of any AI agent platform. It manages the lifecycle of chat sessions, organizes messages into threads, tracks metadata like token usage and model configuration, and provides the history that agents use for context.

The resource hierarchy follows a natural pattern: an agent has many conversations, and each conversation has many messages. Messages can have different roles (user, assistant, system, tool) and may include structured metadata like tool call results.

Database Schema

Start with the data model. Two core tables handle the conversation and message resources.

from sqlalchemy import (
    Column, String, Text, JSON, DateTime, Integer,
    ForeignKey, Enum as SAEnum, func,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
import uuid
import enum

class ConversationStatus(str, enum.Enum):
    active = "active"
    archived = "archived"
    deleted = "deleted"

class Conversation(Base):
    __tablename__ = "conversations"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    agent_id = Column(String(100), nullable=False, index=True)
    title = Column(String(500), nullable=True)
    status = Column(
        SAEnum(ConversationStatus),
        default=ConversationStatus.active,
        nullable=False,
    )
    metadata_ = Column("metadata", JSON, default=dict)
    model = Column(String(100), nullable=True)
    total_tokens = Column(Integer, default=0)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(
        DateTime, server_default=func.now(), onupdate=func.now()
    )

    messages = relationship("Message", back_populates="conversation")

class Message(Base):
    __tablename__ = "messages"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    conversation_id = Column(
        UUID(as_uuid=True),
        ForeignKey("conversations.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    role = Column(String(20), nullable=False)  # user, assistant, system, tool
    content = Column(Text, nullable=True)
    tool_calls = Column(JSON, nullable=True)
    tool_call_id = Column(String(100), nullable=True)
    tokens = Column(Integer, default=0)
    created_at = Column(DateTime, server_default=func.now())

    conversation = relationship("Conversation", back_populates="messages")

CRUD Endpoints

The API follows RESTful conventions with conversations as the top-level resource and messages nested beneath them.

from fastapi import FastAPI, HTTPException, Depends, Query
from pydantic import BaseModel, Field

app = FastAPI()

class CreateConversation(BaseModel):
    agent_id: str
    title: str | None = None
    model: str = "gpt-4o"
    metadata: dict = Field(default_factory=dict)

class CreateMessage(BaseModel):
    role: str
    content: str | None = None
    tool_calls: list[dict] | None = None
    tool_call_id: str | None = None

@app.post("/v1/conversations", status_code=201)
async def create_conversation(
    body: CreateConversation,
    db: AsyncSession = Depends(get_db),
):
    conv = Conversation(
        agent_id=body.agent_id,
        title=body.title,
        model=body.model,
        metadata_=body.metadata,
    )
    db.add(conv)
    await db.commit()
    await db.refresh(conv)
    return conv.to_dict()

@app.get("/v1/conversations/{conversation_id}")
async def get_conversation(
    conversation_id: str,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv or conv.status == ConversationStatus.deleted:
        raise HTTPException(status_code=404, detail="Conversation not found")
    return conv.to_dict()

@app.patch("/v1/conversations/{conversation_id}")
async def update_conversation(
    conversation_id: str,
    body: dict,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv:
        raise HTTPException(status_code=404, detail="Conversation not found")

    allowed_fields = {"title", "metadata", "status"}
    for key, value in body.items():
        if key in allowed_fields:
            setattr(conv, key if key != "metadata" else "metadata_", value)

    await db.commit()
    await db.refresh(conv)
    return conv.to_dict()

Adding Messages to a Conversation

Messages are appended to a conversation and ordered by creation time. The endpoint also updates the conversation's token count and timestamp.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

@app.post(
    "/v1/conversations/{conversation_id}/messages",
    status_code=201,
)
async def add_message(
    conversation_id: str,
    body: CreateMessage,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv or conv.status != ConversationStatus.active:
        raise HTTPException(
            status_code=404,
            detail="Active conversation not found",
        )

    msg = Message(
        conversation_id=conv.id,
        role=body.role,
        content=body.content,
        tool_calls=body.tool_calls,
        tool_call_id=body.tool_call_id,
    )
    db.add(msg)

    conv.updated_at = func.now()
    await db.commit()
    await db.refresh(msg)

    return msg.to_dict()

@app.get("/v1/conversations/{conversation_id}/messages")
async def list_messages(
    conversation_id: str,
    cursor: str | None = Query(None),
    limit: int = Query(50, ge=1, le=200),
    db: AsyncSession = Depends(get_db),
):
    query = (
        select(Message)
        .where(Message.conversation_id == conversation_id)
        .order_by(Message.created_at.asc())
    )

    if cursor:
        decoded = decode_cursor(cursor)
        query = query.where(Message.created_at > decoded["created_at"])

    rows = await db.execute(query.limit(limit + 1))
    messages = rows.scalars().all()

    has_more = len(messages) > limit
    messages = messages[:limit]

    return {
        "data": [m.to_dict() for m in messages],
        "has_more": has_more,
        "next_cursor": encode_cursor(
            messages[-1].created_at.isoformat(),
            str(messages[-1].id),
        ) if has_more else None,
    }

Conversation Lifecycle: Archive and Soft Delete

Rather than hard-deleting conversations, use status transitions. Active conversations can be archived (hidden from default listings but still accessible) or soft-deleted (excluded from all queries, eligible for permanent deletion after a retention period).

@app.post("/v1/conversations/{conversation_id}/archive")
async def archive_conversation(
    conversation_id: str,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv:
        raise HTTPException(status_code=404)
    conv.status = ConversationStatus.archived
    await db.commit()
    return {"status": "archived"}

@app.delete("/v1/conversations/{conversation_id}", status_code=204)
async def delete_conversation(
    conversation_id: str,
    db: AsyncSession = Depends(get_db),
):
    conv = await db.get(Conversation, conversation_id)
    if not conv:
        raise HTTPException(status_code=404)
    conv.status = ConversationStatus.deleted
    await db.commit()

FAQ

How should I handle conversation context windows for LLM calls?

Store all messages in the database for audit and history, but only send the most recent messages to the LLM, respecting the model's context window. Implement a context builder that trims from the oldest messages first while always preserving the system prompt. Track token counts per message so you can calculate the window without re-tokenizing.

Should I use UUIDs or auto-increment integers for conversation IDs?

Use UUIDs for external-facing IDs. They are non-guessable (important for security), globally unique (simplifies distributed systems), and do not leak information about the total number of conversations. Use auto-increment integers internally if you need efficient keyset pagination. You can expose the UUID and use the integer for internal ordering.

How do I handle concurrent writes to the same conversation?

Use database-level ordering by relying on created_at timestamps with sufficient precision (microseconds) combined with the message UUID as a tiebreaker. For the conversation's updated_at field, use the database's NOW() function rather than application time to avoid clock skew. If multiple agents write to the same conversation, consider optimistic locking with a version column to detect conflicts.


#ConversationsAPI #CRUD #ChatSessions #FastAPI #APIDesign #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.