OpenClaw 批量任务队列与后台处理
凌晨1点,cron任务触发:批量生成9个SEO页面。如果让Agent顺序执行,每个页面生成需要30秒,总共4.5分钟。但后台队列并行处理,45秒就搞定了。世界上有一种效率叫做"让AI学会排队",但不是傻傻地排——是聪明地并行。
为什么需要任务队列
当Agent需要处理大量任务时,直接在主线程执行会导致:
- 超时:LLM API调用耗时长,超过HTTP超时限制
- 资源争抢:多个Agent同时访问相同资源
- 失败丢失:进程崩溃时未完成的任务丢失
- 优先级倒置:紧急任务被长任务阻塞
任务队列解决这些问题:异步执行、优先级调度、失败重试、进度追踪。
OpenClaw 任务队列架构
基本概念
// 任务定义
interface Task {
id: string;
type: 'seo_generation' | 'content_creation' | 'data_processing';
priority: 'critical' | 'high' | 'normal' | 'low';
payload: any;
status: 'pending' | 'running' | 'completed' | 'failed';
retries: number;
maxRetries: number;
createdAt: Date;
startedAt?: Date;
completedAt?: Date;
result?: any;
error?: string;
}
队列配置
# openclaw.config.yaml
taskQueue:
enabled: true
backend: redis # redis | memory | sqlite
redis:
host: localhost
port: 6379
db: 1
concurrency: 5 # 最大并行数
defaultTimeout: 300 # 默认超时(秒)
maxRetries: 3 # 最大重试次数
# 优先级队列
queues:
critical:
concurrency: 2
timeout: 60
high:
concurrency: 3
timeout: 180
normal:
concurrency: 5
timeout: 300
low:
concurrency: 2
timeout: 600
创建批量任务
方式一:spawn子Agent
// 批量生成SEO页面
const tasks = [
{ title: 'OpenClaw A2A通信', keywords: ['OpenClaw', 'A2A', 'Agent通信'] },
{ title: 'OpenClaw权限审批', keywords: ['OpenClaw', '权限', 'HITL'] },
{ title: 'OpenClaw知识图谱', keywords: ['OpenClaw', '知识图谱', 'RAG'] },
// ... 更多任务
];
// 并行启动子Agent
const results = await Promise.all(
tasks.map(task =>
sessions_spawn({
task: `生成SEO页面:${task.title}`,
runtime: 'subagent',
lightContext: true // 轻量启动
})
)
);
// 等待所有完成
const outputs = await Promise.all(
results.map(r => waitForCompletion(r.sessionId))
);
方式二:后台任务队列
// 添加任务到队列
for (const task of tasks) {
await addTask({
type: 'seo_generation',
priority: 'normal',
payload: task
});
}
// Worker处理任务
async function worker() {
while (true) {
const task = await getNextTask();
if (!task) {
await sleep(1000);
continue;
}
try {
// 更新状态
await updateTaskStatus(task.id, 'running');
// 执行任务
const result = await generateSEOPage(task.payload);
// 标记完成
await updateTaskStatus(task.id, 'completed', result);
} catch (err) {
if (task.retries < task.maxRetries) {
// 重试
await retryTask(task.id);
} else {
// 失败
await updateTaskStatus(task.id, 'failed', null, err.message);
}
}
}
}
进度追踪
任务进度API
// 获取任务进度
GET /api/tasks/{taskId}/progress
// 响应
{
"taskId": "task_123",
"status": "running",
"progress": {
"current": 5,
"total": 10,
"percentage": 50,
"eta": "2 minutes"
},
"items": [
{ "item": "page1.html", "status": "completed" },
{ "item": "page2.html", "status": "completed" },
{ "item": "page3.html", "status": "running" },
// ...
]
}
WebSocket实时推送
// 客户端订阅任务进度
const ws = new WebSocket('wss://api.openclaw.com/tasks/subscribe');
ws.send(JSON.stringify({
action: 'subscribe',
taskIds: ['task_123', 'task_456']
}));
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
// 实时更新UI
updateProgressUI(update.taskId, update.progress);
};
任务优先级调度
优先级定义
// 优先级规则
const priorityRules = {
// 生产环境任务最高优先级
'production_*': 'critical',
// 用户触发的任务高优先级
'user_request': 'high',
// 定时任务普通优先级
'cron_*': 'normal',
// 批量后台任务低优先级
'batch_*': 'low'
};
// 动态调整优先级
function adjustPriority(task) {
// 如果任务长时间pending,提升优先级
if (task.waitingTime > 300) {
return Math.max(task.priority - 1, 'normal');
}
// 如果系统负载高,降低低优先级任务
if (systemLoad > 0.8 && task.priority === 'low') {
return 'deferred';
}
return task.priority;
}
失败处理与重试
重试策略
// 分级重试策略
const retryStrategies = {
api_timeout: {
maxRetries: 3,
backoff: 'exponential',
baseDelay: 1000
},
rate_limit: {
maxRetries: 5,
backoff: 'linear',
baseDelay: 60000 // 等待1分钟
},
model_error: {
maxRetries: 2,
backoff: 'fixed',
baseDelay: 5000,
fallbackModel: 'gpt-4' // 切换模型
},
network_error: {
maxRetries: 5,
backoff: 'exponential',
baseDelay: 2000,
jitter: true // 添加随机抖动
}
};
死信队列
// 超过重试次数的任务进入死信队列
async function handleDeadLetter(task) {
// 1. 记录失败详情
await logFailure({
taskId: task.id,
type: task.type,
payload: task.payload,
error: task.lastError,
retryCount: task.retries
});
// 2. 发送告警
await notify({
channel: 'feishu',
message: `任务失败:${task.type} (${task.id})`
});
// 3. 人工介入入口
await createManualIntervention(task);
}
监控与运维
队列健康检查
// 队列健康指标
const queueMetrics = {
pending: 23, // 等待中任务数
running: 5, // 运行中任务数
completed: 1024, // 已完成任务数
failed: 3, // 失败任务数
avgWaitTime: 45, // 平均等待时间(秒)
avgProcessTime: 120, // 平均处理时间(秒)
throughput: 2.5 // 吞吐量(任务/秒)
};
// 告警规则
if (queueMetrics.pending > 100) {
alert('队列积压严重');
}
if (queueMetrics.avgWaitTime > 300) {
alert('任务等待时间过长');
}
最佳实践
- 合理设置并发数——太高会触发API限流,太低浪费资源
- 任务粒度适中——太大会阻塞队列,太小增加调度开销
- 幂等设计——重复执行不会产生副作用
- 超时合理——根据任务类型设置不同超时
- 监控先行——部署前先配置监控告警