Skip to content
Learn Agentic AI13 min read0 views

AI Agent for Wait Time Management: Real-Time Updates and Queue Position Notifications

Build an AI agent that tracks patient queue positions in real time, estimates accurate wait times using historical data, sends proactive notifications, and offers rebooking options when delays occur.

Why Wait Time Transparency Matters

Patient satisfaction scores drop significantly when perceived wait times exceed expectations. The key word is "perceived" — patients who receive proactive updates about delays report higher satisfaction than those who wait the same amount of time without any communication. A wait time management agent provides real-time visibility into the queue, accurate time estimates, and actionable options when delays occur.

Queue Tracking System

The queue system tracks each patient's position from check-in through being called back. It monitors the actual flow of patients through each stage of their visit.

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
from enum import Enum
import uuid


class PatientStage(Enum):
    CHECKED_IN = "checked_in"
    IN_WAITING_ROOM = "in_waiting_room"
    IN_OPERATORY = "in_operatory"
    WITH_PROVIDER = "with_provider"
    CHECKOUT = "checkout"
    DEPARTED = "departed"


@dataclass
class QueueEntry:
    id: str = field(
        default_factory=lambda: str(uuid.uuid4())
    )
    patient_id: str = ""
    patient_name: str = ""
    appointment_id: str = ""
    appointment_time: Optional[datetime] = None
    check_in_time: Optional[datetime] = None
    called_back_time: Optional[datetime] = None
    provider_id: str = ""
    appointment_type: str = ""
    estimated_duration_minutes: int = 30
    stage: PatientStage = PatientStage.CHECKED_IN
    position: int = 0
    estimated_wait_minutes: int = 0


class QueueManager:
    def __init__(self, db):
        self.db = db

    async def check_in_patient(
        self, appointment_id: str,
    ) -> QueueEntry:
        appt = await self.db.fetchrow("""
            SELECT a.id, a.patient_id,
                   p.first_name || ' ' || p.last_name AS name,
                   a.start_time, a.provider_id, a.type,
                   a.duration_minutes
            FROM appointments a
            JOIN patients p ON p.id = a.patient_id
            WHERE a.id = $1
        """, appointment_id)

        now = datetime.utcnow()
        position = await self._calculate_position(
            appt["provider_id"], now
        )

        entry = QueueEntry(
            patient_id=appt["patient_id"],
            patient_name=appt["name"],
            appointment_id=appointment_id,
            appointment_time=appt["start_time"],
            check_in_time=now,
            provider_id=appt["provider_id"],
            appointment_type=appt["type"],
            estimated_duration_minutes=appt["duration_minutes"],
            stage=PatientStage.CHECKED_IN,
            position=position,
        )

        entry.estimated_wait_minutes = (
            await self._estimate_wait(entry)
        )

        await self.db.execute("""
            INSERT INTO queue_entries
                (id, patient_id, appointment_id,
                 check_in_time, provider_id,
                 stage, position, estimated_wait)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
        """, entry.id, entry.patient_id, appointment_id,
             now, entry.provider_id, entry.stage.value,
             position, entry.estimated_wait_minutes)

        return entry

    async def _calculate_position(
        self, provider_id: str, now: datetime,
    ) -> int:
        count = await self.db.fetchrow("""
            SELECT COUNT(*) AS ahead
            FROM queue_entries
            WHERE provider_id = $1
              AND stage IN ('checked_in', 'in_waiting_room')
              AND check_in_time < $2
              AND DATE(check_in_time) = DATE($2)
        """, provider_id, now)
        return (count["ahead"] or 0) + 1

    async def update_stage(
        self, queue_id: str, new_stage: PatientStage,
    ):
        now = datetime.utcnow()
        updates = {"stage": new_stage.value}
        if new_stage == PatientStage.IN_OPERATORY:
            updates["called_back_time"] = now

        set_clause = ", ".join(
            f"{k} = ${i+2}" for i, k in enumerate(updates)
        )
        values = [queue_id] + list(updates.values())
        await self.db.execute(
            f"UPDATE queue_entries SET {set_clause} "
            f"WHERE id = $1",
            *values,
        )

        if new_stage in (
            PatientStage.IN_OPERATORY,
            PatientStage.DEPARTED,
        ):
            await self._recalculate_positions(
                queue_id
            )

    async def _recalculate_positions(self, queue_id):
        entry = await self.db.fetchrow(
            "SELECT provider_id FROM queue_entries "
            "WHERE id = $1", queue_id,
        )
        waiting = await self.db.fetch("""
            SELECT id FROM queue_entries
            WHERE provider_id = $1
              AND stage IN ('checked_in', 'in_waiting_room')
              AND DATE(check_in_time) = CURRENT_DATE
            ORDER BY check_in_time
        """, entry["provider_id"])

        for i, row in enumerate(waiting):
            await self.db.execute(
                "UPDATE queue_entries SET position = $2 "
                "WHERE id = $1",
                row["id"], i + 1,
            )

Wait Time Estimation

Accurate estimates require more than simple averages. The estimator uses historical data specific to the provider, day of week, and procedure type.

class WaitTimeEstimator:
    def __init__(self, db):
        self.db = db

    async def _estimate_wait(
        self, entry: QueueEntry,
    ) -> int:
        historical = await self.db.fetchrow("""
            SELECT
                AVG(
                    EXTRACT(EPOCH FROM (
                        called_back_time - check_in_time
                    )) / 60
                ) AS avg_wait,
                PERCENTILE_CONT(0.75) WITHIN GROUP (
                    ORDER BY EXTRACT(EPOCH FROM (
                        called_back_time - check_in_time
                    )) / 60
                ) AS p75_wait
            FROM queue_entries
            WHERE provider_id = $1
              AND EXTRACT(DOW FROM check_in_time) = $2
              AND called_back_time IS NOT NULL
              AND check_in_time > CURRENT_DATE
                  - INTERVAL '90 days'
        """, entry.provider_id,
             datetime.utcnow().weekday())

        if not historical or not historical["avg_wait"]:
            return entry.position * 15  # fallback

        base_wait = float(historical["avg_wait"])

        current_behind = await self.db.fetchrow("""
            SELECT
                SUM(
                    CASE WHEN stage = 'with_provider'
                    THEN EXTRACT(EPOCH FROM (
                        CURRENT_TIMESTAMP - called_back_time
                    )) / 60
                    ELSE 0 END
                ) AS current_overrun
            FROM queue_entries
            WHERE provider_id = $1
              AND stage = 'with_provider'
        """, entry.provider_id)

        overrun = float(
            current_behind["current_overrun"] or 0
        )
        schedule_drift = max(0, overrun - 10)

        estimated = (
            base_wait * entry.position + schedule_drift
        )
        return max(1, round(estimated))

    async def get_current_wait(
        self, patient_id: str,
    ) -> Optional[dict]:
        entry = await self.db.fetchrow("""
            SELECT * FROM queue_entries
            WHERE patient_id = $1
              AND stage IN ('checked_in', 'in_waiting_room')
              AND DATE(check_in_time) = CURRENT_DATE
        """, patient_id)

        if not entry:
            return None

        elapsed = (
            datetime.utcnow() - entry["check_in_time"]
        ).total_seconds() / 60

        return {
            "position": entry["position"],
            "estimated_wait": entry["estimated_wait"],
            "elapsed_minutes": round(elapsed),
            "remaining_minutes": max(
                0,
                entry["estimated_wait"] - round(elapsed),
            ),
            "stage": entry["stage"],
        }

Proactive Notification System

The agent sends notifications at key moments: when the patient checks in, when their estimated wait changes significantly, and when they are about to be called back.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class WaitTimeNotifier:
    def __init__(self, db, sms_client, push_service):
        self.db = db
        self.sms = sms_client
        self.push = push_service

    async def send_check_in_confirmation(
        self, entry: QueueEntry,
    ):
        message = (
            f"Hi {entry.patient_name.split()[0]}, "
            f"you are checked in. Your estimated wait is "
            f"about {entry.estimated_wait_minutes} minutes. "
            f"You are #{entry.position} in line. "
            f"We will text you when the provider is ready."
        )
        patient = await self.db.fetchrow(
            "SELECT phone FROM patients WHERE id = $1",
            entry.patient_id,
        )
        await self.sms.send(patient["phone"], message)

    async def check_for_delay_updates(self):
        waiting = await self.db.fetch("""
            SELECT qe.*, p.phone, p.first_name
            FROM queue_entries qe
            JOIN patients p ON p.id = qe.patient_id
            WHERE qe.stage IN ('checked_in', 'in_waiting_room')
              AND DATE(qe.check_in_time) = CURRENT_DATE
        """)

        estimator = WaitTimeEstimator(self.db)

        for entry_row in waiting:
            queue_entry = QueueEntry(
                id=entry_row["id"],
                patient_id=entry_row["patient_id"],
                provider_id=entry_row["provider_id"],
                position=entry_row["position"],
            )
            new_estimate = await estimator._estimate_wait(
                queue_entry
            )
            old_estimate = entry_row["estimated_wait"]

            if abs(new_estimate - old_estimate) >= 10:
                await self.db.execute(
                    "UPDATE queue_entries "
                    "SET estimated_wait = $2 WHERE id = $1",
                    entry_row["id"], new_estimate,
                )

                if new_estimate > old_estimate:
                    await self.sms.send(
                        entry_row["phone"],
                        f"Hi {entry_row['first_name']}, "
                        f"we are running a bit behind. "
                        f"Your updated wait is about "
                        f"{new_estimate} minutes. "
                        f"Thank you for your patience."
                    )

    async def send_ready_notification(
        self, queue_id: str,
    ):
        entry = await self.db.fetchrow("""
            SELECT qe.patient_id, p.phone, p.first_name
            FROM queue_entries qe
            JOIN patients p ON p.id = qe.patient_id
            WHERE qe.id = $1
        """, queue_id)

        await self.sms.send(
            entry["phone"],
            f"Hi {entry['first_name']}, we are ready "
            f"for you! Please come to the front desk.",
        )

Rebooking Options for Excessive Delays

When the estimated wait exceeds a threshold, the agent proactively offers the patient an option to reschedule rather than continuing to wait.

class RebookingManager:
    DELAY_THRESHOLD_MINUTES = 30

    def __init__(self, db, schedule_manager, sms_client):
        self.db = db
        self.scheduler = schedule_manager
        self.sms = sms_client

    async def offer_rebooking(self, queue_id: str):
        entry = await self.db.fetchrow("""
            SELECT qe.*, p.phone, p.first_name,
                   a.type, a.provider_id
            FROM queue_entries qe
            JOIN patients p ON p.id = qe.patient_id
            JOIN appointments a ON a.id = qe.appointment_id
            WHERE qe.id = $1
        """, queue_id)

        if entry["estimated_wait"] < self.DELAY_THRESHOLD_MINUTES:
            return

        from datetime import date as date_type
        next_slots = await self.scheduler.find_available_slots(
            appointment_type=entry["type"],
            preferred_date=date_type.today() + timedelta(days=1),
            provider_id=entry["provider_id"],
            search_days=5,
        )

        if next_slots:
            next_option = next_slots[0]
            await self.sms.send(
                entry["phone"],
                f"Hi {entry['first_name']}, we apologize "
                f"for the extended wait. If you would "
                f"prefer, we have an opening on "
                f"{next_option.start:%A at %I:%M %p}. "
                f"Reply REBOOK to reschedule or WAIT to "
                f"stay. Your current position is unchanged "
                f"either way."
            )

            await self.db.execute("""
                INSERT INTO rebooking_offers
                    (queue_id, offered_slot, offered_at)
                VALUES ($1, $2, $3)
            """, queue_id, next_option.start,
                 datetime.utcnow())

    async def process_rebooking_response(
        self, patient_id: str, response: str,
    ):
        if response.strip().upper() != "REBOOK":
            return {"action": "staying"}

        offer = await self.db.fetchrow("""
            SELECT rb.*, qe.appointment_id
            FROM rebooking_offers rb
            JOIN queue_entries qe ON qe.id = rb.queue_id
            WHERE qe.patient_id = $1
            ORDER BY rb.offered_at DESC LIMIT 1
        """, patient_id)

        if not offer:
            return {"action": "no_offer_found"}

        await self.db.execute(
            "UPDATE appointments SET status = 'rescheduled' "
            "WHERE id = $1", offer["appointment_id"],
        )
        await self.db.execute(
            "UPDATE queue_entries SET stage = 'departed' "
            "WHERE id = $1", offer["queue_id"],
        )

        return {
            "action": "rebooked",
            "new_time": offer["offered_slot"],
        }

FAQ

How does the agent estimate wait times accurately when procedures run longer than expected?

The estimator uses three data sources: historical averages for the specific provider and day of week, the real-time status of the patient currently with the provider (tracking overrun), and the scheduled durations of all patients ahead in the queue. When the current patient's procedure runs over its expected duration, the system detects the drift and adjusts all downstream estimates in real time. The P75 historical metric is used instead of the average to provide more conservative estimates that patients exceed less often.

What if patients leave the waiting room without telling the front desk?

The system integrates with check-in kiosks and can optionally use Bluetooth beacons or Wi-Fi presence detection to estimate whether a patient is still in the waiting area. If the system detects that a patient may have left, it sends a confirmation message asking if they are still waiting. After 15 minutes with no response and no detected presence, the queue entry is marked as "no show" and downstream patients' positions are updated automatically.

Can the wait time system work across multiple providers and operatories simultaneously?

Yes. The queue tracks each provider independently, so a delay with one provider does not affect the wait estimates for another. The system also accounts for shared resources like operatories and hygienists. When multiple providers share operatories, the estimator factors in room availability as a constraint on top of provider availability, providing a more accurate picture of actual wait times.


#WaitTime #QueueManagement #PatientExperience #HealthcareAI #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.