🚀 OpenClaw 并行执行模式

一个人干三份活?不如三个人各干一份

并行 并发 子Agent 性能

📖 为什么要并行?

我怀疑串行执行是前世欠我的。一个 Agent 处理3个任务,每个等2分钟,总共6分钟。但如果3个 Agent 同时干,2分钟就完事。时间就像海绵里的水,并行一挤,还是有的。

并行适用场景:

  • 多数据源聚合 - 同时从3个API拉数据
  • 多文档处理 - 同时分析10份PDF
  • 多Agent协作 - 不同Agent各司其职
  • 批量操作 - 同时发送100条消息

⚡ 四种并行模式

模式一:Promise.all 批量并发

最简单的并行,适合独立的异步操作。

// ❌ 串行:一个一个等
const news = await fetchNews();
const weather = await fetchWeather();
const stocks = await fetchStocks();
// 总耗时 = news + weather + stocks

// ✅ 并行:同时发请求
const [news, weather, stocks] = await Promise.all([
  fetchNews(),
  fetchWeather(),
  fetchStocks()
]);
// 总耗时 = max(news, weather, stocks)

模式二:子 Agent 并发执行

使用 OpenClaw 子 Agent 并发处理复杂任务。

// 生成3个子Agent并行工作
const results = await agent.spawnMultiple([
  {
    task: "分析这份财报的关键指标",
    runtime: "subagent"
  },
  {
    task: "搜索行业竞品对比数据",
    runtime: "subagent"
  },
  {
    task: "生成可视化图表",
    runtime: "subagent"
  }
]);

// 3个子Agent同时工作,结果自动聚合
console.log(results);
// [{analysis: "..."}, {competitors: "..."}, {charts: "..."}]

模式三:流式聚合

多个流同时进行,结果实时聚合。

// 多路流式聚合
const stream = agent.streamAggregate([
  agent.chat({ message: "写一段产品介绍" }),
  agent.chat({ message: "生成营销文案" }),
  agent.chat({ message: "列出核心卖点" })
], {
  // 任一完成就输出
  emitOnFirst: true,
  // 全部完成时合并
  mergeResults: true
});

for await (const chunk of stream) {
  console.log(chunk.source, chunk.content);
}

模式四:Map-Reduce 分治

大数据集分片处理,结果汇总。

// Map: 将1000条数据分成10批,并行处理
const batches = chunkArray(data, 100);
const partialResults = await Promise.all(
  batches.map(batch => 
    agent.process({ data: batch, task: "分析情感" })
  )
);

// Reduce: 汇总结果
const finalResult = partialResults.reduce((acc, cur) => ({
  positive: acc.positive + cur.positive,
  negative: acc.negative + cur.negative,
  neutral: acc.neutral + cur.neutral
}), { positive: 0, negative: 0, neutral: 0 });

🔧 OpenClaw 配置

# ~/.openclaw/config.yaml
concurrency:
  # 最大并发数
  maxConcurrent: 10
  
  # 子Agent并发上限
  maxSubAgents: 5
  
  # 请求队列
  queue:
    enabled: true
    maxSize: 100
    strategy: "fifo"  # fifo | priority | lifo
  
  # 超时设置
  timeout:
    perTask: 30000     # 单任务30秒
    total: 120000      # 总计2分钟
  
  # 错误处理
  errorHandling:
    strategy: "fail-fast"  # fail-fast | continue-on-error
    maxRetries: 2
    retryDelay: 1000

💡 最佳实践

✅ 并行设计原则

  1. 任务独立性 - 并行的任务之间不能有依赖关系
  2. 合理分片 - 粒度太细调度开销大,太粗并行度低
  3. 结果幂等 - 并行结果的合并顺序不应影响最终结果
  4. 资源限流 - 并发数不要超过 API 限额

⚠️ 常见踩坑

  1. 竞态条件 - 多个并行任务写同一份数据 → 数据错乱
  2. API 限流 - 并发过高触发 429 → 全部失败
  3. 内存溢出 - 1000个并行任务的结果同时加载 → OOM
  4. 错误吞噬 - Promise.all 一个失败全失败 → 用 allSettled

错误处理最佳实践

// ❌ 一个失败全挂
const results = await Promise.all(tasks);

// ✅ 部分失败不影响其他
const results = await Promise.allSettled(tasks);
const succeeded = results.filter(r => r.status === 'fulfilled');
const failed = results.filter(r => r.status === 'rejected');

// ✅ 带重试的并行
const results = await Promise.allSettled(
  tasks.map(task => retry(task, { maxRetries: 3, delay: 1000 }))
);

📊 性能对比

# 测试场景:处理10个独立API调用,每个2秒

串行执行:   10 × 2s = 20s
5并发并行:  ceil(10/5) × 2s = 4s
10并发并行: 2s

加速比 = 20s / 2s = 10x 🚀

# 实际考虑调度开销:约 8-9x
# 考虑API限流:约 5-7x(受rate limit约束)

🔗 相关资源