Most AI agent tutorials give you a Jupyter notebook that works once. Then you try to deploy it, and everything breaks. The LLM times out. State vanishes between requests. Concurrent users step on each other's conversations. Your monitoring is a print statement. Your error handling is crossing your fingers.
This isn't about missing best practices. It's about the fundamental mismatch between stateful, long-running agent workflows and stateless HTTP request-response cycles. Agents need memory across turns, resumable execution after failures, and observability into their decision chains. HTTP APIs expect millisecond responses, fail-fast semantics, and clean separation between requests.
The fastapi-langgraph-agent-production-ready-template by Wassim EL BAKKOURI solves this by implementing actual production patterns: PostgreSQL-backed state persistence, structured observability with Langfuse, automatic retry logic with exponential backoff, rate limiting per endpoint, proper async I/O with uvloop, and environment-specific configuration. It's not just example code-it's the infrastructure you need before writing your first agent node.
The Mental Model: Agents Are Not Request Handlers
Think about a typical REST API. A request comes in, you process it synchronously, return a response, and forget everything. State lives in a database, queried per request. This works because each request is independent.
Agents break this model in three ways:
State accumulation across turns. An agent doesn't just respond to the current input. It maintains conversation history, tracks tool use, accumulates context. Each turn builds on the last. You can't treat turns as independent requests without losing the thread.
Variable execution time. Some agent turns complete in 200ms. Others take 30 seconds because the LLM is reasoning, calling APIs, or waiting for rate limits. You can't block the main thread. You can't hold HTTP connections open that long. You need async execution with resumable state.
Failure recovery semantics. When a tool call fails mid-turn, you don't want to restart from the beginning. You want to resume from the last successful checkpoint. When the LLM times out, you want automatic retry with exponential backoff. When concurrent requests hit the same thread, you want conflict resolution, not race conditions.
The correct abstraction is checkpointed state machines with async execution. Each agent is a LangGraph state machine. Each node transformation is a checkpoint. State persists in PostgreSQL between nodes. HTTP handlers become thin wrappers that start execution, return immediately, and let clients poll or stream results.
This template implements this pattern correctly. The LangGraph agent owns state. PostgreSQL stores checkpoints. FastAPI provides the async HTTP layer. Langfuse gives you observability into the state transitions. Everything else—auth, rate limiting, retry logic—layers on top without breaking the core model.
Architecture: Where State and Control Flow Actually Live
The system has five layers, each with clear responsibilities:
Architecture: Where State and Control Flow Actually Live
HTTP Layer (FastAPI + Middleware)
FastAPI endpoints in app/api/v1/ handle HTTP concerns: request validation with Pydantic, authentication via JWT, rate limiting with SlowAPI, CORS headers, and error responses. They don't own business logic. They transform HTTP into Python function calls and Python results into HTTP.
The middleware stack adds cross-cutting concerns: logging context (request_id, user_id, session_id), Prometheus metrics (request count, latency, status codes), and rate limit enforcement. Each middleware is a layer that wraps the core handler, executing before and after the actual logic.
Critical detail: endpoints are async. They use async def and await because the LLM service and database are I/O-bound. Blocking would serialize requests. Async lets the event loop handle thousands of concurrent connections on a single thread.
Agent Layer (LangGraph StateGraph)
The agent is a LangGraph StateGraph defined in app/core/langgraph/graph.py. It's a directed graph where nodes are Python functions that transform state and edges define transitions. The graph compiles into an executable that processes inputs, updates state, calls tools, invokes LLMs, and produces outputs.
State is a Pydantic model defined in app/schemas/graph.py. Each node gets the current state, returns updates, and the graph merges them. State flows through the graph, accumulating information. The final state is the agent's output.
The checkpointer is where persistence happens. When compiling the graph, you pass checkpointer=PostgresSaver(...). After each node execution, LangGraph writes a checkpoint to PostgreSQL. If execution fails, you can resume from the last checkpoint. If a user returns hours later, you load their thread and continue.
from langgraph.graph import StateGraphfrom langgraph.checkpoint.postgres import AsyncPostgresSaver# Define state schemaclass AgentState(BaseModel): messages: List[Message] context: Dict[str, Any] tool_results: List[ToolResult]# Build graphbuilder = StateGraph(AgentState)builder.add_node("reasoning", reasoning_node)builder.add_node("tool_execution", tool_node)builder.add_edge("reasoning", "tool_execution")# Compile with checkpointercheckpointer = AsyncPostgresSaver.from_conn_string(DATABASE_URL)await checkpointer.setup() # Creates tablesgraph = builder.compile(checkpointer=checkpointer)
LLM Service Layer
The LLM service in app/services/llm.py wraps OpenAI API calls with production patterns:
Automatic retry with tenacity. LLM APIs fail. They timeout. They rate limit. They have transient errors. The service retries with exponential backoff: 3 attempts, starting at 1 second, doubling each time. It only retries on specific errors (timeouts, 429s, 5xxs), not on validation errors or auth failures.
Model configuration management. Different models have different parameters. GPT-4o supports standard chat. GPT-5 supports reasoning effort levels (low, medium, high). The service abstracts this, exposing a clean interface while handling model-specific quirks.
Structured logging. Every LLM call logs the model, token count, latency, and result. When debugging production issues, you need to know which model was used, how long it took, and what it returned. This is built in.
from tenacity import retry, stop_after_attempt, wait_exponentialclass LLMService: @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type((Timeout, RateLimitError)) ) async def generate(self, messages: List[Dict], model: str = "gpt-4o"): logger.info("llm_request", model=model, message_count=len(messages)) start = time.time() response = await self.client.chat.completions.create( model=model, messages=messages, temperature=self.config.temperature, max_tokens=self.config.max_tokens ) latency = time.time() - start logger.info("llm_response", tokens=response.usage.total_tokens, latency=latency) return response.choices[0].message.content
Persistence Layer (PostgreSQL + pgvector)
State persists in three tables:
Checkpoints table (managed by LangGraph): Stores graph state snapshots. Each row is a checkpoint: thread_id, checkpoint_id, state blob (JSONB), timestamp. When you invoke a graph with a thread_id, LangGraph loads the latest checkpoint for that thread and resumes from there.
Users and sessions tables (managed by SQLModel): Store authentication state. Users have hashed passwords (bcrypt). Sessions have JWT tokens with expiration. These tables are separate from agent state because they're different concerns.
Memory vectors table (managed by mem0.ai): Stores long-term semantic memory. Each memory is embedded with OpenAI's text-embedding-3-small model and stored as a pgvector. When an agent needs context, it does a vector similarity search, retrieves relevant memories, and includes them in the prompt.
The key pattern: separate tables for separate concerns. Don't try to store everything in checkpoints. Checkpoints are for resumable execution. User data is for auth. Memory is for semantic context. Each has different query patterns, different retention policies, different scaling requirements.
Observability Layer (Langfuse + Prometheus)
Langfuse traces every agent execution. When you invoke a graph, the template automatically creates a Langfuse trace. Each node execution is a span. Each LLM call is annotated with prompt, completion, tokens, latency. Each tool call logs input and output.
This gives you a UI where you can see: What path did the agent take through the graph? Which tools were called? What were the LLM responses? How long did each step take? Where did it fail?
Prometheus exposes metrics at /metrics: request count by endpoint, latency histograms, error rates, rate limit hits, database connection pool size. Grafana dashboards (in grafana/dashboards/) visualize these in production.
The combination matters. Langfuse answers "what is this agent doing?" Prometheus answers "is the system healthy?" You need both.
Implementation: Production Patterns in Practice
State Persistence with PostgreSQL Checkpointer
The naive approach is to serialize state to JSON and store it in a session. This breaks when state is large (multi-turn conversations), when you need resumability (failures mid-turn), or when you want to inspect historical state (debugging).
LangGraph's PostgreSQL checkpointer solves this correctly:
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaverfrom psycopg import AsyncConnectionfrom psycopg.rows import dict_row# Create connection with required settingsasync def get_checkpointer(): conn = await AsyncConnection.connect( DATABASE_URL, autocommit=True, # Required for .setup() to persist tables row_factory=dict_row # Required for dictionary access ) checkpointer = AsyncPostgresSaver(conn) await checkpointer.setup() # Creates checkpoint tables return checkpointer# Compile graph with checkpointergraph = builder.compile(checkpointer=await get_checkpointer())# Invoke with thread_id for state persistenceconfig = {"configurable": {"thread_id": f"user_{user_id}_session_{session_id}"}}result = await graph.ainvoke(input_data, config)
Critical details:
autocommit=True is required. Without it, .setup() won't actually create the tables. The transaction won't commit. Your checkpoints will silently fail to persist.
row_factory=dict_row is required. The checkpointer accesses database rows like dictionaries (row["column_name"]). Without this, you get TypeError: tuple indices must be integers.
Thread IDs should be namespaced by user and session: user_{user_id}_session_{session_id}. This prevents different users from accessing each other's state and allows multiple concurrent sessions per user.
Streaming Responses for Long-Running Agents
Agents can take 10-30 seconds to complete. Blocking that long is bad UX. The template implements streaming:
@router.post("/chat/stream")async def stream_chat( request: ChatRequest, current_user: User = Depends(get_current_user)): async def generate(): config = { "configurable": { "thread_id": f"user_{current_user.id}_session_{request.session_id}" } } async for chunk in graph.astream(request.message, config): if "messages" in chunk: # Extract latest message message = chunk["messages"][-1] yield f"data: {json.dumps(message.dict())}\n\n" return StreamingResponse(generate(), media_type="text/event-stream")
This uses Server-Sent Events (SSE). The client opens a persistent connection. The server sends events as they happen. The client receives incremental updates.
Why this matters: users see the agent thinking in real-time. If it's stuck waiting for an API, they see that. If it's calling multiple tools, they see each result. The feedback loop is tighter.
Automatic Retry Logic for LLM Reliability
LLM APIs fail in production. OpenAI has rate limits. Anthropic has timeouts. Azure has regional outages. The template handles this with tenacity:
from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type)from openai import RateLimitError, Timeout, APIErrorclass LLMService: @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type((RateLimitError, Timeout, APIError)), before_sleep=log_retry_attempt ) async def generate(self, messages: List[Dict], model: str): # Actual LLM call passdef log_retry_attempt(retry_state): logger.warning( "llm_retry", attempt=retry_state.attempt_number, wait_time=retry_state.next_action.sleep )
The retry logic is:
- Stop after 3 attempts (failures after that are real)
- Wait 1s, then 2s, then 4s (exponential backoff)
- Only retry on transient errors (rate limits, timeouts, API errors)
- Log every retry with attempt number and wait time
This makes LLM calls resilient without hiding real errors. If your API key is invalid, it fails immediately. If OpenAI is overloaded, it retries and succeeds.
Memory Management with mem0.ai and pgvector
Agents need long-term memory. Not just conversation history, but semantic memory: "user prefers Python", "user is building a chatbot", "user has budget constraints". This template uses mem0.ai:
from mem0 import MemoryClientclass MemoryService: def __init__(self): self.client = MemoryClient() self.config = { "collection_name": "agent_memories", "llm_model": "gpt-4o-mini", "embedder_model": "text-embedding-3-small" } async def add_memory(self, user_id: str, content: str): """Extract and store memories from conversation content""" await self.client.add( content, user_id=user_id, metadata={"timestamp": datetime.now().isoformat()} ) async def search_memory(self, user_id: str, query: str, limit: int = 5): """Retrieve relevant memories using semantic search""" results = await self.client.search( query, user_id=user_id, limit=limit ) return [r["memory"] for r in results]
mem0.ai handles the complexity: extracting facts from text, embedding them, storing in pgvector, and searching by similarity. The agent just calls add_memory() after each turn and search_memory() before generating responses.
This scales better than stuffing everything in the prompt. You can have thousands of memories. Search returns only the relevant 3-5. The prompt stays small. The context stays focused.
Environment-Specific Configuration
The template supports three environments: development, staging, production. Each has its own .env file:
.env.development # Local development with debug logging.env.staging # Staging with production-like settings.env.production # Production with strict security
Configuration loads automatically based on APP_ENV:
from pydantic_settings import BaseSettingsfrom functools import lru_cacheclass Settings(BaseSettings): app_env: str = "development" project_name: str debug: bool = False # Database postgres_host: str postgres_port: int = 5432 postgres_db: str postgres_user: str postgres_password: str # LLM openai_api_key: str default_llm_model: str = "gpt-4o" max_tokens: int = 4096 # Observability langfuse_public_key: str langfuse_secret_key: str class Config: env_file = f".env.{os.getenv('APP_ENV', 'development')}" env_file_encoding = "utf-8"@lru_cache()def get_settings() -> Settings: return Settings()
The pattern: define all config as Pydantic models with types and defaults. Load from environment-specific .env files. Cache the instance with lru_cache() so it's a singleton. Inject via FastAPI dependencies.
This gives you: type safety, validation, environment isolation, easy testing. You can run tests with .env.test, staging with .env.staging, production with .env.production, all from the same codebase.
Rate Limiting Per Endpoint
AI agents are expensive. One user spamming requests can burn through your OpenAI credits. The template implements per-endpoint rate limiting:
from slowapi import Limiterfrom slowapi.util import get_remote_addresslimiter = Limiter(key_func=get_remote_address)@router.post("/chat")@limiter.limit("10/minute")async def chat( request: ChatRequest, current_user: User = Depends(get_current_user)): # Rate limited to 10 requests per minute per IP pass@router.post("/chat/stream")@limiter.limit("5/minute") # Stricter limit for streamingasync def stream_chat(...): pass
The rate limiter uses Redis (or in-memory storage for development) to track request counts. When a client exceeds the limit, it returns HTTP 429. The client should back off and retry.
You can configure limits globally or per-endpoint. Streaming endpoints get stricter limits because they hold connections longer. Public endpoints get IP-based limiting. Authenticated endpoints get user-based limiting.
Structured Logging with Request Context
The template uses structlog for structured logging with automatic context binding:
import structlogfrom contextvars import ContextVar# Context variables for request trackingrequest_id_var: ContextVar[str] = ContextVar("request_id", default="")user_id_var: ContextVar[str] = ContextVar("user_id", default="")# Configure structlogstructlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer() ])logger = structlog.get_logger()# Middleware binds context@app.middleware("http")async def logging_middleware(request: Request, call_next): request_id = str(uuid.uuid4()) request_id_var.set(request_id) structlog.contextvars.bind_contextvars( request_id=request_id, path=request.url.path, method=request.method ) response = await call_next(request) logger.info( "request_completed", status_code=response.status_code, duration=time.time() - start_time ) structlog.contextvars.clear_contextvars() return response
Every log line includes: request_id, user_id (if authenticated), timestamp, log_level, event name, and any additional fields. Logs are JSON in production, colored console in development.
This makes debugging production issues tractable. You can grep for a request_id and see everything that happened during that request. You can filter by user_id to see a specific user's activity. You can parse JSON logs and aggregate in your logging system.
Model Evaluation Framework
The template includes an evaluation framework in evals/ that integrates with Langfuse:
from evals.evaluator import AgentEvaluator# Initialize evaluatorevaluator = AgentEvaluator( langfuse_client=langfuse, metrics_dir="evals/metrics/prompts")# Run evaluation on recent tracesresults = await evaluator.evaluate_recent_traces( hours=24, metrics=["accuracy", "safety", "coherence"])# Generate reportreport = { "total_traces": len(results), "success_rate": sum(r["passed"] for r in results) / len(results), "metrics": { metric: { "avg_score": mean(r["scores"][metric] for r in results), "failures": [r for r in results if r["scores"][metric] < threshold] } for metric in metrics }}
The evaluator:
- Fetches traces from Langfuse for the specified time period
- Applies evaluation metrics defined in markdown files
- Scores each trace on multiple dimensions
- Generates detailed JSON reports with success/failure analysis
Metrics are defined as markdown prompts in evals/metrics/prompts/. Each metric gets an LLM call that scores the trace. This lets you track quality over time and catch regressions.
Pitfalls and Failure Modes
Checkpoint Table Creation Race Condition
What happens: You deploy to production. Multiple pods start simultaneously. Each calls checkpointer.setup(). Some fail with "table already exists" errors. Some succeed but create duplicate tables. State corruption follows.
Why: checkpointer.setup() is not idempotent by default. If run concurrently, it races to create tables. There's no locking.
How to detect: Logs show ERROR: relation "checkpoints" already exists or state mysteriously disappears between requests.
How to fix: Run .setup() as a one-time migration, not in application startup. Add it to your deployment pipeline:
# migrations/001_create_checkpoint_tables.pyasync def migrate(): checkpointer = await get_checkpointer() await checkpointer.setup() # Only run once, manually or in CI/CD
In production, handle table creation separately from application deployment. Use database migration tools. Don't rely on application code to create schema.
Connection Pool Exhaustion with Long Conversations
What happens: Agents with 50+ turn conversations start timing out. Database connection pool is exhausted. New requests hang waiting for connections.
Why: The default PostgreSQL checkpointer holds a connection for the entire graph execution. Long conversations hold connections for minutes. With 10 connections in the pool and 20 concurrent long conversations, you deadlock.
How to detect: Logs show psycopg.pool.PoolTimeout: timeout waiting for connection. Prometheus shows connection pool at max capacity.
How to fix: Increase pool size or reduce connection duration:
# Increase pool sizeconn_pool = AsyncConnectionPool( DATABASE_URL, min_size=5, max_size=50 # Up from default 10)# Or: Use connection-per-checkpoint instead of connection-per-run# This closes the connection after each checkpoint writecheckpointer = AsyncPostgresSaver(conn_pool, connection_per_operation=True)
Better: for very long conversations, store large data (file uploads, large tool results) outside checkpoints. Keep checkpoints small and fast.
Memory Vector Search Returns Irrelevant Results
What happens: Agent retrieves memories that don't match the conversation context. Responses are confused or reference wrong user data.
Why: Vector similarity is semantic, not exact. "How do I deploy to AWS?" might retrieve memories about "Azure deployment" if the embedding models consider them similar. Also, you're searching across all memories without filtering by recency or importance.
How to detect: Users report the agent "forgetting" recent information or mixing up context from old conversations.
How to fix: Add metadata filtering and recency weighting:
async def search_memory( self, user_id: str, query: str, limit: int = 5, recency_weight: float = 0.3): # Search with metadata filters results = await self.client.search( query, user_id=user_id, limit=limit * 2, # Retrieve more for reranking filters={ "timestamp": {"$gte": datetime.now() - timedelta(days=30)} } ) # Re-rank by combining similarity and recency for r in results: age_days = (datetime.now() - r["timestamp"]).days recency_score = 1.0 / (1.0 + age_days) r["score"] = ( (1 - recency_weight) * r["similarity"] + recency_weight * recency_score ) # Return top results after reranking return sorted(results, key=lambda r: r["score"], reverse=True)[:limit]
Also: periodically prune old, low-importance memories. Store importance scores. Weight recent + important higher than old + trivial.
Rate Limit Bypass with Thread ID Manipulation
What happens: A malicious user discovers they can bypass rate limits by changing thread IDs. They spam different thread IDs to avoid per-thread limiting.
Why: Rate limiting is based on IP + authenticated user, but not tied to thread creation. An attacker can create unlimited threads, each with its own rate limit counter.
How to detect: Prometheus shows high thread creation rate from single users. Database grows rapidly with orphaned threads.
How to fix: Limit thread creation per user:
# In database serviceasync def create_thread(self, user_id: str, session_id: str): # Check user's active thread count active_count = await self.db.count_active_threads(user_id) if active_count > MAX_THREADS_PER_USER: raise HTTPException( status_code=429, detail=f"Thread limit exceeded: {active_count}/{MAX_THREADS_PER_USER}" ) # Create thread only if under limit thread_id = f"user_{user_id}_session_{session_id}" await self.db.create_thread(thread_id, user_id) return thread_id
Also: implement thread expiration. After 24 hours of inactivity, mark threads as archived. Clean up old threads periodically.
Silent Failures in Async Tool Calls
What happens: Agent calls multiple tools in parallel. Some succeed, some fail. The agent continues with partial results without realizing failures occurred. Responses are incomplete or incorrect.
Why: Async error handling is tricky. If you await asyncio.gather(*tool_calls, return_exceptions=True), exceptions become return values. If you don't check for exceptions in results, you treat them as successful responses.
How to detect: Agent responses reference "the result" when no result exists. Logs show exceptions but execution continues normally.
How to fix: Explicitly handle exceptions in gathered results:
async def call_tools_parallel(self, tools: List[Tool], args: List[Dict]): tasks = [tool.execute(arg) for tool, arg in zip(tools, args)] results = await asyncio.gather(*tasks, return_exceptions=True) # Check for exceptions successful = [] failed = [] for i, result in enumerate(results): if isinstance(result, Exception): logger.error("tool_execution_failed", tool=tools[i].name, error=str(result)) failed.append(tools[i].name) else: successful.append(result) # Decide how to handle failures if failed: if len(failed) == len(tools): # All failed - abort raise ToolExecutionError(f"All tools failed: {failed}") else: # Partial failure - log and continue with successful results logger.warning("partial_tool_failure", failed=failed, successful=len(successful)) return successful
Better: make tools idempotent and retryable. If a tool fails, retry it individually. If it fails consistently, exclude it from results and inform the user.
LLM Cost Explosion from Retry Loops
What happens: Production costs spike. Logs show the agent making 50+ LLM calls for a single user request. No infinite loop in code, but retry logic creates exponential blowup.
Why: Each agent turn calls the LLM. If tools return errors, the agent retries the turn. If the LLM hallucinates tool names, the turn fails and retries. With 3 retries per turn and 5 turns, that's 3^5 = 243 LLM calls.
How to detect: Langfuse traces show deep retry chains. OpenAI bills are 10x expected. Response times are 30+ seconds.
How to fix: Limit total LLM calls per request:
class LLMService: def __init__(self): self.call_count_per_request: Dict[str, int] = {} self.max_calls_per_request = 20 async def generate(self, messages: List[Dict], request_id: str): # Track calls per request count = self.call_count_per_request.get(request_id, 0) + 1 self.call_count_per_request[request_id] = count if count > self.max_calls_per_request: logger.error( "llm_call_limit_exceeded", request_id=request_id, count=count ) raise HTTPException( status_code=500, detail="Request exceeded LLM call limit" ) # Make actual call return await self._generate_internal(messages)
Also: add circuit breakers. If a user's requests consistently hit retry limits, temporarily block them. If a tool consistently fails, disable it automatically.
Summary and Next Steps
This template solves the production gap between AI agent demos and deployable services. The core patterns are:
State persistence with PostgreSQL checkpointers: Agents maintain state across requests, failures, and deployments. LangGraph's checkpoint system makes this automatic and correct.
Async I/O with uvloop: Agents spend most time waiting for LLM APIs and external tools. Async execution handles thousands of concurrent requests on a single thread.
Structured observability: Langfuse traces give you visibility into agent decision chains. Prometheus metrics give you system health. Structured logs tie it together.
Retry and rate limiting: LLM APIs fail. Users abuse endpoints. Automatic retries with exponential backoff handle the first. Per-endpoint rate limits handle the second.
Environment-specific configuration: Development, staging, and production have different requirements. Separate config files with Pydantic validation handle this cleanly.
To build on this foundation:
-
Add custom tools in
app/core/langgraph/tools.py. Each tool is a function that takes state and returns results. Register it with the LangGraph graph. -
Implement custom agent logic in
app/core/langgraph/graph.py. Define nodes for reasoning, planning, execution. Connect them with edges based on state conditions. -
Extend evaluation metrics in
evals/metrics/prompts/. Define new metrics as markdown files. The evaluator automatically discovers and applies them. -
Scale horizontally by deploying multiple instances behind a load balancer. State persists in PostgreSQL. Stateless instances can scale independently.
-
Add background jobs for long-running tasks. Use Celery or similar to offload expensive operations. Store results in the database for later retrieval.
The template gives you infrastructure, not business logic. Your agent's capabilities come from the tools you integrate and the graph you define. But the production concerns—state management, error handling, observability, scaling-are handled.
Related Articles
- Building Production-Ready AI Agents with LangGraph: A Developer's Guide
- Building Production-Ready Agentic AI: The Infrastructure Nobody Talks About
- Agent Building Blocks: Build Production-Ready AI Agents with LangChain
Follow for more insights on Agentic AI systems, security, and practical AI engineering: