SubAgent 工厂模式 Agent池 OpenClaw
世界上有两种Agent管理方式——一种是一个个手动创建,一种是像工厂流水线一样批量生产。凌晨1点42分,我的第100个SubAgent上线时,我终于明白:工厂模式不是设计模式,是救命稻草。
当你的应用需要同时运行几十个甚至上百个SubAgent时,手动管理每个实例会成为噩梦。SubAgent工厂模式提供了一套标准化的Agent实例创建、管理和回收机制,让你可以像管理线程池一样管理Agent池。
| 痛点 | 工厂模式解决方案 |
|---|---|
| 手动创建Agent实例繁琐 | 标准化创建流程,一行代码生成 |
| Agent配置不一致 | 统一配置模板,保证一致性 |
| 资源泄漏(忘记释放Agent) | 自动生命周期管理,超时回收 |
| 无法动态扩缩容 | 根据负载自动调整Agent池大小 |
| 缺乏监控和统计 | 内置指标采集,可视化Agent状态 |
from openclaw import Agent, sessions_spawn
from typing import Dict, List, Optional
from dataclasses import dataclass
import asyncio
import time
@dataclass
class AgentSpec:
"""Agent规格定义"""
name: str
agent_id: str
model: str = "gpt-4"
skills: List[str] = None
max_tokens: int = 4096
timeout: int = 300 # 秒
priority: int = 1 # 1-5,5最高
class SubAgentFactory:
"""SubAgent工厂 - 批量生产和管理Agent实例"""
def __init__(self, max_pool_size: int = 50):
self.max_pool_size = max_pool_size
self.pool: Dict[str, Agent] = {} # agent_id -> Agent实例
self.specs: Dict[str, AgentSpec] = {} # agent_id -> 规格
self.stats = {
"created": 0,
"destroyed": 0,
"active": 0,
"errors": 0
}
async def create_agent(self, spec: AgentSpec) -> Agent:
"""创建单个Agent实例"""
if len(self.pool) >= self.max_pool_size:
# 池已满,尝试回收空闲Agent
await self._recycle_idle_agents()
if len(self.pool) >= self.max_pool_size:
raise Exception("Agent池已满,无法创建新实例")
# 使用sessions_spawn创建SubAgent
agent = await sessions_spawn(
task=f"Agent {spec.name} ready",
agentId=spec.agent_id,
mode="session", # 持久化session
runtime="subagent"
)
self.pool[spec.name] = agent
self.specs[spec.name] = spec
self.stats["created"] += 1
self.stats["active"] += 1
print(f"✅ Agent {spec.name} 创建成功 (池大小: {len(self.pool)})")
return agent
async def create_batch(self, specs: List[AgentSpec]) -> List[Agent]:
"""批量创建Agent实例"""
print(f"🏭 开始批量创建 {len(specs)} 个Agent...")
tasks = [self.create_agent(spec) for spec in specs]
agents = await asyncio.gather(*tasks, return_exceptions=True)
# 统计成功/失败
success = sum(1 for a in agents if not isinstance(a, Exception))
errors = sum(1 for a in agents if isinstance(a, Exception))
print(f"✅ 批量创建完成: 成功 {success}, 失败 {errors}")
return [a for a in agents if not isinstance(a, Exception)]
async def destroy_agent(self, agent_name: str):
"""销毁Agent实例"""
if agent_name in self.pool:
agent = self.pool.pop(agent_name)
self.specs.pop(agent_name, None)
# 触发Agent清理(sessions_yield或类似机制)
await self._cleanup_agent(agent)
self.stats["destroyed"] += 1
self.stats["active"] -= 1
print(f"🗑️ Agent {agent_name} 已销毁")
class DynamicAgentPool:
"""动态Agent池 - 根据负载自动扩缩容"""
def __init__(self, factory: SubAgentFactory, min_size: int = 5, max_size: int = 50):
self.factory = factory
self.min_size = min_size
self.max_size = max_size
self.task_queue = asyncio.Queue()
self.monitor_task = None
async def start(self):
"""启动Agent池(预创建最小数量的Agent)"""
print(f"🚀 启动Agent池,预创建 {self.min_size} 个Agent...")
initial_specs = [
AgentSpec(
name=f"worker-{i}",
agent_id=f"worker_agent_{i}",
skills=["web_search", "code_execution"]
)
for i in range(self.min_size)
]
await self.factory.create_batch(initial_specs)
# 启动监控任务
self.monitor_task = asyncio.create_task(self._monitor_pool())
async def submit_task(self, task: dict) -> str:
"""提交任务到队列"""
await self.task_queue.put(task)
# 检查是否需要扩容
if self.task_queue.qsize() > len(self.factory.pool) * 2:
await self._scale_up()
return task.get("id", "unknown")
async def _scale_up(self):
"""扩容:增加Agent实例"""
if len(self.factory.pool) >= self.max_size:
print("⚠️ 已达最大池大小,无法扩容")
return
new_count = min(5, self.max_size - len(self.factory.pool))
print(f"📈 扩容:新增 {new_count} 个Agent")
new_specs = [
AgentSpec(
name=f"worker-dynamic-{int(time.time())}-{i}",
agent_id=f"dynamic_agent_{int(time.time())}_{i}"
)
for i in range(new_count)
]
await self.factory.create_batch(new_specs)
async def _scale_down(self):
"""缩容:回收空闲Agent(保留最小数量)"""
if len(self.factory.pool) <= self.min_size:
return
# 找出空闲时间最长的Agent
idle_agents = self._get_idle_agents()
for agent_name in idle_agents[:5]: # 每次回收最多5个
await self.factory.destroy_agent(agent_name)
async def _monitor_pool(self):
"""监控池状态,自动扩缩容"""
while True:
await asyncio.sleep(60) # 每分钟检查一次
queue_size = self.task_queue.qsize()
pool_size = len(self.factory.pool)
print(f"📊 池状态: 队列={queue_size}, 池={pool_size}, 统计={self.factory.stats}")
# 自动扩缩容决策
if queue_size > pool_size * 3:
await self._scale_up()
elif queue_size == 0 and pool_size > self.min_size:
await self._scale_down()
class AgentLoadBalancer:
"""Agent负载均衡器 - 智能分配任务"""
def __init__(self, pool: DynamicAgentPool):
self.pool = pool
self.agent_load = {} # agent_name -> 当前负载
async def assign_task(self, task: dict) -> Optional[Agent]:
"""分配任务到最空闲的Agent"""
agents = list(self.pool.factory.pool.values())
if not agents:
print("⚠️ 没有可用的Agent")
return None
# 选择负载最低的Agent
agent_loads = [
(agent, self.agent_load.get(agent.name, 0))
for agent in agents
]
agent_loads.sort(key=lambda x: x[1])
selected_agent = agent_loads[0][0]
self.agent_load[selected_agent.name] = self.agent_load.get(selected_agent.name, 0) + 1
print(f"📌 任务分配给 {selected_agent.name} (负载: {agent_loads[0][1]})")
return selected_agent
def release_task(self, agent_name: str):
"""释放Agent负载"""
self.agent_load[agent_name] = max(0, self.agent_load.get(agent_name, 0) - 1)
import asyncio
from openclaw import Agent
async def main():
# 1. 创建工厂
factory = SubAgentFactory(max_pool_size=20)
# 2. 定义Agent规格
specs = [
AgentSpec(
name="coder-1",
agent_id="coding_agent",
skills=["code_execution", "github"],
model="gpt-4"
),
AgentSpec(
name="searcher-1",
agent_id="search_agent",
skills=["web_search", "browser"],
model="gpt-3.5-turbo"
),
AgentSpec(
name="writer-1",
agent_id="writing_agent",
skills=["content_humanizer", "seo_optimizer"],
model="claude-3"
)
]
# 3. 批量创建
agents = await factory.create_batch(specs)
# 4. 使用Agent
for agent in agents:
result = await agent.run("帮我分析这段代码...")
print(result)
# 5. 查看统计
print(f"工厂统计: {factory.stats}")
asyncio.run(main())
async def advanced_usage():
factory = SubAgentFactory(max_pool_size=50)
pool = DynamicAgentPool(factory, min_size=5, max_size=30)
balancer = AgentLoadBalancer(pool)
# 启动池
await pool.start()
# 提交100个任务
for i in range(100):
task = {"id": f"task-{i}", "type": "code_review", "code": f"..."}
await pool.submit_task(task)
# 等待任务完成(实际应配合任务结果回调)
await asyncio.sleep(60)
# 查看最终状态
print(f"最终统计: {factory.stats}")
| 方案 | 100个任务耗时 | 资源占用 | 复杂度 |
|---|---|---|---|
| 手动创建Agent | 50分钟 | 高(无复用) | 高 |
| 简单工厂 | 15分钟 | 中(基础复用) | 中 |
| 工厂+动态池 | 8分钟 | 低(智能复用) | 低 |
| 工厂+池+负载均衡 | 5分钟 | 最低(最优调度) | 最低 |
凌晨3点,我的Agent池自动扩容到30个实例,处理完最后一批任务后又悄悄缩回5个。我看着监控面板,突然觉得:这不就是我梦寐以求的「自动化」吗?不用盯,不用管,它自己知道该怎么做。然后我关了电脑,去睡了。