🎼 OpenClaw 工作流编排

发布于 2026-04-14 | 阅读时间 15 分钟

"单个 Agent 是独奏,多个 Agent 是协奏。而工作流编排,就是那位让所有人配合默契的指挥家。"

什么是工作流编排?

工作流编排(Workflow Orchestration)是把多个任务、多个 Agent 按照特定逻辑串联起来,形成自动化流水线的技术。

就像工厂的生产线:原材料进入,经过一道道工序,最终变成成品。在 OpenClaw 中,数据就是原材料,Agent 和 Skills 就是工序,工作流就是那条生产线。

核心概念

工作流三要素

  • Steps(步骤) - 工作流中的单个任务单元
  • Transitions(流转) - 步骤之间的执行顺序和条件
  • State(状态) - 整个工作流的执行状态和上下文数据

工作流类型

类型 特点 适用场景
顺序流 步骤按顺序执行,一步接一步 简单的流水线
并行流 多个步骤同时执行 批量处理、数据采集
条件流 根据条件选择不同分支 审批流程、智能路由
循环流 重复执行直到满足条件 轮询、批量重试

基础工作流示例

顺序工作流:内容创作流水线

// workflows/content-pipeline.yaml
name: daily-content-pipeline
version: "1.0"

triggers:
  - type: cron
    schedule: "0 6 * * *"  # 每天早上6点

context:
  # 全局变量
  date: "{{today}}"
  outputDir: "/content/{{date}}"

steps:
  # Step 1: 热点发现
  - id: discover
    name: 发现热点话题
    skill: web-search
    action: trending-topics
    params:
      category: "AI"
      count: 5
    output: trendingTopics

  # Step 2: 并行研究(针对每个话题)
  - id: research
    name: 深度研究
    type: parallel
    foreach: "{{steps.discover.output}}"
    steps:
      - skill: web-search
        action: deep-search
        params:
          query: "{{item.title}}"
          sources: ["news", "blogs", "papers"]
        output: "research_{{item.id}}"

  # Step 3: AI写作
  - id: write
    name: 生成文章
    skill: ai-writer
    action: write-article
    params:
      topic: "{{steps.discover.output[0].title}}"
      research: "{{steps.research.output}}"
      style: "professional"
      wordCount: 1500
    output: article

  # Step 4: SEO优化
  - id: optimize
    name: SEO优化
    skill: seo-optimizer
    action: optimize
    params:
      content: "{{steps.write.output}}"
      keywords: "{{steps.discover.output[0].keywords}}"
    output: optimizedArticle

  # Step 5: 保存和发布
  - id: publish
    name: 发布内容
    type: parallel
    steps:
      - skill: file-ops
        action: save
        params:
          path: "{{context.outputDir}}/article.html"
          content: "{{steps.optimize.output.html}}"
      
      - skill: discord-bot
        action: announce
        params:
          channel: "daily-content"
          message: "新文章已发布: {{steps.optimize.output.title}}"

onSuccess:
  - skill: notification
    action: send
    params:
      to: "admin@example.com"
      subject: "内容流水线完成"

onFailure:
  - skill: notification
    action: send
    params:
      to: "admin@example.com"
      subject: "内容流水线失败: {{error.message}}"

高级编排模式

条件分支工作流

// workflows/smart-router.yaml
steps:
  # 分析输入类型
  - id: analyze
    skill: ai-classifier
    action: classify
    params:
      input: "{{input.text}}"
      categories: ["question", "complaint", "praise", "spam"]
    output: classification

  # 根据类型路由到不同处理流程
  - id: route
    type: switch
    condition: "{{steps.analyze.output.category}}"
    cases:
      question:
        - skill: knowledge-base
          action: search
          params:
            query: "{{input.text}}"
        - skill: ai-writer
          action: compose-reply
          params:
            context: "{{previous.output}}"
      
      complaint:
        - skill: sentiment-analyzer
          action: analyze-urgency
        - skill: ticket-system
          action: create-ticket
          params:
            priority: "{{previous.output.urgency > 0.8 ? 'high' : 'normal'}}"
      
      praise:
        - skill: crm-system
          action: record-feedback
        - skill: ai-writer
          action: compose-thank-you
      
      spam:
        - skill: filter-system
          action: block-sender

循环重试模式

// workflows/reliable-processing.yaml
steps:
  - id: fetch-data
    skill: api-client
    action: get
    params:
      url: "{{input.endpoint}}"
    retry:
      maxAttempts: 5
      backoff: exponential
      initialDelay: 1000
      maxDelay: 30000
    timeout: 60000

  # 批量处理,失败项自动重试
  - id: batch-process
    type: foreach
    items: "{{steps.fetch-data.output.items}}"
    steps:
      - skill: processor
        action: process
        params:
          item: "{{item}}"
        retry:
          maxAttempts: 3
          onError: continue  # 单个失败继续处理其他
    output: processedItems

  # 汇总结果
  - id: summarize
    skill: data-aggregator
    action: summary
    params:
      results: "{{steps.batch-process.output}}"

代码实现工作流

程序化编排

const { WorkflowEngine } = require('openclaw/workflow');

class ContentOrchestrator {
  constructor() {
    this.engine = new WorkflowEngine();
  }

  async createArticlePipeline(topics) {
    const workflow = this.engine.create({
      name: 'article-pipeline',
      timeout: 300000,  // 5分钟总超时
    });

    // Step 1: 并行研究所有话题
    const researchPromises = topics.map(topic => 
      workflow.addStep({
        id: `research-${topic.id}`,
        skill: 'web-researcher',
        params: { query: topic.title },
      })
    );

    // Step 2: 等待所有研究完成
    workflow.addStep({
      id: 'wait-research',
      type: 'join',
      dependsOn: topics.map(t => `research-${t.id}`),
    });

    // Step 3: 选择最佳话题
    workflow.addStep({
      id: 'select-topic',
      skill: 'ai-selector',
      params: {
        criteria: 'trending + relevance',
        options: '{{wait-research.outputs}}',
      },
      dependsOn: ['wait-research'],
    });

    // Step 4: 写作
    workflow.addStep({
      id: 'write',
      skill: 'ai-writer',
      params: {
        topic: '{{select-topic.output}}',
        research: '{{wait-research.outputs}}',
      },
      dependsOn: ['select-topic'],
    });

    // Step 5: 并行审核和优化
    workflow.addStep({
      id: 'review',
      skill: 'content-reviewer',
      params: { content: '{{write.output}}' },
      dependsOn: ['write'],
    });

    workflow.addStep({
      id: 'optimize',
      skill: 'seo-optimizer',
      params: { content: '{{write.output}}' },
      dependsOn: ['write'],
    });

    // Step 6: 合并结果
    workflow.addStep({
      id: 'finalize',
      skill: 'content-merger',
      params: {
        draft: '{{write.output}}',
        review: '{{review.output}}',
        seo: '{{optimize.output}}',
      },
      dependsOn: ['review', 'optimize'],
    });

    return workflow;
  }

  async execute() {
    const topics = await this.fetchTrendingTopics();
    const workflow = await this.createArticlePipeline(topics);
    
    const result = await this.engine.run(workflow, {
      onStepStart: (step) => console.log(`开始: ${step.id}`),
      onStepComplete: (step, output) => console.log(`完成: ${step.id}`),
      onStepError: (step, error) => console.error(`失败: ${step.id}`, error),
    });

    return result;
  }
}

最佳实践

✅ 工作流设计原则

  • 单一职责 - 每个步骤只做一件事
  • 幂等性 - 支持失败重试,多次执行结果一致
  • 错误隔离 - 局部失败不影响整体流程
  • 状态透明 - 关键节点记录日志和状态
  • 超时控制 - 每个步骤都有合理的超时设置

⚠️ 常见陷阱

  • 避免步骤间数据依赖过深,难以调试
  • 不要在循环中创建过多并行任务,会耗尽资源
  • 长流程记得添加检查点,方便中断后恢复
  • 敏感操作添加人工审批节点,别全自动

监控与可视化

# 查看工作流执行状态
openclaw workflow status content-pipeline

# 输出
WORKFLOW: content-pipeline
STATUS: running
STARTED: 2026-04-14 06:00:01
DURATION: 2m 34s

STEPS:
✓ discover      completed    45s
✓ research      completed    1m 23s  (5/5 parallel)
✓ write         completed    34s
→ optimize      running      12s
○ publish       pending      -

# 查看详细日志
openclaw workflow logs content-pipeline --step write

相关链接