为什么需要流式输出?
想象一下,你问AI一个复杂问题,然后盯着空白屏幕等了30秒——直到完整答案一次性蹦出来。这体验糟透了。流式输出(Streaming)就是为了解决这个问题:让答案一个字一个字地"流"出来,而不是等完整答案才显示。
流式输出带来的好处:
- 感知延迟降低 - 用户立刻看到响应开始
- 更好的交互感 - 像"正在思考"的动态效果
- 可中断 - 用户可以在生成中途取消
- 节省内存 - 不需要缓存完整响应
💡 关键洞察:流式输出不仅改善用户体验,还能降低首字延迟(Time to First Token, TTFT),这是AI应用的核心体验指标。
OpenClaw Streaming架构
三种流式协议
OpenClaw支持三种主流流式协议:
| 协议 | 特点 | 适用场景 |
|---|---|---|
| SSE | 单向推送,基于HTTP | 聊天应用、通知推送 |
| WebSocket | 双向通信,持久连接 | 实时协作、多人交互 |
| Chunked | HTTP分块传输 | 简单场景、兼容性要求高 |
实现SSE流式输出
Server-Sent Events (SSE) 是最常用的流式方案:
服务端实现
import { Agent } from 'openclaw';
import { ServerSentEvent } from 'openclaw/streaming';
const agent = new Agent({
model: 'gpt-4',
streaming: true
});
// SSE流式响应
app.get('/chat/stream', async (req, res) => {
const { message } = req.query;
// 设置SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 创建SSE发射器
const sse = new ServerSentEvent(res);
try {
// 流式调用Agent
const stream = await agent.stream(message);
for await (const chunk of stream) {
// 发送不同类型的事件
if (chunk.type === 'content') {
sse.send('message', { content: chunk.text });
} else if (chunk.type === 'tool_call') {
sse.send('tool', { name: chunk.toolName, args: chunk.args });
} else if (chunk.type === 'done') {
sse.send('complete', { usage: chunk.usage });
}
}
} catch (error) {
sse.send('error', { message: error.message });
} finally {
sse.close();
}
});
客户端消费
// 前端消费SSE流
const eventSource = new EventSource('/chat/stream?message=' + encodeURIComponent(message));
eventSource.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
appendToChat(data.content); // 逐步追加内容
});
eventSource.addEventListener('tool', (e) => {
const data = JSON.parse(e.data);
showToolCall(data.name, data.args); // 显示工具调用
});
eventSource.addEventListener('complete', (e) => {
const data = JSON.parse(e.data);
showUsage(data.usage); // 显示token用量
eventSource.close();
});
eventSource.addEventListener('error', (e) => {
console.error('SSE Error:', e);
eventSource.close();
});
WebSocket双向流式
当需要双向通信(如用户中途打断、动态修改prompt)时,WebSocket更合适:
import { WebSocket } from 'ws';
import { Agent } from 'openclaw';
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
const agent = new Agent({ streaming: true });
ws.on('message', async (data) => {
const { type, payload } = JSON.parse(data.toString());
if (type === 'chat') {
const stream = await agent.stream(payload.message);
for await (const chunk of stream) {
ws.send(JSON.stringify({
type: 'chunk',
content: chunk.text
}));
}
ws.send(JSON.stringify({ type: 'done' }));
}
if (type === 'abort') {
agent.abort(); // 支持中途取消
ws.send(JSON.stringify({ type: 'aborted' }));
}
});
});
高级流式特性
工具调用的流式反馈
OpenClaw支持在工具执行时发送实时状态:
for await (const event of agent.streamWithTools(message)) {
switch (event.type) {
case 'thinking':
// LLM正在思考
sendEvent('thinking', { status: 'analyzing' });
break;
case 'tool_start':
// 工具开始执行
sendEvent('tool_start', {
name: event.toolName,
args: event.args
});
break;
case 'tool_progress':
// 工具执行进度(如文件下载)
sendEvent('tool_progress', {
progress: event.progress,
message: event.status
});
break;
case 'tool_complete':
// 工具执行完成
sendEvent('tool_complete', {
result: event.result
});
break;
case 'content':
// 文本内容输出
sendEvent('content', { text: event.text });
break;
}
}
多Agent协作流式
当多个Agent协作时,流式输出可以展示每个Agent的工作:
const multiAgentStream = orchestrator.streamWithAgents(task);
for await (const event of multiAgentStream) {
if (event.type === 'agent_output') {
sendEvent('agent_update', {
agent: event.agentName,
action: event.action,
content: event.content
});
}
}
性能优化
减少chunk频率
⚠️ 不要每个token都发送:过于频繁的chunk会增加网络开销。建议每10-50ms或每5-10个token发送一次。
// 使用缓冲区减少发送频率
let buffer = '';
let lastSend = Date.now();
for await (const chunk of stream) {
buffer += chunk.text;
const now = Date.now();
if (buffer.length >= 10 || now - lastSend >= 50) {
sse.send('message', { content: buffer });
buffer = '';
lastSend = now;
}
}
// 发送剩余内容
if (buffer) {
sse.send('message', { content: buffer });
}
背压处理
当客户端处理速度跟不上生成速度时:
const stream = agent.stream(message);
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 检查客户端是否准备好
if (ws.bufferedAmount > MAX_BUFFER) {
await waitForDrain(ws);
}
ws.send(JSON.stringify(value));
}
最佳实践
- 首字节优化 - 尽快发送第一个chunk,减少TTFT
- 心跳保活 - 每15-30秒发送空事件防止连接超时
- 错误恢复 - 客户端自动重连,服务端支持从断点续传
- 资源清理 - 连接断开时及时释放Agent资源
💡 生产环境建议:使用OpenClaw内置的
StreamManager,它自动处理心跳、背压、错误恢复。
常见问题
Q: SSE和WebSocket怎么选?
如果只需要服务端推送(如聊天),用SSE更简单。如果需要双向通信(如实时协作),用WebSocket。
Q: 流式输出会影响Token计费吗?
不会。Token计费基于实际用量,与输出方式无关。
Q: 如何处理流式中的中断?
OpenClaw Agent支持abort()方法,会取消正在进行的请求并清理资源。