Skip to content
Learn Agentic AI10 min read0 views

AI Agent for Runbook Automation: Converting Human Procedures to Automated Workflows

Build an AI agent that parses human-written runbooks, converts them into executable automation workflows with verification steps, handles edge cases, and supports safe rollback.

The Runbook Problem

Every operations team has runbooks. Markdown documents or wiki pages that describe step-by-step procedures for common tasks: "How to restart the payment service," "How to failover the database," "How to clear the cache during a traffic spike." The problem is that runbooks are written for humans and executed inconsistently. Steps get skipped. Verification checks are forgotten. The on-call engineer at 3 AM misreads step 7 and makes things worse. An AI runbook automation agent converts these human procedures into executable, verifiable workflows.

Parsing Human-Written Runbooks

The agent reads runbooks in markdown format and extracts structured steps using an LLM.

import openai
import json
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class StepType(Enum):
    COMMAND = "command"       # Shell command to execute
    VERIFICATION = "verify"  # Check that something is true
    DECISION = "decision"    # Branch based on condition
    WAIT = "wait"            # Pause for a duration
    NOTIFY = "notify"        # Alert a human or channel
    MANUAL = "manual"        # Requires human action

@dataclass
class RunbookStep:
    order: int
    description: str
    step_type: StepType
    command: Optional[str] = None
    expected_output: Optional[str] = None
    timeout_seconds: int = 60
    rollback_command: Optional[str] = None
    on_failure: str = "abort"  # "abort", "skip", "retry", "rollback"
    retries: int = 0
    conditions: dict = field(default_factory=dict)

@dataclass
class ParsedRunbook:
    title: str
    description: str
    steps: list[RunbookStep]
    prerequisites: list[str]
    estimated_duration_minutes: int
    risk_level: str

async def parse_runbook(markdown_content: str) -> ParsedRunbook:
    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """You convert human-written runbooks into
structured automation specs. For each step, determine:
- Whether it is a command, verification, decision, wait, notification, or manual step
- The exact shell command (if applicable)
- What output to expect for verification
- What to do on failure
- The rollback command if the step needs to be undone"""},
            {"role": "user", "content": f"""Parse this runbook into structured steps.

{markdown_content}

Return JSON with: title, description, steps (array of objects with: order,
description, step_type, command, expected_output, timeout_seconds,
rollback_command, on_failure, retries), prerequisites, estimated_duration_minutes,
risk_level."""},
        ],
        response_format={"type": "json_object"},
        temperature=0.0,
    )
    data = json.loads(response.choices[0].message.content)

    steps = [
        RunbookStep(
            order=s["order"],
            description=s["description"],
            step_type=StepType(s["step_type"]),
            command=s.get("command"),
            expected_output=s.get("expected_output"),
            timeout_seconds=s.get("timeout_seconds", 60),
            rollback_command=s.get("rollback_command"),
            on_failure=s.get("on_failure", "abort"),
            retries=s.get("retries", 0),
        ) for s in data["steps"]
    ]

    return ParsedRunbook(
        title=data["title"],
        description=data["description"],
        steps=steps,
        prerequisites=data.get("prerequisites", []),
        estimated_duration_minutes=data.get("estimated_duration_minutes", 10),
        risk_level=data.get("risk_level", "medium"),
    )

The Execution Engine

The engine runs each step, verifies outcomes, handles failures, and supports rollback.

import subprocess
import asyncio
import logging
import re
from datetime import datetime

logger = logging.getLogger("runbook-agent")

@dataclass
class StepResult:
    step: RunbookStep
    success: bool
    output: str
    error: str
    duration_seconds: float
    rolled_back: bool = False
    skipped: bool = False

class RunbookExecutor:
    def __init__(self):
        self.completed_steps: list[StepResult] = []
        self.execution_log: list[dict] = []

    async def execute(self, runbook: ParsedRunbook) -> list[StepResult]:
        logger.info(f"Starting runbook: {runbook.title}")
        self._log("start", f"Beginning execution of: {runbook.title}")

        for step in runbook.steps:
            result = await self._execute_step(step)
            self.completed_steps.append(result)

            if not result.success and step.on_failure == "abort":
                logger.error(f"Step {step.order} failed. Aborting.")
                self._log("abort", f"Aborted at step {step.order}")
                await self._rollback()
                break
            elif not result.success and step.on_failure == "rollback":
                await self._rollback()
                break

        self._log("complete", "Runbook execution finished")
        return self.completed_steps

    async def _execute_step(self, step: RunbookStep) -> StepResult:
        self._log("step_start", f"Step {step.order}: {step.description}")
        start = datetime.utcnow()

        if step.step_type == StepType.MANUAL:
            self._log("manual", f"Manual step required: {step.description}")
            return StepResult(
                step=step, success=True, output="Manual step - skipped in auto mode",
                error="", duration_seconds=0, skipped=True,
            )

        if step.step_type == StepType.WAIT:
            wait_secs = step.timeout_seconds
            logger.info(f"Waiting {wait_secs} seconds...")
            await asyncio.sleep(wait_secs)
            return StepResult(
                step=step, success=True, output=f"Waited {wait_secs}s",
                error="", duration_seconds=wait_secs,
            )

        if step.step_type == StepType.NOTIFY:
            await self._send_notification(step.description)
            return StepResult(
                step=step, success=True, output="Notification sent",
                error="", duration_seconds=0,
            )

        # Execute command with retries
        last_result = None
        for attempt in range(step.retries + 1):
            result = await self._run_command(step.command, step.timeout_seconds)
            elapsed = (datetime.utcnow() - start).total_seconds()

            if result.returncode == 0:
                # Verify expected output if specified
                if step.expected_output and step.step_type == StepType.VERIFICATION:
                    if not self._verify_output(result.stdout, step.expected_output):
                        last_result = StepResult(
                            step=step, success=False,
                            output=result.stdout, error="Output verification failed",
                            duration_seconds=elapsed,
                        )
                        continue

                return StepResult(
                    step=step, success=True,
                    output=result.stdout, error=result.stderr,
                    duration_seconds=elapsed,
                )

            last_result = StepResult(
                step=step, success=False,
                output=result.stdout, error=result.stderr,
                duration_seconds=elapsed,
            )
            if attempt < step.retries:
                logger.warning(f"Step {step.order} attempt {attempt + 1} failed, retrying...")
                await asyncio.sleep(5)

        return last_result

    async def _run_command(self, command: str, timeout: int):
        return subprocess.run(
            command, shell=True,
            capture_output=True, text=True, timeout=timeout,
        )

    def _verify_output(self, actual: str, expected: str) -> bool:
        """Check if actual output contains expected pattern."""
        return bool(re.search(expected, actual, re.IGNORECASE))

    async def _rollback(self):
        logger.warning("Initiating rollback...")
        self._log("rollback_start", "Beginning rollback of completed steps")

        for result in reversed(self.completed_steps):
            if result.success and result.step.rollback_command:
                logger.info(f"Rolling back step {result.step.order}")
                rb_result = await self._run_command(
                    result.step.rollback_command, result.step.timeout_seconds
                )
                result.rolled_back = rb_result.returncode == 0
                if not result.rolled_back:
                    logger.error(
                        f"Rollback failed for step {result.step.order}: "
                        f"{rb_result.stderr}"
                    )

    async def _send_notification(self, message: str):
        logger.info(f"Notification: {message}")

    def _log(self, event: str, message: str):
        self.execution_log.append({
            "timestamp": datetime.utcnow().isoformat(),
            "event": event,
            "message": message,
        })

Converting Runbooks to Reusable Workflows

The agent stores parsed runbooks as YAML workflow definitions that can be version-controlled and reused.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

# workflows/restart-payment-service.yaml
name: Restart Payment Service
description: Safely restart the payment service with zero downtime
risk_level: medium
estimated_duration: 5m
prerequisites:
  - kubectl access to production namespace
  - payment-service has at least 2 replicas

steps:
  - order: 1
    type: verify
    description: Confirm service has multiple replicas
    command: "kubectl get deploy payment-service -n production -o jsonpath='{.spec.replicas}'"
    expected_output: "[2-9]"
    on_failure: abort

  - order: 2
    type: command
    description: Initiate rolling restart
    command: "kubectl rollout restart deploy/payment-service -n production"
    rollback_command: "kubectl rollout undo deploy/payment-service -n production"
    on_failure: rollback

  - order: 3
    type: wait
    description: Wait for pods to start rolling
    timeout_seconds: 30

  - order: 4
    type: verify
    description: Check rollout status
    command: "kubectl rollout status deploy/payment-service -n production --timeout=120s"
    expected_output: "successfully rolled out"
    on_failure: rollback
    retries: 2

  - order: 5
    type: verify
    description: Verify service health
    command: "curl -s http://payment-service.production.svc/health"
    expected_output: "ok"
    on_failure: rollback
    retries: 3

Loading and Running YAML Workflows

import yaml

def load_workflow(path: str) -> ParsedRunbook:
    with open(path) as f:
        data = yaml.safe_load(f)

    steps = [
        RunbookStep(
            order=s["order"],
            description=s["description"],
            step_type=StepType(s["type"]),
            command=s.get("command"),
            expected_output=s.get("expected_output"),
            timeout_seconds=s.get("timeout_seconds", 60),
            rollback_command=s.get("rollback_command"),
            on_failure=s.get("on_failure", "abort"),
            retries=s.get("retries", 0),
        ) for s in data["steps"]
    ]

    return ParsedRunbook(
        title=data["name"],
        description=data["description"],
        steps=steps,
        prerequisites=data.get("prerequisites", []),
        estimated_duration_minutes=int(data.get("estimated_duration", "10m").rstrip("m")),
        risk_level=data.get("risk_level", "medium"),
    )

async def main():
    runbook = load_workflow("workflows/restart-payment-service.yaml")
    executor = RunbookExecutor()
    results = await executor.execute(runbook)

    succeeded = sum(1 for r in results if r.success)
    failed = sum(1 for r in results if not r.success)
    print(f"Results: {succeeded} succeeded, {failed} failed")

    for r in results:
        status = "OK" if r.success else "FAIL"
        print(f"  Step {r.step.order} [{status}]: {r.step.description}")

FAQ

How do I handle runbooks that require interactive input or judgment calls?

Mark those steps as StepType.MANUAL during parsing. The executor pauses at manual steps and sends a notification to the on-call engineer with the context of what has been done so far and what decision is needed. The engineer provides input through Slack or a web UI, and the agent continues with the remaining automated steps.

What if the original runbook is outdated or contains wrong commands?

The agent validates each command in a dry-run mode before executing. For kubectl commands, it uses --dry-run=client. For destructive commands, it checks preconditions first. If a command fails during dry-run, the agent flags the step as needing review and suggests an updated command based on the current cluster state.

How do I version-control the automated workflows alongside the original runbooks?

Store both in the same Git repository. The original markdown runbook lives in docs/runbooks/ and the generated YAML workflow lives in workflows/. The agent includes a comment in each YAML file linking to the source runbook and the parse date. When the markdown is updated, a CI job re-parses it and creates a PR with the updated workflow for human review.


#RunbookAutomation #DevOps #SRE #Workflow #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.