Long-running agents need progress indication.
That sounds obvious, but the implementation is easy to get wrong. A graph can stream tokens, but a lot of useful agent work is not token generation. It might be searching, loading records, calling APIs, running subgraphs, or fanning out to several workers. If the UI only updates when the final assistant message arrives, the user is left staring at a chat box and guessing whether anything is happening.
There is a small repo with a runnable example: langgraph-copilotkit-progress-demo. This article stands on its own, but the repo is useful if you want to run the full LangGraph and CopilotKit handshake without copying pieces out of a larger app.
The example is intentionally deterministic. It does not require an LLM provider key. The backend simulates planning, fans out to three worker nodes, returns fake structured outputs, and emits progress events. That keeps attention on the progress mechanism instead of model behavior.
The shape of the problem
The backend graph cannot render React.
It can return messages. It can return tool calls. It can dispatch custom events. But the actual progress card is a frontend component, so the graph needs a way to create that component before it starts sending updates to it.
That leads to a two-step flow:
- The graph emits a tool call that renders a progress card in chat.
- The frontend sends the tool response back, and the graph continues into the real work.
Once the card exists, the backend can send custom events that target that card.
The core pattern
The demo graph starts with a small routing node:
async def route_progress_handshake(state: State) -> Command[RouterDestination]:
goto: RouterDestination = (
"plan_worker_tasks"
if is_progress_card_response(state)
else "emit_progress_card_tool_call"
)
return Command(goto=goto)
On the first user-authored run, the latest message is not a progress-card tool response, so the graph routes to emit_progress_card_tool_call.
That node returns an assistant message with a frontend tool call:
AIMessage(
content="",
tool_calls=[
{
"id": tool_call_id,
"name": "showProgressInChat",
"args": {
"title": "Progress",
"tool_call_id": tool_call_id,
},
}
],
)
The important value is tool_call_id. The backend stores it in graph state. The frontend receives it as a prop when rendering the card. Later progress events include the same ID so the card can ignore events meant for some other run.
In the repo, the helper that creates this initial message also seeds partial progress state:
return {
"messages": [ai_message],
"progress": {
"title": PROGRESS_TITLE,
"tool_call_id": tool_call_id,
},
}
That first graph pass ends immediately after emitting the card-rendering tool call. The long-running work has not started yet.
Continue after the card is mounted
CopilotKit renders the frontend tool and sends the tool response back to the graph. On that continuation run, is_progress_card_response returns true:
def is_progress_card_response(state: HasOptionalProgressStateWithMessages) -> bool:
messages = state.get("messages", [])
if not messages:
return False
latest_message = messages[-1]
if not isinstance(latest_message, ToolMessage):
return False
if state.get("progress") is None:
return False
progress_tool_call_id = _get_tool_call_id(state)
if latest_message.tool_call_id != progress_tool_call_id:
return False
return True
Now the graph can do the real work. In the repo, the path is:
plan_worker_tasks
initialize_progress_tracking
fan_out_to_workers
researcher / quotes / outline
finalize_progress_run
The planner is fake on purpose. In a production agent, this might be an LLM planner, a rules engine, or normal application code. The progress mechanism does not care where the worker assignments came from.
Initialize the task list
The progress card first appears in a generic processing state because the frontend knows the card exists, but does not know the task list yet.
After planning, the graph builds a task dictionary from the selected workers:
worker_tasks = build_task_dict_from_worker_inputs(
get_worker_inputs(state),
worker_task_descriptions,
)
aggregator_task = build_progress_task(
"aggregator",
AGGREGATE_RESULTS_TASK_NAME,
"pending",
)
tasks = worker_tasks | build_progress_task_dict(aggregator_task)
With the current demo config, that produces tasks for:
researcher -> Researching facts
quotes -> Selecting examples
outline -> Drafting outline
aggregator -> Create response
Then the graph dispatches a custom event:
await adispatch_custom_event(
"progress_initialize_tasks",
{
"tool_call_id": _get_tool_call_id(state),
"task_dict": _require_progress(state)["task_dict"],
"task_order": _require_progress(state)["task_order"],
},
)
The frontend reducer uses this event to replace the generic processing row with concrete tasks.
Update tasks from worker nodes
Each worker node follows the same shape:
async def researcher(state: State) -> dict:
await update_task_status_to_running(state, "researcher")
prompt = get_worker_prompt(state, "researcher")
await demo_delay(2)
output = create_worker_output(
"researcher",
prompt,
"Collected three deterministic facts for the current request.",
{
"fact_count": 3,
"source": "fake in-memory service",
},
)
await update_task_status_to_completed(state, "researcher")
return {"workers": {"worker_outputs": {"researcher": output}}}
The status helpers dispatch another custom event:
await adispatch_custom_event(
"progress_task_update",
{
"tool_call_id": _get_tool_call_id(state),
"task_id": task_id,
"status": status,
},
)
Because the workers run after fan_out_to_workers, multiple tasks can be active in the same graph run. The UI does not need to know how the graph is wired. It only reduces the event stream.
Complete the card
The final node runs the aggregator task, formats the worker outputs, completes the progress card, and returns the assistant message:
async def finalize_progress_run(state: State):
await update_task_status_to_running(state, "aggregator")
await demo_delay(1)
formatted_worker_outputs = format_worker_outputs(state)
await update_task_status_to_completed(state, "aggregator")
await complete_progress(state)
ai_message = AIMessage(content=formatted_worker_outputs)
return {
"messages": [ai_message],
"workers": None,
"progress": None,
}
The completion event includes the same tool_call_id:
await adispatch_custom_event(
"progress_complete",
{
"tool_call_id": _get_tool_call_id(state),
"completed_at": utc_now_iso(),
},
)
Clearing workers and progress at the end matters. It keeps the next prompt in the same thread from accidentally reusing state from the previous card.
The frontend bridge
The React side has three responsibilities.
First, it registers the tool renderer:
useComponent(
{
name: "showProgressInChat",
description: "Render a progress card in chat.",
parameters: z.object({
title: z.string().optional(),
tool_call_id: z.string(),
}),
render: ({ title, tool_call_id }) => (
<ProgressCard toolCallId={tool_call_id ?? ""} title={title} />
),
},
[agent],
);
Second, it subscribes to CopilotKit custom events:
const { unsubscribe } = agent.subscribe({
onCustomEvent: ({ event }) => {
appendProgressEvent(event);
},
});
Third, each card reduces the event log into display state. The event log is intentionally append-only during a run. That makes it possible for a card rendered in chat history to reconstruct its state after React remounts it.
The reducer recognizes three event names:
progress_initialize_tasks
progress_task_update
progress_complete
Everything else is ignored.
Why not just store current progress in React state?
Normal page-level React state is not enough for this pattern.
The progress card is rendered inside CopilotKit chat history through useComponent. That rendered component may not share ordinary local state with the page that registered it. A small external store gives the card a stable event-log snapshot and a subscribe function.
This does not have to be complicated. In the demo it is just useSyncExternalStore, an array, and a set of listeners:
let eventLogSnapshot: RawCustomEventPayload[] = [];
const listeners = new Set<() => void>();
Cards filter events by tool_call_id, which allows multiple completed cards to remain visible in the same chat thread.
Why keep state helpers outside state.py?
The top-level graph state file is deliberately small:
class State(MessagesState):
workers: WorkerState[WorkerNodes]
progress: ProgressState | None
The reducer-sensitive worker state lives in helpers/workers/worker_state.py, and the progress payload types live in helpers/progress/progress_state.py.
That separation keeps state.py focused on the graph contract. It also keeps the lower-level helper modules responsible for payload shape, merge behavior, and event helpers. This matters most for worker state because LangGraph reducers control how parallel branch updates merge back together.
What to copy into a real app
I would copy the lifecycle before copying the exact files:
emit card tool call
receive frontend tool response
initialize task list
emit task updates
emit completion
clear per-run state
The repo keeps the workers deterministic, but the same structure works when workers are real:
- SQL search
- vector search
- file processing
- browser automation
- child LangGraph runs
- multi-step API workflows
The task IDs should be stable, the labels should be user-facing, and the event payloads should stay serializable.
Tradeoffs
This pattern adds one extra graph turn before long-running work starts. That’s usually worth it because the card exists before the work begins.
It also means progress UI depends on the frontend tool lifecycle. If you call the graph outside CopilotKit, the graph shape still works, but the progress-card behavior is a UI integration feature.
For production, I would also add cancellation, failed task events, support IDs, telemetry, persistence, and stricter CORS. The demo keeps those out so the main mechanism is easy to see.
Running the demo
From the repo root:
uv sync
cp .env.example .env
uv run python backend/main.py
In another terminal:
cd ui
cp .env.example .env.local
pnpm install
pnpm dev
Open the Next.js URL printed by the dev server, usually:
http://localhost:3000
Send a message in the chat. The first graph pass renders the progress card. The continuation pass initializes tasks, fans out to deterministic workers, streams task updates, aggregates the outputs, and completes the card.
To test the important same-thread case, send a second prompt after the first response completes. The second card should initialize and advance just like the first.
You can also inspect the graph in LangGraph Studio:
uv run langgraph dev --config backend/langgraph.json
The graph ID is progress_agent.
Closing thought
Progress indication is not just polish. It is part of making long-running agent workflows understandable.
The backend should own the truth about what is running. The frontend should own the rendering. The tool-call handshake gives both sides a small contract they can rely on.