OpenClaw Agent 故障恢复机制

构建高可用 AI Agent 的容错与自动恢复系统

功能介绍

Agent 故障恢复机制是保障 AI Agent 系统稳定运行的关键。OpenClaw 提供了完整的故障检测、自动恢复、降级策略和熔断机制,帮助开发者构建能够自我修复、持续可用的高可靠 Agent 系统。

生产环境的现实
AI Agent 在生产环境中会面临各种故障:API 超时、模型服务不可用、工具执行失败、资源耗尽等。没有完善的恢复机制,这些故障会导致整个系统瘫痪。

故障类型与恢复策略

1. 瞬态故障

短暂的网络波动、API 限流等,通常可以通过重试解决:

2. 持久故障

需要切换策略或降级处理的故障:

3. 级联故障

需要熔断机制防止故障扩散:

使用方法

实现故障检测器

// OpenClaw 故障检测器
const failureDetector = {
  // 故障类型枚举
  FailureTypes: {
    TIMEOUT: 'timeout',
    RATE_LIMIT: 'rate_limit',
    SERVICE_UNAVAILABLE: 'service_unavailable',
    RESOURCE_EXHAUSTED: 'resource_exhausted',
    INVALID_RESPONSE: 'invalid_response',
    TOOL_FAILURE: 'tool_failure'
  },

  // 检测故障类型
  detect: (error) => {
    if (error.code === 'ETIMEDOUT' || error.code === 'ECONNRESET') {
      return { type: failureDetector.FailureTypes.TIMEOUT, retryable: true };
    }
    if (error.status === 429) {
      return { type: failureDetector.FailureTypes.RATE_LIMIT, retryable: true };
    }
    if (error.status >= 500) {
      return { type: failureDetector.FailureTypes.SERVICE_UNAVAILABLE, retryable: true };
    }
    if (error.code === 'RESOURCE_EXHAUSTED') {
      return { type: failureDetector.FailureTypes.RESOURCE_EXHAUSTED, retryable: false };
    }
    return { type: 'unknown', retryable: false };
  },

  // 记录故障
  record: (failure, context) => {
    const record = {
      timestamp: Date.now(),
      type: failure.type,
      context: context,
      agentId: context.agentId
    };
    // 存储到故障日志
    failureLog.push(record);
    return record;
  }
};

实现自动恢复策略

// OpenClaw 自动恢复策略
class RecoveryStrategy {
  constructor(config = {}) {
    this.maxRetries = config.maxRetries || 3;
    this.baseDelay = config.baseDelay || 1000;
    this.maxDelay = config.maxDelay || 30000;
    this.backoffFactor = config.backoffFactor || 2;
  }

  // 指数退避重试
  async retryWithBackoff(operation, context = {}) {
    let lastError;
    
    for (let attempt = 0; attempt < this.maxRetries; attempt++) {
      try {
        return await operation();
      } catch (error) {
        lastError = error;
        const failure = failureDetector.detect(error);
        
        if (!failure.retryable) {
          throw error;
        }
        
        if (attempt < this.maxRetries - 1) {
          const delay = Math.min(
            this.baseDelay * Math.pow(this.backoffFactor, attempt),
            this.maxDelay
          );
          await this.sleep(delay);
        }
      }
    }
    
    throw lastError;
  }

  // 降级执行
  async executeWithFallback(primary, fallback, context) {
    try {
      return await primary();
    } catch (error) {
      console.warn('Primary operation failed, using fallback:', error.message);
      return await fallback(error);
    }
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

实现熔断器

// OpenClaw 熔断器
class CircuitBreaker {
  constructor(config = {}) {
    this.failureThreshold = config.failureThreshold || 5;
    this.successThreshold = config.successThreshold || 3;
    this.timeout = config.timeout || 60000; // 熔断超时时间
    
    this.failures = 0;
    this.successes = 0;
    this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
    this.lastFailureTime = null;
  }

  async execute(operation) {
    if (this.state === 'OPEN') {
      if (Date.now() - this.lastFailureTime > this.timeout) {
        this.state = 'HALF_OPEN';
      } else {
        throw new Error('Circuit breaker is OPEN');
      }
    }

    try {
      const result = await operation();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  onSuccess() {
    this.failures = 0;
    if (this.state === 'HALF_OPEN') {
      this.successes++;
      if (this.successes >= this.successThreshold) {
        this.state = 'CLOSED';
        this.successes = 0;
      }
    }
  }

  onFailure() {
    this.failures++;
    this.lastFailureTime = Date.now();
    
    if (this.state === 'HALF_OPEN') {
      this.state = 'OPEN';
      this.successes = 0;
    } else if (this.failures >= this.failureThreshold) {
      this.state = 'OPEN';
    }
  }

  getState() {
    return {
      state: this.state,
      failures: this.failures,
      successes: this.successes
    };
  }
}

最佳实践

故障恢复决策树

故障发生
    ├─ 检测故障类型
    │   ├─ 瞬态故障 → 重试(指数退避)
    │   ├─ 持久故障 → 降级/切换
    │   └─ 级联故障 → 熔断保护
    │
    ├─ 记录故障信息
    │   ├─ 时间戳
    │   ├─ 故障类型
    │   └─ 上下文信息
    │
    └─ 触发恢复策略
        ├─ 成功 → 重置计数器
        └─ 失败 → 升级策略
                
  1. 快速失败原则:对于不可恢复的故障,快速失败比长时间等待更好
  2. 优雅降级:核心功能优先,非核心功能可以降级或跳过
  3. 故障隔离:一个组件的故障不应影响其他组件
  4. 可观测性:记录所有故障和恢复事件,便于分析
  5. 预设恢复策略:为每种故障类型预先定义恢复策略
避免恢复风暴
多个 Agent 同时重试可能导致恢复风暴。实施随机抖动(jitter)和全局协调机制来避免。

完整示例:故障恢复管理器

// OpenClaw 故障恢复管理器
class FailureRecoveryManager {
  constructor(agent, config = {}) {
    this.agent = agent;
    this.recoveryStrategy = new RecoveryStrategy(config.recovery);
    this.circuitBreakers = new Map();
    this.failureLog = [];
    this.alertHandler = config.alertHandler;
  }

  // 获取或创建熔断器
  getCircuitBreaker(serviceName, config = {}) {
    if (!this.circuitBreakers.has(serviceName)) {
      this.circuitBreakers.set(serviceName, new CircuitBreaker(config));
    }
    return this.circuitBreakers.get(serviceName);
  }

  // 执行带恢复的操作
  async executeWithRecovery(operation, options = {}) {
    const {
      serviceName = 'default',
      fallback,
      onRecovery,
      maxRetries = 3
    } = options;

    const circuitBreaker = this.getCircuitBreaker(serviceName);

    try {
      // 通过熔断器执行
      const result = await circuitBreaker.execute(async () => {
        // 带重试的执行
        return await this.recoveryStrategy.retryWithBackoff(
          operation,
          { maxRetries }
        );
      });
      
      return result;
    } catch (error) {
      // 记录故障
      const failure = failureDetector.detect(error);
      this.recordFailure(failure, error);

      // 尝试降级
      if (fallback) {
        console.warn(`Using fallback for ${serviceName}`);
        return await fallback(error);
      }

      // 触发告警
      if (this.alertHandler) {
        this.alertHandler({
          type: 'FAILURE',
          service: serviceName,
          error: error.message,
          timestamp: Date.now()
        });
      }

      throw error;
    }
  }

  // 记录故障
  recordFailure(failure, error) {
    const record = {
      timestamp: Date.now(),
      type: failure.type,
      message: error.message,
      stack: error.stack
    };
    this.failureLog.push(record);
    
    // 保持日志大小
    if (this.failureLog.length > 1000) {
      this.failureLog = this.failureLog.slice(-500);
    }
  }

  // 获取健康状态
  getHealthStatus() {
    const circuitBreakerStatus = {};
    for (const [name, cb] of this.circuitBreakers) {
      circuitBreakerStatus[name] = cb.getState();
    }
    
    return {
      circuitBreakers: circuitBreakerStatus,
      recentFailures: this.failureLog.slice(-10),
      failureRate: this.calculateFailureRate()
    };
  }

  calculateFailureRate(windowMs = 60000) {
    const now = Date.now();
    const recentFailures = this.failureLog.filter(
      f => now - f.timestamp < windowMs
    );
    return recentFailures.length;
  }
}

相关链接