Chapter 10: Streaming
In Chapter 6 you built OpenRouterProvider::chat(), which waits for the
entire response before returning. That works, but the user stares at a blank
screen until every token has been generated. Real coding agents print tokens as
they arrive – that is streaming.
This chapter adds streaming support and a StreamingAgent – the streaming
counterpart to SimpleAgent. You will:
- Define a
StreamEventenum that represents real-time deltas. - Build a
StreamAccumulatorthat collects deltas into a completeAssistantTurn. - Write a
parse_sse_line()function that converts raw Server-Sent Events intoStreamEvents. - Define a
StreamProvidertrait – the streaming counterpart toProvider. - Implement
StreamProviderforOpenRouterProvider. - Build a
MockStreamProviderfor testing without HTTP. - Build
StreamingAgent<P: StreamProvider>– a full agent loop with real-time text streaming.
None of this touches the Provider trait or SimpleAgent. Streaming is
layered on top of the existing architecture.
Why streaming?
Without streaming, a long response (say 500 tokens) makes the CLI feel frozen. Streaming fixes three things:
- Immediate feedback – the user sees the first word within milliseconds instead of waiting seconds for the full response.
- Early cancellation – if the agent is heading in the wrong direction, the user can Ctrl-C without waiting for the full response.
- Progress visibility – watching tokens arrive confirms the agent is working, not stuck.
How SSE works
The OpenAI-compatible API supports streaming via
Server-Sent Events (SSE).
You set "stream": true in the request, and instead of one big JSON response,
the server sends a series of text lines:
data: {"choices":[{"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"choices":[{"delta":{"content":" world"},"finish_reason":null}]}
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
data: [DONE]
Each line starts with data: followed by a JSON object (or the sentinel
[DONE]). The key difference from the non-streaming response: instead of a
message field with the complete text, each chunk has a delta field with
just the new part. Your code reads these deltas one by one, prints them
immediately, and accumulates them into the final result.
Here is the flow:
sequenceDiagram
participant A as Agent
participant L as LLM (SSE)
participant U as User
A->>L: POST /chat/completions (stream: true)
L-->>A: data: {"delta":{"content":"Hello"}}
A->>U: print "Hello"
L-->>A: data: {"delta":{"content":" world"}}
A->>U: print " world"
L-->>A: data: [DONE]
A->>U: (done)
Tool calls stream the same way, but with tool_calls deltas instead of
content deltas. The tool call’s name and arguments arrive in pieces that you
concatenate.
StreamEvent
Open mini-claw-code/src/streaming.rs. The StreamEvent enum is our domain type
for streaming deltas:
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, PartialEq)]
pub enum StreamEvent {
/// A chunk of assistant text.
TextDelta(String),
/// A new tool call has started.
ToolCallStart { index: usize, id: String, name: String },
/// More argument JSON for a tool call in progress.
ToolCallDelta { index: usize, arguments: String },
/// The stream is complete.
Done,
}
}
This is the interface between the SSE parser and the rest of the application.
The parser produces StreamEvents; the UI consumes them for display; the
accumulator collects them into an AssistantTurn.
StreamAccumulator
The accumulator is a simple state machine. It keeps a running text buffer
and a list of partial tool calls. Each feed() call appends to the
appropriate place:
#![allow(unused)]
fn main() {
pub struct StreamAccumulator {
text: String,
tool_calls: Vec<PartialToolCall>,
}
impl StreamAccumulator {
pub fn new() -> Self { /* ... */ }
pub fn feed(&mut self, event: &StreamEvent) { /* ... */ }
pub fn finish(self) -> AssistantTurn { /* ... */ }
}
}
The implementation is straightforward:
TextDelta→ append toself.text.ToolCallStart→ grow thetool_callsvec if needed, set theidandnameat the given index.ToolCallDelta→ append to the arguments string at the given index.Done→ no-op (we handle completion infinish()).
finish() consumes the accumulator and builds an AssistantTurn:
#![allow(unused)]
fn main() {
pub fn finish(self) -> AssistantTurn {
let text = if self.text.is_empty() { None } else { Some(self.text) };
let tool_calls: Vec<ToolCall> = self.tool_calls
.into_iter()
.filter(|tc| !tc.name.is_empty())
.map(|tc| ToolCall {
id: tc.id,
name: tc.name,
arguments: serde_json::from_str(&tc.arguments)
.unwrap_or(Value::Null),
})
.collect();
let stop_reason = if tool_calls.is_empty() {
StopReason::Stop
} else {
StopReason::ToolUse
};
AssistantTurn { text, tool_calls, stop_reason }
}
}
Notice that arguments is accumulated as a raw string and only parsed as JSON
at the very end. This is because the API sends argument fragments like
{"pa and th": "f.txt"} – they are not valid JSON until concatenated.
Parsing SSE lines
The parse_sse_line() function takes a single line from the SSE stream and
returns zero or more StreamEvents:
#![allow(unused)]
fn main() {
pub fn parse_sse_line(line: &str) -> Option<Vec<StreamEvent>> {
let data = line.strip_prefix("data: ")?;
if data == "[DONE]" {
return Some(vec![StreamEvent::Done]);
}
let chunk: ChunkResponse = serde_json::from_str(data).ok()?;
// ... extract events from chunk.choices[0].delta
}
}
The SSE chunk types mirror the OpenAI delta format:
#![allow(unused)]
fn main() {
#[derive(Deserialize)]
struct ChunkResponse { choices: Vec<ChunkChoice> }
#[derive(Deserialize)]
struct ChunkChoice { delta: Delta, finish_reason: Option<String> }
#[derive(Deserialize)]
struct Delta {
content: Option<String>,
tool_calls: Option<Vec<DeltaToolCall>>,
}
}
For tool calls, the first chunk includes id and function.name (indicating
a new tool call). Subsequent chunks only have function.arguments fragments.
The parser emits ToolCallStart when id is present, and ToolCallDelta for
non-empty argument strings.
StreamProvider trait
Just as Provider defines the non-streaming interface, StreamProvider
defines the streaming one:
#![allow(unused)]
fn main() {
pub trait StreamProvider: Send + Sync {
fn stream_chat<'a>(
&'a self,
messages: &'a [Message],
tools: &'a [&'a ToolDefinition],
tx: mpsc::UnboundedSender<StreamEvent>,
) -> impl Future<Output = anyhow::Result<AssistantTurn>> + Send + 'a;
}
}
The key difference from Provider::chat() is the tx parameter – an mpsc
channel sender. The implementation sends StreamEvents through this channel
as they arrive and returns the final accumulated AssistantTurn. This gives
callers both real-time events and the complete result.
We keep StreamProvider separate from Provider rather than adding a method
to the existing trait. This means SimpleAgent and all existing code are
completely unaffected.
Implementing StreamProvider for OpenRouterProvider
The implementation ties together SSE parsing, the accumulator, and the channel:
#![allow(unused)]
fn main() {
impl StreamProvider for OpenRouterProvider {
async fn stream_chat(
&self,
messages: &[Message],
tools: &[&ToolDefinition],
tx: mpsc::UnboundedSender<StreamEvent>,
) -> anyhow::Result<AssistantTurn> {
// 1. Build request with stream: true
// 2. Send HTTP request
// 3. Read response chunks in a loop:
// - Buffer incoming bytes
// - Split on newlines
// - parse_sse_line() each complete line
// - feed() each event into the accumulator
// - send each event through tx
// 4. Return acc.finish()
}
}
}
The buffering detail is important. HTTP responses may arrive in arbitrary byte
chunks that do not align with SSE line boundaries. So we maintain a String
buffer, append each chunk, and process only complete lines (splitting on \n):
#![allow(unused)]
fn main() {
let mut buffer = String::new();
while let Some(chunk) = resp.chunk().await? {
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim_end_matches('\r').to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() { continue; }
if let Some(events) = parse_sse_line(&line) {
for event in events {
acc.feed(&event);
let _ = tx.send(event);
}
}
}
}
}
MockStreamProvider
For testing, we need a streaming provider that does not make HTTP calls.
MockStreamProvider wraps the existing MockProvider and synthesizes
StreamEvents from each canned AssistantTurn:
#![allow(unused)]
fn main() {
pub struct MockStreamProvider {
inner: MockProvider,
}
impl StreamProvider for MockStreamProvider {
async fn stream_chat(
&self,
messages: &[Message],
tools: &[&ToolDefinition],
tx: mpsc::UnboundedSender<StreamEvent>,
) -> anyhow::Result<AssistantTurn> {
let turn = self.inner.chat(messages, tools).await?;
// Synthesize stream events from the complete turn
if let Some(ref text) = turn.text {
for ch in text.chars() {
let _ = tx.send(StreamEvent::TextDelta(ch.to_string()));
}
}
for (i, call) in turn.tool_calls.iter().enumerate() {
let _ = tx.send(StreamEvent::ToolCallStart {
index: i, id: call.id.clone(), name: call.name.clone(),
});
let _ = tx.send(StreamEvent::ToolCallDelta {
index: i, arguments: call.arguments.to_string(),
});
}
let _ = tx.send(StreamEvent::Done);
Ok(turn)
}
}
}
It sends text one character at a time (simulating token-by-token streaming)
and each tool call as a start + delta pair. This lets us test StreamingAgent
without any network calls.
StreamingAgent
Now for the main event. StreamingAgent is the streaming counterpart to
SimpleAgent. It has the same structure – a provider, a tool set, and an
agent loop – but it uses StreamProvider and emits AgentEvent::TextDelta
events in real time:
#![allow(unused)]
fn main() {
pub struct StreamingAgent<P: StreamProvider> {
provider: P,
tools: ToolSet,
}
impl<P: StreamProvider> StreamingAgent<P> {
pub fn new(provider: P) -> Self { /* ... */ }
pub fn tool(mut self, t: impl Tool + 'static) -> Self { /* ... */ }
pub async fn run(
&self,
prompt: &str,
events: mpsc::UnboundedSender<AgentEvent>,
) -> anyhow::Result<String> { /* ... */ }
pub async fn chat(
&self,
messages: &mut Vec<Message>,
events: mpsc::UnboundedSender<AgentEvent>,
) -> anyhow::Result<String> { /* ... */ }
}
}
The chat() method is the heart of the streaming agent. Let us walk through
it:
#![allow(unused)]
fn main() {
pub async fn chat(
&self,
messages: &mut Vec<Message>,
events: mpsc::UnboundedSender<AgentEvent>,
) -> anyhow::Result<String> {
let defs = self.tools.definitions();
loop {
// 1. Set up a stream channel
let (stream_tx, mut stream_rx) = mpsc::unbounded_channel();
// 2. Spawn a forwarder that converts StreamEvent::TextDelta
// into AgentEvent::TextDelta for the UI
let events_clone = events.clone();
let forwarder = tokio::spawn(async move {
while let Some(event) = stream_rx.recv().await {
if let StreamEvent::TextDelta(text) = event {
let _ = events_clone.send(AgentEvent::TextDelta(text));
}
}
});
// 3. Call stream_chat — this streams AND returns the turn
let turn = self.provider.stream_chat(messages, &defs, stream_tx).await?;
let _ = forwarder.await;
// 4. Same stop_reason logic as SimpleAgent
match turn.stop_reason {
StopReason::Stop => {
let text = turn.text.clone().unwrap_or_default();
let _ = events.send(AgentEvent::Done(text.clone()));
messages.push(Message::Assistant(turn));
return Ok(text);
}
StopReason::ToolUse => {
// Execute tools, push results, continue loop
// (same pattern as SimpleAgent)
}
}
}
}
}
The architecture has two channels flowing simultaneously:
flowchart LR
SC["stream_chat()"] -- "StreamEvent" --> CH["mpsc channel"]
CH --> FW["forwarder task"]
FW -- "AgentEvent::TextDelta" --> UI["UI / events channel"]
SC -- "feeds" --> ACC["StreamAccumulator"]
ACC -- "finish()" --> TURN["AssistantTurn"]
TURN --> LOOP["Agent loop"]
The forwarder task is a bridge: it receives raw StreamEvents from the
provider and converts TextDelta events into AgentEvent::TextDelta for the
UI. This keeps the provider’s streaming protocol separate from the agent’s
event protocol.
Notice that AgentEvent now has a TextDelta variant:
#![allow(unused)]
fn main() {
pub enum AgentEvent {
TextDelta(String), // NEW — streaming text chunks
ToolCall { name: String, summary: String },
Done(String),
Error(String),
}
}
Using StreamingAgent in the TUI
The TUI example (examples/tui.rs) uses StreamingAgent for the full
experience:
#![allow(unused)]
fn main() {
let provider = OpenRouterProvider::from_env()?;
let agent = Arc::new(
StreamingAgent::new(provider)
.tool(BashTool::new())
.tool(ReadTool::new())
.tool(WriteTool::new())
.tool(EditTool::new()),
);
}
The agent is wrapped in Arc so it can be shared with spawned tasks. Each
turn spawns the agent and processes events with a spinner:
#![allow(unused)]
fn main() {
let (tx, mut rx) = mpsc::unbounded_channel();
let agent = agent.clone();
let mut msgs = std::mem::take(&mut history);
let handle = tokio::spawn(async move {
let _ = agent.chat(&mut msgs, tx).await;
msgs
});
// UI event loop — print TextDeltas, show spinner for tool calls
loop {
tokio::select! {
event = rx.recv() => {
match event {
Some(AgentEvent::TextDelta(text)) => print!("{text}"),
Some(AgentEvent::ToolCall { summary, .. }) => { /* spinner */ },
Some(AgentEvent::Done(_)) => break,
// ...
}
}
_ = tick.tick() => { /* animate spinner */ }
}
}
}
Compare this to the SimpleAgent version from Chapter 9: the structure is
almost identical. The only difference is that TextDelta events let us print
tokens as they arrive instead of waiting for the full Done event.
Running the tests
cargo test -p mini-claw-code ch10
The tests verify:
- Accumulator: text assembly, tool call assembly, mixed events, empty input, multiple parallel tool calls.
- SSE parsing: text deltas, tool call start/delta,
[DONE], non-data lines, empty deltas, invalid JSON, full multi-line sequences. - MockStreamProvider: text responses synthesize char-by-char events; tool call responses synthesize start + delta events.
- StreamingAgent: text-only responses, tool call loops, and multi-turn chat
history – all using
MockStreamProviderfor deterministic testing. - Integration: mock TCP servers that send real SSE responses to
stream_chat()and verify both the returnedAssistantTurnand the events sent through the channel.
Recap
StreamEventrepresents real-time deltas: text chunks, tool call starts, argument fragments, and completion.StreamAccumulatorcollects deltas into a completeAssistantTurn.parse_sse_line()converts raw SSEdata:lines intoStreamEvents.StreamProvideris the streaming counterpart toProvider– it adds anmpscchannel parameter for real-time events.MockStreamProviderwrapsMockProviderto synthesize streaming events for testing.StreamingAgentis the streaming counterpart toSimpleAgent– same tool loop, but with real-timeTextDeltaevents forwarded to the UI.- The
Providertrait andSimpleAgentare unchanged. Streaming is an additive feature layered on top.