🌊 OpenClaw Agent流式输出指南

让AI像水一样流淌——实时、流畅、优雅的Streaming实现

为什么需要流式输出?

想象一下,你问AI一个复杂问题,然后盯着空白屏幕等了30秒——直到完整答案一次性蹦出来。这体验糟透了。流式输出(Streaming)就是为了解决这个问题:让答案一个字一个字地"流"出来,而不是等完整答案才显示。

流式输出带来的好处:

💡 关键洞察:流式输出不仅改善用户体验,还能降低首字延迟(Time to First Token, TTFT),这是AI应用的核心体验指标。

OpenClaw Streaming架构

三种流式协议

OpenClaw支持三种主流流式协议:

协议 特点 适用场景
SSE 单向推送,基于HTTP 聊天应用、通知推送
WebSocket 双向通信,持久连接 实时协作、多人交互
Chunked HTTP分块传输 简单场景、兼容性要求高

实现SSE流式输出

Server-Sent Events (SSE) 是最常用的流式方案:

服务端实现

import { Agent } from 'openclaw';
import { ServerSentEvent } from 'openclaw/streaming';

const agent = new Agent({
  model: 'gpt-4',
  streaming: true
});

// SSE流式响应
app.get('/chat/stream', async (req, res) => {
  const { message } = req.query;
  
  // 设置SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  // 创建SSE发射器
  const sse = new ServerSentEvent(res);
  
  try {
    // 流式调用Agent
    const stream = await agent.stream(message);
    
    for await (const chunk of stream) {
      // 发送不同类型的事件
      if (chunk.type === 'content') {
        sse.send('message', { content: chunk.text });
      } else if (chunk.type === 'tool_call') {
        sse.send('tool', { name: chunk.toolName, args: chunk.args });
      } else if (chunk.type === 'done') {
        sse.send('complete', { usage: chunk.usage });
      }
    }
  } catch (error) {
    sse.send('error', { message: error.message });
  } finally {
    sse.close();
  }
});

客户端消费

// 前端消费SSE流
const eventSource = new EventSource('/chat/stream?message=' + encodeURIComponent(message));

eventSource.addEventListener('message', (e) => {
  const data = JSON.parse(e.data);
  appendToChat(data.content); // 逐步追加内容
});

eventSource.addEventListener('tool', (e) => {
  const data = JSON.parse(e.data);
  showToolCall(data.name, data.args); // 显示工具调用
});

eventSource.addEventListener('complete', (e) => {
  const data = JSON.parse(e.data);
  showUsage(data.usage); // 显示token用量
  eventSource.close();
});

eventSource.addEventListener('error', (e) => {
  console.error('SSE Error:', e);
  eventSource.close();
});

WebSocket双向流式

当需要双向通信(如用户中途打断、动态修改prompt)时,WebSocket更合适:

import { WebSocket } from 'ws';
import { Agent } from 'openclaw';

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  const agent = new Agent({ streaming: true });
  
  ws.on('message', async (data) => {
    const { type, payload } = JSON.parse(data.toString());
    
    if (type === 'chat') {
      const stream = await agent.stream(payload.message);
      
      for await (const chunk of stream) {
        ws.send(JSON.stringify({
          type: 'chunk',
          content: chunk.text
        }));
      }
      
      ws.send(JSON.stringify({ type: 'done' }));
    }
    
    if (type === 'abort') {
      agent.abort(); // 支持中途取消
      ws.send(JSON.stringify({ type: 'aborted' }));
    }
  });
});

高级流式特性

工具调用的流式反馈

OpenClaw支持在工具执行时发送实时状态:

for await (const event of agent.streamWithTools(message)) {
  switch (event.type) {
    case 'thinking':
      // LLM正在思考
      sendEvent('thinking', { status: 'analyzing' });
      break;
    
    case 'tool_start':
      // 工具开始执行
      sendEvent('tool_start', { 
        name: event.toolName,
        args: event.args 
      });
      break;
    
    case 'tool_progress':
      // 工具执行进度(如文件下载)
      sendEvent('tool_progress', { 
        progress: event.progress,
        message: event.status 
      });
      break;
    
    case 'tool_complete':
      // 工具执行完成
      sendEvent('tool_complete', { 
        result: event.result 
      });
      break;
    
    case 'content':
      // 文本内容输出
      sendEvent('content', { text: event.text });
      break;
  }
}

多Agent协作流式

当多个Agent协作时,流式输出可以展示每个Agent的工作:

const multiAgentStream = orchestrator.streamWithAgents(task);

for await (const event of multiAgentStream) {
  if (event.type === 'agent_output') {
    sendEvent('agent_update', {
      agent: event.agentName,
      action: event.action,
      content: event.content
    });
  }
}

性能优化

减少chunk频率

⚠️ 不要每个token都发送:过于频繁的chunk会增加网络开销。建议每10-50ms或每5-10个token发送一次。
// 使用缓冲区减少发送频率
let buffer = '';
let lastSend = Date.now();

for await (const chunk of stream) {
  buffer += chunk.text;
  
  const now = Date.now();
  if (buffer.length >= 10 || now - lastSend >= 50) {
    sse.send('message', { content: buffer });
    buffer = '';
    lastSend = now;
  }
}

// 发送剩余内容
if (buffer) {
  sse.send('message', { content: buffer });
}

背压处理

当客户端处理速度跟不上生成速度时:

const stream = agent.stream(message);
const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  
  // 检查客户端是否准备好
  if (ws.bufferedAmount > MAX_BUFFER) {
    await waitForDrain(ws);
  }
  
  ws.send(JSON.stringify(value));
}

最佳实践

  1. 首字节优化 - 尽快发送第一个chunk,减少TTFT
  2. 心跳保活 - 每15-30秒发送空事件防止连接超时
  3. 错误恢复 - 客户端自动重连,服务端支持从断点续传
  4. 资源清理 - 连接断开时及时释放Agent资源
💡 生产环境建议:使用OpenClaw内置的StreamManager,它自动处理心跳、背压、错误恢复。

常见问题

Q: SSE和WebSocket怎么选?

如果只需要服务端推送(如聊天),用SSE更简单。如果需要双向通信(如实时协作),用WebSocket。

Q: 流式输出会影响Token计费吗?

不会。Token计费基于实际用量,与输出方式无关。

Q: 如何处理流式中的中断?

OpenClaw Agent支持abort()方法,会取消正在进行的请求并清理资源。

📅 最后更新:2026年5月2日 | 作者:妙趣AI