MCP Server 自定义开发 Mojo协议 OpenClaw
世界上有两种MCP Server——一种是现成的,一种是你自己写的。凌晨1点42分,当我发现官方仓库里没有我需要的那个API集成时,我决定自己造一个轮子——一个有MCP协议的轮子。
MCP (Model-Controller-Protocol) 是OpenClaw Agent与外部服务通信的标准协议。本教程将从零教你如何构建自定义MCP Server,支持多种协议实现和工具注册。
┌──────────────────────────────────────────────────────────┐ │ MCP Server 架构 │ ├──────────────────────────────────────────────────────────┤ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ Transport │ │ Protocol │ │ Tools │ │ │ │ Layer │──│ Layer │──│ Layer │ │ │ ├────────────┤ ├────────────┤ ├────────────┤ │ │ │ · HTTP │ │ · Mojo │ │ · Tool A │ │ │ │ · SSE │ │ · gRPC │ │ · Tool B │ │ │ │ · WebSocket│ │ │ │ · Tool C │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ Auth │ │ Monitor │ │ Registry │ │ │ │ Middleware│ │ Metrics │ │ Service │ │ │ └────────────┘ └────────────┘ └────────────┘ │ └──────────────────────────────────────────────────────────┘
import json
import asyncio
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum
class TransportType(Enum):
HTTP = "http"
SSE = "sse"
WEBSOCKET = "websocket"
MOJO = "mojo"
class ToolStatus(Enum):
ACTIVE = "active"
DISABLED = "disabled"
DEPRECATED = "deprecated"
@dataclass
class ToolSpec:
"""工具规格定义"""
name: str
description: str
input_schema: Dict # JSON Schema
handler: Callable
category: str = "general"
status: ToolStatus = ToolStatus.ACTIVE
timeout: int = 30 # 秒
class MCPServer:
"""MCP Server基类"""
def __init__(self, name: str, version: str = "1.0.0", transport: TransportType = TransportType.MOJO):
self.name = name
self.version = version
self.transport = transport
self.tools: Dict[str, ToolSpec] = {}
self.middleware = []
self.start_time = None
def register_tool(self, tool: ToolSpec):
"""注册工具"""
if tool.name in self.tools:
print(f"⚠️ 工具 {tool.name} 已存在,覆盖注册")
self.tools[tool.name] = tool
print(f"✅ 工具 {tool.name} 注册成功")
def register_middleware(self, fn: Callable):
"""注册中间件"""
self.middleware.append(fn)
def get_tools_list(self) -> list:
"""获取可用工具列表"""
return [
{
"name": spec.name,
"description": spec.description,
"input_schema": spec.input_schema,
"category": spec.category,
"status": spec.status.value
}
for spec in self.tools.values()
if spec.status == ToolStatus.ACTIVE
]
async def execute_tool(self, tool_name: str, params: Dict) -> Dict:
"""执行工具"""
if tool_name not in self.tools:
return {"error": f"工具 {tool_name} 不存在", "status": "error"}
spec = self.tools[tool_name]
if spec.status != ToolStatus.ACTIVE:
return {"error": f"工具 {tool_name} 状态: {spec.status.value}", "status": "error"}
# 运行中间件链
context = {"tool": tool_name, "params": params}
for mw in self.middleware:
context = await mw(context)
if context.get("abort"):
return {"error": context.get("reason", "被中间件拒绝"), "status": "blocked"}
# 执行工具
try:
result = await asyncio.wait_for(
spec.handler(**params),
timeout=spec.timeout
)
return {"result": result, "status": "success", "tool": tool_name}
except asyncio.TimeoutError:
return {"error": f"工具 {tool_name} 执行超时({spec.timeout}s)", "status": "timeout"}
except Exception as e:
return {"error": str(e), "status": "error"}
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
class HTTPTransport:
"""HTTP传输层 - 基于FastAPI"""
def __init__(self, server: MCPServer, host: str = "0.0.0.0", port: int = 8080):
self.server = server
self.host = host
self.port = port
self.app = FastAPI(title=f"MCP: {server.name}", version=server.version)
self._setup_routes()
def _setup_routes(self):
app = self.app
server = self.server
@app.get("/health")
async def health():
return {
"status": "ok",
"server": server.name,
"version": server.version,
"tools_count": len(server.tools),
"uptime": "..."
}
@app.get("/tools")
async def list_tools():
return {"tools": server.get_tools_list()}
@app.post("/execute")
async def execute_tool(payload: dict):
tool_name = payload.get("tool")
params = payload.get("params", {})
if not tool_name:
raise HTTPException(status_code=400, detail="缺少 tool 参数")
result = await server.execute_tool(tool_name, params)
if result["status"] == "error":
raise HTTPException(status_code=400, detail=result)
return result
def start(self):
"""启动HTTP服务"""
print(f"🚀 MCP Server '{self.server.name}' 启动于 http://{self.host}:{self.port}")
uvicorn.run(self.app, host=self.host, port=self.port)
class MojoTransport:
"""Mojo传输层 - OpenClaw原生协议"""
def __init__(self, server: MCPServer, socket_path: str = "/tmp/mcp_mojo.sock"):
self.server = server
self.socket_path = socket_path
async def handle_message(self, message: dict) -> dict:
"""处理Mojo协议消息"""
msg_type = message.get("type")
payload = message.get("payload", {})
handlers = {
"discover": self._handle_discover,
"execute": self._handle_execute,
"status": self._handle_status,
"schema": self._handle_schema
}
handler = handlers.get(msg_type)
if not handler:
return {"error": f"未知消息类型: {msg_type}", "status": "error"}
return await handler(payload)
async def _handle_discover(self, _):
"""工具发现"""
return {
"server": self.server.name,
"version": self.server.version,
"tools": self.server.get_tools_list()
}
async def _handle_execute(self, payload: dict):
"""执行工具"""
return await self.server.execute_tool(
payload.get("tool"),
payload.get("params", {})
)
async def _handle_status(self, _):
"""健康检查"""
return {
"status": "running",
"tools_active": len([
t for t in self.server.tools.values()
if t.status == ToolStatus.ACTIVE
])
}
async def _handle_schema(self, payload: dict):
"""获取工具Schema"""
tool_name = payload.get("tool")
if not tool_name or tool_name not in self.server.tools:
return {"error": "工具不存在"}
return {"schema": self.server.tools[tool_name].input_schema}
# 构建一个文件处理 MCP Server 示例
from openclaw.mcp import MCPServer, ToolSpec, HTTPTransport
# 1. 创建Server实例
server = MCPServer(
name="file-processor",
version="1.0.0",
transport="http"
)
# 2. 定义工具处理函数
async def read_file(path: str, encoding: str = "utf-8") -> dict:
"""读取文件内容"""
try:
import aiofiles
async with aiofiles.open(path, mode='r', encoding=encoding) as f:
content = await f.read()
return {
"filename": path.split("/")[-1],
"size": len(content),
"content": content[:10000], # 限制返回长度
"truncated": len(content) > 10000
}
except Exception as e:
return {"error": str(e)}
async def search_files(directory: str, pattern: str = "*") -> dict:
"""搜索文件"""
import glob
files = glob.glob(f"{directory}/{pattern}", recursive=True)
return {
"directory": directory,
"pattern": pattern,
"total": len(files),
"files": files[:100] # 限制返回数量
}
async def calculate_checksum(path: str, algorithm: str = "sha256") -> dict:
"""计算文件校验和"""
import hashlib
with open(path, "rb") as f:
content = f.read()
h = hashlib.new(algorithm)
h.update(content)
return {
"file": path,
"algorithm": algorithm,
"checksum": h.hexdigest()
}
# 3. 注册工具
server.register_tool(ToolSpec(
name="read_file",
description="读取指定文件的内容",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"encoding": {"type": "string", "default": "utf-8"}
},
"required": ["path"]
},
handler=read_file
))
server.register_tool(ToolSpec(
name="search_files",
description="搜索目录中的文件",
input_schema={
"type": "object",
"properties": {
"directory": {"type": "string"},
"pattern": {"type": "string", "default": "*"}
},
"required": ["directory"]
},
handler=search_files
))
server.register_tool(ToolSpec(
name="calculate_checksum",
description="计算文件校验和",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string"},
"algorithm": {"type": "string", "enum": ["md5", "sha1", "sha256"]}
},
"required": ["path"]
},
handler=calculate_checksum
))
# 4. 启动HTTP服务
transport = HTTPTransport(server, host="0.0.0.0", port=9090)
transport.start()
# 5. 在Agent中连接使用
# config.yaml
# mcp_servers:
# file_processor:
# url: "http://localhost:9090/mcp"
# transport: http
凌晨3点,我的自定义MCP Server终于跑起来了。当我用Agent调用它时,它像听话的小弟一样准确完成每个命令。我突然明白:MCP不是协议,是Agent和世界之间的信使。每一个注册的工具,都是Agent伸向世界的一只手。