🎼 OpenClaw 工作流编排
"单个 Agent 是独奏,多个 Agent 是协奏。而工作流编排,就是那位让所有人配合默契的指挥家。"
什么是工作流编排?
工作流编排(Workflow Orchestration)是把多个任务、多个 Agent 按照特定逻辑串联起来,形成自动化流水线的技术。
就像工厂的生产线:原材料进入,经过一道道工序,最终变成成品。在 OpenClaw 中,数据就是原材料,Agent 和 Skills 就是工序,工作流就是那条生产线。
核心概念
工作流三要素
- Steps(步骤) - 工作流中的单个任务单元
- Transitions(流转) - 步骤之间的执行顺序和条件
- State(状态) - 整个工作流的执行状态和上下文数据
工作流类型
| 类型 | 特点 | 适用场景 |
|---|---|---|
| 顺序流 | 步骤按顺序执行,一步接一步 | 简单的流水线 |
| 并行流 | 多个步骤同时执行 | 批量处理、数据采集 |
| 条件流 | 根据条件选择不同分支 | 审批流程、智能路由 |
| 循环流 | 重复执行直到满足条件 | 轮询、批量重试 |
基础工作流示例
顺序工作流:内容创作流水线
// workflows/content-pipeline.yaml
name: daily-content-pipeline
version: "1.0"
triggers:
- type: cron
schedule: "0 6 * * *" # 每天早上6点
context:
# 全局变量
date: "{{today}}"
outputDir: "/content/{{date}}"
steps:
# Step 1: 热点发现
- id: discover
name: 发现热点话题
skill: web-search
action: trending-topics
params:
category: "AI"
count: 5
output: trendingTopics
# Step 2: 并行研究(针对每个话题)
- id: research
name: 深度研究
type: parallel
foreach: "{{steps.discover.output}}"
steps:
- skill: web-search
action: deep-search
params:
query: "{{item.title}}"
sources: ["news", "blogs", "papers"]
output: "research_{{item.id}}"
# Step 3: AI写作
- id: write
name: 生成文章
skill: ai-writer
action: write-article
params:
topic: "{{steps.discover.output[0].title}}"
research: "{{steps.research.output}}"
style: "professional"
wordCount: 1500
output: article
# Step 4: SEO优化
- id: optimize
name: SEO优化
skill: seo-optimizer
action: optimize
params:
content: "{{steps.write.output}}"
keywords: "{{steps.discover.output[0].keywords}}"
output: optimizedArticle
# Step 5: 保存和发布
- id: publish
name: 发布内容
type: parallel
steps:
- skill: file-ops
action: save
params:
path: "{{context.outputDir}}/article.html"
content: "{{steps.optimize.output.html}}"
- skill: discord-bot
action: announce
params:
channel: "daily-content"
message: "新文章已发布: {{steps.optimize.output.title}}"
onSuccess:
- skill: notification
action: send
params:
to: "admin@example.com"
subject: "内容流水线完成"
onFailure:
- skill: notification
action: send
params:
to: "admin@example.com"
subject: "内容流水线失败: {{error.message}}"
高级编排模式
条件分支工作流
// workflows/smart-router.yaml
steps:
# 分析输入类型
- id: analyze
skill: ai-classifier
action: classify
params:
input: "{{input.text}}"
categories: ["question", "complaint", "praise", "spam"]
output: classification
# 根据类型路由到不同处理流程
- id: route
type: switch
condition: "{{steps.analyze.output.category}}"
cases:
question:
- skill: knowledge-base
action: search
params:
query: "{{input.text}}"
- skill: ai-writer
action: compose-reply
params:
context: "{{previous.output}}"
complaint:
- skill: sentiment-analyzer
action: analyze-urgency
- skill: ticket-system
action: create-ticket
params:
priority: "{{previous.output.urgency > 0.8 ? 'high' : 'normal'}}"
praise:
- skill: crm-system
action: record-feedback
- skill: ai-writer
action: compose-thank-you
spam:
- skill: filter-system
action: block-sender
循环重试模式
// workflows/reliable-processing.yaml
steps:
- id: fetch-data
skill: api-client
action: get
params:
url: "{{input.endpoint}}"
retry:
maxAttempts: 5
backoff: exponential
initialDelay: 1000
maxDelay: 30000
timeout: 60000
# 批量处理,失败项自动重试
- id: batch-process
type: foreach
items: "{{steps.fetch-data.output.items}}"
steps:
- skill: processor
action: process
params:
item: "{{item}}"
retry:
maxAttempts: 3
onError: continue # 单个失败继续处理其他
output: processedItems
# 汇总结果
- id: summarize
skill: data-aggregator
action: summary
params:
results: "{{steps.batch-process.output}}"
代码实现工作流
程序化编排
const { WorkflowEngine } = require('openclaw/workflow');
class ContentOrchestrator {
constructor() {
this.engine = new WorkflowEngine();
}
async createArticlePipeline(topics) {
const workflow = this.engine.create({
name: 'article-pipeline',
timeout: 300000, // 5分钟总超时
});
// Step 1: 并行研究所有话题
const researchPromises = topics.map(topic =>
workflow.addStep({
id: `research-${topic.id}`,
skill: 'web-researcher',
params: { query: topic.title },
})
);
// Step 2: 等待所有研究完成
workflow.addStep({
id: 'wait-research',
type: 'join',
dependsOn: topics.map(t => `research-${t.id}`),
});
// Step 3: 选择最佳话题
workflow.addStep({
id: 'select-topic',
skill: 'ai-selector',
params: {
criteria: 'trending + relevance',
options: '{{wait-research.outputs}}',
},
dependsOn: ['wait-research'],
});
// Step 4: 写作
workflow.addStep({
id: 'write',
skill: 'ai-writer',
params: {
topic: '{{select-topic.output}}',
research: '{{wait-research.outputs}}',
},
dependsOn: ['select-topic'],
});
// Step 5: 并行审核和优化
workflow.addStep({
id: 'review',
skill: 'content-reviewer',
params: { content: '{{write.output}}' },
dependsOn: ['write'],
});
workflow.addStep({
id: 'optimize',
skill: 'seo-optimizer',
params: { content: '{{write.output}}' },
dependsOn: ['write'],
});
// Step 6: 合并结果
workflow.addStep({
id: 'finalize',
skill: 'content-merger',
params: {
draft: '{{write.output}}',
review: '{{review.output}}',
seo: '{{optimize.output}}',
},
dependsOn: ['review', 'optimize'],
});
return workflow;
}
async execute() {
const topics = await this.fetchTrendingTopics();
const workflow = await this.createArticlePipeline(topics);
const result = await this.engine.run(workflow, {
onStepStart: (step) => console.log(`开始: ${step.id}`),
onStepComplete: (step, output) => console.log(`完成: ${step.id}`),
onStepError: (step, error) => console.error(`失败: ${step.id}`, error),
});
return result;
}
}
最佳实践
✅ 工作流设计原则
- 单一职责 - 每个步骤只做一件事
- 幂等性 - 支持失败重试,多次执行结果一致
- 错误隔离 - 局部失败不影响整体流程
- 状态透明 - 关键节点记录日志和状态
- 超时控制 - 每个步骤都有合理的超时设置
⚠️ 常见陷阱
- 避免步骤间数据依赖过深,难以调试
- 不要在循环中创建过多并行任务,会耗尽资源
- 长流程记得添加检查点,方便中断后恢复
- 敏感操作添加人工审批节点,别全自动
监控与可视化
# 查看工作流执行状态
openclaw workflow status content-pipeline
# 输出
WORKFLOW: content-pipeline
STATUS: running
STARTED: 2026-04-14 06:00:01
DURATION: 2m 34s
STEPS:
✓ discover completed 45s
✓ research completed 1m 23s (5/5 parallel)
✓ write completed 34s
→ optimize running 12s
○ publish pending -
# 查看详细日志
openclaw workflow logs content-pipeline --step write