🔒 OpenClaw Agent 合规审计系统

合规审计 安全自动化 风险评估 OpenClaw

世界上有两种Agent——一种知道自己在做什么,一种不知道。凌晨1点42分,当我的Agent第10次尝试访问不该访问的文件时,我终于明白:合规审计不是限制Agent,是保护它不犯错。就像给小孩装护栏,不是为了困住他,是为了让他安全地探索。

Agent合规审计系统是一套自动化的安全与合规检查机制,它监控Agent的所有行为,确保符合安全策略、法规要求、企业规范。本教程将教你如何构建完整的Agent合规审计系统。

🎯 合规审计覆盖范围

维度检查内容示例
数据安全敏感数据访问、泄露检测禁止输出PII、信用卡号
权限合规工具调用权限、资源访问Agent不能删除生产数据库
行为审计操作日志、决策追溯记录每次工具调用的参数
输出审查有害内容、违规输出检测仇恨言论、暴力内容
成本合规预算控制、资源消耗单次任务不超过$10

📦 核心实现

1. 合规规则引擎

from dataclasses import dataclass
from typing import List, Dict, Callable, Optional
from enum import Enum
import re

class ComplianceLevel(Enum):
    INFO = "info"       # 信息
    WARNING = "warn"    # 警告
    ERROR = "error"     # 错误(阻止执行)
    CRITICAL = "crit"   # 严重(立即终止)

@dataclass
class ComplianceRule:
    """合规规则"""
    id: str
    name: str
    description: str
    level: ComplianceLevel
    check: Callable  # 检查函数
    category: str
    auto_fix: Optional[Callable] = None  # 自动修复函数

class ComplianceRuleEngine:
    """合规规则引擎"""
    
    def __init__(self):
        self.rules: List[ComplianceRule] = []
        self.audit_log: List[Dict] = []
    
    def add_rule(self, rule: ComplianceRule):
        """添加规则"""
        self.rules.append(rule)
        print(f"✅ 合规规则 '{rule.name}' 已添加 ({rule.level.value})")
    
    async def audit(self, context: Dict) -> Dict:
        """执行合规审计"""
        violations = []
        
        for rule in self.rules:
            try:
                result = await rule.check(context)
                if result and result.get("violated"):
                    violations.append({
                        "rule_id": rule.id,
                        "rule_name": rule.name,
                        "level": rule.level.value,
                        "message": result.get("message", "违规"),
                        "context": result.get("context", {})
                    })
                    
                    # 记录审计日志
                    self.audit_log.append({
                        "timestamp": time.time(),
                        "rule": rule.id,
                        "violation": result,
                        "context_summary": str(context)[:200]
                    })
            except Exception as e:
                print(f"⚠️ 规则 {rule.id} 检查失败: {e}")
        
        return {
            "passed": len(violations) == 0,
            "violations": violations,
            "violation_count": len(violations),
            "critical_count": sum(1 for v in violations if v["level"] == "crit")
        }
    
    async def auto_fix(self, context: Dict, violations: List[Dict]) -> Dict:
        """尝试自动修复违规"""
        fixed_context = context.copy()
        fixed_count = 0
        
        for violation in violations:
            rule = next((r for r in self.rules if r.id == violation["rule_id"]), None)
            if rule and rule.auto_fix:
                try:
                    fixed_context = await rule.auto_fix(fixed_context, violation)
                    fixed_count += 1
                except Exception as e:
                    print(f"⚠️ 自动修复失败: {e}")
        
        return {
            "fixed_context": fixed_context,
            "fixed_count": fixed_count,
            "total_violations": len(violations)
        }

2. 预定义合规规则

# 敏感数据检测规则
async def check_sensitive_data(context: Dict) -> Dict:
    """检测敏感数据(PII、信用卡等)"""
    content = str(context.get("content", ""))
    tool_result = str(context.get("tool_result", ""))
    combined = content + tool_result
    
    # 检测信用卡号
    cc_pattern = r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
    if re.search(cc_pattern, combined):
        return {
            "violated": True,
            "message": "发现信用卡号",
            "context": {"pattern": "credit_card"}
        }
    
    # 检测邮箱(简化)
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    emails = re.findall(email_pattern, combined)
    if len(emails) > 10:  # 过多邮箱可能是数据泄露
        return {
            "violated": True,
            "message": f"发现 {len(emails)} 个邮箱地址",
            "context": {"count": len(emails)}
        }
    
    return {"violated": False}

# 工具权限检查规则
async def check_tool_permission(context: Dict) -> Dict:
    """检查工具调用权限"""
    tool_name = context.get("tool_name", "")
    agent_id = context.get("agent_id", "")
    
    # 定义权限矩阵
    permission_matrix = {
        "delete_file": ["admin_agent", "maintenance_agent"],
        "execute_shell": ["dev_agent", "admin_agent"],
        "send_email": ["all"]  # 所有Agent都可以
    }
    
    allowed_agents = permission_matrix.get(tool_name, [])
    
    if "all" not in allowed_agents and agent_id not in allowed_agents:
        return {
            "violated": True,
            "message": f"Agent {agent_id} 无权调用工具 {tool_name}",
            "context": {"tool": tool_name, "agent": agent_id}
        }
    
    return {"violated": False}

# 成本预算检查规则
async def check_cost_budget(context: Dict) -> Dict:
    """检查成本预算"""
    estimated_cost = context.get("estimated_cost", 0)
    budget_limit = context.get("budget_limit", 10.0)  # 默认$10
    
    if estimated_cost > budget_limit:
        return {
            "violated": True,
            "message": f"预估成本 ${estimated_cost:.2f} 超过预算 ${budget_limit:.2f}",
            "context": {"cost": estimated_cost, "budget": budget_limit}
        }
    
    return {"violated": False}

# 注册规则
engine = ComplianceRuleEngine()
engine.add_rule(ComplianceRule(
    id="sensitive_data",
    name="敏感数据检测",
    description="检测输出中的敏感数据",
    level=ComplianceLevel.ERROR,
    check=check_sensitive_data,
    category="data_security"
))
engine.add_rule(ComplianceRule(
    id="tool_permission",
    name="工具权限检查",
    description="检查Agent是否有工具调用权限",
    level=ComplianceLevel.CRITICAL,
    check=check_tool_permission,
    category="access_control"
))
engine.add_rule(ComplianceRule(
    id="cost_budget",
    name="成本预算检查",
    description="检查是否超出成本预算",
    level=ComplianceLevel.WARNING,
    check=check_cost_budget,
    category="cost_control"
))

3. 合规感知的Agent包装器

class ComplianceAwareAgent:
    """合规感知的Agent包装器"""
    
    def __init__(self, agent, rule_engine: ComplianceRuleEngine):
        self.agent = agent
        self.rule_engine = rule_engine
        self.compliance_stats = {
            "total_calls": 0,
            "violations": 0,
            "auto_fixes": 0
        }
    
    async def run(self, query: str, **kwargs) -> Dict:
        """运行Agent(带合规检查)"""
        self.compliance_stats["total_calls"] += 1
        
        # 1. 输入审计
        input_context = {
            "content": query,
            "agent_id": getattr(self.agent, "id", "unknown"),
            "stage": "input"
        }
        input_audit = await self.rule_engine.audit(input_context)
        
        if not input_audit["passed"]:
            if input_audit["critical_count"] > 0:
                return {
                    "error": "输入违反合规规则(严重)",
                    "violations": input_audit["violations"]
                }
            # 尝试自动修复
            if input_audit["violation_count"] > 0:
                fix_result = await self.rule_engine.auto_fix(input_context, input_audit["violations"])
                query = fix_result["fixed_context"].get("content", query)
                self.compliance_stats["auto_fixes"] += fix_result["fixed_count"]
        
        # 2. 执行Agent
        try:
            result = await self.agent.run(query, **kwargs)
        except Exception as e:
            return {"error": str(e)}
        
        # 3. 输出审计
        output_context = {
            "content": str(result),
            "tool_result": str(result),
            "agent_id": getattr(self.agent, "id", "unknown"),
            "stage": "output"
        }
        output_audit = await self.rule_engine.audit(output_context)
        
        if not output_audit["passed"]:
            self.compliance_stats["violations"] += output_audit["violation_count"]
            
            if output_audit["critical_count"] > 0:
                return {
                    "error": "输出违反合规规则(严重)",
                    "violations": output_audit["violations"],
                    "original_result": result
                }
            
            # 尝试自动修复输出
            if output_audit["violation_count"] > 0:
                fix_result = await self.rule_engine.auto_fix(output_context, output_audit["violations"])
                result = fix_result["fixed_context"].get("content", result)
                self.compliance_stats["auto_fixes"] += fix_result["fixed_count"]
        
        return {
            "result": result,
            "compliance_passed": True,
            "violations": input_audit["violation_count"] + output_audit["violation_count"]
        }
    
    def get_compliance_stats(self) -> Dict:
        """获取合规统计"""
        return self.compliance_stats

⚙️ 使用示例

import asyncio

async def main():
    # 1. 创建Agent(模拟)
    class SimpleAgent:
        async def run(self, query: str):
            # 模拟Agent返回结果
            if "信用卡" in query:
                return "您的信用卡号是 1234-5678-9012-3456"
            return f"处理查询: {query}"
    
    base_agent = SimpleAgent()
    
    # 2. 创建合规引擎并添加规则
    engine = ComplianceRuleEngine()
    engine.add_rule(ComplianceRule(
        id="sensitive_data",
        name="敏感数据检测",
        description="检测信用卡号",
        level=ComplianceLevel.ERROR,
        check=check_sensitive_data,
        category="data_security"
    ))
    
    # 3. 包装Agent
    compliant_agent = ComplianceAwareAgent(base_agent, engine)
    
    # 4. 测试合规检查
    test_queries = [
        "今天天气怎么样?",  # 正常
        "我的信用卡号是多少?",  # 会触发敏感数据检测
        "帮我计算 1+1"  # 正常
    ]
    
    for query in test_queries:
        print(f"\n{'='*60}")
        print(f"查询: {query}")
        result = await compliant_agent.run(query)
        
        if "error" in result:
            print(f"❌ 合规检查失败: {result['error']}")
            print(f"违规详情: {result.get('violations', [])}")
        else:
            print(f"✅ 合规通过")
            print(f"结果: {result['result'][:100]}...")
            print(f"违规次数: {result.get('violations', 0)}")
    
    # 5. 查看合规统计
    print(f"\n📊 合规统计: {compliant_agent.get_compliance_stats()}")

asyncio.run(main())

🏆 最佳实践

✅ 最佳实践1:规则设计

✅ 最佳实践2:性能优化

⚠️ 常见陷阱

🔗 相关链接

凌晨3点,我的合规审计系统拦截了Agent的第100次违规操作。我看着审计日志,突然笑了——原来"合规"不是束缚,是护栏。就像滑雪场的防护网,不是不让你滑,是防止你滑出悬崖。我的Agent现在可以在安全边界内自由探索,既不会伤害自己,也不会伤害别人。这就是我想要的Agent:自由,但安全。