2026 年 AI Agent 生态爆发式增长,GitHub 上 AI Agent 相关项目月均新增 Star 超过 5 万。而 Model Context Protocol(MCP)作为连接 LLM 与外部工具的事实标准,已被 Claude Desktop、Cursor、Windsurf、Continue 等主流 AI 工具全面采用。然而,大多数开发者只停留在「配置 MCP Server」的层面,对客户端实现原理一无所知。本文将从零构建一个生产级 MCP 客户端,深入讲解传输层协议、工具发现机制、安全校验策略和错误处理模式。
🔧 一、MCP 协议核心架构
MCP(Model Context Protocol)是 Anthropic 于 2024 年底发布的开放协议,目标是标准化 LLM 与外部工具/数据源之间的通信方式。你可以把它理解为「AI 世界的 USB-C」——一个统一的接口标准。
📡 传输层:stdio 与 SSE 双模式
MCP 定义了两种传输方式,各有适用场景:
| 特性 | stdio 传输 | SSE 传输 |
|---|---|---|
| 通信方式 | 标准输入/输出流 | HTTP Server-Sent Events |
| 适用场景 | 本地进程、CLI 工具 | 远程服务、Web 应用 |
| 延迟 | 极低(< 1ms) | 较高(网络延迟) |
| 安全性 | 进程隔离 | 需要认证/HTTPS |
| 部署复杂度 | 低(启动子进程) | 高(需要 HTTP 服务) |
| 推荐场景 | ✅ 本地开发工具 | ✅ 生产环境远程服务 |
📌 记住: 绝大多数 MCP Server 使用 stdio 传输——Claude Desktop 启动一个子进程,通过 stdin/stdout 交换 JSON-RPC 消息。只有需要远程访问时才用 SSE。
🔌 JSON-RPC 2.0 消息格式
MCP 底层基于 JSON-RPC 2.0 协议。每条消息的结构如下:
// MCP 请求消息格式
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}
// MCP 响应消息格式
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"tools": [
{
"name": "read_file",
"description": "读取文件内容",
"inputSchema": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "文件路径" }
},
"required": ["path"]
}
}
]
}
}
关键点:每个请求有唯一 id,响应通过 id 与请求匹配。MCP 还支持通知(Notification)——没有 id 的单向消息,用于进度更新等场景。
🚀 二、从零构建 MCP 客户端
🏗️ 核心客户端类
我们先构建 MCP 客户端的骨架,实现消息收发和请求-响应匹配:
// MCP 客户端核心实现
import { ChildProcess, spawn } from 'child_process';
import { EventEmitter } from 'events';
import { randomUUID } from 'crypto';
interface JsonRpcRequest {
jsonrpc: '2.0';
id: string | number;
method: string;
params?: Record<string, unknown>;
}
interface JsonRpcResponse {
jsonrpc: '2.0';
id: string | number;
result?: unknown;
error?: { code: number; message: string; data?: unknown };
}
interface JsonRpcNotification {
jsonrpc: '2.0';
method: string;
params?: Record<string, unknown>;
}
type JsonRpcMessage = JsonRpcResponse | JsonRpcNotification;
export class McpClient extends EventEmitter {
private process: ChildProcess | null = null;
private pendingRequests = new Map<string | number, {
resolve: (value: unknown) => void;
reject: (reason: Error) => void;
timer: ReturnType<typeof setTimeout>;
}>();
private buffer = '';
private requestId = 0;
private connected = false;
constructor(
private readonly command: string,
private readonly args: string[] = [],
private readonly timeout = 30000
) {
super();
}
// 启动 MCP Server 子进程并建立连接
async connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.process = spawn(this.command, this.args, {
stdio: ['pipe', 'pipe', 'pipe'],
});
this.process.stdout?.on('data', (chunk: Buffer) => {
this.handleData(chunk.toString());
});
this.process.stderr?.on('data', (chunk: Buffer) => {
this.emit('stderr', chunk.toString());
});
this.process.on('error', (err) => {
this.emit('error', err);
reject(err);
});
this.process.on('exit', (code) => {
this.connected = false;
this.emit('exit', code);
// 拒绝所有待处理请求
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error(`Server exited with code ${code}`));
}
this.pendingRequests.clear();
});
// 发送 initialize 请求
this.sendRequest('initialize', {
protocolVersion: '2025-03-26',
capabilities: {
roots: { listChanged: true },
sampling: {},
},
clientInfo: {
name: 'custom-mcp-client',
version: '1.0.0',
},
}).then((result) => {
this.connected = true;
this.emit('connected', result);
// 发送 initialized 通知
this.sendNotification('notifications/initialized', {});
resolve();
}).catch(reject);
});
}
// 处理接收到的数据,按换行符分割 JSON-RPC 消息
private handleData(data: string): void {
this.buffer += data;
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const message: JsonRpcMessage = JSON.parse(trimmed);
this.dispatchMessage(message);
} catch (err) {
this.emit('parse_error', { raw: trimmed, error: err });
}
}
}
// 分发消息:响应走 pendingRequests,通知走 EventEmitter
private dispatchMessage(message: JsonRpcMessage): void {
if ('id' in message && message.id !== undefined) {
const pending = this.pendingRequests.get(message.id);
if (pending) {
clearTimeout(pending.timer);
this.pendingRequests.delete(message.id);
if (message.error) {
pending.reject(new Error(`[${message.error.code}] ${message.error.message}`));
} else {
pending.resolve(message.result);
}
}
} else {
const notif = message as JsonRpcNotification;
this.emit('notification', notif.method, notif.params);
}
}
// 发送请求并等待响应
sendRequest(method: string, params?: Record<string, unknown>): Promise<unknown> {
return new Promise((resolve, reject) => {
if (!this.process?.stdin?.writable) {
return reject(new Error('Server process not available'));
}
const id = ++this.requestId;
const request: JsonRpcRequest = {
jsonrpc: '2.0',
id,
method,
params,
};
const timer = setTimeout(() => {
this.pendingRequests.delete(id);
reject(new Error(`Request ${method} timed out after ${this.timeout}ms`));
}, this.timeout);
this.pendingRequests.set(id, { resolve, reject, timer });
this.process.stdin.write(JSON.stringify(request) + '\n');
});
}
// 发送单向通知(无响应)
sendNotification(method: string, params?: Record<string, unknown>): void {
if (!this.process?.stdin?.writable) return;
const notification: JsonRpcNotification = {
jsonrpc: '2.0',
method,
params,
};
this.process.stdin.write(JSON.stringify(notification) + '\n');
}
get isConnected(): boolean {
return this.connected;
}
// 优雅关闭
async disconnect(): Promise<void> {
if (this.process) {
this.process.stdin?.end();
this.process.kill();
this.process = null;
this.connected = false;
}
}
}
⚠️ 警告: 生产环境中必须实现超时机制。没有超时的请求会永远挂起,当 MCP Server 崩溃或死锁时,你的客户端会无限等待。
🔍 工具发现与调用
连接建立后,客户端需要先发现可用工具,然后才能调用:
// 工具发现与调用封装
interface McpTool {
name: string;
description: string;
inputSchema: {
type: 'object';
properties: Record<string, {
type: string;
description?: string;
enum?: string[];
default?: unknown;
}>;
required?: string[];
};
}
interface ToolCallResult {
content: Array<{
type: 'text' | 'image' | 'resource';
text?: string;
data?: string;
mimeType?: string;
}>;
isError?: boolean;
}
export class McpToolManager {
private tools: McpTool[] = [];
private toolMap = new Map<string, McpTool>();
constructor(private readonly client: McpClient) {}
// 从 MCP Server 获取工具列表
async discoverTools(): Promise<McpTool[]> {
const result = await this.client.sendRequest('tools/list', {}) as {
tools: McpTool[];
};
this.tools = result.tools;
this.toolMap.clear();
for (const tool of this.tools) {
this.toolMap.set(tool.name, tool);
}
return this.tools;
}
// 校验工具调用参数
validateParams(toolName: string, params: Record<string, unknown>): void {
const tool = this.toolMap.get(toolName);
if (!tool) {
throw new Error(`Unknown tool: ${toolName}`);
}
// 检查必填参数
const required = tool.inputSchema.required || [];
for (const field of required) {
if (params[field] === undefined || params[field] === null) {
throw new Error(`Missing required parameter: ${field}`);
}
}
// 检查未知参数
const knownKeys = new Set(Object.keys(tool.inputSchema.properties || {}));
for (const key of Object.keys(params)) {
if (!knownKeys.has(key)) {
console.warn(`⚠️ Unknown parameter "${key}" for tool "${toolName}"`);
}
}
}
// 调用工具
async callTool(
name: string,
args: Record<string, unknown>
): Promise<ToolCallResult> {
this.validateParams(name, args);
const result = await this.client.sendRequest('tools/call', {
name,
arguments: args,
}) as ToolCallResult;
return result;
}
// 将工具列表转换为 LLM Function Calling 格式
toFunctionDefinitions(): Array<{
name: string;
description: string;
parameters: Record<string, unknown>;
}> {
return this.tools.map((tool) => ({
name: tool.name,
description: tool.description,
parameters: tool.inputSchema,
}));
}
get availableTools(): McpTool[] {
return [...this.tools];
}
}
💡 提示:
toFunctionDefinitions()方法非常重要——它将 MCP 工具格式转换为 OpenAI/Claude 的 Function Calling 格式,让你可以无缝将 MCP 工具注入到 LLM 对话中。
🛡️ 安全校验层
MCP 客户端的安全校验常被忽略,但它是生产环境的生命线。以下是必须实现的安全措施:
// MCP 客户端安全校验
import { resolve, normalize, isAbsolute } from 'path';
interface SecurityConfig {
// 允许调用的工具白名单(为空则允许所有)
allowedTools?: string[];
// 禁止调用的工具黑名单
blockedTools?: string[];
// 文件操作的沙箱目录
sandboxDir?: string;
// 最大参数大小(字节)
maxParamSize?: number;
// 最大响应大小(字节)
maxResponseSize?: number;
// 危险参数模式检测
dangerousPatterns?: RegExp[];
}
export class McpSecurityGuard {
private config: Required<SecurityConfig>;
constructor(config: SecurityConfig = {}) {
this.config = {
allowedTools: config.allowedTools || [],
blockedTools: config.blockedTools || [
'rm', 'delete', 'exec', 'eval', 'shell',
],
sandboxDir: config.sandboxDir
? normalize(resolve(config.sandboxDir))
: '',
maxParamSize: config.maxParamSize || 1024 * 1024, // 1MB
maxResponseSize: config.maxResponseSize || 10 * 1024 * 1024, // 10MB
dangerousPatterns: config.dangerousPatterns || [
/\.\.\//, // 路径遍历
/[;&|`$]/, // Shell 注入
/__proto__/, // 原型链污染
/constructor\[/, // 原型链攻击
],
};
}
// 校验工具调用是否安全
check(toolName: string, args: Record<string, unknown>): void {
// 1. 白名单检查
if (this.config.allowedTools.length > 0 &&
!this.config.allowedTools.includes(toolName)) {
throw new Error(`⛔ Tool "${toolName}" is not in the allowed list`);
}
// 2. 黑名单检查
if (this.config.blockedTools.includes(toolName)) {
throw new Error(`⛔ Tool "${toolName}" is blocked`);
}
// 3. 参数大小检查
const paramStr = JSON.stringify(args);
if (paramStr.length > this.config.maxParamSize) {
throw new Error(
`⛔ Parameters exceed max size: ${paramStr.length} > ${this.config.maxParamSize}`
);
}
// 4. 递归检查参数值
this.deepCheck(args, toolName);
}
// 递归检查参数中的危险模式
private deepCheck(obj: unknown, context: string): void {
if (typeof obj === 'string') {
for (const pattern of this.config.dangerousPatterns) {
if (pattern.test(obj)) {
throw new Error(
`⛔ Dangerous pattern detected in "${context}": ${pattern}`
);
}
}
// 文件路径沙箱检查
if (this.config.sandboxDir && this.looksLikePath(obj)) {
const resolved = normalize(resolve(obj));
if (!resolved.startsWith(this.config.sandboxDir)) {
throw new Error(
`⛔ Path "${obj}" escapes sandbox directory`
);
}
}
} else if (Array.isArray(obj)) {
for (const item of obj) this.deepCheck(item, context);
} else if (obj && typeof obj === 'object') {
// 防止原型链污染
if ('__proto__' in (obj as Record<string, unknown>) ||
'constructor' in (obj as Record<string, unknown>)) {
throw new Error(`⛔ Prototype pollution attempt detected`);
}
for (const [key, value] of Object.entries(obj as Record<string, unknown>)) {
this.deepCheck(value, `${context}.${key}`);
}
}
}
// 启发式判断是否是文件路径
private looksLikePath(str: string): boolean {
return /^(\.{0,2}\/|~\/|[A-Z]:\\)/.test(str) || str.includes('/');
}
// 检查响应大小
checkResponse(result: unknown): void {
const size = JSON.stringify(result).length;
if (size > this.config.maxResponseSize) {
throw new Error(
`⛔ Response exceeds max size: ${size} > ${this.config.maxResponseSize}`
);
}
}
}
⚠️ 警告: 永远不要信任 MCP Server 返回的数据。恶意或被入侵的 MCP Server 可以返回包含提示注入(Prompt Injection)的文本,在 LLM 处理响应时劫持行为。务必对响应内容做清洗。
💡 三、生产环境实战模式
🔄 完整集成示例
将上述组件组合起来,构建一个完整的 MCP 客户端应用:
// 完整的 MCP 客户端集成示例
async function main() {
// 1. 创建客户端(以 filesystem MCP server 为例)
const client = new McpClient('npx', [
'-y', '@modelcontextprotocol/server-filesystem', '/tmp/sandbox',
]);
// 2. 配置安全策略
const security = new McpSecurityGuard({
sandboxDir: '/tmp/sandbox',
blockedTools: ['exec', 'shell', 'eval'],
maxParamSize: 512 * 1024,
dangerousPatterns: [
/\.\.\//, // 路径遍历
/[;&|`$]/, // Shell 注入
/__proto__/, // 原型链污染
],
});
// 3. 监听事件
client.on('connected', (info) => {
console.log('✅ Connected to MCP Server:', info);
});
client.on('stderr', (data) => {
console.error('⚠️ Server stderr:', data.trim());
});
client.on('notification', (method, params) => {
console.log('📢 Notification:', method, params);
});
try {
// 4. 连接并发现工具
await client.connect();
const toolManager = new McpToolManager(client);
const tools = await toolManager.discoverTools();
console.log(`🔧 Discovered ${tools.length} tools:`);
for (const tool of tools) {
console.log(` - ${tool.name}: ${tool.description}`);
}
// 5. 安全调用工具
security.check('read_file', { path: '/tmp/sandbox/hello.txt' });
const result = await toolManager.callTool('read_file', {
path: '/tmp/sandbox/hello.txt',
});
security.checkResponse(result);
console.log('📄 File content:', result);
// 6. 转换为 LLM Function Calling 格式
const functions = toolManager.toFunctionDefinitions();
console.log('🤖 LLM functions:', JSON.stringify(functions, null, 2));
} finally {
await client.disconnect();
console.log('👋 Disconnected');
}
}
main().catch(console.error);
⚡ 错误处理与重连策略
生产环境中,MCP Server 可能随时崩溃。以下是健壮的重连实现:
// MCP 客户端自动重连机制
export class ResilientMcpClient {
private client: McpClient;
private retryCount = 0;
private maxRetries = 5;
private baseDelay = 1000; // 1 秒
constructor(
private readonly command: string,
private readonly args: string[] = [],
) {
this.client = new McpClient(command, args);
this.setupAutoReconnect();
}
private setupAutoReconnect(): void {
this.client.on('exit', async (code) => {
if (code !== 0 && this.retryCount < this.maxRetries) {
const delay = this.baseDelay * Math.pow(2, this.retryCount);
this.retryCount++;
console.log(
`🔄 Server exited (${code}), ` +
`reconnecting in ${delay}ms ` +
`(attempt ${this.retryCount}/${this.maxRetries})`
);
await new Promise((r) => setTimeout(r, delay));
try {
await this.client.connect();
this.retryCount = 0;
console.log('✅ Reconnected successfully');
} catch (err) {
console.error('❌ Reconnect failed:', err);
}
}
});
}
async connect(): Promise<void> {
await this.client.connect();
this.retryCount = 0;
}
async sendRequest(method: string, params?: Record<string, unknown>) {
if (!this.client.isConnected) {
throw new Error('Client not connected, wait for reconnection');
}
return this.client.sendRequest(method, params);
}
async disconnect(): Promise<void> {
this.maxRetries = 0; // 阻止自动重连
await this.client.disconnect();
}
}
⚡ 关键结论: 指数退避(Exponential Backoff)是重连策略的黄金标准。首次重连 1 秒,第二次 2 秒,第三次 4 秒——避免在网络故障时用密集请求淹没服务器。
📊 性能优化:批量请求与并发控制
当需要调用多个工具时,串行执行效率低下。以下是并发控制模式:
// MCP 并发工具调用(带并发限制)
export async function batchCallTools(
toolManager: McpToolManager,
security: McpSecurityGuard,
calls: Array<{ name: string; args: Record<string, unknown> }>,
concurrency = 3,
): Promise<Array<{ success: boolean; result?: unknown; error?: string }>> {
const results: Array<{ success: boolean; result?: unknown; error?: string }> =
new Array(calls.length);
let index = 0;
// 工作协程
async function worker() {
while (index < calls.length) {
const i = index++;
const { name, args } = calls[i];
try {
security.check(name, args);
const result = await toolManager.callTool(name, args);
security.checkResponse(result);
results[i] = { success: true, result };
} catch (err) {
results[i] = { success: false, error: (err as Error).message };
}
}
}
// 启动 N 个并发工作协程
const workers = Array.from(
{ length: Math.min(concurrency, calls.length) },
() => worker()
);
await Promise.all(workers);
return results;
}
使用示例:
// 批量调用示例
const calls = [
{ name: 'read_file', args: { path: '/tmp/sandbox/a.txt' } },
{ name: 'read_file', args: { path: '/tmp/sandbox/b.txt' } },
{ name: 'read_file', args: { path: '/tmp/sandbox/c.txt' } },
{ name: 'list_directory', args: { path: '/tmp/sandbox' } },
];
const results = await batchCallTools(toolManager, security, calls, 3);
// 3 个并发,4 个任务,总耗时约等于最慢的 2 个任务之和
🎯 四、避坑指南与最佳实践
经过在生产环境中踩过的坑,以下是必须注意的关键点:
❌ 常见错误
- ❌ 不做超时控制 — Server 崩溃后客户端永远挂起
- ❌ 信任 Server 返回数据 — 可能包含 Prompt Injection 攻击
- ❌ 不做参数校验 — 恶意参数可导致路径遍历或注入攻击
- ❌ 直接拼接 Shell 命令 — MCP 参数中可能包含
;rm -rf / - ❌ 忽略 stderr 输出 — Server 的警告和错误信息会被静默丢弃
✅ 推荐做法
- ✅ 每个请求设置超时 — 30 秒是合理默认值
- ✅ 实现指数退避重连 — 至少支持 3-5 次重试
- ✅ 对所有文件路径做沙箱限制 — 防止路径遍历攻击
- ✅ 限制响应大小 — 防止 Server 返回超大 payload 导致 OOM
- ✅ 记录所有工具调用日志 — 便于审计和调试
- ✅ 使用白名单而非黑名单 — 白名单更安全,黑名单容易遗漏
🔐 安全检查清单
| 检查项 | 必要性 | 说明 |
|---|---|---|
| 工具白名单 | ✅ 必须 | 只允许已知安全的工具 |
| 参数大小限制 | ✅ 必须 | 防止 DoS 攻击 |
| 路径沙箱 | ✅ 必须 | 文件操作必须限制在指定目录 |
| Shell 注入检测 | ✅ 必须 | 检测 `;& |
| 响应大小限制 | ✅ 必须 | 防止 OOM |
| Prompt Injection 清洗 | ⚠️ 推荐 | 清洗 LLM 响应中的恶意指令 |
| 调用频率限制 | ⚠️ 推荐 | 防止工具被滥用 |
| TLS 证书校验(SSE) | ✅ 必须 | 远程传输必须验证证书 |
📝 总结
MCP 客户端开发的核心不在于协议本身——JSON-RPC 2.0 很简单——而在于工程化处理:安全校验、错误恢复、性能优化。本文实现的客户端涵盖了生产环境所需的关键能力:
- 传输层:stdio 模式的进程管理和消息帧解析
- 工具发现:自动获取 Server 能力并转换为 LLM 格式
- 安全层:路径沙箱、注入检测、原型链防护
- 韧性:指数退避重连、超时控制、批量并发
💡 提示: 如果你不想从零实现,可以直接使用官方 SDK
@modelcontextprotocol/sdk的 Client 类。但理解底层原理对排查问题至关重要——当 Claude Desktop 显示「MCP Server disconnected」时,你知道该看什么。
相关工具推荐:
- 🔧 MCP Inspector — 官方 MCP 调试工具
- 📚 MCP 规范 — 协议完整规范
- 🛠️ mcp-cli — 命令行 MCP 客户端
- 🔐 mcp-proxy — MCP SSE 代理,支持认证