🌊 Streaming Responses(流式响应)详解

"凌晨4点03分,用户在等待。不是等结果,而是等第一个字。那一秒钟的沉默,比一分钟的结果更漫长..."

Streaming Responses(流式响应)是一种让 AI Agent 逐字逐句实时输出 的技术。不同于传统「等待完整响应」模式,流式响应让用户立即看到输出,大幅改善交互体验。

❌ 非流式响应

用户: 写一篇AI新闻日报

[等待 15 秒...]

Agent: 好的,这是今天的AI新闻日报:
2026年4月21日,OpenAI发布...

(用户等待期间什么都没看到)
        

✅ 流式响应

用户: 写一篇AI新闻日报

Agent: 好的,
Agent: 好的,这是今天
Agent: 好的,这是今天的AI新闻
Agent: 好的,这是今天的AI新闻日报:
2026年4月21日
...

(用户立即看到输出)
        

为什么 Streaming 如此重要?

1. 降低感知延迟

用户对等待的感知是非线性的:

等待时间 vs 用户焦虑程度

    焦虑
      │
   高 │                              ████████████
      │                         ████
      │                    ████
      │               ████
   低 │          ████
      │     ████
      │ ████
      └──────────────────────────────────────────
        0s   1s   2s   5s   10s   15s   30s  等待时间

关键点:0-2秒是"舒适区",超过5秒焦虑急剧上升
Streaming 把感知等待从 15s 降到 <1s

2. 提升透明度

流式输出让用户看到 Agent 的工作过程:

3. 支持长任务

对于长内容生成,流式输出避免超时:

// 非 Streaming:必须等完整响应
// 10000字文章可能需要60秒,容易超时

// Streaming:持续输出,永不超时
// 用户看到持续的内容流,体验更好

Streaming 技术原理

SSE (Server-Sent Events)

最常用的流式传输协议:

// HTTP Response Headers
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

// 数据格式
data: {"text": "好的"}

data: {"text": ",这是"}

data: {"text": "今天的"}

data: [DONE]

OpenAI API Streaming

const stream = await openai.chat.completions.create({
  model: "gpt-4",
  messages: [{ role: "user", content: "写一篇AI新闻" }],
  stream: true  // 开启流式
});

for await (const chunk of stream) {
  const delta = chunk.choices[0]?.delta?.content || "";
  process.stdout.write(delta);
  // 实时输出:好的,这是今天的AI新闻...
}

Claude API Streaming

const stream = await anthropic.messages.stream({
  model: "claude-3-opus",
  max_tokens: 4096,
  messages: [{ role: "user", content: "写一篇AI新闻" }]
});

stream.on("text", (text) => {
  process.stdout.write(text);
  // 实时输出
});

const finalMessage = await stream.finalMessage();
// 最终完整消息

Streaming 的时间线

📊 非 Streaming 时间线
用户发送 ──────────────────────────────────────────────────────▶ 完整响应
   │                                                          │
   └────── 等待 15 秒,什么都没显示 ──────────────────────────┘
📊 Streaming 时间线
用户发送 ──▶ 首字 0.5s ──▶ 持续输出 ──▶ 完成 15s
   │           │              │            │
   └── [TTFB]──└── 逐字显示 ──┴────────────┘
   
TTFB (Time to First Byte): 0.5 秒
用户感知延迟: 0.5 秒 (vs 15 秒)

OpenClaw Streaming 实战

1. sessions_spawn 流式输出

// 启动子Agent,流式输出到父会话
const spawn = await sessions_spawn({
  runtime: "subagent",
  task: "生成一篇详细的AI新闻日报",
  streamTo: "parent"  // 流式输出到父会话
});

// 父会话会实时收到:
// "好的,"
// "这是今天的"
// "AI新闻日报..."
// 而不是等待完整响应

2. 浏览器端的 Streaming

// 前端接收流式响应
const response = await fetch("/api/agent/stream", {
  method: "POST",
  body: JSON.stringify({ message: "写一篇新闻" })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  
  const chunk = decoder.decode(value);
  const data = JSON.parse(chunk);
  
  // 实时追加到页面
  outputElement.textContent += data.text;
}

3. 处理 Streaming 中断

// 用户中断流式输出
let abortController = new AbortController();

// 开始流式请求
const streamPromise = fetchAgentStream({
  signal: abortController.signal
});

// 用户点击"停止"
stopButton.onclick = () => {
  abortController.abort();
  // Agent 会收到中断信号并停止
};

// Agent 端处理
async function* streamResponse(signal) {
  for (const chunk of generateChunks()) {
    if (signal.aborted) {
      yield { type: "aborted", partialContent: accumulated };
      return;
    }
    yield chunk;
  }
}

Streaming 与 Tool Calling

挑战:工具调用中断流

// 用户看到流式输出
Agent: "让我搜索一下最新的..." [停止]

// Agent 调用工具
[Tool: web_search("AI news")]

// 等待工具返回...(用户看到空白)

// 工具返回后继续
Agent: "根据搜索结果..."

// 问题:工具调用打破了流式体验

解决方案:工具状态提示

// 流式输出工具状态
Agent: "让我搜索一下最新的..."
[🔧 正在调用 web_search...]
[⏳ 等待结果...]
[✅ 获取到 10 条结果]
Agent: "根据搜索结果..."

// 用户始终知道 Agent 在做什么

性能优化

1. 缓冲策略

// 智能缓冲:平衡性能和流畅度
const bufferConfig = {
  minChunkSize: 5,      // 最少5个字符才发送
  maxBufferSize: 50,    // 最多缓冲50个字符
  flushInterval: 100    // 100ms 强制刷新
};

// 效果:
// - 减少网络请求次数
// - 保持流畅输出感
// - 避免单字跳动

2. 断点续传

// 网络中断时保存状态
const streamState = {
  sessionId: "sess-123",
  lastTokenIndex: 234,
  partialContent: "已生成的内容..."
};

// 重连后恢复
function resumeStream(state) {
  return fetch("/api/stream/resume", {
    body: JSON.stringify({
      sessionId: state.sessionId,
      from: state.lastTokenIndex
    })
  });
}

3. 压缩传输

// 对流式数据压缩
// 原始:每次发送完整JSON
data: {"text": "好的,", "index": 0}
data: {"text": "这是", "index": 1}

// 压缩:只发送增量
data: 好的,
data: 这是
data: 今天的

// 压缩后带宽节省 60%+

常见问题

Q: Streaming 会增加服务器负担吗?

A: 会增加连接数,但减少超时问题。总体上是正面的。

Q: 所有场景都适合 Streaming 吗?

A: 不适合需要完整结果才能展示的场景,如数据分析图表。

Q: 如何处理 Streaming 错误?

// 流式错误处理
stream.on("error", (error) => {
  if (error.code === "NETWORK_ERROR") {
    // 尝试重连
    reconnectStream();
  } else if (error.code === "RATE_LIMIT") {
    // 提示用户等待
    showRateLimitWarning();
  } else {
    // 显示部分结果 + 错误信息
    showPartialResult();
    showError(error.message);
  }
});
"Streaming 不是技术优化,而是用户体验优化。它把「等待」变成了「观看」,把「黑盒」变成了「玻璃盒」,把「焦虑」变成了「期待」——这才是真正的交互设计..."