AI Agent for Runbook Automation: Converting Human Procedures to Automated Workflows
Build an AI agent that parses human-written runbooks, converts them into executable automation workflows with verification steps, handles edge cases, and supports safe rollback.
The Runbook Problem
Every operations team has runbooks. Markdown documents or wiki pages that describe step-by-step procedures for common tasks: "How to restart the payment service," "How to failover the database," "How to clear the cache during a traffic spike." The problem is that runbooks are written for humans and executed inconsistently. Steps get skipped. Verification checks are forgotten. The on-call engineer at 3 AM misreads step 7 and makes things worse. An AI runbook automation agent converts these human procedures into executable, verifiable workflows.
Parsing Human-Written Runbooks
The agent reads runbooks in markdown format and extracts structured steps using an LLM.
import openai
import json
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class StepType(Enum):
COMMAND = "command" # Shell command to execute
VERIFICATION = "verify" # Check that something is true
DECISION = "decision" # Branch based on condition
WAIT = "wait" # Pause for a duration
NOTIFY = "notify" # Alert a human or channel
MANUAL = "manual" # Requires human action
@dataclass
class RunbookStep:
order: int
description: str
step_type: StepType
command: Optional[str] = None
expected_output: Optional[str] = None
timeout_seconds: int = 60
rollback_command: Optional[str] = None
on_failure: str = "abort" # "abort", "skip", "retry", "rollback"
retries: int = 0
conditions: dict = field(default_factory=dict)
@dataclass
class ParsedRunbook:
title: str
description: str
steps: list[RunbookStep]
prerequisites: list[str]
estimated_duration_minutes: int
risk_level: str
async def parse_runbook(markdown_content: str) -> ParsedRunbook:
client = openai.AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """You convert human-written runbooks into
structured automation specs. For each step, determine:
- Whether it is a command, verification, decision, wait, notification, or manual step
- The exact shell command (if applicable)
- What output to expect for verification
- What to do on failure
- The rollback command if the step needs to be undone"""},
{"role": "user", "content": f"""Parse this runbook into structured steps.
{markdown_content}
Return JSON with: title, description, steps (array of objects with: order,
description, step_type, command, expected_output, timeout_seconds,
rollback_command, on_failure, retries), prerequisites, estimated_duration_minutes,
risk_level."""},
],
response_format={"type": "json_object"},
temperature=0.0,
)
data = json.loads(response.choices[0].message.content)
steps = [
RunbookStep(
order=s["order"],
description=s["description"],
step_type=StepType(s["step_type"]),
command=s.get("command"),
expected_output=s.get("expected_output"),
timeout_seconds=s.get("timeout_seconds", 60),
rollback_command=s.get("rollback_command"),
on_failure=s.get("on_failure", "abort"),
retries=s.get("retries", 0),
) for s in data["steps"]
]
return ParsedRunbook(
title=data["title"],
description=data["description"],
steps=steps,
prerequisites=data.get("prerequisites", []),
estimated_duration_minutes=data.get("estimated_duration_minutes", 10),
risk_level=data.get("risk_level", "medium"),
)
The Execution Engine
The engine runs each step, verifies outcomes, handles failures, and supports rollback.
import subprocess
import asyncio
import logging
import re
from datetime import datetime
logger = logging.getLogger("runbook-agent")
@dataclass
class StepResult:
step: RunbookStep
success: bool
output: str
error: str
duration_seconds: float
rolled_back: bool = False
skipped: bool = False
class RunbookExecutor:
def __init__(self):
self.completed_steps: list[StepResult] = []
self.execution_log: list[dict] = []
async def execute(self, runbook: ParsedRunbook) -> list[StepResult]:
logger.info(f"Starting runbook: {runbook.title}")
self._log("start", f"Beginning execution of: {runbook.title}")
for step in runbook.steps:
result = await self._execute_step(step)
self.completed_steps.append(result)
if not result.success and step.on_failure == "abort":
logger.error(f"Step {step.order} failed. Aborting.")
self._log("abort", f"Aborted at step {step.order}")
await self._rollback()
break
elif not result.success and step.on_failure == "rollback":
await self._rollback()
break
self._log("complete", "Runbook execution finished")
return self.completed_steps
async def _execute_step(self, step: RunbookStep) -> StepResult:
self._log("step_start", f"Step {step.order}: {step.description}")
start = datetime.utcnow()
if step.step_type == StepType.MANUAL:
self._log("manual", f"Manual step required: {step.description}")
return StepResult(
step=step, success=True, output="Manual step - skipped in auto mode",
error="", duration_seconds=0, skipped=True,
)
if step.step_type == StepType.WAIT:
wait_secs = step.timeout_seconds
logger.info(f"Waiting {wait_secs} seconds...")
await asyncio.sleep(wait_secs)
return StepResult(
step=step, success=True, output=f"Waited {wait_secs}s",
error="", duration_seconds=wait_secs,
)
if step.step_type == StepType.NOTIFY:
await self._send_notification(step.description)
return StepResult(
step=step, success=True, output="Notification sent",
error="", duration_seconds=0,
)
# Execute command with retries
last_result = None
for attempt in range(step.retries + 1):
result = await self._run_command(step.command, step.timeout_seconds)
elapsed = (datetime.utcnow() - start).total_seconds()
if result.returncode == 0:
# Verify expected output if specified
if step.expected_output and step.step_type == StepType.VERIFICATION:
if not self._verify_output(result.stdout, step.expected_output):
last_result = StepResult(
step=step, success=False,
output=result.stdout, error="Output verification failed",
duration_seconds=elapsed,
)
continue
return StepResult(
step=step, success=True,
output=result.stdout, error=result.stderr,
duration_seconds=elapsed,
)
last_result = StepResult(
step=step, success=False,
output=result.stdout, error=result.stderr,
duration_seconds=elapsed,
)
if attempt < step.retries:
logger.warning(f"Step {step.order} attempt {attempt + 1} failed, retrying...")
await asyncio.sleep(5)
return last_result
async def _run_command(self, command: str, timeout: int):
return subprocess.run(
command, shell=True,
capture_output=True, text=True, timeout=timeout,
)
def _verify_output(self, actual: str, expected: str) -> bool:
"""Check if actual output contains expected pattern."""
return bool(re.search(expected, actual, re.IGNORECASE))
async def _rollback(self):
logger.warning("Initiating rollback...")
self._log("rollback_start", "Beginning rollback of completed steps")
for result in reversed(self.completed_steps):
if result.success and result.step.rollback_command:
logger.info(f"Rolling back step {result.step.order}")
rb_result = await self._run_command(
result.step.rollback_command, result.step.timeout_seconds
)
result.rolled_back = rb_result.returncode == 0
if not result.rolled_back:
logger.error(
f"Rollback failed for step {result.step.order}: "
f"{rb_result.stderr}"
)
async def _send_notification(self, message: str):
logger.info(f"Notification: {message}")
def _log(self, event: str, message: str):
self.execution_log.append({
"timestamp": datetime.utcnow().isoformat(),
"event": event,
"message": message,
})
Converting Runbooks to Reusable Workflows
The agent stores parsed runbooks as YAML workflow definitions that can be version-controlled and reused.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
# workflows/restart-payment-service.yaml
name: Restart Payment Service
description: Safely restart the payment service with zero downtime
risk_level: medium
estimated_duration: 5m
prerequisites:
- kubectl access to production namespace
- payment-service has at least 2 replicas
steps:
- order: 1
type: verify
description: Confirm service has multiple replicas
command: "kubectl get deploy payment-service -n production -o jsonpath='{.spec.replicas}'"
expected_output: "[2-9]"
on_failure: abort
- order: 2
type: command
description: Initiate rolling restart
command: "kubectl rollout restart deploy/payment-service -n production"
rollback_command: "kubectl rollout undo deploy/payment-service -n production"
on_failure: rollback
- order: 3
type: wait
description: Wait for pods to start rolling
timeout_seconds: 30
- order: 4
type: verify
description: Check rollout status
command: "kubectl rollout status deploy/payment-service -n production --timeout=120s"
expected_output: "successfully rolled out"
on_failure: rollback
retries: 2
- order: 5
type: verify
description: Verify service health
command: "curl -s http://payment-service.production.svc/health"
expected_output: "ok"
on_failure: rollback
retries: 3
Loading and Running YAML Workflows
import yaml
def load_workflow(path: str) -> ParsedRunbook:
with open(path) as f:
data = yaml.safe_load(f)
steps = [
RunbookStep(
order=s["order"],
description=s["description"],
step_type=StepType(s["type"]),
command=s.get("command"),
expected_output=s.get("expected_output"),
timeout_seconds=s.get("timeout_seconds", 60),
rollback_command=s.get("rollback_command"),
on_failure=s.get("on_failure", "abort"),
retries=s.get("retries", 0),
) for s in data["steps"]
]
return ParsedRunbook(
title=data["name"],
description=data["description"],
steps=steps,
prerequisites=data.get("prerequisites", []),
estimated_duration_minutes=int(data.get("estimated_duration", "10m").rstrip("m")),
risk_level=data.get("risk_level", "medium"),
)
async def main():
runbook = load_workflow("workflows/restart-payment-service.yaml")
executor = RunbookExecutor()
results = await executor.execute(runbook)
succeeded = sum(1 for r in results if r.success)
failed = sum(1 for r in results if not r.success)
print(f"Results: {succeeded} succeeded, {failed} failed")
for r in results:
status = "OK" if r.success else "FAIL"
print(f" Step {r.step.order} [{status}]: {r.step.description}")
FAQ
How do I handle runbooks that require interactive input or judgment calls?
Mark those steps as StepType.MANUAL during parsing. The executor pauses at manual steps and sends a notification to the on-call engineer with the context of what has been done so far and what decision is needed. The engineer provides input through Slack or a web UI, and the agent continues with the remaining automated steps.
What if the original runbook is outdated or contains wrong commands?
The agent validates each command in a dry-run mode before executing. For kubectl commands, it uses --dry-run=client. For destructive commands, it checks preconditions first. If a command fails during dry-run, the agent flags the step as needing review and suggests an updated command based on the current cluster state.
How do I version-control the automated workflows alongside the original runbooks?
Store both in the same Git repository. The original markdown runbook lives in docs/runbooks/ and the generated YAML workflow lives in workflows/. The agent includes a comment in each YAML file linking to the source runbook and the parse date. When the markdown is updated, a CI job re-parses it and creates a PR with the updated workflow for human review.
#RunbookAutomation #DevOps #SRE #Workflow #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.