Skip to content
Learn Agentic AI14 min read0 views

Video Frame Analysis Agents: Object Tracking, Event Detection, and Timeline Generation

Learn how to build a video analysis agent that samples frames intelligently, detects and tracks objects across time, classifies events, and generates structured timelines for surveillance, sports, and content analysis applications.

From Continuous Video to Structured Events

Video is the richest data source available — a single security camera generates millions of frames per day. But raw video is nearly useless for automation. What you need is structured data: "Person entered at 14:32, stayed for 47 minutes, interacted with the checkout counter at 14:45."

A video analysis agent bridges this gap. It samples frames intelligently (not every frame — that would be wasteful), detects objects, tracks them across time, classifies events, and produces a structured timeline that downstream systems can query, alert on, or analyze.

Architecture of the Video Agent

The pipeline has four stages:

  1. Intelligent frame sampling — select frames that contain meaningful changes
  2. Object detection — identify objects of interest in each sampled frame
  3. Object tracking — maintain identity across frames as objects move
  4. Event classification and timeline generation — interpret object behaviors as events

Intelligent Frame Sampling

Processing every frame of a 30fps video is wasteful when most consecutive frames are nearly identical. Sample based on visual change:

import cv2
import numpy as np
from dataclasses import dataclass


@dataclass
class SampledFrame:
    frame_number: int
    timestamp: float       # seconds
    image: np.ndarray
    change_score: float    # how different from previous sample


def sample_frames_by_change(
    video_path: str,
    change_threshold: float = 30.0,
    min_interval: float = 0.5,   # minimum seconds between samples
    max_interval: float = 5.0,   # maximum seconds between samples
) -> list[SampledFrame]:
    """Sample frames based on visual change detection."""
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    samples = []
    prev_gray = None
    frame_num = 0
    last_sample_time = -max_interval  # Force first frame

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        timestamp = frame_num / fps
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray = cv2.GaussianBlur(gray, (21, 21), 0)

        if prev_gray is not None:
            # Compute frame difference
            diff = cv2.absdiff(prev_gray, gray)
            change_score = float(np.mean(diff))

            time_since_last = timestamp - last_sample_time

            should_sample = (
                (change_score > change_threshold and
                 time_since_last >= min_interval) or
                time_since_last >= max_interval
            )

            if should_sample:
                samples.append(SampledFrame(
                    frame_number=frame_num,
                    timestamp=timestamp,
                    image=frame.copy(),
                    change_score=change_score,
                ))
                last_sample_time = timestamp
        else:
            # Always sample the first frame
            samples.append(SampledFrame(
                frame_number=0,
                timestamp=0.0,
                image=frame.copy(),
                change_score=0.0,
            ))
            last_sample_time = 0.0

        prev_gray = gray
        frame_num += 1

    cap.release()
    return samples

Object Detection on Sampled Frames

Use a pre-trained detection model to find objects in each frame:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from dataclasses import field


@dataclass
class Detection:
    class_name: str
    confidence: float
    bbox: tuple          # (x1, y1, x2, y2)
    center: tuple        # (cx, cy)
    frame_number: int
    timestamp: float
    track_id: int = -1   # assigned during tracking


def detect_objects_yolo(
    frame: SampledFrame,
) -> list[Detection]:
    """Detect objects using YOLO (via OpenCV DNN)."""
    blob = cv2.dnn.blobFromImage(
        frame.image, 1/255.0, (416, 416),
        swapRB=True, crop=False
    )

    # Load YOLO network (cache in production)
    net = cv2.dnn.readNetFromDarknet(
        "yolov4.cfg", "yolov4.weights"
    )
    layer_names = net.getUnconnectedOutLayersNames()

    net.setInput(blob)
    outputs = net.forward(layer_names)

    detections = []
    h, w = frame.image.shape[:2]
    conf_threshold = 0.5

    for output in outputs:
        for detection in output:
            scores = detection[5:]
            class_id = int(np.argmax(scores))
            confidence = float(scores[class_id])

            if confidence > conf_threshold:
                cx = int(detection[0] * w)
                cy = int(detection[1] * h)
                bw = int(detection[2] * w)
                bh = int(detection[3] * h)

                x1 = cx - bw // 2
                y1 = cy - bh // 2

                detections.append(Detection(
                    class_name=COCO_CLASSES[class_id],
                    confidence=confidence,
                    bbox=(x1, y1, x1 + bw, y1 + bh),
                    center=(cx, cy),
                    frame_number=frame.frame_number,
                    timestamp=frame.timestamp,
                ))

    return apply_nms(detections)


def apply_nms(
    detections: list[Detection],
    iou_threshold: float = 0.4,
) -> list[Detection]:
    """Apply non-maximum suppression to remove overlapping boxes."""
    if not detections:
        return []

    boxes = np.array([d.bbox for d in detections])
    scores = np.array([d.confidence for d in detections])

    indices = cv2.dnn.NMSBoxes(
        boxes.tolist(), scores.tolist(),
        score_threshold=0.5,
        nms_threshold=iou_threshold,
    )

    if len(indices) > 0:
        indices = indices.flatten()
        return [detections[i] for i in indices]
    return []

Simple Object Tracking Across Frames

Track objects by matching detections across consecutive frames using IoU (Intersection over Union):

def compute_iou(box1: tuple, box2: tuple) -> float:
    """Compute IoU between two bounding boxes."""
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union = area1 + area2 - intersection

    return intersection / union if union > 0 else 0.0


class SimpleTracker:
    """Track objects across frames using IoU matching."""

    def __init__(self, iou_threshold: float = 0.3):
        self.next_id = 0
        self.active_tracks: dict[int, Detection] = {}
        self.iou_threshold = iou_threshold

    def update(
        self, detections: list[Detection]
    ) -> list[Detection]:
        """Match new detections to existing tracks."""
        if not self.active_tracks:
            for det in detections:
                det.track_id = self.next_id
                self.active_tracks[self.next_id] = det
                self.next_id += 1
            return detections

        # Compute IoU matrix
        track_ids = list(self.active_tracks.keys())
        matched = set()
        matched_tracks = set()

        for i, det in enumerate(detections):
            best_iou = 0.0
            best_track = -1

            for track_id in track_ids:
                if track_id in matched_tracks:
                    continue
                prev = self.active_tracks[track_id]
                if prev.class_name != det.class_name:
                    continue

                iou = compute_iou(prev.bbox, det.bbox)
                if iou > best_iou:
                    best_iou = iou
                    best_track = track_id

            if best_iou >= self.iou_threshold:
                det.track_id = best_track
                self.active_tracks[best_track] = det
                matched.add(i)
                matched_tracks.add(best_track)
            else:
                det.track_id = self.next_id
                self.active_tracks[self.next_id] = det
                self.next_id += 1

        # Remove tracks that were not matched
        for track_id in track_ids:
            if track_id not in matched_tracks:
                del self.active_tracks[track_id]

        return detections

Event Detection and Classification

Convert tracked object movements into semantic events:

@dataclass
class Event:
    event_type: str
    start_time: float
    end_time: float | None
    track_id: int
    object_class: str
    description: str
    metadata: dict = field(default_factory=dict)


class EventDetector:
    """Detect events from tracked object sequences."""

    def __init__(self):
        self.track_history: dict[int, list[Detection]] = {}
        self.events: list[Event] = []

    def process_detections(
        self, detections: list[Detection]
    ) -> list[Event]:
        """Process new detections and detect events."""
        new_events = []

        for det in detections:
            if det.track_id not in self.track_history:
                # New object appeared — entry event
                self.track_history[det.track_id] = [det]
                new_events.append(Event(
                    event_type="entry",
                    start_time=det.timestamp,
                    end_time=None,
                    track_id=det.track_id,
                    object_class=det.class_name,
                    description=f"{det.class_name} entered the scene",
                ))
            else:
                history = self.track_history[det.track_id]
                history.append(det)

                # Detect stopped/stationary objects
                if len(history) >= 5:
                    recent = history[-5:]
                    movement = np.mean([
                        np.sqrt(
                            (recent[j].center[0] - recent[j-1].center[0])**2 +
                            (recent[j].center[1] - recent[j-1].center[1])**2
                        )
                        for j in range(1, len(recent))
                    ])

                    if movement < 10:
                        duration = det.timestamp - recent[0].timestamp
                        if duration > 30:  # Stationary for 30+ seconds
                            new_events.append(Event(
                                event_type="stationary",
                                start_time=recent[0].timestamp,
                                end_time=det.timestamp,
                                track_id=det.track_id,
                                object_class=det.class_name,
                                description=(
                                    f"{det.class_name} stationary for "
                                    f"{duration:.0f}s"
                                ),
                                metadata={"duration": duration},
                            ))

        self.events.extend(new_events)
        return new_events

Timeline Generation

Compile all events into a structured, queryable timeline:

import json
from datetime import datetime, timedelta


def generate_timeline(
    events: list[Event],
    video_start_time: datetime | None = None,
) -> dict:
    """Generate a structured timeline from detected events."""
    base_time = video_start_time or datetime.utcnow()

    timeline = {
        "video_start": base_time.isoformat(),
        "total_events": len(events),
        "event_types": {},
        "events": [],
    }

    for event in sorted(events, key=lambda e: e.start_time):
        abs_start = base_time + timedelta(seconds=event.start_time)
        abs_end = (
            base_time + timedelta(seconds=event.end_time)
            if event.end_time else None
        )

        timeline["events"].append({
            "type": event.event_type,
            "timestamp": abs_start.isoformat(),
            "end_timestamp": abs_end.isoformat() if abs_end else None,
            "relative_seconds": event.start_time,
            "object": event.object_class,
            "track_id": event.track_id,
            "description": event.description,
            "metadata": event.metadata,
        })

        # Count by type
        timeline["event_types"][event.event_type] = (
            timeline["event_types"].get(event.event_type, 0) + 1
        )

    return timeline

FAQ

How do I choose the right frame sampling rate?

It depends on the speed of events you need to capture. For surveillance with slow-moving people, sampling every 1-2 seconds (or on change detection) is sufficient. For sports analysis with fast action, you may need 5-10 fps. Start with change-based sampling and tune the threshold: too low captures noise, too high misses events. Monitor your event detection accuracy and adjust.

What is the difference between IoU-based tracking and deep learning trackers?

IoU-based tracking is simple, fast, and works well when objects move slowly between frames. It fails when objects move far between samples, overlap frequently, or leave and re-enter the frame. Deep learning trackers like DeepSORT add appearance features (a Re-ID model) so they can re-identify objects even after occlusion or camera cuts. For production surveillance, DeepSORT or ByteTrack is strongly recommended.

How do I handle multiple camera views of the same scene?

Multi-camera tracking requires re-identification across views. Each camera runs its own detection and tracking pipeline, then a cross-camera matching stage uses appearance features and spatial calibration to link tracks across views. This is an active research area — the simplest approach is to use a shared Re-ID embedding model and match tracks by visual similarity when an object disappears from one camera and appears in another within a plausible time window.


#VideoAnalysis #ObjectTracking #EventDetection #ComputerVision #Surveillance #TimelineGeneration #AgenticAI #Python

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.