🔄 OpenClaw 工作流编排完全指南
世界上有一种自动化叫OpenClaw工作流——你设定规则,它替你跑完马拉松...
📚 什么是工作流编排?
工作流编排是将多个任务按照特定逻辑组合执行的能力。OpenClaw支持:
- 顺序执行:任务A完成后执行任务B
- 并行执行:同时启动多个子Agent
- 条件分支:根据结果选择不同路径
- 循环迭代:批量处理列表数据
- 错误恢复:失败时自动重试或回退
⚙️ 子Agent调度
并行启动多个子Agent
# 同时启动多个子Agent处理不同任务
agents = [
sessions_spawn({
"task": "分析竞品A",
"runtime": "subagent"
}),
sessions_spawn({
"task": "分析竞品B",
"runtime": "subagent"
}),
sessions_spawn({
"task": "分析竞品C",
"runtime": "subagent"
})
]
等待所有Agent完成
# 使用yield等待结果
sessions_yield({
"message": "等待所有分析完成..."
})
# 收集结果
results = []
for agent in agents:
result = sessions_history(agent.sessionKey)
results.append(result)
⚙️ 条件分支
基于结果的路由
result = web_search({"query": "OpenClaw最新动态"})
if result.count > 10:
# 内容多,生成详细报告
sessions_spawn({
"task": "生成详细报告",
"runtime": "subagent"
})
elif result.count > 0:
# 内容适中,生成简报
sessions_spawn({
"task": "生成简报",
"runtime": "subagent"
})
else:
# 无内容,发送通知
message({"action": "send", "message": "今日无新动态"})
错误处理分支
try:
result = web_fetch({"url": "https://example.com"})
except Exception as e:
# 失败时切换到备用方案
result = web_search({"query": "example.com 内容"})
log_error(e)
⚙️ 循环与批处理
批量处理URL列表
urls = [
"https://site1.com",
"https://site2.com",
"https://site3.com"
]
results = []
for url in urls:
try:
content = web_fetch({"url": url})
results.append({"url": url, "content": content})
except Exception as e:
results.append({"url": url, "error": str(e)})
# 避免请求过快
time.sleep(1)
分页处理
page = 1
all_items = []
while True:
result = fetch_page(page)
all_items.extend(result.items)
if not result.has_more:
break
page += 1
# 限制最大页数
if page > 100:
break
💡 最佳实践
- 任务拆分:复杂任务拆分为可并行的小任务
- 超时控制:设置子Agent超时避免无限等待
- 结果聚合:设计统一的结果格式便于合并
- 错误隔离:子Agent失败不影响主流程
- 日志追踪:记录每个步骤的执行状态
- 幂等设计:支持重试而不产生副作用
🚀 实战案例:SEO内容工厂
# 完整的SEO内容生成工作流
def seo_content_pipeline(keywords):
results = []
for keyword in keywords:
# 1. 搜索相关资料
search_result = web_search({"query": keyword})
# 2. 并行获取多个网页内容
contents = []
for url in search_result[:3]:
try:
content = web_fetch({"url": url})
contents.append(content)
except:
continue
# 3. 生成文章
article = generate_article(contents, keyword)
# 4. 保存文件
filename = f"{keyword.replace(' ', '-')}.html"
write(f"/var/www/miaoquai/content/{filename}", article)
results.append({"keyword": keyword, "file": filename})
# 5. 更新sitemap
update_sitemap(results)
# 6. 发送通知
notify_completion(results)
return results
🔗 相关资源
🎯 高级模式
管道模式
# 数据流:A → B → C
data = step_a()
data = step_b(data)
data = step_c(data)
扇出-扇入模式
# 扇出:一个任务拆分为多个并行任务
tasks = [spawn_task(item) for item in items]
# 扇入:收集所有结果
results = [wait(task) for task in tasks]
combined = merge_results(results)