🔄 OpenClaw 工作流编排完全指南

世界上有一种自动化叫OpenClaw工作流——你设定规则,它替你跑完马拉松...

📚 什么是工作流编排?

工作流编排是将多个任务按照特定逻辑组合执行的能力。OpenClaw支持:

  • 顺序执行:任务A完成后执行任务B
  • 并行执行:同时启动多个子Agent
  • 条件分支:根据结果选择不同路径
  • 循环迭代:批量处理列表数据
  • 错误恢复:失败时自动重试或回退

⚙️ 子Agent调度

并行启动多个子Agent

# 同时启动多个子Agent处理不同任务
agents = [
    sessions_spawn({
        "task": "分析竞品A",
        "runtime": "subagent"
    }),
    sessions_spawn({
        "task": "分析竞品B",
        "runtime": "subagent"
    }),
    sessions_spawn({
        "task": "分析竞品C",
        "runtime": "subagent"
    })
]

等待所有Agent完成

# 使用yield等待结果
sessions_yield({
    "message": "等待所有分析完成..."
})

# 收集结果
results = []
for agent in agents:
    result = sessions_history(agent.sessionKey)
    results.append(result)

⚙️ 条件分支

基于结果的路由

result = web_search({"query": "OpenClaw最新动态"})

if result.count > 10:
    # 内容多,生成详细报告
    sessions_spawn({
        "task": "生成详细报告",
        "runtime": "subagent"
    })
elif result.count > 0:
    # 内容适中,生成简报
    sessions_spawn({
        "task": "生成简报",
        "runtime": "subagent"
    })
else:
    # 无内容,发送通知
    message({"action": "send", "message": "今日无新动态"})

错误处理分支

try:
    result = web_fetch({"url": "https://example.com"})
except Exception as e:
    # 失败时切换到备用方案
    result = web_search({"query": "example.com 内容"})
    log_error(e)

⚙️ 循环与批处理

批量处理URL列表

urls = [
    "https://site1.com",
    "https://site2.com",
    "https://site3.com"
]

results = []
for url in urls:
    try:
        content = web_fetch({"url": url})
        results.append({"url": url, "content": content})
    except Exception as e:
        results.append({"url": url, "error": str(e)})
    
    # 避免请求过快
    time.sleep(1)

分页处理

page = 1
all_items = []

while True:
    result = fetch_page(page)
    all_items.extend(result.items)
    
    if not result.has_more:
        break
    
    page += 1
    
    # 限制最大页数
    if page > 100:
        break

💡 最佳实践

  • 任务拆分:复杂任务拆分为可并行的小任务
  • 超时控制:设置子Agent超时避免无限等待
  • 结果聚合:设计统一的结果格式便于合并
  • 错误隔离:子Agent失败不影响主流程
  • 日志追踪:记录每个步骤的执行状态
  • 幂等设计:支持重试而不产生副作用

🚀 实战案例:SEO内容工厂

# 完整的SEO内容生成工作流

def seo_content_pipeline(keywords):
    results = []
    
    for keyword in keywords:
        # 1. 搜索相关资料
        search_result = web_search({"query": keyword})
        
        # 2. 并行获取多个网页内容
        contents = []
        for url in search_result[:3]:
            try:
                content = web_fetch({"url": url})
                contents.append(content)
            except:
                continue
        
        # 3. 生成文章
        article = generate_article(contents, keyword)
        
        # 4. 保存文件
        filename = f"{keyword.replace(' ', '-')}.html"
        write(f"/var/www/miaoquai/content/{filename}", article)
        
        results.append({"keyword": keyword, "file": filename})
    
    # 5. 更新sitemap
    update_sitemap(results)
    
    # 6. 发送通知
    notify_completion(results)
    
    return results

🔗 相关资源

🎯 高级模式

管道模式

# 数据流:A → B → C
data = step_a()
data = step_b(data)
data = step_c(data)

扇出-扇入模式

# 扇出:一个任务拆分为多个并行任务
tasks = [spawn_task(item) for item in items]

# 扇入:收集所有结果
results = [wait(task) for task in tasks]
combined = merge_results(results)