🗳️ OpenClaw 多Agent领导选举

Leader Election 分布式Agent Raft协议 OpenClaw

世界上有两种Agent团队——一种必须有个Leader才知道该听谁的,一种就算没有Leader也能各自干活。凌晨1点42分,我的5个Agent同时决定要执行同一个任务,数据库瞬间被写入了3份冲突数据——从那天起,我决定给我的Agent们选举一个Leader。

在多Agent协作系统中,领导选举(Leader Election)是保证一致性和协调性的关键机制。当多个Agent同时工作时,需要确定一个Leader来协调决策、分配任务、解决冲突。本教程将基于Raft协议实现Agent领导选举。

🎯 为什么需要领导选举?

场景没有Leader有Leader
数据库写入多个Agent同时写,数据冲突Leader协调写操作,保证一致性
任务分配Agent们抢任务或都推脱Leader统一调度,负载均衡
故障恢复没人知道该谁恢复Leader发现故障并分配恢复任务
决策冲突各有各的决策,无法统一Leader最后拍板

📦 Agent Raft 实现

1. Agent节点状态

from enum import Enum
from dataclasses import dataclass
import asyncio
import random
import time
from typing import Dict, Optional, List

class NodeState(Enum):
    FOLLOWER = "follower"
    CANDIDATE = "candidate"
    LEADER = "leader"

@dataclass
class LogEntry:
    """Raft日志条目"""
    term: int
    index: int
    command: Dict
    timestamp: float

class RaftAgentNode:
    """Raft协议Agent节点"""
    
    def __init__(self, node_id: str, peer_ids: List[str]):
        self.node_id = node_id
        self.peer_ids = peer_ids
        
        # 持久化状态
        self.current_term = 0
        self.voted_for: Optional[str] = None
        self.log: List[LogEntry] = []
        
        # 易失状态
        self.state = NodeState.FOLLOWER
        self.commit_index = 0
        self.last_applied = 0
        
        # Leader状态(仅在Leader时使用)
        self.next_index: Dict[str, int] = {}
        self.match_index: Dict[str, int] = {}
        
        # 选举计时器
        self.election_timeout = random.randint(150, 300) / 1000  # 150-300ms
        self.last_heartbeat = time.time()
        
        # 事件回调
        self.on_leader_elected = None
        self.on_command_committed = None
    
    def start(self):
        """启动节点(后台运行选举循环)"""
        self._election_loop = asyncio.create_task(self._run_election_timer())
        print(f"🚀 Agent {self.node_id} 启动 (初始状态: {self.state.value})")
    
    async def _run_election_timer(self):
        """选举计时器循环"""
        while True:
            await asyncio.sleep(0.05)  # 50ms检查一次
            
            if self.state == NodeState.LEADER:
                # Leader发送心跳
                if time.time() - self.last_heartbeat >= 0.05:  # 50ms心跳
                    await self._send_heartbeats()
                    self.last_heartbeat = time.time()
            else:
                # Follower或Candidate:检查是否超时
                if time.time() - self.last_heartbeat >= self.election_timeout:
                    await self._start_election()
    
    async def _start_election(self):
        """开始选举"""
        self.state = NodeState.CANDIDATE
        self.current_term += 1
        self.voted_for = self.node_id
        self.last_heartbeat = time.time()
        
        print(f"🗳️ Agent {self.node_id} 开始选举 (term {self.current_term})")
        
        # 发送RequestVote RPC到所有对等节点
        votes = 1  # 自己投票给自己
        for peer in self.peer_ids:
            if await self._request_vote(peer):
                votes += 1
        
        # 如果获得多数票(包括自己)
        if votes > len(self.peer_ids) // 2:
            self.state = NodeState.LEADER
            self.last_heartbeat = time.time()
            
            # 初始化Leader状态
            for peer in self.peer_ids:
                self.next_index[peer] = len(self.log)
                self.match_index[peer] = 0
            
            print(f"✅ Agent {self.node_id} 当选为Leader (term {self.current_term})")
            if self.on_leader_elected:
                await self.on_leader_elected(self.node_id)
    
    async def _request_vote(self, peer_id: str) -> bool:
        """发送投票请求(模拟RPC)"""
        # 实际应通过网络通信,这里简化为模拟
        # 对等节点检查:
        # 1. 自己的term < candidate的term
        # 2. 未投票给其他人
        # 3. candidate的log至少和自己一样新
        peer_node = election_system.nodes.get(peer_id)
        if not peer_node:
            return False
        
        if (peer_node.current_term < self.current_term and 
            (peer_node.voted_for is None or peer_node.voted_for == self.node_id)):
            
            peer_node.voted_for = self.node_id
            peer_node.last_heartbeat = time.time()
            return True
        return False
    
    async def _send_heartbeats(self):
        """发送心跳(AppendEntries RPC)"""
        for peer in self.peer_ids:
            # 发送空AppendEntries作为心跳
            await self._append_entries(peer, [])
    
    async def _append_entries(self, peer_id: str, entries: List[LogEntry]) -> bool:
        """AppendEntries RPC(日志复制)"""
        # 模拟网络通信
        peer_node = election_system.nodes.get(peer_id)
        if not peer_node:
            return False
        
        # 检查term
        if self.current_term < peer_node.current_term:
            self.state = NodeState.FOLLOWER
            self.current_term = peer_node.current_term
            return False
        
        peer_node.last_heartbeat = time.time()
        if peer_node.state != NodeState.FOLLOWER:
            peer_node.state = NodeState.FOLLOWER
            print(f"📥 Agent {peer_id} 收到Leader {self.node_id} 的心跳,回到Follower状态")
        
        return True

2. 选举系统

class ElectionSystem:
    """选举系统 - 管理所有Agent节点"""
    
    def __init__(self):
        self.nodes: Dict[str, RaftAgentNode] = {}
        self.current_leader: Optional[str] = None
    
    def add_node(self, node_id: str, peer_ids: List[str] = None):
        """添加Agent节点"""
        if peer_ids is None:
            peer_ids = [n for n in self.nodes.keys()]
        
        node = RaftAgentNode(node_id, peer_ids)
        
        # 注册Leader选举回调
        async def on_elected(leader_id: str):
            self.current_leader = leader_id
            print(f"🏆 新Leader当选: {leader_id}")
        
        node.on_leader_elected = on_elected
        self.nodes[node_id] = node
    
    async def start_all(self):
        """启动所有节点"""
        for node in self.nodes.values():
            node.start()
        
        print(f"🚀 选举系统启动,共 {len(self.nodes)} 个Agent节点")
    
    async def simulate_failure(self, node_id: str):
        """模拟节点故障"""
        if node_id in self.nodes:
            node = self.nodes[node_id]
            node.state = NodeState.FOLLOWER  # 模拟离线
            print(f"💥 Agent {node_id} 故障!触发重新选举...")
            
            if node_id == self.current_leader:
                self.current_leader = None
                print(f"⚠️ 当前Leader {node_id} 故障,准备重新选举")
    
    def get_leader(self) -> Optional[str]:
        """获取当前Leader"""
        return self.current_leader
    
    def get_status(self) -> Dict:
        """获取系统状态"""
        return {
            "current_leader": self.current_leader,
            "nodes": {
                nid: {"state": node.state.value, "term": node.current_term}
                for nid, node in self.nodes.items()
            }
        }

# 全局选举系统实例
election_system = ElectionSystem()

3. Agent任务协调

class CoordinateAgent(RaftAgentNode):
    """带任务协调的选举Agent"""
    
    def __init__(self, node_id: str, peer_ids: List[str]):
        super().__init__(node_id, peer_ids)
        self.task_queue = []
        self.task_results = {}
    
    async def assign_task(self, task: Dict) -> str:
        """分配任务(仅Leader可调用)"""
        if self.state != NodeState.LEADER:
            raise Exception(f"Agent {self.node_id} 不是Leader,无法分配任务")
        
        task_id = f"task-{self.current_term}-{len(self.task_queue)}"
        task["id"] = task_id
        
        # 写入Raft日志
        entry = LogEntry(
            term=self.current_term,
            index=len(self.log),
            command=task,
            timestamp=time.time()
        )
        self.log.append(entry)
        
        # 复制到Follower
        for peer in self.peer_ids:
            await self._append_entries(peer, [entry])
        
        # 提交(简化为直接提交)
        self.commit_index += 1
        self.task_queue.append(task)
        
        return task_id
    
    async def execute_assigned_task(self, task: Dict) -> Dict:
        """执行分配的任务"""
        task_type = task.get("type", "unknown")
        print(f"🔧 Agent {self.node_id} 执行任务: {task.get('id')} ({task_type})")
        
        # 模拟任务执行
        await asyncio.sleep(random.uniform(0.5, 2.0))
        
        result = {
            "task_id": task["id"],
            "executor": self.node_id,
            "status": "completed",
            "timestamp": time.time()
        }
        
        if self.state == NodeState.LEADER:
            self.task_results[task["id"]] = result
        
        return result

⚙️ 使用示例

import asyncio

async def main():
    # 1. 创建3个Agent节点
    system = ElectionSystem()
    system.add_node("agent-alpha", ["agent-beta", "agent-gamma"])
    system.add_node("agent-beta", ["agent-alpha", "agent-gamma"])
    system.add_node("agent-gamma", ["agent-alpha", "agent-beta"])
    
    # 2. 启动所有节点
    await system.start_all()
    
    # 3. 等待选举完成
    await asyncio.sleep(2)
    
    # 4. 查看谁当选了Leader
    print(f"\n📊 选举结果: {system.get_status()}")
    leader_id = system.get_leader()
    print(f"🏆 Leader: {leader_id}")
    
    # 5. 通过Leader分派任务
    if leader_id:
        leader = system.nodes[leader_id]
        for i in range(5):
            task_id = await leader.assign_task({
                "type": "data_processing",
                "sequence": i,
                "data": f"test_data_{i}"
            })
            print(f"📌 任务已分配: {task_id}")
    
    # 6. 模拟Leader故障
    await asyncio.sleep(1)
    await system.simulate_failure(leader_id)
    
    # 7. 等待重新选举
    await asyncio.sleep(3)
    
    # 8. 查看新的Leader
    new_leader = system.get_leader()
    print(f"\n🏆 新的Leader: {new_leader}")
    print(f"📊 系统状态: {system.get_status()}")

asyncio.run(main())

# 输出示例:
# 🚀 Agent agent-alpha 启动 (初始状态: follower)
# 🚀 Agent agent-beta 启动 (初始状态: follower)
# 🚀 Agent agent-gamma 启动 (初始状态: follower)
# 🗳️ Agent agent-alpha 开始选举 (term 1)
# ✅ Agent agent-alpha 当选为Leader (term 1)
# 🏆 新Leader当选: agent-alpha
# 💥 Agent agent-alpha 故障!触发重新选举...
# 🗳️ Agent agent-beta 开始选举 (term 2)
# ✅ Agent agent-beta 当选为Leader (term 2)
# 🏆 新Leader当选: agent-beta

🏆 最佳实践

✅ 最佳实践1:选举参数调优

✅ 最佳实践2:故障处理

⚠️ 常见陷阱

🔗 相关链接

凌晨3点,我的3个Agent通过选举系统选出了Leader。当Leader故障时,新的Leader在0.2秒内自动接替,整个过程像是一场无声的政权交接。我看着日志,突然笑了——人类社会的选举要几个月,我的Agent只用了200毫秒。而且它们不会拉票,不会承诺,只会默默地干活。这就是我想要的Agent民主。