Asynchronous Processing and Message Queues in Agentic AI Systems

1. Introduction

Modern agentic AI systems behave less like monolithic LLM applications and more like distributed, autonomous workers making decisions, invoking tools, coordinating tasks, and reacting to events. This autonomy introduces unpredictable timing, variable workloads, and long-running operations—all of which traditional synchronous architectures struggle to handle.

Modern agentic AI system architecture showing 6 autonomous agents coordinated through a central hub

Figure 1: Modern Agentic AI Systems

Asynchronous processing and message queues solve these problems elegantly. They allow agentic AI systems to scale, stay responsive, and coordinate multiple agents working in parallel. Let’s break down how they do this.

2. Core Architectural Roles of Async & Queues

2.1 Handling Long-Running Agent Operations

Agentic AI workflows often include:

  • multiple LLM calls
  • tool invocation chains
  • web scraping
  • data extraction
  • reasoning loops
  • reflection cycles

These tasks can take anywhere from a few seconds to several minutes.

If executed synchronously:

  • user requests block
  • system threads get stuck
  • timeouts become common
  • overall throughput collapses

Async + Queues Fix This

The main thread:

  • accepts the request
  • places it in a queue
  • immediately responds with a task ID

Meanwhile, workers execute the long-running agent task independently.

Sequence diagram showing asynchronous agent workflow with user, API, queue, worker, and LLM components

Figure 2: Diagram — Long-running agent tasks using async workers

2.2 Basic Async Agent Task with Celery

Key Features Demonstrated:

  • ✅ Non-blocking task submission
  • ✅ Progress tracking with state updates
  • ✅ Automatic retry logic with exponential backoff
  • ✅ Timeout protection
  • ✅ RESTful API integration
  • ✅ Task result retrieval
# tasks.py - Define async agent tasks
from celery import Celery
from typing import Dict, Any
import time
from openai import OpenAI

# Initialize Celery with Redis as broker
app = Celery('agentic_tasks', 
             broker='redis://localhost:6379/0',
             backend='redis://localhost:6379/0')

# Configure task settings
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=3600,  # 1 hour max
    task_soft_time_limit=3300,  # Warning at 55 minutes
)

@app.task(bind=True, max_retries=3)
def execute_agent_workflow(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:
    """
    Execute a long-running agent workflow asynchronously.
    
    Args:
        query: User's query or task
        context: Additional context for the agent
        
    Returns:
        Dict containing agent's response and metadata
    """
    try:
        # Update task state to indicate progress
        self.update_state(
            state='PROCESSING',
            meta={'step': 'initializing', 'progress': 10}
        )
        
        # Initialize LLM client
        client = OpenAI()
        
        # Step 1: Initial reasoning
        self.update_state(
            state='PROCESSING',
            meta={'step': 'reasoning', 'progress': 25}
        )
        
        reasoning_response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful research agent."},
                {"role": "user", "content": f"Analyze this query: {query}"}
            ],
            timeout=60
        )
        
        # Step 2: Tool invocation (simulated)
        self.update_state(
            state='PROCESSING',
            meta={'step': 'tool_execution', 'progress': 50}
        )
        
        # Simulate web scraping or API calls
        time.sleep(2)
        tool_results = {"data": "scraped content"}
        
        # Step 3: Final synthesis
        self.update_state(
            state='PROCESSING',
            meta={'step': 'synthesis', 'progress': 75}
        )
        
        final_response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "Synthesize the findings."},
                {"role": "user", "content": f"Results: {tool_results}"}
            ],
            timeout=60
        )
        
        # Complete
        self.update_state(
            state='SUCCESS',
            meta={'step': 'complete', 'progress': 100}
        )
        
        return {
            'status': 'success',
            'result': final_response.choices[0].message.content,
            'metadata': {
                'reasoning': reasoning_response.choices[0].message.content,
                'tools_used': ['web_search', 'scraper']
            }
        }
        
    except Exception as exc:
        # Retry with exponential backoff
        self.update_state(
            state='FAILURE',
            meta={'error': str(exc)}
        )
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# API endpoint integration
# api.py
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from celery.result import AsyncResult

app_api = FastAPI()

class AgentRequest(BaseModel):
    query: str
    context: dict = {}

class TaskResponse(BaseModel):
    task_id: str
    status: str
    message: str

@app_api.post("/agent/execute", response_model=TaskResponse)
async def execute_agent(request: AgentRequest):
    """
    Submit agent task to queue and return immediately.
    """
    # Enqueue the task
    task = execute_agent_workflow.delay(
        query=request.query,
        context=request.context
    )
    
    return TaskResponse(
        task_id=task.id,
        status="queued",
        message=f"Task submitted. Check status at /agent/status/{task.id}"
    )

@app_api.get("/agent/status/{task_id}")
async def get_agent_status(task_id: str):
    """
    Check the status of a running agent task.
    """
    task_result = AsyncResult(task_id, app=app)
    
    if task_result.state == 'PENDING':
        response = {
            'task_id': task_id,
            'state': task_result.state,
            'status': 'Task is waiting in queue...'
        }
    elif task_result.state == 'PROCESSING':
        response = {
            'task_id': task_id,
            'state': task_result.state,
            'progress': task_result.info.get('progress', 0),
            'current_step': task_result.info.get('step', 'unknown')
        }
    elif task_result.state == 'SUCCESS':
        response = {
            'task_id': task_id,
            'state': task_result.state,
            'result': task_result.result
        }
    else:
        # Something went wrong
        response = {
            'task_id': task_id,
            'state': task_result.state,
            'error': str(task_result.info)
        }
    
    return response

@app_api.get("/agent/result/{task_id}")
async def get_agent_result(task_id: str):
    """
    Retrieve the final result of a completed agent task.
    """
    task_result = AsyncResult(task_id, app=app)
    
    if not task_result.ready():
        return {
            'task_id': task_id,
            'status': 'not_ready',
            'message': 'Task is still processing'
        }
    
    return {
        'task_id': task_id,
        'status': 'complete',
        'result': task_result.get()
    }

2.3 Managing Concurrent Multi-Agent Behavior

In agentic ecosystems, you often have many agents working at once:

  • Research agent
  • Scraper agent
  • Reviewer agent
  • Planner agent
  • Tool agent

Without queues, simultaneous operations could overwhelm:

  • LLM API rate limits
  • vector database
  • external APIs
  • CPU-bound local inference

Queues allow:

  • throttling
  • prioritization
  • buffering
  • safe parallel execution
Architecture diagram showing three agents (Reviewer, Scraper, Research) connected to a central queue that distributes work to three workers

Figure 3: Diagram — Multi-agent system coordinated via queues

Workers share the load instead of agents fighting for resources.

2.4 Multi-Agent Coordination with Dedicated Queues

Key Features Demonstrated:

  • ✅ Dedicated queues per agent type
  • ✅ Rate limiting for external API calls
  • ✅ Parallel execution with group()
  • ✅ Sequential workflows with chain()
  • ✅ Result aggregation with chord()
  • ✅ Automatic load balancing across workers
# multi_agent_system.py
from celery import Celery, group, chain, chord
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

app = Celery('multi_agent')

# Configure multiple queues for different agent types
app.conf.task_routes = {
    'agents.research.*': {'queue': 'research'},
    'agents.scraper.*': {'queue': 'scraper'},
    'agents.reviewer.*': {'queue': 'reviewer'},
    'agents.planner.*': {'queue': 'planner'},
    'agents.tool.*': {'queue': 'tools'},
}

# Configure rate limits per queue
app.conf.task_annotations = {
    'agents.scraper.*': {'rate_limit': '10/m'},  # 10 per minute
    'agents.tool.api_call': {'rate_limit': '30/m'},  # Respect API limits
}

# Research Agent
@app.task(queue='research', bind=True)
def research_agent(self, topic: str) -> Dict[str, Any]:
    """
    Research agent: Gathers information on a topic.
    """
    logger.info(f"Research agent processing: {topic}")
    
    try:
        # Simulate research (replace with actual LLM call)
        import time
        time.sleep(2)
        
        findings = {
            'topic': topic,
            'sources': ['source1.com', 'source2.com'],
            'summary': f'Research findings for {topic}'
        }
        
        return {
            'agent': 'research',
            'status': 'success',
            'data': findings
        }
    except Exception as e:
        logger.error(f"Research agent failed: {e}")
        raise

# Scraper Agent
@app.task(queue='scraper', bind=True, max_retries=5)
def scraper_agent(self, urls: List[str]) -> Dict[str, Any]:
    """
    Scraper agent: Extracts content from URLs.
    """
    logger.info(f"Scraper agent processing {len(urls)} URLs")
    
    try:
        scraped_data = []
        for url in urls:
            # Simulate scraping (replace with actual scraping logic)
            content = f"Content from {url}"
            scraped_data.append({'url': url, 'content': content})
        
        return {
            'agent': 'scraper',
            'status': 'success',
            'data': scraped_data
        }
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))

# Reviewer Agent
@app.task(queue='reviewer', bind=True)
def reviewer_agent(self, content: Dict[str, Any]) -> Dict[str, Any]:
    """
    Reviewer agent: Validates and scores content quality.
    """
    logger.info("Reviewer agent processing content")
    
    try:
        # Simulate review (replace with actual LLM evaluation)
        quality_score = 0.85
        issues = []
        
        return {
            'agent': 'reviewer',
            'status': 'success',
            'data': {
                'quality_score': quality_score,
                'issues': issues,
                'approved': quality_score > 0.7
            }
        }
    except Exception as e:
        logger.error(f"Reviewer agent failed: {e}")
        raise

# Planner Agent
@app.task(queue='planner', bind=True)
def planner_agent(self, goal: str, available_agents: List[str]) -> Dict[str, Any]:
    """
    Planner agent: Creates execution plan for multi-agent workflow.
    """
    logger.info(f"Planner agent creating plan for: {goal}")
    
    try:
        # Create execution plan
        plan = {
            'goal': goal,
            'steps': [
                {'agent': 'research', 'action': 'gather_info'},
                {'agent': 'scraper', 'action': 'extract_data'},
                {'agent': 'reviewer', 'action': 'validate'},
            ]
        }
        
        return {
            'agent': 'planner',
            'status': 'success',
            'data': plan
        }
    except Exception as e:
        logger.error(f"Planner agent failed: {e}")
        raise

# Tool Agent
@app.task(queue='tools', bind=True, rate_limit='30/m')
def tool_agent_api_call(self, endpoint: str, params: Dict) -> Dict[str, Any]:
    """
    Tool agent: Makes external API calls with rate limiting.
    """
    logger.info(f"Tool agent calling: {endpoint}")
    
    try:
        # Simulate API call (replace with actual API client)
        import requests
        response = requests.get(endpoint, params=params, timeout=10)
        
        return {
            'agent': 'tool',
            'status': 'success',
            'data': response.json()
        }
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)


# Orchestration: Coordinating Multiple Agents
@app.task
def orchestrate_multi_agent_workflow(query: str) -> Dict[str, Any]:
    """
    Orchestrate a complex workflow involving multiple agents.
    
    Execution pattern:
    1. Planner creates the plan
    2. Research and Scraper work in parallel
    3. Reviewer validates the combined results
    """
    logger.info(f"Orchestrating workflow for query: {query}")
    
    # Step 1: Create plan
    plan_task = planner_agent.s(
        goal=query,
        available_agents=['research', 'scraper', 'reviewer']
    )
    
    # Step 2: Execute research and scraping in parallel
    parallel_tasks = group(
        research_agent.s(topic=query),
        scraper_agent.s(urls=['http://example.com/1', 'http://example.com/2'])
    )
    
    # Step 3: Review results after parallel execution completes
    review_task = reviewer_agent.s()
    
    # Chain the workflow: plan -> parallel execution -> review
    workflow = chain(
        plan_task,
        parallel_tasks,
        review_task
    )
    
    # Execute asynchronously
    result = workflow.apply_async()
    
    return {
        'workflow_id': result.id,
        'status': 'submitted',
        'message': 'Multi-agent workflow initiated'
    }


# Advanced: Chord pattern for aggregation
@app.task
def aggregate_agent_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Aggregate results from multiple agents.
    Called after all parallel tasks complete.
    """
    logger.info("Aggregating results from multiple agents")
    
    aggregated = {
        'total_agents': len(results),
        'successful': sum(1 for r in results if r.get('status') == 'success'),
        'combined_data': [r.get('data') for r in results],
        'timestamp': time.time()
    }
    
    return aggregated

@app.task
def complex_multi_agent_workflow(query: str) -> str:
    """
    Advanced workflow using chord pattern for parallel execution + aggregation.
    """
    # Create a chord: parallel tasks + callback
    workflow = chord(
        group(
            research_agent.s(topic=query),
            scraper_agent.s(urls=['http://example.com']),
            tool_agent_api_call.s(endpoint='http://api.example.com', params={})
        )
    )(aggregate_agent_results.s())
    
    return workflow.id

Starting Workers for Different Queues:

# Terminal 1: Research queue worker
celery -A multi_agent_system worker -Q research -n research_worker@%h -c 2

# Terminal 2: Scraper queue worker (more concurrency for I/O)
celery -A multi_agent_system worker -Q scraper -n scraper_worker@%h -c 5

# Terminal 3: Reviewer queue worker
celery -A multi_agent_system worker -Q reviewer -n reviewer_worker@%h -c 2

# Terminal 4: Planner queue worker
celery -A multi_agent_system worker -Q planner -n planner_worker@%h -c 1

# Terminal 5: Tool queue worker (with rate limiting)
celery -A multi_agent_system worker -Q tools -n tool_worker@%h -c 3

# Or start all queues with one command (development only)
celery -A multi_agent_system worker -Q research,scraper,reviewer,planner,tools -c 10

2.5 Decoupling Application Logic from Agent Execution

Decoupling is essential for:

  • responsiveness
  • fault isolation
  • easier maintenance
  • retry logic
  • observability

A synchronous model ties the lifespan of the user request to the agent’s operation. An async/queue architecture breaks that dependency.

Benefits:

  • The system can acknowledge user requests instantly.
  • Agent execution happens independently.
  • Failures do not crash the main application.
  • The same job can be retried, resumed, or distributed.

3. Practical Applications of Async & Queues in Agentic AI

3.1 Tool Execution Buffering

Agents make frequent tool calls:

  • DB queries
  • URL fetches
  • external API calls
  • scraping
  • long-running computations

Queues help:

  • enforce rate limits
  • batch similar requests
  • retry failures
  • distribute load across workers

3.2 Rate-Limited Tool Execution with Retry Logic

Key Features Demonstrated:

  • ✅ Rate limiting with Redis
  • ✅ Result caching to reduce redundant calls
  • ✅ Retry logic with exponential backoff
  • ✅ Batch processing for efficiency
  • ✅ Priority queues for critical tasks
  • ✅ Connection pooling and timeout handling
# tool_executor.py
from celery import Celery
from typing import Dict, Any, Optional
import time
import logging
from functools import wraps
from redis import Redis
import hashlib

logger = logging.getLogger(__name__)

app = Celery('tool_executor')

# Redis for caching and rate limiting
redis_client = Redis(host='localhost', port=6379, db=1, decode_responses=True)

def rate_limit(key_prefix: str, max_calls: int, time_window: int):
    """
    Decorator for rate limiting tool calls.
    
    Args:
        key_prefix: Redis key prefix for this rate limit
        max_calls: Maximum number of calls allowed
        time_window: Time window in seconds
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create rate limit key
            rate_key = f"rate_limit:{key_prefix}"
            
            # Check current count
            current_count = redis_client.get(rate_key)
            
            if current_count and int(current_count) >= max_calls:
                wait_time = redis_client.ttl(rate_key)
                raise Exception(
                    f"Rate limit exceeded. Try again in {wait_time} seconds"
                )
            
            # Increment counter
            pipe = redis_client.pipeline()
            pipe.incr(rate_key)
            pipe.expire(rate_key, time_window)
            pipe.execute()
            
            # Execute function
            return func(*args, **kwargs)
        return wrapper
    return decorator

def cache_result(ttl: int = 3600):
    """
    Decorator to cache tool results.
    
    Args:
        ttl: Time to live in seconds (default 1 hour)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key from function name and arguments
            cache_key = f"cache:{func.__name__}:{hashlib.md5(str(args).encode()).hexdigest()}"
            
            # Check cache
            cached_result = redis_client.get(cache_key)
            if cached_result:
                logger.info(f"Cache hit for {func.__name__}")
                import json
                return json.loads(cached_result)
            
            # Execute function
            result = func(*args, **kwargs)
            
            # Store in cache
            import json
            redis_client.setex(cache_key, ttl, json.dumps(result))
            logger.info(f"Cached result for {func.__name__}")
            
            return result
        return wrapper
    return decorator


@app.task(bind=True, max_retries=3, default_retry_delay=60)
@rate_limit(key_prefix="external_api", max_calls=30, time_window=60)
@cache_result(ttl=1800)  # Cache for 30 minutes
def call_external_api(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
    """
    Call external API with rate limiting, caching, and retry logic.
    """
    logger.info(f"Calling external API: {endpoint}")
    
    try:
        import requests
        
        response = requests.get(
            endpoint,
            params=params,
            timeout=30,
            headers={'User-Agent': 'AgenticAI/1.0'}
        )
        
        response.raise_for_status()
        
        return {
            'status': 'success',
            'data': response.json(),
            'cached': False
        }
        
    except requests.exceptions.RequestException as exc:
        logger.error(f"API call failed: {exc}")
        
        # Retry with exponential backoff
        countdown = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=countdown)


@app.task(bind=True, max_retries=5)
@rate_limit(key_prefix="web_scraping", max_calls=10, time_window=60)
def scrape_url(self, url: str) -> Dict[str, Any]:
    """
    Scrape URL with rate limiting and retry on failure.
    """
    logger.info(f"Scraping: {url}")
    
    try:
        import requests
        from bs4 import BeautifulSoup
        
        response = requests.get(
            url,
            timeout=30,
            headers={
                'User-Agent': 'Mozilla/5.0 (compatible; AgenticBot/1.0)'
            }
        )
        
        response.raise_for_status()
        
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Extract title and main content
        title = soup.find('title').text if soup.find('title') else 'No title'
        
        # Remove script and style elements
        for script in soup(['script', 'style']):
            script.decompose()
        
        text_content = soup.get_text(separator=' ', strip=True)
        
        return {
            'status': 'success',
            'url': url,
            'title': title,
            'content': text_content[:5000],  # Limit content size
            'length': len(text_content)
        }
        
    except Exception as exc:
        logger.error(f"Scraping failed for {url}: {exc}")
        
        # Exponential backoff: 1min, 2min, 4min, 8min, 16min
        countdown = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=countdown, max_retries=5)


@app.task(bind=True)
@rate_limit(key_prefix="database_query", max_calls=100, time_window=60)
def execute_database_query(self, query: str, params: Optional[Dict] = None) -> List[Dict]:
    """
    Execute database query with rate limiting.
    """
    logger.info("Executing database query")
    
    try:
        import psycopg2
        from psycopg2.extras import RealDictCursor
        
        conn = psycopg2.connect(
            host='localhost',
            database='agent_db',
            user='agent_user',
            password='secure_password',
            connect_timeout=10
        )
        
        with conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query, params or {})
            results = cursor.fetchall()
        
        conn.close()
        
        return {
            'status': 'success',
            'count': len(results),
            'data': results
        }
        
    except Exception as exc:
        logger.error(f"Database query failed: {exc}")
        raise self.retry(exc=exc, countdown=30)


# Batch processing for efficiency
@app.task
def batch_api_calls(endpoints: List[str]) -> List[Dict[str, Any]]:
    """
    Process multiple API calls efficiently using Celery groups.
    """
    from celery import group
    
    # Create a group of parallel API call tasks
    job = group(
        call_external_api.s(endpoint=endpoint, params={})
        for endpoint in endpoints
    )
    
    # Execute all in parallel
    result = job.apply_async()
    
    # Wait for all to complete (or use result.get() in a callback)
    return {
        'batch_id': result.id,
        'total_tasks': len(endpoints),
        'status': 'processing'
    }


@app.task
def batch_url_scraping(urls: List[str], callback_task: Optional[str] = None) -> str:
    """
    Scrape multiple URLs with automatic batching and rate limiting.
    """
    from celery import chord, group
    
    # Create scraping tasks
    scrape_tasks = group(scrape_url.s(url) for url in urls)
    
    if callback_task:
        # Use chord for aggregation callback
        workflow = chord(scrape_tasks)(callback_task)
    else:
        # Just execute in parallel
        workflow = scrape_tasks.apply_async()
    
    return workflow.id

Configuration for Production:

# celeryconfig.py
from kombu import Queue, Exchange

# Broker settings
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'

# Task execution settings
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
timezone = 'UTC'
enable_utc = True

# Performance settings
worker_prefetch_multiplier = 4
worker_max_tasks_per_child = 1000  # Restart worker after 1000 tasks

# Queue configuration
task_default_queue = 'default'
task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),
    Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
)

# Route tasks by priority
task_routes = {
    'tool_executor.call_external_api': {'queue': 'high_priority'},
    'tool_executor.scrape_url': {'queue': 'low_priority'},
}

# Result expiration
result_expires = 3600  # Results expire after 1 hour

# Task retry settings
task_acks_late = True  # Acknowledge tasks after completion
task_reject_on_worker_lost = True  # Requeue if worker crashes

3.3 State Management & Checkpointing

Agentic workflows are multi-step:

  1. Think
  2. Search
  3. Analyze
  4. Act
  5. Reflect
  6. Continue

If step 4 fails, you don’t want to restart steps 1–3.

Queues + async let you:

  • save intermediate state
  • resume partial workflows
  • persist progress
  • recover from failures gracefully
Flowchart showing multi-stage agent workflow with checkpoint decision point for resuming after failures

Figure 4: Diagram—Checkpoint-enabled agent workflow

3.4 Checkpoint-Enabled Agent Workflow

Key Features Demonstrated:

  • ✅ Multi-stage workflow with checkpointing
  • ✅ Automatic resume from failure point
  • ✅ Persistent state in PostgreSQL
  • ✅ Workflow history tracking
  • ✅ Graceful failure handling
  • ✅ No redundant work on retry
# checkpoint_workflow.py
from celery import Celery, Task
from typing import Dict, Any, List, Optional
import json
import logging
from datetime import datetime
from enum import Enum
import psycopg2
from psycopg2.extras import Json

logger = logging.getLogger(__name__)

app = Celery('checkpoint_workflow')

class WorkflowStage(Enum):
    """Workflow stages for checkpointing."""
    INITIALIZED = "initialized"
    REASONING = "reasoning"
    SEARCHING = "searching"
    ANALYZING = "analyzing"
    ACTING = "acting"
    REFLECTING = "reflecting"
    COMPLETED = "completed"
    FAILED = "failed"


class CheckpointDB:
    """Database handler for workflow checkpoints."""
    
    def __init__(self):
        self.conn_params = {
            'host': 'localhost',
            'database': 'agent_workflows',
            'user': 'agent_user',
            'password': 'secure_password'
        }
    
    def save_checkpoint(
        self,
        workflow_id: str,
        stage: WorkflowStage,
        state: Dict[str, Any],
        metadata: Optional[Dict] = None
    ) -> None:
        """Save workflow checkpoint to database."""
        try:
            with psycopg2.connect(**self.conn_params) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                        INSERT INTO workflow_checkpoints 
                        (workflow_id, stage, state, metadata, created_at)
                        VALUES (%s, %s, %s, %s, %s)
                        ON CONFLICT (workflow_id, stage) 
                        DO UPDATE SET 
                            state = EXCLUDED.state,
                            metadata = EXCLUDED.metadata,
                            updated_at = CURRENT_TIMESTAMP
                    """, (
                        workflow_id,
                        stage.value,
                        Json(state),
                        Json(metadata or {}),
                        datetime.utcnow()
                    ))
                conn.commit()
            
            logger.info(f"Checkpoint saved: {workflow_id} at stage {stage.value}")
            
        except Exception as e:
            logger.error(f"Failed to save checkpoint: {e}")
            raise
    
    def load_checkpoint(
        self,
        workflow_id: str,
        stage: Optional[WorkflowStage] = None
    ) -> Optional[Dict[str, Any]]:
        """Load workflow checkpoint from database."""
        try:
            with psycopg2.connect(**self.conn_params) as conn:
                with conn.cursor() as cursor:
                    if stage:
                        cursor.execute("""
                            SELECT stage, state, metadata, created_at
                            FROM workflow_checkpoints
                            WHERE workflow_id = %s AND stage = %s
                            ORDER BY created_at DESC
                            LIMIT 1
                        """, (workflow_id, stage.value))
                    else:
                        # Get latest checkpoint
                        cursor.execute("""
                            SELECT stage, state, metadata, created_at
                            FROM workflow_checkpoints
                            WHERE workflow_id = %s
                            ORDER BY created_at DESC
                            LIMIT 1
                        """, (workflow_id,))
                    
                    result = cursor.fetchone()
                    
                    if result:
                        return {
                            'stage': result[0],
                            'state': result[1],
                            'metadata': result[2],
                            'created_at': result[3]
                        }
            
            return None
            
        except Exception as e:
            logger.error(f"Failed to load checkpoint: {e}")
            return None
    
    def get_workflow_history(self, workflow_id: str) -> List[Dict[str, Any]]:
        """Get complete history of workflow checkpoints."""
        try:
            with psycopg2.connect(**self.conn_params) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                        SELECT stage, state, metadata, created_at
                        FROM workflow_checkpoints
                        WHERE workflow_id = %s
                        ORDER BY created_at ASC
                    """, (workflow_id,))
                    
                    results = cursor.fetchall()
                    
                    return [
                        {
                            'stage': r[0],
                            'state': r[1],
                            'metadata': r[2],
                            'created_at': r[3]
                        }
                        for r in results
                    ]
        except Exception as e:
            logger.error(f"Failed to get workflow history: {e}")
            return []


checkpoint_db = CheckpointDB()


class CheckpointableTask(Task):
    """Base task class with checkpoint support."""
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Handle task failure by saving checkpoint."""
        workflow_id = kwargs.get('workflow_id')
        if workflow_id:
            checkpoint_db.save_checkpoint(
                workflow_id=workflow_id,
                stage=WorkflowStage.FAILED,
                state={'error': str(exc)},
                metadata={'task_id': task_id, 'traceback': str(einfo)}
            )


@app.task(base=CheckpointableTask, bind=True, max_retries=3)
def execute_checkpointed_workflow(
    self,
    workflow_id: str,
    query: str,
    context: Dict[str, Any],
    resume_from: Optional[str] = None
) -> Dict[str, Any]:
    """
    Execute a multi-stage workflow with checkpoint support.
    
    If the workflow fails at any stage, it can resume from the last checkpoint.
    """
    logger.info(f"Starting workflow {workflow_id}")
    
    # Check if we're resuming from a checkpoint
    if resume_from:
        checkpoint = checkpoint_db.load_checkpoint(workflow_id)
        if checkpoint:
            logger.info(f"Resuming from stage: {checkpoint['stage']}")
            current_stage = WorkflowStage(checkpoint['stage'])
            accumulated_state = checkpoint['state']
        else:
            current_stage = WorkflowStage.INITIALIZED
            accumulated_state = {}
    else:
        current_stage = WorkflowStage.INITIALIZED
        accumulated_state = {}
    
    try:
        # Stage 1: Reasoning
        if current_stage.value in [WorkflowStage.INITIALIZED.value, WorkflowStage.REASONING.value]:
            logger.info("Stage 1: Reasoning")
            
            reasoning_result = perform_reasoning(query, context)
            accumulated_state['reasoning'] = reasoning_result
            
            checkpoint_db.save_checkpoint(
                workflow_id=workflow_id,
                stage=WorkflowStage.REASONING,
                state=accumulated_state,
                metadata={'completed_at': datetime.utcnow().isoformat()}
            )
            
            current_stage = WorkflowStage.SEARCHING
        
        # Stage 2: Searching
        if current_stage.value == WorkflowStage.SEARCHING.value:
            logger.info("Stage 2: Searching")
            
            search_queries = accumulated_state['reasoning'].get('search_queries', [])
            search_results = perform_search(search_queries)
            accumulated_state['search_results'] = search_results
            
            checkpoint_db.save_checkpoint(
                workflow_id=workflow_id,
                stage=WorkflowStage.SEARCHING,
                state=accumulated_state,
                metadata={'completed_at': datetime.utcnow().isoformat()}
            )
            
            current_stage = WorkflowStage.ANALYZING
        
        # Stage 3: Analyzing
        if current_stage.value == WorkflowStage.ANALYZING.value:
            logger.info("Stage 3: Analyzing")
            
            analysis = perform_analysis(
                accumulated_state['search_results'],
                accumulated_state['reasoning']
            )
            accumulated_state['analysis'] = analysis
            
            checkpoint_db.save_checkpoint(
                workflow_id=workflow_id,
                stage=WorkflowStage.ANALYZING,
                state=accumulated_state,
                metadata={'completed_at': datetime.utcnow().isoformat()}
            )
            
            current_stage = WorkflowStage.ACTING
        
        # Stage 4: Acting
        if current_stage.value == WorkflowStage.ACTING.value:
            logger.info("Stage 4: Acting")
            
            action_result = perform_action(accumulated_state['analysis'])
            accumulated_state['action_result'] = action_result
            
            checkpoint_db.save_checkpoint(
                workflow_id=workflow_id,
                stage=WorkflowStage.ACTING,
                state=accumulated_state,
                metadata={'completed_at': datetime.utcnow().isoformat()}
            )
            
            current_stage = WorkflowStage.REFLECTING
        
        # Stage 5: Reflecting
        if current_stage.value == WorkflowStage.REFLECTING.value:
            logger.info("Stage 5: Reflecting")
            
            reflection = perform_reflection(accumulated_state)
            accumulated_state['reflection'] = reflection
            
            checkpoint_db.save_checkpoint(
                workflow_id=workflow_id,
                stage=WorkflowStage.REFLECTING,
                state=accumulated_state,
                metadata={'completed_at': datetime.utcnow().isoformat()}
            )
            
            current_stage = WorkflowStage.COMPLETED
        
        # Final checkpoint
        checkpoint_db.save_checkpoint(
            workflow_id=workflow_id,
            stage=WorkflowStage.COMPLETED,
            state=accumulated_state,
            metadata={
                'completed_at': datetime.utcnow().isoformat(),
                'success': True
            }
        )
        
        return {
            'workflow_id': workflow_id,
            'status': 'completed',
            'result': accumulated_state
        }
        
    except Exception as exc:
        logger.error(f"Workflow failed at stage {current_stage.value}: {exc}")
        
        # Save failure checkpoint
        checkpoint_db.save_checkpoint(
            workflow_id=workflow_id,
            stage=current_stage,
            state=accumulated_state,
            metadata={
                'error': str(exc),
                'failed_at': datetime.utcnow().isoformat()
            }
        )
        
        # Retry from current stage
        raise self.retry(
            exc=exc,
            countdown=120,  # Wait 2 minutes before retry
            kwargs={
                'workflow_id': workflow_id,
                'query': query,
                'context': context,
                'resume_from': current_stage.value
            }
        )


# Helper functions for each stage
def perform_reasoning(query: str, context: Dict[str, Any]) -> Dict[str, Any]:
    """Stage 1: Initial reasoning and planning."""
    from openai import OpenAI
    
    client = OpenAI()
    
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are a reasoning agent. Plan the search strategy."},
            {"role": "user", "content": f"Query: {query}\nContext: {context}"}
        ],
        timeout=60
    )
    
    return {
        'reasoning': response.choices[0].message.content,
        'search_queries': ['query1', 'query2'],  # Extract from reasoning
        'approach': 'comprehensive'
    }


def perform_search(queries: List[str]) -> List[Dict[str, Any]]:
    """Stage 2: Execute search queries."""
    # Simulate search (replace with actual search implementation)
    import time
    time.sleep(1)
    
    return [
        {'query': q, 'results': [f'result for {q}']}
        for q in queries
    ]


def perform_analysis(search_results: List[Dict], reasoning: Dict) -> Dict[str, Any]:
    """Stage 3: Analyze search results."""
    from openai import OpenAI
    
    client = OpenAI()
    
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "Analyze the search results."},
            {"role": "user", "content": f"Results: {search_results}"}
        ],
        timeout=60
    )
    
    return {
        'analysis': response.choices[0].message.content,
        'confidence': 0.85,
        'key_findings': ['finding1', 'finding2']
    }


def perform_action(analysis: Dict[str, Any]) -> Dict[str, Any]:
    """Stage 4: Take action based on analysis."""
    # Simulate action (API call, database update, etc.)
    import time
    time.sleep(1)
    
    return {
        'action_taken': 'generated_report',
        'status': 'success'
    }


def perform_reflection(state: Dict[str, Any]) -> Dict[str, Any]:
    """Stage 5: Reflect on the entire process."""
    return {
        'quality_assessment': 'high',
        'improvements': ['More sources needed'],
        'success_rate': 0.9
    }


# API endpoint for resuming workflows
@app.task
def resume_workflow(workflow_id: str) -> Dict[str, Any]:
    """Resume a failed or interrupted workflow from last checkpoint."""
    checkpoint = checkpoint_db.load_checkpoint(workflow_id)
    
    if not checkpoint:
        return {
            'status': 'error',
            'message': f'No checkpoint found for workflow {workflow_id}'
        }
    
    logger.info(f"Resuming workflow {workflow_id} from stage {checkpoint['stage']}")
    
    # Resume execution
    result = execute_checkpointed_workflow.delay(
        workflow_id=workflow_id,
        query=checkpoint['state'].get('query', ''),
        context=checkpoint['state'].get('context', {}),
        resume_from=checkpoint['stage']
    )
    
    return {
        'status': 'resumed',
        'task_id': result.id,
        'resumed_from_stage': checkpoint['stage']
    }

Database Schema for Checkpoints:

-- Create checkpoints table
CREATE TABLE IF NOT EXISTS workflow_checkpoints (
    id SERIAL PRIMARY KEY,
    workflow_id VARCHAR(255) NOT NULL,
    stage VARCHAR(50) NOT NULL,
    state JSONB NOT NULL,
    metadata JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(workflow_id, stage)
);

-- Create indexes for performance
CREATE INDEX idx_workflow_id ON workflow_checkpoints(workflow_id);
CREATE INDEX idx_workflow_stage ON workflow_checkpoints(workflow_id, stage);
CREATE INDEX idx_created_at ON workflow_checkpoints(created_at DESC);

-- Create function to update updated_at automatically
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_workflow_checkpoints_updated_at 
    BEFORE UPDATE ON workflow_checkpoints 
    FOR EACH ROW 
    EXECUTE FUNCTION update_updated_at_column();

4. Scaling & Load Distribution

Horizontal scaling is the backbone of robust agent systems.

With queues:

  • Add more workers = handle more tasks
  • Remove workers = lower costs
  • System auto-balances workloads

Scaling doesn’t require changing the main app.

5. Event-Driven Agent Architectures

5.1 Architecture

Many agent tasks are triggered by:

  • new data arriving
  • changes in the environment
  • user updates
  • periodic schedules (Celery Beat)
  • external webhooks

Message queues make this possible:

  • agents can subscribe to events
  • workflows run asynchronously
  • each agent wakes up only when relevant work arrives
Event-driven architecture diagram showing EventSource triggering three agents through a central Message Queue

Figure 5: Diagram—Event-driven agent pipeline

5.2 Event-Driven Agent System with Webhooks

Key Features Demonstrated:

  • ✅ Event-driven architecture with pub/sub
  • ✅ Webhook endpoints for external integrations
  • ✅ Periodic tasks with Celery Beat
  • ✅ Event routing to appropriate agents
  • ✅ Health monitoring and cleanup
  • ✅ Signature verification for webhooks
  • ✅ Redis-based event bus
# event_driven_agents.py
from celery import Celery
from celery.schedules import crontab
from typing import Dict, Any, List, Callable
import logging
import json
from datetime import datetime
from redis import Redis
from fastapi import FastAPI, Request, BackgroundTasks
import hmac
import hashlib

logger = logging.getLogger(__name__)

app = Celery('event_driven_agents')
api = FastAPI()

# Redis for event pub/sub
redis_client = Redis(host='localhost', port=6379, db=2, decode_responses=True)

# Configure periodic tasks
app.conf.beat_schedule = {
    'monitor-data-sources-every-hour': {
        'task': 'event_driven_agents.monitor_data_sources',
        'schedule': crontab(minute=0),  # Every hour
    },
    'cleanup-old-events-daily': {
        'task': 'event_driven_agents.cleanup_old_events',
        'schedule': crontab(hour=2, minute=0),  # Daily at 2 AM
    },
    'health-check-every-5-minutes': {
        'task': 'event_driven_agents.health_check',
        'schedule': 300.0,  # Every 5 minutes
    },
}


class EventBus:
    """Event bus for publish/subscribe pattern."""
    
    def __init__(self):
        self.redis = redis_client
        self.subscribers: Dict[str, List[Callable]] = {}
    
    def publish(self, event_type: str, data: Dict[str, Any]) -> None:
        """Publish an event to all subscribers."""
        event = {
            'type': event_type,
            'data': data,
            'timestamp': datetime.utcnow().isoformat(),
            'event_id': f"{event_type}_{int(datetime.utcnow().timestamp())}"
        }
        
        # Store event in Redis
        event_key = f"events:{event_type}:{event['event_id']}"
        self.redis.setex(event_key, 86400, json.dumps(event))  # 24 hour TTL
        
        # Publish to channel
        self.redis.publish(f"channel:{event_type}", json.dumps(event))
        
        logger.info(f"Published event: {event_type}")
    
    def subscribe(self, event_type: str, handler: Callable) -> None:
        """Subscribe a handler to an event type."""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
        logger.info(f"Subscribed handler to {event_type}")


event_bus = EventBus()


# Event-triggered agents
@app.task(bind=True)
def agent_on_new_data(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Agent triggered when new data arrives.
    """
    logger.info("Agent 1: Processing new data event")
    
    try:
        data_source = event_data.get('source')
        data_content = event_data.get('content')
        
        # Process the new data
        processed_result = {
            'source': data_source,
            'processed_at': datetime.utcnow().isoformat(),
            'summary': f"Processed data from {data_source}",
            'status': 'success'
        }
        
        # Publish processed event for downstream agents
        event_bus.publish('data_processed', processed_result)
        
        return processed_result
        
    except Exception as e:
        logger.error(f"Agent 1 failed: {e}")
        raise


@app.task(bind=True)
def agent_on_environment_change(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Agent triggered when environment changes.
    """
    logger.info("Agent 2: Responding to environment change")
    
    try:
        change_type = event_data.get('change_type')
        impact = event_data.get('impact')
        
        # Adapt strategy based on change
        adaptation = {
            'change_detected': change_type,
            'adaptation_strategy': f"Adjusted for {change_type}",
            'timestamp': datetime.utcnow().isoformat()
        }
        
        # Notify other systems
        event_bus.publish('agent_adapted', adaptation)
        
        return adaptation
        
    except Exception as e:
        logger.error(f"Agent 2 failed: {e}")
        raise


@app.task(bind=True)
def agent_on_user_update(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Agent triggered when user provides updates.
    """
    logger.info("Agent 3: Processing user update")
    
    try:
        user_id = event_data.get('user_id')
        update_type = event_data.get('update_type')
        
        # Handle user update
        response = {
            'user_id': user_id,
            'acknowledgment': f"Processed {update_type} update",
            'next_action': 'user_notified',
            'timestamp': datetime.utcnow().isoformat()
        }
        
        return response
        
    except Exception as e:
        logger.error(f"Agent 3 failed: {e}")
        raise


# Event router
@app.task
def route_event(event_type: str, event_data: Dict[str, Any]) -> List[str]:
    """
    Route events to appropriate agent handlers.
    """
    logger.info(f"Routing event: {event_type}")
    
    event_handlers = {
        'new_data_arrived': agent_on_new_data,
        'environment_changed': agent_on_environment_change,
        'user_updated': agent_on_user_update,
    }
    
    handler = event_handlers.get(event_type)
    
    if handler:
        # Trigger the appropriate agent asynchronously
        result = handler.delay(event_data)
        return [result.id]
    else:
        logger.warning(f"No handler found for event: {event_type}")
        return []


# Webhook endpoint for external events
@api.post("/webhook/github")
async def github_webhook(request: Request):
    """
    Receive GitHub webhook events and trigger appropriate agents.
    """
    # Verify webhook signature
    signature = request.headers.get('X-Hub-Signature-256')
    if not verify_github_signature(await request.body(), signature):
        return {'error': 'Invalid signature'}, 401
    
    payload = await request.json()
    event_type = request.headers.get('X-GitHub-Event')
    
    logger.info(f"Received GitHub webhook: {event_type}")
    
    # Transform webhook to internal event
    event_data = {
        'source': 'github',
        'event_type': event_type,
        'payload': payload,
        'received_at': datetime.utcnow().isoformat()
    }
    
    # Route to appropriate agent
    route_event.delay('new_data_arrived', event_data)
    
    return {'status': 'accepted'}


@api.post("/webhook/slack")
async def slack_webhook(request: Request):
    """
    Receive Slack events and trigger agents.
    """
    payload = await request.json()
    
    # Handle Slack URL verification
    if payload.get('type') == 'url_verification':
        return {'challenge': payload['challenge']}
    
    event = payload.get('event', {})
    event_type = event.get('type')
    
    logger.info(f"Received Slack event: {event_type}")
    
    # Transform to internal event
    event_data = {
        'source': 'slack',
        'event_type': event_type,
        'user': event.get('user'),
        'text': event.get('text'),
        'channel': event.get('channel')
    }
    
    # Trigger user update agent
    route_event.delay('user_updated', event_data)
    
    return {'status': 'ok'}


@api.post("/webhook/custom")
async def custom_webhook(request: Request):
    """
    Generic webhook endpoint for custom integrations.
    """
    payload = await request.json()
    
    event_type = payload.get('event_type', 'environment_changed')
    event_data = payload.get('data', {})
    
    logger.info(f"Received custom webhook: {event_type}")
    
    # Route to appropriate agent
    task_ids = route_event.delay(event_type, event_data)
    
    return {
        'status': 'accepted',
        'task_ids': task_ids
    }


# Periodic monitoring tasks
@app.task
def monitor_data_sources():
    """
    Periodically check data sources for changes.
    Runs every hour via Celery Beat.
    """
    logger.info("Monitoring data sources for changes")
    
    # Check various data sources
    data_sources = ['database', 'api', 's3_bucket']
    
    for source in data_sources:
        # Simulate checking for changes
        has_changes = check_data_source(source)
        
        if has_changes:
            event_bus.publish('new_data_arrived', {
                'source': source,
                'content': 'New data detected',
                'priority': 'high'
            })


@app.task
def cleanup_old_events():
    """
    Clean up old events from Redis.
    Runs daily at 2 AM.
    """
    logger.info("Cleaning up old events")
    
    # Get all event keys
    pattern = "events:*"
    cursor = 0
    deleted_count = 0
    
    while True:
        cursor, keys = redis_client.scan(
            cursor=cursor,
            match=pattern,
            count=100
        )
        
        for key in keys:
            # Check if event is older than 7 days
            ttl = redis_client.ttl(key)
            if ttl == -1:  # No expiration set
                redis_client.delete(key)
                deleted_count += 1
        
        if cursor == 0:
            break
    
    logger.info(f"Deleted {deleted_count} old events")


@app.task
def health_check():
    """
    Perform health check on all agents.
    Runs every 5 minutes.
    """
    logger.info("Performing health check")
    
    # Check Redis connection
    try:
        redis_client.ping()
        redis_status = 'healthy'
    except Exception:
        redis_status = 'unhealthy'
    
    # Publish health status
    event_bus.publish('health_check_completed', {
        'redis': redis_status,
        'timestamp': datetime.utcnow().isoformat()
    })


# Utility functions
def verify_github_signature(payload: bytes, signature: str) -> bool:
    """Verify GitHub webhook signature."""
    secret = b'your_webhook_secret'
    
    if not signature:
        return False
    
    expected_signature = 'sha256=' + hmac.new(
        secret,
        payload,
        hashlib.sha256
    ).hexdigest()
    
    return hmac.compare_digest(signature, expected_signature)


def check_data_source(source: str) -> bool:
    """Check if a data source has new data."""
    # Implement actual checking logic
    import random
    return random.choice([True, False])

Starting the Event-Driven System:

# Terminal 1: Start Celery worker for event processing
celery -A event_driven_agents worker --loglevel=info -Q default -c 4

# Terminal 2: Start Celery Beat for periodic tasks
celery -A event_driven_agents beat --loglevel=info

# Terminal 3: Start FastAPI webhook server
uvicorn event_driven_agents:api --host 0.0.0.0 --port 8000 --reload

# Terminal 4: Monitor with Flower
celery -A event_driven_agents flower --port=5555

Testing the Event System:

# test_events.py
import requests
import json

# Test custom webhook
response = requests.post(
    'http://localhost:8000/webhook/custom',
    json={
        'event_type': 'environment_changed',
        'data': {
            'change_type': 'api_rate_limit_increased',
            'impact': 'high',
            'details': 'Rate limit increased to 1000/hour'
        }
    }
)

print(f"Response: {response.json()}")

# Test direct event publishing
from event_driven_agents import event_bus

event_bus.publish('new_data_arrived', {
    'source': 'manual_trigger',
    'content': 'Test data',
    'priority': 'low'
})

6. Production Deployment with Docker

docker-compose.yml

# docker-compose.yml
version: '3.8'

services:
  # Redis - Message broker and cache
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 3

  # PostgreSQL - State management
  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_DB: agent_workflows
      POSTGRES_USER: agent_user
      POSTGRES_PASSWORD: secure_password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init_db.sql:/docker-entrypoint-initdb.d/init.sql
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U agent_user"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Celery Worker - Agent execution
  celery_worker:
    build: .
    command: celery -A multi_agent_system worker --loglevel=info -c 4
    depends_on:
      - redis
      - postgres
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/0
      DATABASE_URL: postgresql://agent_user:secure_password@postgres:5432/agent_workflows
      OPENAI_API_KEY: ${OPENAI_API_KEY}
    volumes:
      - ./:/app
    restart: unless-stopped

  # Celery Beat - Periodic tasks
  celery_beat:
    build: .
    command: celery -A event_driven_agents beat --loglevel=info
    depends_on:
      - redis
      - postgres
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/0
    volumes:
      - ./:/app
    restart: unless-stopped

  # Flower - Monitoring dashboard
  flower:
    build: .
    command: celery -A multi_agent_system flower --port=5555
    ports:
      - "5555:5555"
    depends_on:
      - redis
      - celery_worker
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      CELERY_RESULT_BACKEND: redis://redis:6379/0
    restart: unless-stopped

  # FastAPI - Web server for webhooks
  api:
    build: .
    command: uvicorn event_driven_agents:api --host 0.0.0.0 --port 8000
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - postgres
    environment:
      CELERY_BROKER_URL: redis://redis:6379/0
      DATABASE_URL: postgresql://agent_user:secure_password@postgres:5432/agent_workflows
    volumes:
      - ./:/app
    restart: unless-stopped

volumes:
  redis_data:
  postgres_data:

Dockerfile

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Run as non-root user
RUN useradd -m -u 1000 celeryuser && chown -R celeryuser:celeryuser /app
USER celeryuser

CMD ["celery", "-A", "multi_agent_system", "worker", "--loglevel=info"]

requirements.txt

# requirements.txt
celery[redis]==5.3.4
redis==5.0.1
psycopg2-binary==2.9.9
openai==1.3.0
fastapi==0.104.1
uvicorn[standard]==0.24.0
flower==2.0.1
requests==2.31.0
beautifulsoup4==4.12.2
pydantic==2.5.0
python-multipart==0.0.6

Start the entire system:

# Build and start all services
docker-compose up --build -d

# View logs
docker-compose logs -f celery_worker

# Scale workers
docker-compose up -d --scale celery_worker=5

# Stop all services
docker-compose down

7. Frequently Asked Questions

7.1 What is asynchronous processing in agentic AI?

Asynchronous processing in agentic AI allows autonomous agents to execute tasks without blocking the main application thread. Instead of waiting for long-running operations like LLM calls, tool invocations, or web scraping to complete, the system places these tasks in a queue and immediately returns control to the user. Worker processes handle the actual execution independently, enabling the system to remain responsive while agents perform complex, time-consuming operations in the background.

7.2 Why do AI agents need message queues?

AI agents need message queues to handle three critical challenges: unpredictable timing (agent operations can take seconds to minutes), variable workloads (multiple agents may need resources simultaneously), and coordination complexity (agents must communicate without conflicts). Message queues act as buffers that throttle requests, prioritize tasks, enable retry logic, and distribute workload across multiple workers, preventing system overload and resource contention.

7.3 What’s the difference between synchronous and asynchronous agent execution?

In synchronous execution, the system waits for each agent operation to complete before proceeding, causing user requests to block, threads to get stuck, and timeouts to occur frequently. In asynchronous execution, the system immediately acknowledges requests, places tasks in a queue with a tracking ID, and allows worker processes to handle operations independently. This decoupling means failures don’t crash the main application, tasks can be retried automatically, and the system scales by simply adding more workers.

7.4 Which message broker is best for agentic AI systems?

The choice depends on your requirements:

  • Redis – Best for simple, high-speed queuing with low latency; ideal for prototypes and moderate-scale systems
  • RabbitMQ – Excellent for complex routing, reliable delivery guarantees, and fine-grained control; suited for enterprise production systems
  • Apache Kafka – Optimal for event streaming, high-throughput scenarios, and when you need message replay capabilities
  • AWS SQS – Best for cloud-native applications requiring minimal infrastructure management

Most production agentic AI systems start with Redis for simplicity and scale to RabbitMQ or Kafka as requirements grow.

7.5 How do queues enable multi-agent coordination?

Queues enable multi-agent coordination by providing a centralized task distribution mechanism. Instead of agents competing directly for resources like API rate limits, database connections, or external services, they submit work to specialized queues. Workers pull tasks at a controlled rate, preventing overwhelming downstream services. Different agent types (research, scraper, reviewer, planner) can have dedicated queues with different priorities, and the system automatically load-balances work across available workers.

7.6 What happens if an agent task fails in a queue-based system?

Queue-based systems provide robust failure handling through several mechanisms:

  1. Automatic retries – Failed tasks return to the queue with exponential backoff
  2. Dead letter queues – Tasks failing repeatedly move to a separate queue for investigation
  3. State persistence – Intermediate results are checkpointed, so work doesn’t need to restart from scratch
  4. Circuit breakers – Repeated failures can temporarily disable problematic agents
  5. Monitoring – Failed tasks generate alerts for investigation

This graceful degradation ensures one failing agent doesn’t bring down the entire system.

7.7 How does async processing improve agent scalability?

Async processing enables horizontal scalability – the easiest and most cost-effective scaling strategy. When demand increases, you simply add more worker processes without modifying application code. The queue automatically distributes work across all available workers. When demand decreases, you reduce worker count to save costs. This elastic scaling is impossible with synchronous architectures, where each additional concurrent user requires dedicated thread resources that remain blocked during long operations.

7.8 Can I use async processing for real-time agent interactions?

Yes, but with careful architecture. For truly real-time interactions (sub-second responses), use async processing for heavy operations while keeping lightweight responses synchronous. Implement streaming responses where the agent immediately returns a connection, then streams results as they become available. Use WebSockets or Server-Sent Events (SSE) to push updates to users. Reserve synchronous execution only for simple queries that complete in milliseconds, and use queues for everything else.

7.9 What tools do I need to implement async agent processing?

A production-ready async agent system typically requires:

Task Queue Framework:

  • Celery (Python) – Most popular, mature ecosystem
  • RQ (Redis Queue) – Simpler alternative for smaller projects
  • Dramatiq – Modern alternative with better defaults

Message Broker:

  • Redis – Fast, simple setup
  • RabbitMQ – Enterprise-grade reliability
  • AWS SQS – Cloud-native managed service

State Management:

  • PostgreSQL – Structured data and ACID guarantees
  • MongoDB – Flexible schema for agent states
  • Redis – Fast intermediate state storage

Monitoring:

  • Flower – Celery monitoring dashboard
  • Prometheus + Grafana – Metrics and alerting
  • CloudWatch – AWS-native monitoring

7.10 How do I handle long-running agent workflows with checkpoints?

Implement checkpointing by breaking workflows into discrete steps and persisting state after each step:

  1. Define stages – Break workflows into logical units (Think → Search → Analyze → Act → Reflect)
  2. Save intermediate state – Store results and context after each stage completion
  3. Use unique task IDs – Track workflow progress with persistent identifiers
  4. Implement resume logic – On failure, check last completed stage and continue from there
  5. Set timeouts per stage – Prevent individual steps from hanging indefinitely
  6. Store in durable storage – Use databases, not just in-memory caches

This approach means a failure at step 4 doesn’t require restarting steps 1-3, saving time and API costs.

8. Conclusion: Async + Queues = Agentic AI Superpower

Asynchronous processing and message queues are not optional in agentic systems—they are foundational.

They enable:

✔ Non-blocking agent tasks
✔ Multi-agent concurrency
✔ Reliable tool execution
✔ State persistence
✔ Event-driven autonomy
✔ Horizontal scaling
✔ Decoupled architecture

In short:

Without async and queues, autonomous AI would collapse under its own complexity. They make agentic systems resilient, scalable, and production-grade.

Leave a Comment

Your email address will not be published. Required fields are marked *