📖 为什么要流式?
我怀疑"等AI回复"这件事是世上最漫长的等待。一个问题甩出去,然后盯着光标闪烁5秒、10秒、30秒……终于一大坨文字冒出来。用户体验?不存在。
流式输出让 Agent 像真人一样边想边说。不等全部生成完,一个 token 一个 token 地推送。用户看到文字逐渐浮现,就像有人在跟你实时对话。
❌ 非流式
请求 → 等待30秒 → 一次性返回全部
感知延迟:30秒
用户体验:痛苦
✅ 流式
请求 → 0.3秒后开始 → 逐token推送
感知延迟:0.3秒
用户体验:丝滑
⚡ 流式输出
1. 基础流式对话
// OpenClaw 流式对话
const stream = agent.chat({
message: "解释一下什么是 RAG",
stream: true // 启用流式
});
for await (const chunk of stream) {
process.stdout.write(chunk.content);
// 输出:R...A...G...是...检...索...增...强...生...成...
}
// 流式事件类型
for await (const event of stream) {
switch (event.type) {
case 'content':
// 文字内容
process.stdout.write(event.content);
break;
case 'tool_call':
// 工具调用
console.log(`\n🔧 调用工具: ${event.tool}`);
break;
case 'tool_result':
// 工具返回
console.log(`✅ 工具结果: ${event.result}`);
break;
case 'done':
// 完成
console.log('\n✨ 完成');
break;
}
}
2. SSE 服务端推送
// OpenClaw SSE 端点配置
// 前端使用 EventSource 接收
const eventSource = new EventSource('/api/agent/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'token':
appendToChat(data.content);
break;
case 'tool_start':
showToolIndicator(data.tool);
break;
case 'tool_end':
hideToolIndicator(data.tool);
break;
case 'complete':
eventSource.close();
break;
}
};
// OpenClaw 后端配置
agent.serve({
port: 3000,
streaming: {
enabled: true,
transport: "sse", // sse | websocket
heartbeat: 15000, // 15秒心跳
retryInterval: 3000
}
});
🔌 WebSocket 实时通信
1. WebSocket 配置
# ~/.openclaw/config.yaml
websocket:
enabled: true
port: 3001
path: "/ws/agent"
# 连接管理
maxConnections: 1000
heartbeat: 30000 # 30秒心跳
idleTimeout: 300000 # 5分钟无活动断开
# 消息格式
messageFormat: "json" # json | msgpack
# 安全
auth:
type: "token" # token | jwt | apikey
tokenHeader: "Authorization"
# 事件订阅
events:
- agent.message # Agent消息
- agent.tool_call # 工具调用
- agent.status # Agent状态变化
- agent.error # 错误事件
2. 双向通信示例
// 服务端:OpenClaw WebSocket
const ws = agent.websocket();
// 监听客户端消息
ws.on('message', async (client, message) => {
// 实时流式响应
const stream = agent.chat({
message: message.text,
stream: true
});
for await (const chunk of stream) {
// 逐个token推送给客户端
client.send({
type: 'token',
content: chunk.content,
index: chunk.index
});
}
// 完成通知
client.send({ type: 'complete' });
});
// 主动推送:Agent 事件通知
ws.broadcast({
type: 'notification',
message: '定时任务执行完成',
data: { taskId: 'cron-001', status: 'success' }
});
3. 前端集成
// 前端 WebSocket 连接
const socket = new WebSocket('ws://localhost:3001/ws/agent');
// 发送消息
function sendMessage(text) {
socket.send(JSON.stringify({
type: 'message',
text: text,
conversationId: currentConversation
}));
}
// 接收流式响应
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'token':
// 逐字显示
appendToken(data.content);
break;
case 'complete':
// 对话完成
finalizeMessage();
break;
case 'notification':
// 主动推送的通知
showNotification(data.message);
break;
}
};
// 断线重连
socket.onclose = () => {
setTimeout(() => {
console.log('重连中...');
connectWebSocket();
}, 3000);
};
🔧 进阶模式
多路流式聚合
// 多个Agent同时输出,合并到一个流
const streams = await Promise.all([
agent.chat({ message: "写标题", stream: true }),
agent.chat({ message: "写正文", stream: true }),
agent.chat({ message: "写结尾", stream: true })
]);
// 交织输出
for await (const chunk of mergeStreams(streams)) {
ws.send({
type: 'multi_stream',
source: chunk.streamId,
content: chunk.content
});
}
进度实时推送
// 批量任务进度实时推送
await agent.batch({
items: largeDataset,
processor: processItem,
onProgress: (progress) => {
// 实时推送给前端
ws.send({
type: 'batch_progress',
completed: progress.completed,
total: progress.total,
percentage: Math.round(progress.completed / progress.total * 100),
current: progress.currentItem
});
}
});
💡 最佳实践
✅ 流式 vs WebSocket 选择
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 网页聊天界面 | SSE | 简单、HTTP兼容、单向推送够用 |
| 实时协作 | WebSocket | 双向通信、低延迟 |
| CLI 工具 | 流式 stdout | 无需网络、直接输出 |
| 移动端 App | WebSocket | 长连接省电、推送即时 |
| 事件通知 | SSE | 轻量级、自动重连 |
⚠️ 踩坑提醒
- 背压问题 - 生产速度 > 消费速度 → 加缓冲区
- 连接管理 - 断线重连 + 心跳必须实现
- 消息顺序 - 并行流的消息可能乱序 → 加序号
- 内存泄漏 - 未关闭的连接占内存 → 设超时
- 编码问题 - 流式输出可能截断 UTF-8 → 用行分割