WHY LANGGRAPH FOR A SHOPPING FLOW
The shopping flow has several properties that make it a good fit for LangGraph:
- Stateful across turns — the cart, checkout session, and product list need to persist between user messages
- Conditional routing — the same user message might trigger a search, a cart update, or a checkout depending on context
- Independent protocol nodes — MCP, A2A, and UCP calls can be isolated into separate nodes with their own error handling
- Streaming — LangGraph's event system plugs directly into AG-UI's SSE streaming
An alternative would be a custom state machine or a FastAPI endpoint with manual state management. Both are possible. Neither gives you checkpointing, streaming hooks, and conditional branching for free.
THE STATE SCHEMA
Every LangGraph graph runs on a typed state. ShopAgent's state captures everything needed across a session:
class AgentState(TypedDict, total=False):
messages: Annotated[list[AnyMessage], add_messages]
cart: Annotated[list[CartItem], replace_value]
product_candidates: Annotated[list[ProductCandidate], replace_value]
checkout_session: Annotated[CheckoutSession | None, replace_value]
user_preferences: Annotated[UserPreferences, replace_value]
a2ui_payload: Annotated[A2UIPayload | None, replace_value]
errors: Annotated[list[ProtocolError], append_errors]
next_action: Annotated[str, replace_value]
recommender_available: Annotated[bool, replace_value]
inventory_available: Annotated[bool, replace_value]
_pending_product_id: Annotated[str, replace_value]
_pending_quantity: Annotated[int, replace_value]
_pending_intents: Annotated[list[dict], replace_value]
_actions_taken: Annotated[int, replace_value]
user_id: Annotated[str, replace_value]
order_history: Annotated[list[dict], replace_value]
The Annotated wrappers define the reducer for each field. replace_value is last-write-wins. add_messages appends new messages to the list. append_errors accumulates errors from all nodes. Each node only returns the fields it modifies — LangGraph merges the partial update into the full state using the reducers.
This means a node that only updates the cart does not need to know about the checkout session or the product list. It returns {"cart": [...], "a2ui_payload": {...}} and the graph handles the rest.
THE GRAPH STRUCTURE
The graph is assembled in agent.py:
def build_graph():
graph = StateGraph(AgentState)
graph.add_node("classify_intent", classify_intent)
graph.add_node("search_products", search_products)
graph.add_node("compare_products", compare_products)
graph.add_node("add_to_cart", add_to_cart)
graph.add_node("update_cart", update_cart)
graph.add_node("initiate_checkout", initiate_checkout)
graph.add_node("generate_checkout_form", generate_checkout_form)
graph.add_node("collect_shipping", collect_shipping)
graph.add_node("complete_checkout", complete_checkout)
graph.add_node("process_next_intent", process_next_intent)
graph.add_node("respond", respond)
graph.set_entry_point("classify_intent")
# ... conditional edges ...
graph.add_edge("respond", END)
return graph.compile(checkpointer=MemorySaver())
Every user message enters at classify_intent. From there, the graph routes to the appropriate action node. Every action node eventually routes to respond, which streams the final message and terminates the run.
ROUTING WITH CONDITIONAL EDGES
classify_intent classifies the user's message and writes next_action to state. The route_intent function reads this and returns the target node:
def route_intent(state):
action = state.get("next_action", "respond")
valid = {
"search_products", "compare_products", "add_to_cart",
"update_cart", "initiate_checkout", "provide_shipping",
"complete_checkout", "get_order_history", "respond",
}
if action not in valid:
return "respond"
return action
graph.add_conditional_edges(
"classify_intent",
route_intent,
{
"search_products": "search_products",
"compare_products": "compare_products",
"add_to_cart": "add_to_cart",
# ...
"respond": "respond",
}
)
This is clean routing with no if-else chain in the node itself. The node writes state. A pure function reads state and returns the next node name. LangGraph handles the dispatch.
After each action node, a second routing function checks whether there are queued intents to process:
def route_after_action(state):
pending = state.get("_pending_intents", [])
actions_taken = state.get("_actions_taken", 0)
if pending and actions_taken < 3:
return "process_next_intent"
return "respond"
This is how multi-step user messages work — more on that below.
THE CHECKOUT GATE — BLOCKING PROGRESS UNTIL READY
The complete_checkout node finalises the payment. But it should only run when the user has provided a shipping address. If the routing function allowed complete_checkout to run before the address is collected, the UCP service would reject the request with a validation error.
The gate is in the routing function, not in the node:
def route_intent(state):
# ...
if action == "complete_checkout":
session_raw = state.get("checkout_session")
if session_raw:
session = CheckoutSession(**session_raw) if isinstance(session_raw, dict) else session_raw
if session and session.shipping_address:
return "complete_checkout"
# No address yet — route to respond, which will prompt for it
return "respond"
The node never runs without an address. The orchestrator redirects to respond, which detects the missing address and instructs the user to fill in the form. When the form is submitted and collect_shipping runs, the address enters the state. The next "confirm order" message passes the gate and reaches complete_checkout.
This pattern — routing gates before expensive nodes — keeps the nodes clean and focused. Each node can assume its preconditions have been met.
MULTI-INTENT QUEUING
Users sometimes send multi-part requests: "search for trail shoes and add the cheapest one to my cart". This is two intents — search_products then add_to_cart — in one message.
The classify_intent node extracts all intents and queues them:
# classify_intent returns
{
"next_action": "search_products", # first action executes now
"_pending_intents": [
{"intent": "add_to_cart", ...} # second action queued
],
"_actions_taken": 0,
}
After search_products finishes, route_after_action sees pending intents and routes to process_next_intent. That node pops the next intent, sets next_action, and the graph routes to add_to_cart. After add_to_cart, no more pending intents, so the graph routes to respond.
The cap is three total actions per message. This prevents runaway chains from adversarial or confused user input.
GRACEFUL DEGRADATION IN NODES
Each protocol node wraps its external calls in try-except and falls back gracefully:
async def search_products(state):
raw_products = await mcp_search_products(...)
# A2A recommender — optional enrichment
try:
recommendations = await a2a_get_recommendations(...)
except Exception as exc:
errors.append(ProtocolError(
protocol="a2a", service="recommender",
message=str(exc), recoverable=True
))
# Proceeds with empty recommendations — products shown in MCP order
# A2A inventory — optional enrichment
try:
inventory = await a2a_get_inventory(...)
except Exception as exc:
errors.append(ProtocolError(
protocol="a2a", service="inventory",
message=str(exc), recoverable=True
))
# Proceeds with unknown stock — shows "Availability unknown"
The node does not raise. It records the error in state (as a ProtocolError with recoverable=True) and continues with what it has. The respond node picks up the errors and includes them in its context so the LLM can acknowledge degraded service to the user.
Fatal errors — product not found in the catalogue, UCP session creation failed — are recorded with recoverable=False and the frontend surfaces them as error banners rather than dismissible warnings.
THE CHECKPOINTER AND THREAD IDS
Every run needs a thread ID:
config = {"configurable": {"thread_id": thread_id}}
async for event in graph.astream_events({"messages": [...]}, config=config):
...
The MemorySaver checkpointer stores the full state after each node run. When the next user message arrives on the same thread, the graph resumes from the last checkpoint. This is how the cart persists between messages — the state is stored in memory, keyed by thread ID.
ShopAgent generates a fresh thread ID for each browser session. This means each user session starts with a clean state, which is important for a demo where many users might be using the system concurrently and you do not want one user's cart to appear in another's session.
WHAT WE'D DO DIFFERENTLY
Persistent checkpointer — MemorySaver stores state in process memory. A server restart wipes all sessions. For production, replace it with a PostgreSQL or Redis checkpointer. LangGraph has first-party adapters for both.
Parallel A2A calls — The search_products node calls the recommender and inventory agents sequentially. asyncio.gather would run them concurrently, cutting the node latency roughly in half.
Node timeouts — External protocol calls (A2A, UCP) have no timeout. A slow external service blocks the entire node. Wrapping each call in asyncio.wait_for with a timeout ensures the node always returns within a reasonable window, even if it falls back to degraded mode.
THE TAKEAWAY
LangGraph turns a multi-step commerce flow from an ad-hoc control loop into a typed, declarative graph. State is always consistent. Routing is explicit and testable. Individual nodes are isolated functions that do one thing. Adding a new protocol step — a fraud check, a personalisation layer, an inventory reservation — means adding a node and two routing edges. The rest of the graph does not change.
The ShopAgent demo is live at https://shop-agent.agilecreativeminds.nl. See the demo showcase or follow the demo walkthrough. Built by Agile Creative Minds.