📖 为什么要并行?
我怀疑串行执行是前世欠我的。一个 Agent 处理3个任务,每个等2分钟,总共6分钟。但如果3个 Agent 同时干,2分钟就完事。时间就像海绵里的水,并行一挤,还是有的。
并行适用场景:
- 多数据源聚合 - 同时从3个API拉数据
- 多文档处理 - 同时分析10份PDF
- 多Agent协作 - 不同Agent各司其职
- 批量操作 - 同时发送100条消息
⚡ 四种并行模式
模式一:Promise.all 批量并发
最简单的并行,适合独立的异步操作。
// ❌ 串行:一个一个等
const news = await fetchNews();
const weather = await fetchWeather();
const stocks = await fetchStocks();
// 总耗时 = news + weather + stocks
// ✅ 并行:同时发请求
const [news, weather, stocks] = await Promise.all([
fetchNews(),
fetchWeather(),
fetchStocks()
]);
// 总耗时 = max(news, weather, stocks)
模式二:子 Agent 并发执行
使用 OpenClaw 子 Agent 并发处理复杂任务。
// 生成3个子Agent并行工作
const results = await agent.spawnMultiple([
{
task: "分析这份财报的关键指标",
runtime: "subagent"
},
{
task: "搜索行业竞品对比数据",
runtime: "subagent"
},
{
task: "生成可视化图表",
runtime: "subagent"
}
]);
// 3个子Agent同时工作,结果自动聚合
console.log(results);
// [{analysis: "..."}, {competitors: "..."}, {charts: "..."}]
模式三:流式聚合
多个流同时进行,结果实时聚合。
// 多路流式聚合
const stream = agent.streamAggregate([
agent.chat({ message: "写一段产品介绍" }),
agent.chat({ message: "生成营销文案" }),
agent.chat({ message: "列出核心卖点" })
], {
// 任一完成就输出
emitOnFirst: true,
// 全部完成时合并
mergeResults: true
});
for await (const chunk of stream) {
console.log(chunk.source, chunk.content);
}
模式四:Map-Reduce 分治
大数据集分片处理,结果汇总。
// Map: 将1000条数据分成10批,并行处理
const batches = chunkArray(data, 100);
const partialResults = await Promise.all(
batches.map(batch =>
agent.process({ data: batch, task: "分析情感" })
)
);
// Reduce: 汇总结果
const finalResult = partialResults.reduce((acc, cur) => ({
positive: acc.positive + cur.positive,
negative: acc.negative + cur.negative,
neutral: acc.neutral + cur.neutral
}), { positive: 0, negative: 0, neutral: 0 });
🔧 OpenClaw 配置
# ~/.openclaw/config.yaml
concurrency:
# 最大并发数
maxConcurrent: 10
# 子Agent并发上限
maxSubAgents: 5
# 请求队列
queue:
enabled: true
maxSize: 100
strategy: "fifo" # fifo | priority | lifo
# 超时设置
timeout:
perTask: 30000 # 单任务30秒
total: 120000 # 总计2分钟
# 错误处理
errorHandling:
strategy: "fail-fast" # fail-fast | continue-on-error
maxRetries: 2
retryDelay: 1000
💡 最佳实践
✅ 并行设计原则
- 任务独立性 - 并行的任务之间不能有依赖关系
- 合理分片 - 粒度太细调度开销大,太粗并行度低
- 结果幂等 - 并行结果的合并顺序不应影响最终结果
- 资源限流 - 并发数不要超过 API 限额
⚠️ 常见踩坑
- 竞态条件 - 多个并行任务写同一份数据 → 数据错乱
- API 限流 - 并发过高触发 429 → 全部失败
- 内存溢出 - 1000个并行任务的结果同时加载 → OOM
- 错误吞噬 - Promise.all 一个失败全失败 → 用 allSettled
错误处理最佳实践
// ❌ 一个失败全挂
const results = await Promise.all(tasks);
// ✅ 部分失败不影响其他
const results = await Promise.allSettled(tasks);
const succeeded = results.filter(r => r.status === 'fulfilled');
const failed = results.filter(r => r.status === 'rejected');
// ✅ 带重试的并行
const results = await Promise.allSettled(
tasks.map(task => retry(task, { maxRetries: 3, delay: 1000 }))
);
📊 性能对比
# 测试场景:处理10个独立API调用,每个2秒
串行执行: 10 × 2s = 20s
5并发并行: ceil(10/5) × 2s = 4s
10并发并行: 2s
加速比 = 20s / 2s = 10x 🚀
# 实际考虑调度开销:约 8-9x
# 考虑API限流:约 5-7x(受rate limit约束)