Building a Social Media Automation Agent: Content Posting, Scheduling, and Engagement
Learn to build an AI agent for social media automation covering platform API integration versus browser automation, content scheduling, engagement monitoring, and rate limiting strategies.
API-First vs Browser Automation
Social media automation faces a fundamental architectural choice: use the platform's official API or automate a browser to interact with the web interface directly. The answer is almost always API-first, with browser automation reserved for specific actions that APIs do not support.
Official APIs provide stable endpoints, documented rate limits, proper authentication, and compliance with platform terms of service. Browser automation is fragile, harder to scale, and risks account suspension. However, some platforms restrict API access or lag behind their web UI in feature coverage. A well-designed agent handles both pathways through a unified interface.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class Platform(Enum):
TWITTER = "twitter"
LINKEDIN = "linkedin"
INSTAGRAM = "instagram"
@dataclass
class SocialPost:
content: str
platform: Platform
media_urls: list[str] = field(default_factory=list)
scheduled_time: Optional[datetime] = None
hashtags: list[str] = field(default_factory=list)
status: str = "draft"
post_id: Optional[str] = None
class SocialPlatformAdapter(ABC):
"""Unified interface for platform interactions."""
@abstractmethod
async def publish(self, post: SocialPost) -> str:
"""Publish a post and return its platform ID."""
...
@abstractmethod
async def get_engagement(self, post_id: str) -> dict:
"""Get likes, comments, shares for a post."""
...
@abstractmethod
async def get_rate_limit_status(self) -> dict:
"""Check remaining API quota."""
...
Platform Adapters with Rate Limiting
Each platform gets its own adapter. The critical piece is rate limiting — every social media API enforces strict request quotas, and exceeding them results in temporary bans or permanent API key revocation.
import asyncio
import time
import httpx
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: list[float] = []
async def acquire(self):
"""Wait until a request slot is available."""
while True:
now = time.time()
# Remove expired timestamps
self.requests = [
t for t in self.requests
if now - t < self.window_seconds
]
if len(self.requests) < self.max_requests:
self.requests.append(now)
return
# Wait for the oldest request to expire
sleep_time = (
self.requests[0] + self.window_seconds - now + 0.1
)
await asyncio.sleep(sleep_time)
class TwitterAdapter(SocialPlatformAdapter):
"""Twitter/X API v2 adapter with rate limiting."""
def __init__(self, bearer_token: str):
self.client = httpx.AsyncClient(
base_url="https://api.twitter.com/2",
headers={"Authorization": f"Bearer {bearer_token}"},
)
# Twitter allows 300 tweets per 3 hours (per-app)
self.post_limiter = RateLimiter(
max_requests=300, window_seconds=10800
)
# 300 reads per 15 minutes
self.read_limiter = RateLimiter(
max_requests=300, window_seconds=900
)
async def publish(self, post: SocialPost) -> str:
await self.post_limiter.acquire()
text = post.content
if post.hashtags:
text += "\n\n" + " ".join(
f"#{tag}" for tag in post.hashtags
)
response = await self.client.post(
"/tweets",
json={"text": text},
)
response.raise_for_status()
return response.json()["data"]["id"]
async def get_engagement(self, post_id: str) -> dict:
await self.read_limiter.acquire()
response = await self.client.get(
f"/tweets/{post_id}",
params={
"tweet.fields": "public_metrics",
},
)
metrics = response.json()["data"]["public_metrics"]
return {
"likes": metrics["like_count"],
"retweets": metrics["retweet_count"],
"replies": metrics["reply_count"],
"impressions": metrics.get("impression_count", 0),
}
async def get_rate_limit_status(self) -> dict:
return {
"post_slots_remaining": (
self.post_limiter.max_requests
- len(self.post_limiter.requests)
),
"read_slots_remaining": (
self.read_limiter.max_requests
- len(self.read_limiter.requests)
),
}
Content Scheduling Engine
The scheduling engine stores posts in a queue and publishes them at the right time. It handles timezone conversion, optimal posting time suggestions, and retry logic for failed publishes.
import heapq
from zoneinfo import ZoneInfo
class ContentScheduler:
def __init__(self, adapters: dict[Platform, SocialPlatformAdapter]):
self.adapters = adapters
self.queue: list[tuple[float, SocialPost]] = []
self.published: list[SocialPost] = []
self.failed: list[tuple[SocialPost, str]] = []
def schedule(self, post: SocialPost):
"""Add a post to the schedule queue."""
if post.scheduled_time is None:
raise ValueError("Post must have a scheduled_time")
timestamp = post.scheduled_time.timestamp()
heapq.heappush(self.queue, (timestamp, post))
post.status = "scheduled"
async def run(self):
"""Main scheduler loop — publishes posts when due."""
while True:
now = datetime.utcnow().timestamp()
while self.queue and self.queue[0][0] <= now:
_, post = heapq.heappop(self.queue)
adapter = self.adapters.get(post.platform)
if not adapter:
self.failed.append(
(post, f"No adapter for {post.platform}")
)
continue
try:
rate_status = await adapter.get_rate_limit_status()
post_id = await adapter.publish(post)
post.post_id = post_id
post.status = "published"
self.published.append(post)
except Exception as e:
post.status = "failed"
self.failed.append((post, str(e)))
await asyncio.sleep(30) # Check every 30 seconds
def get_optimal_times(self, platform: Platform,
timezone: str = "UTC") -> list[str]:
"""Suggest optimal posting times based on engagement data."""
# These are general best practices; production systems
# should learn from actual engagement data
optimal = {
Platform.TWITTER: ["09:00", "12:00", "17:00"],
Platform.LINKEDIN: ["07:30", "12:00", "17:30"],
Platform.INSTAGRAM: ["11:00", "14:00", "19:00"],
}
return optimal.get(platform, ["12:00"])
Engagement Monitoring
The engagement monitor tracks how published posts perform over time, collecting metrics at configurable intervals and flagging posts that are performing unusually well or poorly.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class EngagementMonitor:
def __init__(self, adapters: dict[Platform, SocialPlatformAdapter],
db_path: str = "engagement.db"):
self.adapters = adapters
async def collect_metrics(self, posts: list[SocialPost]):
"""Collect engagement metrics for published posts."""
results = []
for post in posts:
if post.status != "published" or not post.post_id:
continue
adapter = self.adapters.get(post.platform)
if not adapter:
continue
try:
metrics = await adapter.get_engagement(post.post_id)
metrics["post_id"] = post.post_id
metrics["platform"] = post.platform.value
metrics["collected_at"] = datetime.utcnow().isoformat()
results.append(metrics)
except Exception as e:
print(f"Failed to get metrics for {post.post_id}: {e}")
return results
def detect_viral_posts(self, metrics_history: list[dict],
threshold_multiplier: float = 3.0):
"""Detect posts performing significantly above average."""
if len(metrics_history) < 5:
return []
avg_likes = sum(
m["likes"] for m in metrics_history
) / len(metrics_history)
return [
m for m in metrics_history
if m["likes"] > avg_likes * threshold_multiplier
]
AI-Powered Content Generation
The agent can generate post content tailored to each platform's conventions — character limits, hashtag norms, and tone expectations.
class ContentGenerator:
def __init__(self, client: AsyncOpenAI):
self.client = client
async def generate_post(self, topic: str,
platform: Platform) -> SocialPost:
"""Generate platform-appropriate content."""
platform_rules = {
Platform.TWITTER: "Max 280 characters. Concise, punchy.",
Platform.LINKEDIN: "Professional tone. 1-3 paragraphs.",
Platform.INSTAGRAM: "Visual-first. Use emojis. 5-10 hashtags.",
}
response = await self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": (
f"Write a social media post for "
f"{platform.value}.\n"
f"Rules: {platform_rules[platform]}\n"
"Return JSON: content, hashtags (array)"
)},
{"role": "user", "content": f"Topic: {topic}"},
],
response_format={"type": "json_object"},
)
data = json.loads(response.choices[0].message.content)
return SocialPost(
content=data["content"],
platform=platform,
hashtags=data.get("hashtags", []),
)
FAQ
Is it safe to use browser automation for social media posting?
Most social media platforms explicitly prohibit automated access through their web interfaces in their terms of service. Using browser automation risks account suspension. Always prefer official APIs for posting, scheduling, and analytics. Browser automation should only be used for internal tools or platforms that explicitly allow it.
How do I handle multi-platform posting where each platform has different character limits?
Create platform-specific content variants from a single source message. Use an LLM to adapt the core message to each platform's constraints rather than simply truncating a long post. Store the original message and platform-specific variants together so you can track which version performed best.
What rate limits should I implement beyond the platform's requirements?
Add your own conservative limits on top of platform limits. A good rule of thumb is to use no more than 80% of the stated API quota to leave headroom for retries and other tools that share the same API key. Also implement exponential backoff when you receive 429 (Too Many Requests) responses.
#SocialMediaAutomation #ContentScheduling #APIIntegration #RateLimiting #AgenticAI #ContentStrategy #PythonAutomation #Engagement
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.