📖 为什么要批量处理?
凌晨1点03分,我盯着屏幕上503个待处理的文件叹了口气。一个一个来?不如让机器去排队,我在旁边喝茶。
批量处理是 Agent 的工业化能力——把重复性工作标准化、自动化、规模化。
典型应用场景:
- 内容生成 - 批量生成500个SEO页面
- 数据处理 - 批量分析10000条用户反馈
- 文件转换 - 批量转PDF/HTML/Markdown
- API 调用 - 批量从多个源拉取数据
- 社区运营 - 批量发布到多个平台
⚡ 核心配置
# ~/.openclaw/config.yaml
batch:
# 分片策略
chunking:
strategy: "fixed" # fixed | dynamic | time-based
size: 50 # 每批50个
# 并发控制
concurrency:
maxWorkers: 5 # 最多5个并发
perWorkerQueue: 10 # 每个Worker队列长度
# 进度跟踪
progress:
enabled: true
persistInterval: 10 # 每10条保存进度
store: "file" # file | redis | db
filePath: "./batch-progress.json"
# 断点续传
resume:
enabled: true
autoDetect: true # 自动检测未完成任务
# 错误处理
errorHandling:
maxRetries: 3
retryDelay: 2000 # 2秒
backoffMultiplier: 2 # 指数退避
failedItemsFile: "./batch-failed.json"
🔧 代码示例
1. 基础批量处理
// 定义批量任务
const items = [
{ id: 1, title: "OpenClaw 教程", type: "tutorial" },
{ id: 2, title: "AI Agent 指南", type: "guide" },
// ... 500 more items
];
const results = await agent.batch({
items,
concurrency: 5,
processor: async (item) => {
// 每个item的处理逻辑
const content = await agent.chat({
message: `为"${item.title}"生成SEO页面`
});
return { id: item.id, content };
},
// 进度回调
onProgress: (progress) => {
console.log(`进度: ${progress.completed}/${progress.total}`);
console.log(`成功: ${progress.succeeded}`);
console.log(`失败: ${progress.failed}`);
console.log(`耗时: ${progress.elapsed}ms`);
}
});
2. 断点续传
// 自动从上次中断处继续
const results = await agent.batch({
items,
processor: myProcessor,
resume: {
enabled: true,
// 自动读取上次进度
checkpointFile: "./batch-checkpoint.json"
}
});
// 即使中途中断,下次运行会自动跳过已完成的项
3. 分片处理大数据
// 10000条数据分批处理
const allData = loadCSV("big-dataset.csv"); // 10000行
const results = await agent.batchChunked({
data: allData,
chunkSize: 100, // 每批100条
concurrency: 3, // 3批并行
processor: async (chunk) => {
// 处理一个批次
return chunk.map(row => ({
...row,
sentiment: await analyze(row.text)
}));
},
// 结果自动合并
merger: (allChunks) => allChunks.flat()
});
4. 批量内容生成实战
// 批量生成SEO页面(妙趣AI实际使用的方式)
const topics = [
"openclaw-mcp-server-setup",
"openclaw-telegram-bot",
"openclaw-discord-integration",
// ... 更多话题
];
await agent.batch({
items: topics,
concurrency: 3,
processor: async (topic) => {
const html = await generateSEOPage(topic);
writeFileSync(`/var/www/miaoquai/tools/${topic}.html`, html);
return { topic, status: "done" };
},
onProgress: (p) => console.log(`${p.completed}/${p.total}`)
});
💡 最佳实践
✅ 批量处理黄金法则
| 法则 | 说明 | 反面教材 |
|---|---|---|
| 先小后大 | 先用10条测试,再上1000条 | 直接跑全量,出错全废 |
| 结果持久化 | 每处理完一批立即保存 | 全部跑完才保存,崩溃全丢 |
| 错误隔离 | 单条失败不影响其他 | 一个报错全部中断 |
| 进度可见 | 实时输出进度信息 | 跑了一小时不知道到哪了 |
| 速率控制 | 不超过API限额 | 并发太高被429封了 |
⚠️ 踩坑实录
- 内存爆炸 - 一次性加载10000条到内存 → 用流式读取
- API 封禁 - 1秒内发100个请求 → 加上速率控制
- 结果重复 - 断点续传判断不准 → 用幂等ID
- 编码问题 - 批量处理文件编码不一致 → 统一 UTF-8
📊 性能参考
# 妙趣AI实际数据:批量生成SEO页面
# 任务:生成8个教程页面,每个约10KB
串行执行(1并发): 8 × 30s = 240s (4分钟)
并行执行(3并发): ceil(8/3) × 30s = 90s (1.5分钟)
并行执行(5并发): ceil(8/5) × 30s = 60s (1分钟)
实际提升: 4x
API 调用节省: 0(相同调用量)
但时间节省 = 3分钟 × 每次运行 = 长期收益巨大