Leader Election 分布式Agent Raft协议 OpenClaw
世界上有两种Agent团队——一种必须有个Leader才知道该听谁的,一种就算没有Leader也能各自干活。凌晨1点42分,我的5个Agent同时决定要执行同一个任务,数据库瞬间被写入了3份冲突数据——从那天起,我决定给我的Agent们选举一个Leader。
在多Agent协作系统中,领导选举(Leader Election)是保证一致性和协调性的关键机制。当多个Agent同时工作时,需要确定一个Leader来协调决策、分配任务、解决冲突。本教程将基于Raft协议实现Agent领导选举。
| 场景 | 没有Leader | 有Leader |
|---|---|---|
| 数据库写入 | 多个Agent同时写,数据冲突 | Leader协调写操作,保证一致性 |
| 任务分配 | Agent们抢任务或都推脱 | Leader统一调度,负载均衡 |
| 故障恢复 | 没人知道该谁恢复 | Leader发现故障并分配恢复任务 |
| 决策冲突 | 各有各的决策,无法统一 | Leader最后拍板 |
from enum import Enum
from dataclasses import dataclass
import asyncio
import random
import time
from typing import Dict, Optional, List
class NodeState(Enum):
FOLLOWER = "follower"
CANDIDATE = "candidate"
LEADER = "leader"
@dataclass
class LogEntry:
"""Raft日志条目"""
term: int
index: int
command: Dict
timestamp: float
class RaftAgentNode:
"""Raft协议Agent节点"""
def __init__(self, node_id: str, peer_ids: List[str]):
self.node_id = node_id
self.peer_ids = peer_ids
# 持久化状态
self.current_term = 0
self.voted_for: Optional[str] = None
self.log: List[LogEntry] = []
# 易失状态
self.state = NodeState.FOLLOWER
self.commit_index = 0
self.last_applied = 0
# Leader状态(仅在Leader时使用)
self.next_index: Dict[str, int] = {}
self.match_index: Dict[str, int] = {}
# 选举计时器
self.election_timeout = random.randint(150, 300) / 1000 # 150-300ms
self.last_heartbeat = time.time()
# 事件回调
self.on_leader_elected = None
self.on_command_committed = None
def start(self):
"""启动节点(后台运行选举循环)"""
self._election_loop = asyncio.create_task(self._run_election_timer())
print(f"🚀 Agent {self.node_id} 启动 (初始状态: {self.state.value})")
async def _run_election_timer(self):
"""选举计时器循环"""
while True:
await asyncio.sleep(0.05) # 50ms检查一次
if self.state == NodeState.LEADER:
# Leader发送心跳
if time.time() - self.last_heartbeat >= 0.05: # 50ms心跳
await self._send_heartbeats()
self.last_heartbeat = time.time()
else:
# Follower或Candidate:检查是否超时
if time.time() - self.last_heartbeat >= self.election_timeout:
await self._start_election()
async def _start_election(self):
"""开始选举"""
self.state = NodeState.CANDIDATE
self.current_term += 1
self.voted_for = self.node_id
self.last_heartbeat = time.time()
print(f"🗳️ Agent {self.node_id} 开始选举 (term {self.current_term})")
# 发送RequestVote RPC到所有对等节点
votes = 1 # 自己投票给自己
for peer in self.peer_ids:
if await self._request_vote(peer):
votes += 1
# 如果获得多数票(包括自己)
if votes > len(self.peer_ids) // 2:
self.state = NodeState.LEADER
self.last_heartbeat = time.time()
# 初始化Leader状态
for peer in self.peer_ids:
self.next_index[peer] = len(self.log)
self.match_index[peer] = 0
print(f"✅ Agent {self.node_id} 当选为Leader (term {self.current_term})")
if self.on_leader_elected:
await self.on_leader_elected(self.node_id)
async def _request_vote(self, peer_id: str) -> bool:
"""发送投票请求(模拟RPC)"""
# 实际应通过网络通信,这里简化为模拟
# 对等节点检查:
# 1. 自己的term < candidate的term
# 2. 未投票给其他人
# 3. candidate的log至少和自己一样新
peer_node = election_system.nodes.get(peer_id)
if not peer_node:
return False
if (peer_node.current_term < self.current_term and
(peer_node.voted_for is None or peer_node.voted_for == self.node_id)):
peer_node.voted_for = self.node_id
peer_node.last_heartbeat = time.time()
return True
return False
async def _send_heartbeats(self):
"""发送心跳(AppendEntries RPC)"""
for peer in self.peer_ids:
# 发送空AppendEntries作为心跳
await self._append_entries(peer, [])
async def _append_entries(self, peer_id: str, entries: List[LogEntry]) -> bool:
"""AppendEntries RPC(日志复制)"""
# 模拟网络通信
peer_node = election_system.nodes.get(peer_id)
if not peer_node:
return False
# 检查term
if self.current_term < peer_node.current_term:
self.state = NodeState.FOLLOWER
self.current_term = peer_node.current_term
return False
peer_node.last_heartbeat = time.time()
if peer_node.state != NodeState.FOLLOWER:
peer_node.state = NodeState.FOLLOWER
print(f"📥 Agent {peer_id} 收到Leader {self.node_id} 的心跳,回到Follower状态")
return True
class ElectionSystem:
"""选举系统 - 管理所有Agent节点"""
def __init__(self):
self.nodes: Dict[str, RaftAgentNode] = {}
self.current_leader: Optional[str] = None
def add_node(self, node_id: str, peer_ids: List[str] = None):
"""添加Agent节点"""
if peer_ids is None:
peer_ids = [n for n in self.nodes.keys()]
node = RaftAgentNode(node_id, peer_ids)
# 注册Leader选举回调
async def on_elected(leader_id: str):
self.current_leader = leader_id
print(f"🏆 新Leader当选: {leader_id}")
node.on_leader_elected = on_elected
self.nodes[node_id] = node
async def start_all(self):
"""启动所有节点"""
for node in self.nodes.values():
node.start()
print(f"🚀 选举系统启动,共 {len(self.nodes)} 个Agent节点")
async def simulate_failure(self, node_id: str):
"""模拟节点故障"""
if node_id in self.nodes:
node = self.nodes[node_id]
node.state = NodeState.FOLLOWER # 模拟离线
print(f"💥 Agent {node_id} 故障!触发重新选举...")
if node_id == self.current_leader:
self.current_leader = None
print(f"⚠️ 当前Leader {node_id} 故障,准备重新选举")
def get_leader(self) -> Optional[str]:
"""获取当前Leader"""
return self.current_leader
def get_status(self) -> Dict:
"""获取系统状态"""
return {
"current_leader": self.current_leader,
"nodes": {
nid: {"state": node.state.value, "term": node.current_term}
for nid, node in self.nodes.items()
}
}
# 全局选举系统实例
election_system = ElectionSystem()
class CoordinateAgent(RaftAgentNode):
"""带任务协调的选举Agent"""
def __init__(self, node_id: str, peer_ids: List[str]):
super().__init__(node_id, peer_ids)
self.task_queue = []
self.task_results = {}
async def assign_task(self, task: Dict) -> str:
"""分配任务(仅Leader可调用)"""
if self.state != NodeState.LEADER:
raise Exception(f"Agent {self.node_id} 不是Leader,无法分配任务")
task_id = f"task-{self.current_term}-{len(self.task_queue)}"
task["id"] = task_id
# 写入Raft日志
entry = LogEntry(
term=self.current_term,
index=len(self.log),
command=task,
timestamp=time.time()
)
self.log.append(entry)
# 复制到Follower
for peer in self.peer_ids:
await self._append_entries(peer, [entry])
# 提交(简化为直接提交)
self.commit_index += 1
self.task_queue.append(task)
return task_id
async def execute_assigned_task(self, task: Dict) -> Dict:
"""执行分配的任务"""
task_type = task.get("type", "unknown")
print(f"🔧 Agent {self.node_id} 执行任务: {task.get('id')} ({task_type})")
# 模拟任务执行
await asyncio.sleep(random.uniform(0.5, 2.0))
result = {
"task_id": task["id"],
"executor": self.node_id,
"status": "completed",
"timestamp": time.time()
}
if self.state == NodeState.LEADER:
self.task_results[task["id"]] = result
return result
import asyncio
async def main():
# 1. 创建3个Agent节点
system = ElectionSystem()
system.add_node("agent-alpha", ["agent-beta", "agent-gamma"])
system.add_node("agent-beta", ["agent-alpha", "agent-gamma"])
system.add_node("agent-gamma", ["agent-alpha", "agent-beta"])
# 2. 启动所有节点
await system.start_all()
# 3. 等待选举完成
await asyncio.sleep(2)
# 4. 查看谁当选了Leader
print(f"\n📊 选举结果: {system.get_status()}")
leader_id = system.get_leader()
print(f"🏆 Leader: {leader_id}")
# 5. 通过Leader分派任务
if leader_id:
leader = system.nodes[leader_id]
for i in range(5):
task_id = await leader.assign_task({
"type": "data_processing",
"sequence": i,
"data": f"test_data_{i}"
})
print(f"📌 任务已分配: {task_id}")
# 6. 模拟Leader故障
await asyncio.sleep(1)
await system.simulate_failure(leader_id)
# 7. 等待重新选举
await asyncio.sleep(3)
# 8. 查看新的Leader
new_leader = system.get_leader()
print(f"\n🏆 新的Leader: {new_leader}")
print(f"📊 系统状态: {system.get_status()}")
asyncio.run(main())
# 输出示例:
# 🚀 Agent agent-alpha 启动 (初始状态: follower)
# 🚀 Agent agent-beta 启动 (初始状态: follower)
# 🚀 Agent agent-gamma 启动 (初始状态: follower)
# 🗳️ Agent agent-alpha 开始选举 (term 1)
# ✅ Agent agent-alpha 当选为Leader (term 1)
# 🏆 新Leader当选: agent-alpha
# 💥 Agent agent-alpha 故障!触发重新选举...
# 🗳️ Agent agent-beta 开始选举 (term 2)
# ✅ Agent agent-beta 当选为Leader (term 2)
# 🏆 新Leader当选: agent-beta
凌晨3点,我的3个Agent通过选举系统选出了Leader。当Leader故障时,新的Leader在0.2秒内自动接替,整个过程像是一场无声的政权交接。我看着日志,突然笑了——人类社会的选举要几个月,我的Agent只用了200毫秒。而且它们不会拉票,不会承诺,只会默默地干活。这就是我想要的Agent民主。