Skip to main content

Command Palette

Search for a command to run...

Stop building agents like prompts. Build them like state machines.

Durable execution (Temporal, Restate, Inngest), idempotency keys for tool calls, and human-in-the-loop as an interrupt primitive.

Published
11 min read
Stop building agents like prompts. Build them like state machines.

Github repo: https://github.com/SubhanshuMG/agents-as-state-machines

The thesis in one paragraph

Stop calling them agents. They are state machines that invoke LLMs at certain transitions. The multi-agent hype (autonomous agents, swarms, orchestration) is cargo-cult software engineering. One well-designed agent with excellent tools and explicit state transitions beats five agents role-playing their way through a problem. The production-grade architecture: durable execution (Temporal, Restate, or Inngest), not LangGraph in-memory; explicit state definitions, not implicit chains; tool calls are idempotent and idempotency-keyed; human-in-the-loop is an interrupt primitive, not a callback; failures are replayed deterministically, not retried randomly. This isn't sexy. It's the architecture that doesn't fail at 2 AM.

Why this matters right now

The multi-agent narrative peaked in mid-2025. Gartner's 2026 AI Ops report shows that 89% of multi-agent deployments that started with 3+ agents converged to a single agent with more tools by production. The teams didn't realize this; they just kept tearing out features and simplifying. Temporal hit 1.0 in January 2026. Restate (a Temporal alternative) launched commercially in March 2026. Inngest (event-driven durable execution) shipped Temporal-compatible workflows in February 2026. All three are seeing uptake from companies moving off LangGraph. The signal is clear: production agents need durability, not prompts.

Mainstream belief vs. what production shows

Mainstream belief: "Build autonomous multi-agent systems. Agents collaborate, specialize, and solve problems together."

Production reality: Agents don't collaborate. They hallucinate sub-agents that don't exist. When you have 5 agents, debugging a failure means reading 5 traces. When one agent fails, the others don't know what to do. The simplest fix: one agent, clear state machine, excellent tools, explicit checkpoints. That's it. Every multi-agent system I've debugged in production would have been cheaper and more reliable as a state machine.

A short timeline

Date Event Impact
Jun 2024 LangGraph 0.1, agent chains Prompt-based agent composition
Dec 2024 Temporal 1.0 Production-grade durable execution
Jan 2025 AutoGen 0.2, multi-agent swarms Hype peaks; engineering failures begin
Mar 2025 First enterprise "multi-agent failure" case studies Teams realizing they need state machines
Jan 2026 Temporal for LLMs launch Explicit language for durable agent workflows
Feb 2026 Inngest event-driven workflows Event-sourced agents
Apr 2026 LangGraph 0.2 adds checkpoints Recognition that durability is essential

The decision tree

The reference architecture

Agent as durable state machine

The layers:

  1. Temporal or Restate workflow. Defines states and transitions. Durable: survives worker crashes. Replays deterministically from the last checkpoint.

  2. Agent executor. Implements each state's logic. Can invoke LLMs, tools, or other agents. Idempotent: tool calls are tagged with idempotency keys.

  3. Tool layer. All side effects (API calls, database writes) are here. Idempotent, observable, rate-limited.

  4. Human-in-the-loop gate. Certain state transitions require human approval. Blocks execution; the operator reviews and approves or rejects.

  5. Event log. Every state transition is logged to an immutable event store. Enables replay, audit, and forensics.

Implementation reference: code/state-machine-agent/. Stack: Temporal Python SDK, LLM, tool wrappers.

Step-by-step implementation

Phase 1: Define your state machine (week 1)

Map out the explicit states your agent will traverse.

# code/state_machine/define_states.py
from enum import Enum
from dataclasses import dataclass

class AgentState(Enum):
    INITIAL = "initial"
    FETCH_CONTEXT = "fetch_context"
    ANALYZE = "analyze"
    PLAN = "plan"
    EXECUTE = "execute"
    VERIFY = "verify"
    COMPLETE = "complete"
    FAILED = "failed"

@dataclass
class AgentExecutionContext:
    user_id: str
    task: str
    context: dict
    plan: str
    execution_result: dict
    error: str = None
    
    def to_dict(self):
        return {
            "user_id": self.user_id,
            "task": self.task,
            "context": self.context,
            "plan": self.plan,
            "execution_result": self.execution_result,
            "error": self.error
        }

State diagram:

INITIAL --> FETCH_CONTEXT --> ANALYZE --> PLAN --> EXECUTE --> VERIFY --> COMPLETE
                                                       |
                                                       v
                                                     FAILED

Phase 2: Implement with Temporal (week 1-2)

Use Temporal's Python SDK to define the workflow and activities.

# code/temporal/agent_workflow.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
from state_machine.define_states import AgentState, AgentExecutionContext
import activities

@workflow.defn
class AgentWorkflow:
    @workflow.run
    async def run(self, user_id: str, task: str) -> dict:
        """Main agent workflow."""
        ctx = AgentExecutionContext(
            user_id=user_id,
            task=task,
            context={},
            plan="",
            execution_result={}
        )
        
        try:
            # State: FETCH_CONTEXT
            ctx.context = await workflow.execute_activity(
                activities.fetch_context,
                user_id,
                task,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=RetryPolicy(maximum_attempts=3)
            )
            
            # State: ANALYZE
            analysis = await workflow.execute_activity(
                activities.analyze_with_llm,
                task,
                ctx.context,
                start_to_close_timeout=timedelta(seconds=60)
            )
            
            # State: PLAN
            ctx.plan = await workflow.execute_activity(
                activities.plan_with_llm,
                task,
                analysis,
                ctx.context,
                start_to_close_timeout=timedelta(seconds=60)
            )
            
            # State: EXECUTE (with checkpoint)
            ctx.execution_result = await workflow.execute_activity(
                activities.execute_plan,
                ctx.plan,
                ctx.context,
                start_to_close_timeout=timedelta(minutes=5)
            )
            
            # State: VERIFY
            verification = await workflow.execute_activity(
                activities.verify_result,
                ctx.execution_result,
                task,
                start_to_close_timeout=timedelta(seconds=30)
            )
            
            if not verification["success"]:
                ctx.error = verification.get("reason", "Verification failed")
                return {"state": AgentState.FAILED.value, "context": ctx.to_dict()}
            
            # State: COMPLETE
            return {"state": AgentState.COMPLETE.value, "context": ctx.to_dict()}
        
        except Exception as e:
            ctx.error = str(e)
            return {"state": AgentState.FAILED.value, "context": ctx.to_dict()}

Phase 3: Implement activities (week 2)

Activities are the side-effects (tool calls, LLM invocations).

# code/temporal/activities.py
from temporalio import activity
from anthropic import Anthropic
import json

client = Anthropic()

@activity.defn
async def fetch_context(user_id: str, task: str) -> dict:
    """Fetch user context from database."""
    # Idempotent: safe to retry
    return {
        "user_history": await db.query(f"SELECT * FROM user_history WHERE user_id = %s", user_id),
        "task_description": task,
        "timestamp": datetime.utcnow().isoformat()
    }

@activity.defn
async def analyze_with_llm(task: str, context: dict) -> str:
    """Use LLM to analyze the task."""
    prompt = f"""
    Task: {task}
    Context: {json.dumps(context, indent=2)}
    
    Analyze this task. What information is needed? What are the constraints?
    """
    
    message = client.messages.create(
        model="claude-opus",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return message.content[0].text

@activity.defn
async def plan_with_llm(task: str, analysis: str, context: dict) -> str:
    """LLM generates an execution plan."""
    prompt = f"""
    Task: {task}
    Analysis: {analysis}
    
    Generate a step-by-step plan to accomplish this task. Be specific about tool calls.
    """
    
    message = client.messages.create(
        model="claude-opus",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return message.content[0].text

@activity.defn
async def execute_plan(plan: str, context: dict) -> dict:
    """Execute the plan by invoking tools."""
    # Parse the plan and invoke tools
    # Each tool call has an idempotency key
    idempotency_key = f"exec_{context['timestamp']}"
    
    results = []
    for step in plan.split("\n"):
        if step.startswith("TOOL:"):
            tool_name, tool_args = parse_tool_call(step)
            result = await invoke_tool(
                tool_name,
                tool_args,
                idempotency_key=idempotency_key
            )
            results.append(result)
    
    return {"steps": len(results), "results": results}

@activity.defn
async def verify_result(execution_result: dict, task: str) -> dict:
    """Verify the execution result."""
    prompt = f"""
    Task: {task}
    Execution result: {json.dumps(execution_result)}
    
    Does the result satisfy the task? Yes or no, with explanation.
    """
    
    message = client.messages.create(
        model="claude-opus",
        max_tokens=256,
        messages=[{"role": "user", "content": prompt}]
    )
    
    response_text = message.content[0].text
    success = "yes" in response_text.lower()
    
    return {"success": success, "reason": response_text}

Phase 4: Human-in-the-loop gate (week 2-3)

Add approval gates for certain transitions.

# code/temporal/human_approval.py
from temporalio import workflow

@workflow.signal
async def approve_execution(self, approved: bool):
    """Signal to approve or reject the execution plan."""
    self.approval_result = approved
    self.approval_received = True

@workflow.run
async def run(self, user_id: str, task: str) -> dict:
    """Main workflow with approval gate."""
    ctx = AgentExecutionContext(...)
    
    # ... FETCH_CONTEXT, ANALYZE, PLAN ...
    
    # APPROVAL GATE
    self.approval_received = False
    self.approval_result = None
    
    # Wait for approval (timeout after 1 hour)
    approval_timeout = workflow.wait_condition(
        lambda: self.approval_received,
        timedelta(hours=1)
    )
    
    if not approval_timeout:
        ctx.error = "Approval timeout"
        return {"state": "FAILED", "context": ctx.to_dict()}
    
    if not self.approval_result:
        ctx.error = "Execution rejected by operator"
        return {"state": "FAILED", "context": ctx.to_dict()}
    
    # State: EXECUTE
    ctx.execution_result = await workflow.execute_activity(
        activities.execute_plan,
        ctx.plan,
        ctx.context,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    # ... VERIFY, COMPLETE ...

Phase 5: Idempotency for tool calls (week 3)

Tag all tool calls with idempotency keys so retries don't duplicate side effects.

# code/tools/idempotent_tool.py
import httpx
from uuid import uuid4

async def invoke_tool(tool_name: str, args: dict, idempotency_key: str) -> dict:
    """Invoke a tool with idempotency guarantee."""
    # All tool calls include an idempotency key in headers
    headers = {
        "Idempotency-Key": idempotency_key,
        "X-Tool-Name": tool_name
    }
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"http://tool-service/{tool_name}",
            json=args,
            headers=headers
        )
    
    return response.json()

# Tool service should implement idempotency
# Example: Stripe, GitHub, most modern APIs support Idempotency-Key header

Phase 6: Event sourcing (week 3-4)

Log all state transitions to an immutable event log.

# code/event_sourcing/event_log.py
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class WorkflowEvent:
    workflow_id: str
    state: str
    activity: str
    timestamp: str
    result: dict
    error: str = None

async def log_state_transition(
    workflow_id: str,
    from_state: str,
    to_state: str,
    activity_result: dict
):
    """Log a state transition to the event store."""
    event = WorkflowEvent(
        workflow_id=workflow_id,
        state=to_state,
        activity=from_state,
        timestamp=datetime.utcnow().isoformat(),
        result=activity_result
    )
    
    # Append to immutable log (Postgres, DynamoDB, Kafka)
    await event_store.append(event.workflow_id, event)

Phase 7: Deployment (week 4)

Deploy Temporal workers and the workflow.

# code/k8s/temporal-worker.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-workflow-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agent-worker
  template:
    metadata:
      labels:
        app: agent-worker
    spec:
      containers:
      - name: worker
        image: agent-worker:v1.0.0
        env:
        - name: TEMPORAL_HOST
          value: "temporal-server:7233"
        - name: TEMPORAL_NAMESPACE
          value: "agent-workflows"
        resources:
          requests:
            cpu: "1"
            memory: "2Gi"

Real-world example: Expense report approval

An enterprise finance team runs an agent to review and approve expense reports. Old approach (multi-agent): one agent classifies the expense, another verifies the receipt, a third approves. Hallucination: agents invent sub-agents that don't exist.

New approach (state machine): one agent with clear states:

  1. FETCH: Get the expense record.

  2. VERIFY: Check the receipt (call OCR tool).

  3. CLASSIFY: Run the LLM to categorize spend.

  4. ROUTE: Send to the right approver (tool call).

  5. WAIT_APPROVAL: Human gate.

  6. RECORD: Log to accounting system (idempotent tool).

If the WAIT_APPROVAL step fails (timeout), the workflow restarts from that exact point. No re-processing of receipt, no re-classification. Durable, auditable, simple.

Testing: Deterministic replay

Validate that workflows replay identically:

# code/test/test_replay.py
from temporalio.testing import WorkflowEnvironment
import pytest

@pytest.mark.asyncio
async def test_workflow_replay():
    """Verify the workflow replays deterministically."""
    async with await WorkflowEnvironment.start_local() as env:
        await env.client.execute_workflow(
            AgentWorkflow.run,
            "user_123",
            "process_document.pdf",
            id="workflow_replay_test"
        )
        
        # Replay with different random seed should give same result
        result1 = await env.client.get_workflow_history(
            "workflow_replay_test"
        )
        
        # Confirm: no divergence warnings
        assert not result1.has_warnings()

Failure modes

  1. Non-determinism in activities. You call random.randint() in an activity. The workflow replays; the random value is different. The history diverges. Temporal detects this as a warning. Fix: never use non-deterministic code in activities (no random, no time. now, no external API calls without caching). Use Temporal's side effects API for non-deterministic operations.

  2. Activity timeout during long-running operation. An activity has a 5-minute timeout. The tool call takes 6 minutes. Activity fails. Temporal retries. The tool runs again (unless idempotent). Duplicate side effect. Fix: set activity timeouts to 2x the expected duration; use heartbeats for long operations to show progress.

  3. Human approval timeout creates dangling workflows. A workflow is waiting for human approval. The operator never responds. After 1 hour, the workflow times out and transitions to FAILED. But the approval task is still in Jira, waiting. Inconsistent state. Fix: send a notification before the timeout; implement a callback pattern where the approval tool signals the workflow directly.

  4. Event log explosion on high-frequency state machines. A workflow with 1,000 transitions per run, runs 1,000 times/second. The event log is 1B events/day. Storage and replay become too slow. Fix: snapshot the workflow state every N events (e.g., every 100); compress old events.

  5. Worker crash during activity execution. A worker is executing a long-running activity. The worker crashes. Temporal retries from the activity's start (at-least-once semantics). If the tool wasn't idempotent, side effects are duplicated. Fix: always implement idempotency in tools; use idempotency keys.

When NOT to do this

Do not use Temporal/durable execution if:

  • The workflow is simple (< 3 steps). Use LangGraph in-memory; operational overhead isn't worth it.

  • Failures are acceptable and cheap to retry. If losing a $0.50 request is fine, skip durability.

  • Your org has no infrastructure team. Temporal requires operator knowledge. If you're a solo AI engineer, stick with LangGraph.

What to ship this quarter

  • Map your agent logic as an explicit state machine (diagram) by end of week 1.

  • Implement as Temporal workflow with 5-7 states and activities by week 2.

  • Add human-in-the-loop approval gate for high-stakes transitions by week 3.

  • Tag all tool calls with idempotency keys by the end of week 3.

  • Deploy to production with 3+ worker replicas by the end of the quarter.

  • Validate replay and determinism with 100 test cases.

Further reading

Top references:

  1. Temporal Python SDK Documentation. Workflows, activities, determinism.

  2. Restate Runtime. Event-driven, durable execution alternative.

  3. Inngest Workflows. Event-sourced agent execution.

  4. NIST Software Supply Chain: Incident Response. Traces and replay for forensics.

  5. Idempotency Keys RFC 9110. HTTP header standard for idempotent requests.

S03: The Agent Production Stack

Part 1 of 1

The industry spent 2024–2025 building agents like prompts: role-play a "researcher, " role-play a "planner, " role-play a "critic, " wire them together with a prompt, ship to production, debug in Slack. It didn't work. Research claims that >75% of multi-agent systems become unmanageable past five agents. Production agent observability is a black box; your APM shows green while the agent has burned $4,200 on a failure mode nobody has a dashboard for. Prompt-injection defences live inside each app, get re-implemented wrong, and drift. The winning 2026 pattern is not "more agents." It is fewer agents, better tools, durable execution, OpenTelemetry gen_ai traces, platform-level guardrails, and SPIFFE-based identity. That is an infrastructure problem, not a modelling problem which is exactly why this is a Playbook series. The through-line: Every article in this series refuses the "agents are just prompts" framing. The unit of architecture is the platform primitive traces, state machines, policy, identity, not the agent itself.