🏭 OpenClaw SubAgent 工厂模式

SubAgent 工厂模式 Agent池 OpenClaw

世界上有两种Agent管理方式——一种是一个个手动创建,一种是像工厂流水线一样批量生产。凌晨1点42分,我的第100个SubAgent上线时,我终于明白:工厂模式不是设计模式,是救命稻草。

当你的应用需要同时运行几十个甚至上百个SubAgent时,手动管理每个实例会成为噩梦。SubAgent工厂模式提供了一套标准化的Agent实例创建、管理和回收机制,让你可以像管理线程池一样管理Agent池。

🎯 为什么需要工厂模式?

痛点工厂模式解决方案
手动创建Agent实例繁琐标准化创建流程,一行代码生成
Agent配置不一致统一配置模板,保证一致性
资源泄漏(忘记释放Agent)自动生命周期管理,超时回收
无法动态扩缩容根据负载自动调整Agent池大小
缺乏监控和统计内置指标采集,可视化Agent状态

📦 核心实现

1. SubAgent工厂基类

from openclaw import Agent, sessions_spawn
from typing import Dict, List, Optional
from dataclasses import dataclass
import asyncio
import time

@dataclass
class AgentSpec:
    """Agent规格定义"""
    name: str
    agent_id: str
    model: str = "gpt-4"
    skills: List[str] = None
    max_tokens: int = 4096
    timeout: int = 300  # 秒
    priority: int = 1  # 1-5,5最高

class SubAgentFactory:
    """SubAgent工厂 - 批量生产和管理Agent实例"""
    
    def __init__(self, max_pool_size: int = 50):
        self.max_pool_size = max_pool_size
        self.pool: Dict[str, Agent] = {}  # agent_id -> Agent实例
        self.specs: Dict[str, AgentSpec] = {}  # agent_id -> 规格
        self.stats = {
            "created": 0,
            "destroyed": 0,
            "active": 0,
            "errors": 0
        }
    
    async def create_agent(self, spec: AgentSpec) -> Agent:
        """创建单个Agent实例"""
        if len(self.pool) >= self.max_pool_size:
            # 池已满,尝试回收空闲Agent
            await self._recycle_idle_agents()
            if len(self.pool) >= self.max_pool_size:
                raise Exception("Agent池已满,无法创建新实例")
        
        # 使用sessions_spawn创建SubAgent
        agent = await sessions_spawn(
            task=f"Agent {spec.name} ready",
            agentId=spec.agent_id,
            mode="session",  # 持久化session
            runtime="subagent"
        )
        
        self.pool[spec.name] = agent
        self.specs[spec.name] = spec
        self.stats["created"] += 1
        self.stats["active"] += 1
        
        print(f"✅ Agent {spec.name} 创建成功 (池大小: {len(self.pool)})")
        return agent
    
    async def create_batch(self, specs: List[AgentSpec]) -> List[Agent]:
        """批量创建Agent实例"""
        print(f"🏭 开始批量创建 {len(specs)} 个Agent...")
        tasks = [self.create_agent(spec) for spec in specs]
        agents = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 统计成功/失败
        success = sum(1 for a in agents if not isinstance(a, Exception))
        errors = sum(1 for a in agents if isinstance(a, Exception))
        
        print(f"✅ 批量创建完成: 成功 {success}, 失败 {errors}")
        return [a for a in agents if not isinstance(a, Exception)]
    
    async def destroy_agent(self, agent_name: str):
        """销毁Agent实例"""
        if agent_name in self.pool:
            agent = self.pool.pop(agent_name)
            self.specs.pop(agent_name, None)
            # 触发Agent清理(sessions_yield或类似机制)
            await self._cleanup_agent(agent)
            self.stats["destroyed"] += 1
            self.stats["active"] -= 1
            print(f"🗑️ Agent {agent_name} 已销毁")

2. 动态Agent池管理

class DynamicAgentPool:
    """动态Agent池 - 根据负载自动扩缩容"""
    
    def __init__(self, factory: SubAgentFactory, min_size: int = 5, max_size: int = 50):
        self.factory = factory
        self.min_size = min_size
        self.max_size = max_size
        self.task_queue = asyncio.Queue()
        self.monitor_task = None
        
    async def start(self):
        """启动Agent池(预创建最小数量的Agent)"""
        print(f"🚀 启动Agent池,预创建 {self.min_size} 个Agent...")
        initial_specs = [
            AgentSpec(
                name=f"worker-{i}",
                agent_id=f"worker_agent_{i}",
                skills=["web_search", "code_execution"]
            )
            for i in range(self.min_size)
        ]
        await self.factory.create_batch(initial_specs)
        
        # 启动监控任务
        self.monitor_task = asyncio.create_task(self._monitor_pool())
    
    async def submit_task(self, task: dict) -> str:
        """提交任务到队列"""
        await self.task_queue.put(task)
        # 检查是否需要扩容
        if self.task_queue.qsize() > len(self.factory.pool) * 2:
            await self._scale_up()
        return task.get("id", "unknown")
    
    async def _scale_up(self):
        """扩容:增加Agent实例"""
        if len(self.factory.pool) >= self.max_size:
            print("⚠️ 已达最大池大小,无法扩容")
            return
        
        new_count = min(5, self.max_size - len(self.factory.pool))
        print(f"📈 扩容:新增 {new_count} 个Agent")
        
        new_specs = [
            AgentSpec(
                name=f"worker-dynamic-{int(time.time())}-{i}",
                agent_id=f"dynamic_agent_{int(time.time())}_{i}"
            )
            for i in range(new_count)
        ]
        await self.factory.create_batch(new_specs)
    
    async def _scale_down(self):
        """缩容:回收空闲Agent(保留最小数量)"""
        if len(self.factory.pool) <= self.min_size:
            return
        
        # 找出空闲时间最长的Agent
        idle_agents = self._get_idle_agents()
        for agent_name in idle_agents[:5]:  # 每次回收最多5个
            await self.factory.destroy_agent(agent_name)
    
    async def _monitor_pool(self):
        """监控池状态,自动扩缩容"""
        while True:
            await asyncio.sleep(60)  # 每分钟检查一次
            
            queue_size = self.task_queue.qsize()
            pool_size = len(self.factory.pool)
            
            print(f"📊 池状态: 队列={queue_size}, 池={pool_size}, 统计={self.factory.stats}")
            
            # 自动扩缩容决策
            if queue_size > pool_size * 3:
                await self._scale_up()
            elif queue_size == 0 and pool_size > self.min_size:
                await self._scale_down()

3. 负载均衡器

class AgentLoadBalancer:
    """Agent负载均衡器 - 智能分配任务"""
    
    def __init__(self, pool: DynamicAgentPool):
        self.pool = pool
        self.agent_load = {}  # agent_name -> 当前负载
    
    async def assign_task(self, task: dict) -> Optional[Agent]:
        """分配任务到最空闲的Agent"""
        agents = list(self.pool.factory.pool.values())
        if not agents:
            print("⚠️ 没有可用的Agent")
            return None
        
        # 选择负载最低的Agent
        agent_loads = [
            (agent, self.agent_load.get(agent.name, 0))
            for agent in agents
        ]
        agent_loads.sort(key=lambda x: x[1])
        
        selected_agent = agent_loads[0][0]
        self.agent_load[selected_agent.name] = self.agent_load.get(selected_agent.name, 0) + 1
        
        print(f"📌 任务分配给 {selected_agent.name} (负载: {agent_loads[0][1]})")
        return selected_agent
    
    def release_task(self, agent_name: str):
        """释放Agent负载"""
        self.agent_load[agent_name] = max(0, self.agent_load.get(agent_name, 0) - 1)

⚙️ 使用方法

基础用法:创建工厂并批量生成Agent

import asyncio
from openclaw import Agent

async def main():
    # 1. 创建工厂
    factory = SubAgentFactory(max_pool_size=20)
    
    # 2. 定义Agent规格
    specs = [
        AgentSpec(
            name="coder-1",
            agent_id="coding_agent",
            skills=["code_execution", "github"],
            model="gpt-4"
        ),
        AgentSpec(
            name="searcher-1",
            agent_id="search_agent",
            skills=["web_search", "browser"],
            model="gpt-3.5-turbo"
        ),
        AgentSpec(
            name="writer-1",
            agent_id="writing_agent",
            skills=["content_humanizer", "seo_optimizer"],
            model="claude-3"
        )
    ]
    
    # 3. 批量创建
    agents = await factory.create_batch(specs)
    
    # 4. 使用Agent
    for agent in agents:
        result = await agent.run("帮我分析这段代码...")
        print(result)
    
    # 5. 查看统计
    print(f"工厂统计: {factory.stats}")

asyncio.run(main())

高级用法:动态池 + 负载均衡

async def advanced_usage():
    factory = SubAgentFactory(max_pool_size=50)
    pool = DynamicAgentPool(factory, min_size=5, max_size=30)
    balancer = AgentLoadBalancer(pool)
    
    # 启动池
    await pool.start()
    
    # 提交100个任务
    for i in range(100):
        task = {"id": f"task-{i}", "type": "code_review", "code": f"..."}
        await pool.submit_task(task)
    
    # 等待任务完成(实际应配合任务结果回调)
    await asyncio.sleep(60)
    
    # 查看最终状态
    print(f"最终统计: {factory.stats}")

🏆 最佳实践

✅ 最佳实践1:合理设置池大小

✅ 最佳实践2:Agent生命周期管理

⚠️ 常见陷阱

📊 性能对比

方案100个任务耗时资源占用复杂度
手动创建Agent50分钟高(无复用)
简单工厂15分钟中(基础复用)
工厂+动态池8分钟低(智能复用)
工厂+池+负载均衡5分钟最低(最优调度)最低

🔗 相关链接

凌晨3点,我的Agent池自动扩容到30个实例,处理完最后一批任务后又悄悄缩回5个。我看着监控面板,突然觉得:这不就是我梦寐以求的「自动化」吗?不用盯,不用管,它自己知道该怎么做。然后我关了电脑,去睡了。