🔌 OpenClaw MCP 服务端开发指南

MCP Server 自定义开发 Mojo协议 OpenClaw

世界上有两种MCP Server——一种是现成的,一种是你自己写的。凌晨1点42分,当我发现官方仓库里没有我需要的那个API集成时,我决定自己造一个轮子——一个有MCP协议的轮子。

MCP (Model-Controller-Protocol) 是OpenClaw Agent与外部服务通信的标准协议。本教程将从零教你如何构建自定义MCP Server,支持多种协议实现和工具注册。

🎯 MCP Server架构概览

┌──────────────────────────────────────────────────────────┐
│                   MCP Server 架构                        │
├──────────────────────────────────────────────────────────┤
│                                                          │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐         │
│  │  Transport │  │   Protocol  │  │   Tools    │         │
│  │    Layer   │──│    Layer    │──│   Layer    │         │
│  ├────────────┤  ├────────────┤  ├────────────┤         │
│  │  · HTTP    │  │  · Mojo    │  │  · Tool A  │         │
│  │  · SSE     │  │  · gRPC    │  │  · Tool B  │         │
│  │  · WebSocket│  │            │  │  · Tool C  │         │
│  └────────────┘  └────────────┘  └────────────┘         │
│                                                          │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐         │
│  │   Auth     │  │  Monitor   │  │  Registry  │         │
│  │  Middleware│  │  Metrics   │  │   Service   │         │
│  └────────────┘  └────────────┘  └────────────┘         │
└──────────────────────────────────────────────────────────┘

📦 核心实现

1. 基础MCP Server结构

import json
import asyncio
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum

class TransportType(Enum):
    HTTP = "http"
    SSE = "sse"
    WEBSOCKET = "websocket"
    MOJO = "mojo"

class ToolStatus(Enum):
    ACTIVE = "active"
    DISABLED = "disabled"
    DEPRECATED = "deprecated"

@dataclass
class ToolSpec:
    """工具规格定义"""
    name: str
    description: str
    input_schema: Dict  # JSON Schema
    handler: Callable
    category: str = "general"
    status: ToolStatus = ToolStatus.ACTIVE
    timeout: int = 30  # 秒

class MCPServer:
    """MCP Server基类"""
    
    def __init__(self, name: str, version: str = "1.0.0", transport: TransportType = TransportType.MOJO):
        self.name = name
        self.version = version
        self.transport = transport
        self.tools: Dict[str, ToolSpec] = {}
        self.middleware = []
        self.start_time = None
    
    def register_tool(self, tool: ToolSpec):
        """注册工具"""
        if tool.name in self.tools:
            print(f"⚠️ 工具 {tool.name} 已存在,覆盖注册")
        self.tools[tool.name] = tool
        print(f"✅ 工具 {tool.name} 注册成功")
    
    def register_middleware(self, fn: Callable):
        """注册中间件"""
        self.middleware.append(fn)
    
    def get_tools_list(self) -> list:
        """获取可用工具列表"""
        return [
            {
                "name": spec.name,
                "description": spec.description,
                "input_schema": spec.input_schema,
                "category": spec.category,
                "status": spec.status.value
            }
            for spec in self.tools.values()
            if spec.status == ToolStatus.ACTIVE
        ]

    async def execute_tool(self, tool_name: str, params: Dict) -> Dict:
        """执行工具"""
        if tool_name not in self.tools:
            return {"error": f"工具 {tool_name} 不存在", "status": "error"}
        
        spec = self.tools[tool_name]
        if spec.status != ToolStatus.ACTIVE:
            return {"error": f"工具 {tool_name} 状态: {spec.status.value}", "status": "error"}
        
        # 运行中间件链
        context = {"tool": tool_name, "params": params}
        for mw in self.middleware:
            context = await mw(context)
            if context.get("abort"):
                return {"error": context.get("reason", "被中间件拒绝"), "status": "blocked"}
        
        # 执行工具
        try:
            result = await asyncio.wait_for(
                spec.handler(**params),
                timeout=spec.timeout
            )
            return {"result": result, "status": "success", "tool": tool_name}
        except asyncio.TimeoutError:
            return {"error": f"工具 {tool_name} 执行超时({spec.timeout}s)", "status": "timeout"}
        except Exception as e:
            return {"error": str(e), "status": "error"}

2. HTTP Transport实现

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

class HTTPTransport:
    """HTTP传输层 - 基于FastAPI"""
    
    def __init__(self, server: MCPServer, host: str = "0.0.0.0", port: int = 8080):
        self.server = server
        self.host = host
        self.port = port
        self.app = FastAPI(title=f"MCP: {server.name}", version=server.version)
        self._setup_routes()
    
    def _setup_routes(self):
        app = self.app
        server = self.server
        
        @app.get("/health")
        async def health():
            return {
                "status": "ok",
                "server": server.name,
                "version": server.version,
                "tools_count": len(server.tools),
                "uptime": "..." 
            }
        
        @app.get("/tools")
        async def list_tools():
            return {"tools": server.get_tools_list()}
        
        @app.post("/execute")
        async def execute_tool(payload: dict):
            tool_name = payload.get("tool")
            params = payload.get("params", {})
            
            if not tool_name:
                raise HTTPException(status_code=400, detail="缺少 tool 参数")
            
            result = await server.execute_tool(tool_name, params)
            if result["status"] == "error":
                raise HTTPException(status_code=400, detail=result)
            return result
    
    def start(self):
        """启动HTTP服务"""
        print(f"🚀 MCP Server '{self.server.name}' 启动于 http://{self.host}:{self.port}")
        uvicorn.run(self.app, host=self.host, port=self.port)

3. Mojo协议实现(OpenClaw原生)

class MojoTransport:
    """Mojo传输层 - OpenClaw原生协议"""
    
    def __init__(self, server: MCPServer, socket_path: str = "/tmp/mcp_mojo.sock"):
        self.server = server
        self.socket_path = socket_path
    
    async def handle_message(self, message: dict) -> dict:
        """处理Mojo协议消息"""
        msg_type = message.get("type")
        payload = message.get("payload", {})
        
        handlers = {
            "discover": self._handle_discover,
            "execute": self._handle_execute,
            "status": self._handle_status,
            "schema": self._handle_schema
        }
        
        handler = handlers.get(msg_type)
        if not handler:
            return {"error": f"未知消息类型: {msg_type}", "status": "error"}
        
        return await handler(payload)
    
    async def _handle_discover(self, _):
        """工具发现"""
        return {
            "server": self.server.name,
            "version": self.server.version,
            "tools": self.server.get_tools_list()
        }
    
    async def _handle_execute(self, payload: dict):
        """执行工具"""
        return await self.server.execute_tool(
            payload.get("tool"),
            payload.get("params", {})
        )
    
    async def _handle_status(self, _):
        """健康检查"""
        return {
            "status": "running",
            "tools_active": len([
                t for t in self.server.tools.values()
                if t.status == ToolStatus.ACTIVE
            ])
        }
    
    async def _handle_schema(self, payload: dict):
        """获取工具Schema"""
        tool_name = payload.get("tool")
        if not tool_name or tool_name not in self.server.tools:
            return {"error": "工具不存在"}
        return {"schema": self.server.tools[tool_name].input_schema}

⚙️ 实战:构建自定义 MCP Server

# 构建一个文件处理 MCP Server 示例
from openclaw.mcp import MCPServer, ToolSpec, HTTPTransport

# 1. 创建Server实例
server = MCPServer(
    name="file-processor",
    version="1.0.0",
    transport="http"
)

# 2. 定义工具处理函数
async def read_file(path: str, encoding: str = "utf-8") -> dict:
    """读取文件内容"""
    try:
        import aiofiles
        async with aiofiles.open(path, mode='r', encoding=encoding) as f:
            content = await f.read()
        return {
            "filename": path.split("/")[-1],
            "size": len(content),
            "content": content[:10000],  # 限制返回长度
            "truncated": len(content) > 10000
        }
    except Exception as e:
        return {"error": str(e)}

async def search_files(directory: str, pattern: str = "*") -> dict:
    """搜索文件"""
    import glob
    files = glob.glob(f"{directory}/{pattern}", recursive=True)
    return {
        "directory": directory,
        "pattern": pattern,
        "total": len(files),
        "files": files[:100]  # 限制返回数量
    }

async def calculate_checksum(path: str, algorithm: str = "sha256") -> dict:
    """计算文件校验和"""
    import hashlib
    with open(path, "rb") as f:
        content = f.read()
    h = hashlib.new(algorithm)
    h.update(content)
    return {
        "file": path,
        "algorithm": algorithm,
        "checksum": h.hexdigest()
    }

# 3. 注册工具
server.register_tool(ToolSpec(
    name="read_file",
    description="读取指定文件的内容",
    input_schema={
        "type": "object",
        "properties": {
            "path": {"type": "string", "description": "文件路径"},
            "encoding": {"type": "string", "default": "utf-8"}
        },
        "required": ["path"]
    },
    handler=read_file
))

server.register_tool(ToolSpec(
    name="search_files",
    description="搜索目录中的文件",
    input_schema={
        "type": "object",
        "properties": {
            "directory": {"type": "string"},
            "pattern": {"type": "string", "default": "*"}
        },
        "required": ["directory"]
    },
    handler=search_files
))

server.register_tool(ToolSpec(
    name="calculate_checksum",
    description="计算文件校验和",
    input_schema={
        "type": "object",
        "properties": {
            "path": {"type": "string"},
            "algorithm": {"type": "string", "enum": ["md5", "sha1", "sha256"]}
        },
        "required": ["path"]
    },
    handler=calculate_checksum
))

# 4. 启动HTTP服务
transport = HTTPTransport(server, host="0.0.0.0", port=9090)
transport.start()

# 5. 在Agent中连接使用
# config.yaml
# mcp_servers:
#   file_processor:
#     url: "http://localhost:9090/mcp"
#     transport: http

🏆 最佳实践

✅ 最佳实践1:工具设计

✅ 最佳实践2:安全防护

⚠️ 常见陷阱

🔗 相关链接

凌晨3点,我的自定义MCP Server终于跑起来了。当我用Agent调用它时,它像听话的小弟一样准确完成每个命令。我突然明白:MCP不是协议,是Agent和世界之间的信使。每一个注册的工具,都是Agent伸向世界的一只手。