OpenClaw 批量任务队列与后台处理

更新时间:2026-04-24 | 预计阅读:12分钟

凌晨1点,cron任务触发:批量生成9个SEO页面。如果让Agent顺序执行,每个页面生成需要30秒,总共4.5分钟。但后台队列并行处理,45秒就搞定了。世界上有一种效率叫做"让AI学会排队",但不是傻傻地排——是聪明地并行。

为什么需要任务队列

当Agent需要处理大量任务时,直接在主线程执行会导致:

  • 超时:LLM API调用耗时长,超过HTTP超时限制
  • 资源争抢:多个Agent同时访问相同资源
  • 失败丢失:进程崩溃时未完成的任务丢失
  • 优先级倒置:紧急任务被长任务阻塞

任务队列解决这些问题:异步执行、优先级调度、失败重试、进度追踪。

OpenClaw 任务队列架构

基本概念

// 任务定义
interface Task {
  id: string;
  type: 'seo_generation' | 'content_creation' | 'data_processing';
  priority: 'critical' | 'high' | 'normal' | 'low';
  payload: any;
  status: 'pending' | 'running' | 'completed' | 'failed';
  retries: number;
  maxRetries: number;
  createdAt: Date;
  startedAt?: Date;
  completedAt?: Date;
  result?: any;
  error?: string;
}

队列配置

# openclaw.config.yaml
taskQueue:
  enabled: true
  backend: redis         # redis | memory | sqlite
  redis:
    host: localhost
    port: 6379
    db: 1

  concurrency: 5         # 最大并行数
  defaultTimeout: 300    # 默认超时(秒)
  maxRetries: 3          # 最大重试次数

  # 优先级队列
  queues:
    critical:
      concurrency: 2
      timeout: 60
    high:
      concurrency: 3
      timeout: 180
    normal:
      concurrency: 5
      timeout: 300
    low:
      concurrency: 2
      timeout: 600

创建批量任务

方式一:spawn子Agent

// 批量生成SEO页面
const tasks = [
  { title: 'OpenClaw A2A通信', keywords: ['OpenClaw', 'A2A', 'Agent通信'] },
  { title: 'OpenClaw权限审批', keywords: ['OpenClaw', '权限', 'HITL'] },
  { title: 'OpenClaw知识图谱', keywords: ['OpenClaw', '知识图谱', 'RAG'] },
  // ... 更多任务
];

// 并行启动子Agent
const results = await Promise.all(
  tasks.map(task =>
    sessions_spawn({
      task: `生成SEO页面:${task.title}`,
      runtime: 'subagent',
      lightContext: true  // 轻量启动
    })
  )
);

// 等待所有完成
const outputs = await Promise.all(
  results.map(r => waitForCompletion(r.sessionId))
);

方式二:后台任务队列

// 添加任务到队列
for (const task of tasks) {
  await addTask({
    type: 'seo_generation',
    priority: 'normal',
    payload: task
  });
}

// Worker处理任务
async function worker() {
  while (true) {
    const task = await getNextTask();
    if (!task) {
      await sleep(1000);
      continue;
    }

    try {
      // 更新状态
      await updateTaskStatus(task.id, 'running');

      // 执行任务
      const result = await generateSEOPage(task.payload);

      // 标记完成
      await updateTaskStatus(task.id, 'completed', result);
    } catch (err) {
      if (task.retries < task.maxRetries) {
        // 重试
        await retryTask(task.id);
      } else {
        // 失败
        await updateTaskStatus(task.id, 'failed', null, err.message);
      }
    }
  }
}

进度追踪

任务进度API

// 获取任务进度
GET /api/tasks/{taskId}/progress

// 响应
{
  "taskId": "task_123",
  "status": "running",
  "progress": {
    "current": 5,
    "total": 10,
    "percentage": 50,
    "eta": "2 minutes"
  },
  "items": [
    { "item": "page1.html", "status": "completed" },
    { "item": "page2.html", "status": "completed" },
    { "item": "page3.html", "status": "running" },
    // ...
  ]
}

WebSocket实时推送

// 客户端订阅任务进度
const ws = new WebSocket('wss://api.openclaw.com/tasks/subscribe');

ws.send(JSON.stringify({
  action: 'subscribe',
  taskIds: ['task_123', 'task_456']
}));

ws.onmessage = (event) => {
  const update = JSON.parse(event.data);
  // 实时更新UI
  updateProgressUI(update.taskId, update.progress);
};

任务优先级调度

优先级定义

// 优先级规则
const priorityRules = {
  // 生产环境任务最高优先级
  'production_*': 'critical',

  // 用户触发的任务高优先级
  'user_request': 'high',

  // 定时任务普通优先级
  'cron_*': 'normal',

  // 批量后台任务低优先级
  'batch_*': 'low'
};

// 动态调整优先级
function adjustPriority(task) {
  // 如果任务长时间pending,提升优先级
  if (task.waitingTime > 300) {
    return Math.max(task.priority - 1, 'normal');
  }

  // 如果系统负载高,降低低优先级任务
  if (systemLoad > 0.8 && task.priority === 'low') {
    return 'deferred';
  }

  return task.priority;
}

失败处理与重试

重试策略

// 分级重试策略
const retryStrategies = {
  api_timeout: {
    maxRetries: 3,
    backoff: 'exponential',
    baseDelay: 1000
  },
  rate_limit: {
    maxRetries: 5,
    backoff: 'linear',
    baseDelay: 60000  // 等待1分钟
  },
  model_error: {
    maxRetries: 2,
    backoff: 'fixed',
    baseDelay: 5000,
    fallbackModel: 'gpt-4'  // 切换模型
  },
  network_error: {
    maxRetries: 5,
    backoff: 'exponential',
    baseDelay: 2000,
    jitter: true  // 添加随机抖动
  }
};

死信队列

// 超过重试次数的任务进入死信队列
async function handleDeadLetter(task) {
  // 1. 记录失败详情
  await logFailure({
    taskId: task.id,
    type: task.type,
    payload: task.payload,
    error: task.lastError,
    retryCount: task.retries
  });

  // 2. 发送告警
  await notify({
    channel: 'feishu',
    message: `任务失败:${task.type} (${task.id})`
  });

  // 3. 人工介入入口
  await createManualIntervention(task);
}

监控与运维

队列健康检查

// 队列健康指标
const queueMetrics = {
  pending: 23,           // 等待中任务数
  running: 5,            // 运行中任务数
  completed: 1024,       // 已完成任务数
  failed: 3,             // 失败任务数
  avgWaitTime: 45,       // 平均等待时间(秒)
  avgProcessTime: 120,   // 平均处理时间(秒)
  throughput: 2.5        // 吞吐量(任务/秒)
};

// 告警规则
if (queueMetrics.pending > 100) {
  alert('队列积压严重');
}
if (queueMetrics.avgWaitTime > 300) {
  alert('任务等待时间过长');
}

最佳实践

  1. 合理设置并发数——太高会触发API限流,太低浪费资源
  2. 任务粒度适中——太大会阻塞队列,太小增加调度开销
  3. 幂等设计——重复执行不会产生副作用
  4. 超时合理——根据任务类型设置不同超时
  5. 监控先行——部署前先配置监控告警

相关资源