🌊 OpenClaw 流式输出与 WebSocket 实时通信

让 Agent 不再"憋大招",一个字一个字地吐

流式 WebSocket SSE 实时

📖 为什么要流式?

我怀疑"等AI回复"这件事是世上最漫长的等待。一个问题甩出去,然后盯着光标闪烁5秒、10秒、30秒……终于一大坨文字冒出来。用户体验?不存在。

流式输出让 Agent 像真人一样边想边说。不等全部生成完,一个 token 一个 token 地推送。用户看到文字逐渐浮现,就像有人在跟你实时对话。

❌ 非流式

请求 → 等待30秒 → 一次性返回全部

感知延迟:30秒

用户体验:痛苦

✅ 流式

请求 → 0.3秒后开始 → 逐token推送

感知延迟:0.3秒

用户体验:丝滑

⚡ 流式输出

1. 基础流式对话

// OpenClaw 流式对话
const stream = agent.chat({
  message: "解释一下什么是 RAG",
  stream: true  // 启用流式
});

for await (const chunk of stream) {
  process.stdout.write(chunk.content);
  // 输出:R...A...G...是...检...索...增...强...生...成...
}

// 流式事件类型
for await (const event of stream) {
  switch (event.type) {
    case 'content':
      // 文字内容
      process.stdout.write(event.content);
      break;
    case 'tool_call':
      // 工具调用
      console.log(`\n🔧 调用工具: ${event.tool}`);
      break;
    case 'tool_result':
      // 工具返回
      console.log(`✅ 工具结果: ${event.result}`);
      break;
    case 'done':
      // 完成
      console.log('\n✨ 完成');
      break;
  }
}

2. SSE 服务端推送

// OpenClaw SSE 端点配置
// 前端使用 EventSource 接收
const eventSource = new EventSource('/api/agent/stream');

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  switch (data.type) {
    case 'token':
      appendToChat(data.content);
      break;
    case 'tool_start':
      showToolIndicator(data.tool);
      break;
    case 'tool_end':
      hideToolIndicator(data.tool);
      break;
    case 'complete':
      eventSource.close();
      break;
  }
};

// OpenClaw 后端配置
agent.serve({
  port: 3000,
  streaming: {
    enabled: true,
    transport: "sse",  // sse | websocket
    heartbeat: 15000,  // 15秒心跳
    retryInterval: 3000
  }
});

🔌 WebSocket 实时通信

1. WebSocket 配置

# ~/.openclaw/config.yaml
websocket:
  enabled: true
  port: 3001
  path: "/ws/agent"
  
  # 连接管理
  maxConnections: 1000
  heartbeat: 30000       # 30秒心跳
  idleTimeout: 300000    # 5分钟无活动断开
  
  # 消息格式
  messageFormat: "json"  # json | msgpack
  
  # 安全
  auth:
    type: "token"        # token | jwt | apikey
    tokenHeader: "Authorization"
    
  # 事件订阅
  events:
    - agent.message      # Agent消息
    - agent.tool_call    # 工具调用
    - agent.status       # Agent状态变化
    - agent.error        # 错误事件

2. 双向通信示例

// 服务端:OpenClaw WebSocket
const ws = agent.websocket();

// 监听客户端消息
ws.on('message', async (client, message) => {
  // 实时流式响应
  const stream = agent.chat({
    message: message.text,
    stream: true
  });
  
  for await (const chunk of stream) {
    // 逐个token推送给客户端
    client.send({
      type: 'token',
      content: chunk.content,
      index: chunk.index
    });
  }
  
  // 完成通知
  client.send({ type: 'complete' });
});

// 主动推送:Agent 事件通知
ws.broadcast({
  type: 'notification',
  message: '定时任务执行完成',
  data: { taskId: 'cron-001', status: 'success' }
});

3. 前端集成

// 前端 WebSocket 连接
const socket = new WebSocket('ws://localhost:3001/ws/agent');

// 发送消息
function sendMessage(text) {
  socket.send(JSON.stringify({
    type: 'message',
    text: text,
    conversationId: currentConversation
  }));
}

// 接收流式响应
socket.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  switch (data.type) {
    case 'token':
      // 逐字显示
      appendToken(data.content);
      break;
    case 'complete':
      // 对话完成
      finalizeMessage();
      break;
    case 'notification':
      // 主动推送的通知
      showNotification(data.message);
      break;
  }
};

// 断线重连
socket.onclose = () => {
  setTimeout(() => {
    console.log('重连中...');
    connectWebSocket();
  }, 3000);
};

🔧 进阶模式

多路流式聚合

// 多个Agent同时输出,合并到一个流
const streams = await Promise.all([
  agent.chat({ message: "写标题", stream: true }),
  agent.chat({ message: "写正文", stream: true }),
  agent.chat({ message: "写结尾", stream: true })
]);

// 交织输出
for await (const chunk of mergeStreams(streams)) {
  ws.send({
    type: 'multi_stream',
    source: chunk.streamId,
    content: chunk.content
  });
}

进度实时推送

// 批量任务进度实时推送
await agent.batch({
  items: largeDataset,
  processor: processItem,
  onProgress: (progress) => {
    // 实时推送给前端
    ws.send({
      type: 'batch_progress',
      completed: progress.completed,
      total: progress.total,
      percentage: Math.round(progress.completed / progress.total * 100),
      current: progress.currentItem
    });
  }
});

💡 最佳实践

✅ 流式 vs WebSocket 选择

场景 推荐方案 理由
网页聊天界面 SSE 简单、HTTP兼容、单向推送够用
实时协作 WebSocket 双向通信、低延迟
CLI 工具 流式 stdout 无需网络、直接输出
移动端 App WebSocket 长连接省电、推送即时
事件通知 SSE 轻量级、自动重连

⚠️ 踩坑提醒

  1. 背压问题 - 生产速度 > 消费速度 → 加缓冲区
  2. 连接管理 - 断线重连 + 心跳必须实现
  3. 消息顺序 - 并行流的消息可能乱序 → 加序号
  4. 内存泄漏 - 未关闭的连接占内存 → 设超时
  5. 编码问题 - 流式输出可能截断 UTF-8 → 用行分割

🔗 相关资源