Playwright with Async Python: Concurrent Browser Automation for AI Agents
Learn how to use Playwright's async API with Python asyncio to run concurrent browser sessions, parallelize page interactions, and build high-throughput AI agent automation pipelines.
Why Async Matters for Browser Automation
Browser automation is inherently I/O-bound — most of the time is spent waiting for pages to load, elements to appear, and network requests to complete. Synchronous Playwright wastes this idle time by blocking the Python thread. Async Playwright, using Python's asyncio, lets your AI agent do useful work while waiting: processing data from a previous page, launching another browser tab, or calling an LLM API.
For agents that need to scrape multiple sites, interact with multiple accounts, or run parallel browser sessions, async Playwright can deliver 5-10x throughput improvements over synchronous code.
Async Playwright Basics
The async API mirrors the sync API exactly, but every method that performs I/O becomes a coroutine:
import asyncio
from playwright.async_api import async_playwright
async def main():
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto("https://example.com")
title = await page.title()
print(f"Title: {title}")
content = await page.locator("h1").text_content()
print(f"Heading: {content}")
await browser.close()
asyncio.run(main())
Notice the pattern: sync_playwright() becomes async_playwright(), and every Playwright method gets an await prefix. The import changes from playwright.sync_api to playwright.async_api.
Running Multiple Pages Concurrently
The real power of async Playwright is running multiple pages at the same time:
import asyncio
from playwright.async_api import async_playwright
async def scrape_page(browser, url: str) -> dict:
"""Scrape a single page in its own context."""
context = await browser.new_context()
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle", timeout=15000)
return {
"url": url,
"title": await page.title(),
"heading": await page.locator("h1").text_content()
if await page.locator("h1").count() > 0 else None,
}
except Exception as e:
return {"url": url, "error": str(e)}
finally:
await context.close()
async def main():
urls = [
"https://example.com",
"https://httpbin.org",
"https://jsonplaceholder.typicode.com",
"https://reqres.in",
"https://dummyjson.com",
]
async with async_playwright() as p:
browser = await p.chromium.launch()
# Scrape all pages concurrently
tasks = [scrape_page(browser, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
if "error" in result:
print(f"FAILED: {result['url']} - {result['error']}")
else:
print(f"OK: {result['title']} ({result['url']})")
await browser.close()
asyncio.run(main())
This scrapes all five pages simultaneously rather than sequentially. On a fast connection, this completes in roughly the time of the slowest single page load, not the sum of all five.
Controlling Concurrency with Semaphores
Unlimited concurrency can overwhelm the browser or trigger rate limiting. Use an asyncio.Semaphore to cap parallel sessions:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import asyncio
from playwright.async_api import async_playwright
async def scrape_with_limit(browser, url: str, semaphore: asyncio.Semaphore):
async with semaphore:
context = await browser.new_context()
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle")
title = await page.title()
return {"url": url, "title": title}
except Exception as e:
return {"url": url, "error": str(e)}
finally:
await context.close()
async def main():
urls = [f"https://example.com/page/{i}" for i in range(20)]
# Allow at most 5 concurrent browser contexts
semaphore = asyncio.Semaphore(5)
async with async_playwright() as p:
browser = await p.chromium.launch()
tasks = [scrape_with_limit(browser, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(urls)} pages")
await browser.close()
asyncio.run(main())
The semaphore ensures that no more than 5 contexts are active at any time, preventing memory exhaustion while still maintaining significant parallelism.
Async Event Handling
Handle network events and page events asynchronously:
import asyncio
from playwright.async_api import async_playwright
async def main():
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
api_responses = []
async def on_response(response):
if "/api/" in response.url and response.status == 200:
try:
data = await response.json()
api_responses.append({
"url": response.url,
"data": data,
})
except Exception:
pass
page.on("response", on_response)
await page.goto("https://example.com")
await page.wait_for_load_state("networkidle")
print(f"Captured {len(api_responses)} API responses")
await browser.close()
asyncio.run(main())
Combining Playwright with Other Async Operations
The real power of async comes from combining browser automation with other I/O operations — API calls, database queries, and LLM requests:
import asyncio
from openai import AsyncOpenAI
from playwright.async_api import async_playwright
client = AsyncOpenAI()
async def scrape_and_analyze(browser, url: str) -> dict:
"""Scrape a page and analyze its content with an LLM."""
context = await browser.new_context()
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle")
title = await page.title()
body_text = await page.locator("body").text_content()
# Truncate to avoid token limits
body_text = body_text[:3000] if body_text else ""
# Analyze with LLM while we have the page data
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "Summarize the following web page content "
"in 2-3 sentences.",
},
{"role": "user", "content": f"Title: {title}\n{body_text}"},
],
max_tokens=200,
)
summary = response.choices[0].message.content
return {"url": url, "title": title, "summary": summary}
except Exception as e:
return {"url": url, "error": str(e)}
finally:
await context.close()
async def main():
urls = [
"https://example.com",
"https://httpbin.org",
]
async with async_playwright() as p:
browser = await p.chromium.launch()
tasks = [scrape_and_analyze(browser, url) for url in urls]
results = await asyncio.gather(*tasks)
for r in results:
if "summary" in r:
print(f"\n{r['title']}:")
print(f" {r['summary']}")
await browser.close()
asyncio.run(main())
Async Producer-Consumer Pattern
For high-throughput scraping, use a queue-based producer-consumer pattern:
import asyncio
from playwright.async_api import async_playwright
async def worker(name: str, browser, queue: asyncio.Queue, results: list):
"""Worker that processes URLs from a shared queue."""
while True:
url = await queue.get()
if url is None:
queue.task_done()
break
context = await browser.new_context()
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle", timeout=10000)
results.append({
"url": url,
"title": await page.title(),
"worker": name,
})
print(f"[{name}] Scraped: {url}")
except Exception as e:
print(f"[{name}] Failed: {url} ({e})")
finally:
await context.close()
queue.task_done()
async def main():
urls = [f"https://example.com/item/{i}" for i in range(15)]
num_workers = 3
queue = asyncio.Queue()
results = []
for url in urls:
await queue.put(url)
# Add poison pills to stop workers
for _ in range(num_workers):
await queue.put(None)
async with async_playwright() as p:
browser = await p.chromium.launch()
workers = [
asyncio.create_task(
worker(f"W{i}", browser, queue, results)
)
for i in range(num_workers)
]
await asyncio.gather(*workers)
print(f"\nTotal scraped: {len(results)}")
await browser.close()
asyncio.run(main())
FAQ
When should I use async vs sync Playwright?
Use sync Playwright for simple scripts, debugging, and prototyping — it is easier to read and write. Switch to async when you need concurrent page operations, integration with other async libraries (FastAPI, aiohttp, OpenAI async client), or high-throughput automation with many pages. If your AI agent framework is already async (most modern ones are), use async Playwright to avoid blocking the event loop.
Does asyncio.gather run tasks in separate threads?
No. asyncio.gather runs coroutines concurrently within a single thread using cooperative multitasking. When one coroutine hits an await (waiting for a page to load, for example), the event loop switches to another coroutine that is ready to run. This works well for I/O-bound tasks like browser automation. For CPU-bound work, you would need asyncio.to_thread() or ProcessPoolExecutor.
How many concurrent browser pages can async Playwright handle?
The practical limit depends on RAM and the complexity of the pages being loaded. Each page/context uses roughly 20-50 MB. On a 16 GB machine, you can comfortably run 50-100 concurrent lightweight pages. Use a semaphore to cap concurrency at a level your machine can handle, and monitor memory usage during development to find the right number.
#AsyncPython #Playwright #Asyncio #ConcurrentAutomation #AIAgents #ParallelScraping #EventLoop
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.