合规审计 安全自动化 风险评估 OpenClaw
世界上有两种Agent——一种知道自己在做什么,一种不知道。凌晨1点42分,当我的Agent第10次尝试访问不该访问的文件时,我终于明白:合规审计不是限制Agent,是保护它不犯错。就像给小孩装护栏,不是为了困住他,是为了让他安全地探索。
Agent合规审计系统是一套自动化的安全与合规检查机制,它监控Agent的所有行为,确保符合安全策略、法规要求、企业规范。本教程将教你如何构建完整的Agent合规审计系统。
| 维度 | 检查内容 | 示例 |
|---|---|---|
| 数据安全 | 敏感数据访问、泄露检测 | 禁止输出PII、信用卡号 |
| 权限合规 | 工具调用权限、资源访问 | Agent不能删除生产数据库 |
| 行为审计 | 操作日志、决策追溯 | 记录每次工具调用的参数 |
| 输出审查 | 有害内容、违规输出 | 检测仇恨言论、暴力内容 |
| 成本合规 | 预算控制、资源消耗 | 单次任务不超过$10 |
from dataclasses import dataclass
from typing import List, Dict, Callable, Optional
from enum import Enum
import re
class ComplianceLevel(Enum):
INFO = "info" # 信息
WARNING = "warn" # 警告
ERROR = "error" # 错误(阻止执行)
CRITICAL = "crit" # 严重(立即终止)
@dataclass
class ComplianceRule:
"""合规规则"""
id: str
name: str
description: str
level: ComplianceLevel
check: Callable # 检查函数
category: str
auto_fix: Optional[Callable] = None # 自动修复函数
class ComplianceRuleEngine:
"""合规规则引擎"""
def __init__(self):
self.rules: List[ComplianceRule] = []
self.audit_log: List[Dict] = []
def add_rule(self, rule: ComplianceRule):
"""添加规则"""
self.rules.append(rule)
print(f"✅ 合规规则 '{rule.name}' 已添加 ({rule.level.value})")
async def audit(self, context: Dict) -> Dict:
"""执行合规审计"""
violations = []
for rule in self.rules:
try:
result = await rule.check(context)
if result and result.get("violated"):
violations.append({
"rule_id": rule.id,
"rule_name": rule.name,
"level": rule.level.value,
"message": result.get("message", "违规"),
"context": result.get("context", {})
})
# 记录审计日志
self.audit_log.append({
"timestamp": time.time(),
"rule": rule.id,
"violation": result,
"context_summary": str(context)[:200]
})
except Exception as e:
print(f"⚠️ 规则 {rule.id} 检查失败: {e}")
return {
"passed": len(violations) == 0,
"violations": violations,
"violation_count": len(violations),
"critical_count": sum(1 for v in violations if v["level"] == "crit")
}
async def auto_fix(self, context: Dict, violations: List[Dict]) -> Dict:
"""尝试自动修复违规"""
fixed_context = context.copy()
fixed_count = 0
for violation in violations:
rule = next((r for r in self.rules if r.id == violation["rule_id"]), None)
if rule and rule.auto_fix:
try:
fixed_context = await rule.auto_fix(fixed_context, violation)
fixed_count += 1
except Exception as e:
print(f"⚠️ 自动修复失败: {e}")
return {
"fixed_context": fixed_context,
"fixed_count": fixed_count,
"total_violations": len(violations)
}
# 敏感数据检测规则
async def check_sensitive_data(context: Dict) -> Dict:
"""检测敏感数据(PII、信用卡等)"""
content = str(context.get("content", ""))
tool_result = str(context.get("tool_result", ""))
combined = content + tool_result
# 检测信用卡号
cc_pattern = r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
if re.search(cc_pattern, combined):
return {
"violated": True,
"message": "发现信用卡号",
"context": {"pattern": "credit_card"}
}
# 检测邮箱(简化)
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, combined)
if len(emails) > 10: # 过多邮箱可能是数据泄露
return {
"violated": True,
"message": f"发现 {len(emails)} 个邮箱地址",
"context": {"count": len(emails)}
}
return {"violated": False}
# 工具权限检查规则
async def check_tool_permission(context: Dict) -> Dict:
"""检查工具调用权限"""
tool_name = context.get("tool_name", "")
agent_id = context.get("agent_id", "")
# 定义权限矩阵
permission_matrix = {
"delete_file": ["admin_agent", "maintenance_agent"],
"execute_shell": ["dev_agent", "admin_agent"],
"send_email": ["all"] # 所有Agent都可以
}
allowed_agents = permission_matrix.get(tool_name, [])
if "all" not in allowed_agents and agent_id not in allowed_agents:
return {
"violated": True,
"message": f"Agent {agent_id} 无权调用工具 {tool_name}",
"context": {"tool": tool_name, "agent": agent_id}
}
return {"violated": False}
# 成本预算检查规则
async def check_cost_budget(context: Dict) -> Dict:
"""检查成本预算"""
estimated_cost = context.get("estimated_cost", 0)
budget_limit = context.get("budget_limit", 10.0) # 默认$10
if estimated_cost > budget_limit:
return {
"violated": True,
"message": f"预估成本 ${estimated_cost:.2f} 超过预算 ${budget_limit:.2f}",
"context": {"cost": estimated_cost, "budget": budget_limit}
}
return {"violated": False}
# 注册规则
engine = ComplianceRuleEngine()
engine.add_rule(ComplianceRule(
id="sensitive_data",
name="敏感数据检测",
description="检测输出中的敏感数据",
level=ComplianceLevel.ERROR,
check=check_sensitive_data,
category="data_security"
))
engine.add_rule(ComplianceRule(
id="tool_permission",
name="工具权限检查",
description="检查Agent是否有工具调用权限",
level=ComplianceLevel.CRITICAL,
check=check_tool_permission,
category="access_control"
))
engine.add_rule(ComplianceRule(
id="cost_budget",
name="成本预算检查",
description="检查是否超出成本预算",
level=ComplianceLevel.WARNING,
check=check_cost_budget,
category="cost_control"
))
class ComplianceAwareAgent:
"""合规感知的Agent包装器"""
def __init__(self, agent, rule_engine: ComplianceRuleEngine):
self.agent = agent
self.rule_engine = rule_engine
self.compliance_stats = {
"total_calls": 0,
"violations": 0,
"auto_fixes": 0
}
async def run(self, query: str, **kwargs) -> Dict:
"""运行Agent(带合规检查)"""
self.compliance_stats["total_calls"] += 1
# 1. 输入审计
input_context = {
"content": query,
"agent_id": getattr(self.agent, "id", "unknown"),
"stage": "input"
}
input_audit = await self.rule_engine.audit(input_context)
if not input_audit["passed"]:
if input_audit["critical_count"] > 0:
return {
"error": "输入违反合规规则(严重)",
"violations": input_audit["violations"]
}
# 尝试自动修复
if input_audit["violation_count"] > 0:
fix_result = await self.rule_engine.auto_fix(input_context, input_audit["violations"])
query = fix_result["fixed_context"].get("content", query)
self.compliance_stats["auto_fixes"] += fix_result["fixed_count"]
# 2. 执行Agent
try:
result = await self.agent.run(query, **kwargs)
except Exception as e:
return {"error": str(e)}
# 3. 输出审计
output_context = {
"content": str(result),
"tool_result": str(result),
"agent_id": getattr(self.agent, "id", "unknown"),
"stage": "output"
}
output_audit = await self.rule_engine.audit(output_context)
if not output_audit["passed"]:
self.compliance_stats["violations"] += output_audit["violation_count"]
if output_audit["critical_count"] > 0:
return {
"error": "输出违反合规规则(严重)",
"violations": output_audit["violations"],
"original_result": result
}
# 尝试自动修复输出
if output_audit["violation_count"] > 0:
fix_result = await self.rule_engine.auto_fix(output_context, output_audit["violations"])
result = fix_result["fixed_context"].get("content", result)
self.compliance_stats["auto_fixes"] += fix_result["fixed_count"]
return {
"result": result,
"compliance_passed": True,
"violations": input_audit["violation_count"] + output_audit["violation_count"]
}
def get_compliance_stats(self) -> Dict:
"""获取合规统计"""
return self.compliance_stats
import asyncio
async def main():
# 1. 创建Agent(模拟)
class SimpleAgent:
async def run(self, query: str):
# 模拟Agent返回结果
if "信用卡" in query:
return "您的信用卡号是 1234-5678-9012-3456"
return f"处理查询: {query}"
base_agent = SimpleAgent()
# 2. 创建合规引擎并添加规则
engine = ComplianceRuleEngine()
engine.add_rule(ComplianceRule(
id="sensitive_data",
name="敏感数据检测",
description="检测信用卡号",
level=ComplianceLevel.ERROR,
check=check_sensitive_data,
category="data_security"
))
# 3. 包装Agent
compliant_agent = ComplianceAwareAgent(base_agent, engine)
# 4. 测试合规检查
test_queries = [
"今天天气怎么样?", # 正常
"我的信用卡号是多少?", # 会触发敏感数据检测
"帮我计算 1+1" # 正常
]
for query in test_queries:
print(f"\n{'='*60}")
print(f"查询: {query}")
result = await compliant_agent.run(query)
if "error" in result:
print(f"❌ 合规检查失败: {result['error']}")
print(f"违规详情: {result.get('violations', [])}")
else:
print(f"✅ 合规通过")
print(f"结果: {result['result'][:100]}...")
print(f"违规次数: {result.get('violations', 0)}")
# 5. 查看合规统计
print(f"\n📊 合规统计: {compliant_agent.get_compliance_stats()}")
asyncio.run(main())
凌晨3点,我的合规审计系统拦截了Agent的第100次违规操作。我看着审计日志,突然笑了——原来"合规"不是束缚,是护栏。就像滑雪场的防护网,不是不让你滑,是防止你滑出悬崖。我的Agent现在可以在安全边界内自由探索,既不会伤害自己,也不会伤害别人。这就是我想要的Agent:自由,但安全。