Skip to content
Learn Agentic AI13 min read0 views

Playwright with Async Python: Concurrent Browser Automation for AI Agents

Learn how to use Playwright's async API with Python asyncio to run concurrent browser sessions, parallelize page interactions, and build high-throughput AI agent automation pipelines.

Why Async Matters for Browser Automation

Browser automation is inherently I/O-bound — most of the time is spent waiting for pages to load, elements to appear, and network requests to complete. Synchronous Playwright wastes this idle time by blocking the Python thread. Async Playwright, using Python's asyncio, lets your AI agent do useful work while waiting: processing data from a previous page, launching another browser tab, or calling an LLM API.

For agents that need to scrape multiple sites, interact with multiple accounts, or run parallel browser sessions, async Playwright can deliver 5-10x throughput improvements over synchronous code.

Async Playwright Basics

The async API mirrors the sync API exactly, but every method that performs I/O becomes a coroutine:

import asyncio
from playwright.async_api import async_playwright

async def main():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()
        await page.goto("https://example.com")

        title = await page.title()
        print(f"Title: {title}")

        content = await page.locator("h1").text_content()
        print(f"Heading: {content}")

        await browser.close()

asyncio.run(main())

Notice the pattern: sync_playwright() becomes async_playwright(), and every Playwright method gets an await prefix. The import changes from playwright.sync_api to playwright.async_api.

Running Multiple Pages Concurrently

The real power of async Playwright is running multiple pages at the same time:

import asyncio
from playwright.async_api import async_playwright

async def scrape_page(browser, url: str) -> dict:
    """Scrape a single page in its own context."""
    context = await browser.new_context()
    page = await context.new_page()

    try:
        await page.goto(url, wait_until="networkidle", timeout=15000)
        return {
            "url": url,
            "title": await page.title(),
            "heading": await page.locator("h1").text_content()
            if await page.locator("h1").count() > 0 else None,
        }
    except Exception as e:
        return {"url": url, "error": str(e)}
    finally:
        await context.close()

async def main():
    urls = [
        "https://example.com",
        "https://httpbin.org",
        "https://jsonplaceholder.typicode.com",
        "https://reqres.in",
        "https://dummyjson.com",
    ]

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        # Scrape all pages concurrently
        tasks = [scrape_page(browser, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            if "error" in result:
                print(f"FAILED: {result['url']} - {result['error']}")
            else:
                print(f"OK: {result['title']} ({result['url']})")

        await browser.close()

asyncio.run(main())

This scrapes all five pages simultaneously rather than sequentially. On a fast connection, this completes in roughly the time of the slowest single page load, not the sum of all five.

Controlling Concurrency with Semaphores

Unlimited concurrency can overwhelm the browser or trigger rate limiting. Use an asyncio.Semaphore to cap parallel sessions:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import asyncio
from playwright.async_api import async_playwright

async def scrape_with_limit(browser, url: str, semaphore: asyncio.Semaphore):
    async with semaphore:
        context = await browser.new_context()
        page = await context.new_page()
        try:
            await page.goto(url, wait_until="networkidle")
            title = await page.title()
            return {"url": url, "title": title}
        except Exception as e:
            return {"url": url, "error": str(e)}
        finally:
            await context.close()

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(20)]

    # Allow at most 5 concurrent browser contexts
    semaphore = asyncio.Semaphore(5)

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        tasks = [scrape_with_limit(browser, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

        success = sum(1 for r in results if "error" not in r)
        print(f"Completed: {success}/{len(urls)} pages")

        await browser.close()

asyncio.run(main())

The semaphore ensures that no more than 5 contexts are active at any time, preventing memory exhaustion while still maintaining significant parallelism.

Async Event Handling

Handle network events and page events asynchronously:

import asyncio
from playwright.async_api import async_playwright

async def main():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()

        api_responses = []

        async def on_response(response):
            if "/api/" in response.url and response.status == 200:
                try:
                    data = await response.json()
                    api_responses.append({
                        "url": response.url,
                        "data": data,
                    })
                except Exception:
                    pass

        page.on("response", on_response)
        await page.goto("https://example.com")
        await page.wait_for_load_state("networkidle")

        print(f"Captured {len(api_responses)} API responses")
        await browser.close()

asyncio.run(main())

Combining Playwright with Other Async Operations

The real power of async comes from combining browser automation with other I/O operations — API calls, database queries, and LLM requests:

import asyncio
from openai import AsyncOpenAI
from playwright.async_api import async_playwright

client = AsyncOpenAI()

async def scrape_and_analyze(browser, url: str) -> dict:
    """Scrape a page and analyze its content with an LLM."""
    context = await browser.new_context()
    page = await context.new_page()

    try:
        await page.goto(url, wait_until="networkidle")
        title = await page.title()
        body_text = await page.locator("body").text_content()

        # Truncate to avoid token limits
        body_text = body_text[:3000] if body_text else ""

        # Analyze with LLM while we have the page data
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": "Summarize the following web page content "
                               "in 2-3 sentences.",
                },
                {"role": "user", "content": f"Title: {title}\n{body_text}"},
            ],
            max_tokens=200,
        )

        summary = response.choices[0].message.content
        return {"url": url, "title": title, "summary": summary}

    except Exception as e:
        return {"url": url, "error": str(e)}
    finally:
        await context.close()

async def main():
    urls = [
        "https://example.com",
        "https://httpbin.org",
    ]

    async with async_playwright() as p:
        browser = await p.chromium.launch()
        tasks = [scrape_and_analyze(browser, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for r in results:
            if "summary" in r:
                print(f"\n{r['title']}:")
                print(f"  {r['summary']}")

        await browser.close()

asyncio.run(main())

Async Producer-Consumer Pattern

For high-throughput scraping, use a queue-based producer-consumer pattern:

import asyncio
from playwright.async_api import async_playwright

async def worker(name: str, browser, queue: asyncio.Queue, results: list):
    """Worker that processes URLs from a shared queue."""
    while True:
        url = await queue.get()
        if url is None:
            queue.task_done()
            break

        context = await browser.new_context()
        page = await context.new_page()
        try:
            await page.goto(url, wait_until="networkidle", timeout=10000)
            results.append({
                "url": url,
                "title": await page.title(),
                "worker": name,
            })
            print(f"[{name}] Scraped: {url}")
        except Exception as e:
            print(f"[{name}] Failed: {url} ({e})")
        finally:
            await context.close()
            queue.task_done()

async def main():
    urls = [f"https://example.com/item/{i}" for i in range(15)]
    num_workers = 3

    queue = asyncio.Queue()
    results = []

    for url in urls:
        await queue.put(url)

    # Add poison pills to stop workers
    for _ in range(num_workers):
        await queue.put(None)

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        workers = [
            asyncio.create_task(
                worker(f"W{i}", browser, queue, results)
            )
            for i in range(num_workers)
        ]

        await asyncio.gather(*workers)
        print(f"\nTotal scraped: {len(results)}")

        await browser.close()

asyncio.run(main())

FAQ

When should I use async vs sync Playwright?

Use sync Playwright for simple scripts, debugging, and prototyping — it is easier to read and write. Switch to async when you need concurrent page operations, integration with other async libraries (FastAPI, aiohttp, OpenAI async client), or high-throughput automation with many pages. If your AI agent framework is already async (most modern ones are), use async Playwright to avoid blocking the event loop.

Does asyncio.gather run tasks in separate threads?

No. asyncio.gather runs coroutines concurrently within a single thread using cooperative multitasking. When one coroutine hits an await (waiting for a page to load, for example), the event loop switches to another coroutine that is ready to run. This works well for I/O-bound tasks like browser automation. For CPU-bound work, you would need asyncio.to_thread() or ProcessPoolExecutor.

How many concurrent browser pages can async Playwright handle?

The practical limit depends on RAM and the complexity of the pages being loaded. Each page/context uses roughly 20-50 MB. On a 16 GB machine, you can comfortably run 50-100 concurrent lightweight pages. Use a semaphore to cap concurrency at a level your machine can handle, and monitor memory usage during development to find the right number.


#AsyncPython #Playwright #Asyncio #ConcurrentAutomation #AIAgents #ParallelScraping #EventLoop

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.