MCP 客户端开发实战:用 TypeScript 构建 AI Agent 工具调用引擎

深入讲解 Model Context Protocol 客户端实现原理,涵盖 stdio/SSE 传输层、工具发现、安全校验、错误处理等核心机制,附完整可运行 TypeScript 代码。

前端开发 2026-06-05 15 分钟

2026 年 AI Agent 生态爆发式增长,GitHub 上 AI Agent 相关项目月均新增 Star 超过 5 万。而 Model Context Protocol(MCP)作为连接 LLM 与外部工具的事实标准,已被 Claude Desktop、Cursor、Windsurf、Continue 等主流 AI 工具全面采用。然而,大多数开发者只停留在「配置 MCP Server」的层面,对客户端实现原理一无所知。本文将从零构建一个生产级 MCP 客户端,深入讲解传输层协议、工具发现机制、安全校验策略和错误处理模式。

🔧 一、MCP 协议核心架构

MCP(Model Context Protocol)是 Anthropic 于 2024 年底发布的开放协议,目标是标准化 LLM 与外部工具/数据源之间的通信方式。你可以把它理解为「AI 世界的 USB-C」——一个统一的接口标准。

📡 传输层:stdio 与 SSE 双模式

MCP 定义了两种传输方式,各有适用场景:

特性 stdio 传输 SSE 传输
通信方式 标准输入/输出流 HTTP Server-Sent Events
适用场景 本地进程、CLI 工具 远程服务、Web 应用
延迟 极低(< 1ms) 较高(网络延迟)
安全性 进程隔离 需要认证/HTTPS
部署复杂度 低(启动子进程) 高(需要 HTTP 服务)
推荐场景 ✅ 本地开发工具 ✅ 生产环境远程服务

📌 记住: 绝大多数 MCP Server 使用 stdio 传输——Claude Desktop 启动一个子进程,通过 stdin/stdout 交换 JSON-RPC 消息。只有需要远程访问时才用 SSE。

🔌 JSON-RPC 2.0 消息格式

MCP 底层基于 JSON-RPC 2.0 协议。每条消息的结构如下:

// MCP 请求消息格式
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/list",
  "params": {}
}

// MCP 响应消息格式
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "tools": [
      {
        "name": "read_file",
        "description": "读取文件内容",
        "inputSchema": {
          "type": "object",
          "properties": {
            "path": { "type": "string", "description": "文件路径" }
          },
          "required": ["path"]
        }
      }
    ]
  }
}

关键点:每个请求有唯一 id,响应通过 id 与请求匹配。MCP 还支持通知(Notification)——没有 id 的单向消息,用于进度更新等场景。

🚀 二、从零构建 MCP 客户端

🏗️ 核心客户端类

我们先构建 MCP 客户端的骨架,实现消息收发和请求-响应匹配:

// MCP 客户端核心实现
import { ChildProcess, spawn } from 'child_process';
import { EventEmitter } from 'events';
import { randomUUID } from 'crypto';

interface JsonRpcRequest {
  jsonrpc: '2.0';
  id: string | number;
  method: string;
  params?: Record<string, unknown>;
}

interface JsonRpcResponse {
  jsonrpc: '2.0';
  id: string | number;
  result?: unknown;
  error?: { code: number; message: string; data?: unknown };
}

interface JsonRpcNotification {
  jsonrpc: '2.0';
  method: string;
  params?: Record<string, unknown>;
}

type JsonRpcMessage = JsonRpcResponse | JsonRpcNotification;

export class McpClient extends EventEmitter {
  private process: ChildProcess | null = null;
  private pendingRequests = new Map<string | number, {
    resolve: (value: unknown) => void;
    reject: (reason: Error) => void;
    timer: ReturnType<typeof setTimeout>;
  }>();
  private buffer = '';
  private requestId = 0;
  private connected = false;

  constructor(
    private readonly command: string,
    private readonly args: string[] = [],
    private readonly timeout = 30000
  ) {
    super();
  }

  // 启动 MCP Server 子进程并建立连接
  async connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.process = spawn(this.command, this.args, {
        stdio: ['pipe', 'pipe', 'pipe'],
      });

      this.process.stdout?.on('data', (chunk: Buffer) => {
        this.handleData(chunk.toString());
      });

      this.process.stderr?.on('data', (chunk: Buffer) => {
        this.emit('stderr', chunk.toString());
      });

      this.process.on('error', (err) => {
        this.emit('error', err);
        reject(err);
      });

      this.process.on('exit', (code) => {
        this.connected = false;
        this.emit('exit', code);
        // 拒绝所有待处理请求
        for (const [, pending] of this.pendingRequests) {
          clearTimeout(pending.timer);
          pending.reject(new Error(`Server exited with code ${code}`));
        }
        this.pendingRequests.clear();
      });

      // 发送 initialize 请求
      this.sendRequest('initialize', {
        protocolVersion: '2025-03-26',
        capabilities: {
          roots: { listChanged: true },
          sampling: {},
        },
        clientInfo: {
          name: 'custom-mcp-client',
          version: '1.0.0',
        },
      }).then((result) => {
        this.connected = true;
        this.emit('connected', result);
        // 发送 initialized 通知
        this.sendNotification('notifications/initialized', {});
        resolve();
      }).catch(reject);
    });
  }

  // 处理接收到的数据,按换行符分割 JSON-RPC 消息
  private handleData(data: string): void {
    this.buffer += data;
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop() || '';

    for (const line of lines) {
      const trimmed = line.trim();
      if (!trimmed) continue;
      try {
        const message: JsonRpcMessage = JSON.parse(trimmed);
        this.dispatchMessage(message);
      } catch (err) {
        this.emit('parse_error', { raw: trimmed, error: err });
      }
    }
  }

  // 分发消息:响应走 pendingRequests,通知走 EventEmitter
  private dispatchMessage(message: JsonRpcMessage): void {
    if ('id' in message && message.id !== undefined) {
      const pending = this.pendingRequests.get(message.id);
      if (pending) {
        clearTimeout(pending.timer);
        this.pendingRequests.delete(message.id);
        if (message.error) {
          pending.reject(new Error(`[${message.error.code}] ${message.error.message}`));
        } else {
          pending.resolve(message.result);
        }
      }
    } else {
      const notif = message as JsonRpcNotification;
      this.emit('notification', notif.method, notif.params);
    }
  }

  // 发送请求并等待响应
  sendRequest(method: string, params?: Record<string, unknown>): Promise<unknown> {
    return new Promise((resolve, reject) => {
      if (!this.process?.stdin?.writable) {
        return reject(new Error('Server process not available'));
      }
      const id = ++this.requestId;
      const request: JsonRpcRequest = {
        jsonrpc: '2.0',
        id,
        method,
        params,
      };
      const timer = setTimeout(() => {
        this.pendingRequests.delete(id);
        reject(new Error(`Request ${method} timed out after ${this.timeout}ms`));
      }, this.timeout);

      this.pendingRequests.set(id, { resolve, reject, timer });
      this.process.stdin.write(JSON.stringify(request) + '\n');
    });
  }

  // 发送单向通知(无响应)
  sendNotification(method: string, params?: Record<string, unknown>): void {
    if (!this.process?.stdin?.writable) return;
    const notification: JsonRpcNotification = {
      jsonrpc: '2.0',
      method,
      params,
    };
    this.process.stdin.write(JSON.stringify(notification) + '\n');
  }

  get isConnected(): boolean {
    return this.connected;
  }

  // 优雅关闭
  async disconnect(): Promise<void> {
    if (this.process) {
      this.process.stdin?.end();
      this.process.kill();
      this.process = null;
      this.connected = false;
    }
  }
}

⚠️ 警告: 生产环境中必须实现超时机制。没有超时的请求会永远挂起,当 MCP Server 崩溃或死锁时,你的客户端会无限等待。

🔍 工具发现与调用

连接建立后,客户端需要先发现可用工具,然后才能调用:

// 工具发现与调用封装
interface McpTool {
  name: string;
  description: string;
  inputSchema: {
    type: 'object';
    properties: Record<string, {
      type: string;
      description?: string;
      enum?: string[];
      default?: unknown;
    }>;
    required?: string[];
  };
}

interface ToolCallResult {
  content: Array<{
    type: 'text' | 'image' | 'resource';
    text?: string;
    data?: string;
    mimeType?: string;
  }>;
  isError?: boolean;
}

export class McpToolManager {
  private tools: McpTool[] = [];
  private toolMap = new Map<string, McpTool>();

  constructor(private readonly client: McpClient) {}

  // 从 MCP Server 获取工具列表
  async discoverTools(): Promise<McpTool[]> {
    const result = await this.client.sendRequest('tools/list', {}) as {
      tools: McpTool[];
    };
    this.tools = result.tools;
    this.toolMap.clear();
    for (const tool of this.tools) {
      this.toolMap.set(tool.name, tool);
    }
    return this.tools;
  }

  // 校验工具调用参数
  validateParams(toolName: string, params: Record<string, unknown>): void {
    const tool = this.toolMap.get(toolName);
    if (!tool) {
      throw new Error(`Unknown tool: ${toolName}`);
    }
    // 检查必填参数
    const required = tool.inputSchema.required || [];
    for (const field of required) {
      if (params[field] === undefined || params[field] === null) {
        throw new Error(`Missing required parameter: ${field}`);
      }
    }
    // 检查未知参数
    const knownKeys = new Set(Object.keys(tool.inputSchema.properties || {}));
    for (const key of Object.keys(params)) {
      if (!knownKeys.has(key)) {
        console.warn(`⚠️ Unknown parameter "${key}" for tool "${toolName}"`);
      }
    }
  }

  // 调用工具
  async callTool(
    name: string,
    args: Record<string, unknown>
  ): Promise<ToolCallResult> {
    this.validateParams(name, args);
    const result = await this.client.sendRequest('tools/call', {
      name,
      arguments: args,
    }) as ToolCallResult;
    return result;
  }

  // 将工具列表转换为 LLM Function Calling 格式
  toFunctionDefinitions(): Array<{
    name: string;
    description: string;
    parameters: Record<string, unknown>;
  }> {
    return this.tools.map((tool) => ({
      name: tool.name,
      description: tool.description,
      parameters: tool.inputSchema,
    }));
  }

  get availableTools(): McpTool[] {
    return [...this.tools];
  }
}

💡 提示: toFunctionDefinitions() 方法非常重要——它将 MCP 工具格式转换为 OpenAI/Claude 的 Function Calling 格式,让你可以无缝将 MCP 工具注入到 LLM 对话中。

🛡️ 安全校验层

MCP 客户端的安全校验常被忽略,但它是生产环境的生命线。以下是必须实现的安全措施:

// MCP 客户端安全校验
import { resolve, normalize, isAbsolute } from 'path';

interface SecurityConfig {
  // 允许调用的工具白名单(为空则允许所有)
  allowedTools?: string[];
  // 禁止调用的工具黑名单
  blockedTools?: string[];
  // 文件操作的沙箱目录
  sandboxDir?: string;
  // 最大参数大小(字节)
  maxParamSize?: number;
  // 最大响应大小(字节)
  maxResponseSize?: number;
  // 危险参数模式检测
  dangerousPatterns?: RegExp[];
}

export class McpSecurityGuard {
  private config: Required<SecurityConfig>;

  constructor(config: SecurityConfig = {}) {
    this.config = {
      allowedTools: config.allowedTools || [],
      blockedTools: config.blockedTools || [
        'rm', 'delete', 'exec', 'eval', 'shell',
      ],
      sandboxDir: config.sandboxDir
        ? normalize(resolve(config.sandboxDir))
        : '',
      maxParamSize: config.maxParamSize || 1024 * 1024, // 1MB
      maxResponseSize: config.maxResponseSize || 10 * 1024 * 1024, // 10MB
      dangerousPatterns: config.dangerousPatterns || [
        /\.\.\//,           // 路径遍历
        /[;&|`$]/,          // Shell 注入
        /__proto__/,         // 原型链污染
        /constructor\[/,     // 原型链攻击
      ],
    };
  }

  // 校验工具调用是否安全
  check(toolName: string, args: Record<string, unknown>): void {
    // 1. 白名单检查
    if (this.config.allowedTools.length > 0 &&
        !this.config.allowedTools.includes(toolName)) {
      throw new Error(`⛔ Tool "${toolName}" is not in the allowed list`);
    }

    // 2. 黑名单检查
    if (this.config.blockedTools.includes(toolName)) {
      throw new Error(`⛔ Tool "${toolName}" is blocked`);
    }

    // 3. 参数大小检查
    const paramStr = JSON.stringify(args);
    if (paramStr.length > this.config.maxParamSize) {
      throw new Error(
        `⛔ Parameters exceed max size: ${paramStr.length} > ${this.config.maxParamSize}`
      );
    }

    // 4. 递归检查参数值
    this.deepCheck(args, toolName);
  }

  // 递归检查参数中的危险模式
  private deepCheck(obj: unknown, context: string): void {
    if (typeof obj === 'string') {
      for (const pattern of this.config.dangerousPatterns) {
        if (pattern.test(obj)) {
          throw new Error(
            `⛔ Dangerous pattern detected in "${context}": ${pattern}`
          );
        }
      }
      // 文件路径沙箱检查
      if (this.config.sandboxDir && this.looksLikePath(obj)) {
        const resolved = normalize(resolve(obj));
        if (!resolved.startsWith(this.config.sandboxDir)) {
          throw new Error(
            `⛔ Path "${obj}" escapes sandbox directory`
          );
        }
      }
    } else if (Array.isArray(obj)) {
      for (const item of obj) this.deepCheck(item, context);
    } else if (obj && typeof obj === 'object') {
      // 防止原型链污染
      if ('__proto__' in (obj as Record<string, unknown>) ||
          'constructor' in (obj as Record<string, unknown>)) {
        throw new Error(`⛔ Prototype pollution attempt detected`);
      }
      for (const [key, value] of Object.entries(obj as Record<string, unknown>)) {
        this.deepCheck(value, `${context}.${key}`);
      }
    }
  }

  // 启发式判断是否是文件路径
  private looksLikePath(str: string): boolean {
    return /^(\.{0,2}\/|~\/|[A-Z]:\\)/.test(str) || str.includes('/');
  }

  // 检查响应大小
  checkResponse(result: unknown): void {
    const size = JSON.stringify(result).length;
    if (size > this.config.maxResponseSize) {
      throw new Error(
        `⛔ Response exceeds max size: ${size} > ${this.config.maxResponseSize}`
      );
    }
  }
}

⚠️ 警告: 永远不要信任 MCP Server 返回的数据。恶意或被入侵的 MCP Server 可以返回包含提示注入(Prompt Injection)的文本,在 LLM 处理响应时劫持行为。务必对响应内容做清洗。

💡 三、生产环境实战模式

🔄 完整集成示例

将上述组件组合起来,构建一个完整的 MCP 客户端应用:

// 完整的 MCP 客户端集成示例
async function main() {
  // 1. 创建客户端(以 filesystem MCP server 为例)
  const client = new McpClient('npx', [
    '-y', '@modelcontextprotocol/server-filesystem', '/tmp/sandbox',
  ]);

  // 2. 配置安全策略
  const security = new McpSecurityGuard({
    sandboxDir: '/tmp/sandbox',
    blockedTools: ['exec', 'shell', 'eval'],
    maxParamSize: 512 * 1024,
    dangerousPatterns: [
      /\.\.\//,           // 路径遍历
      /[;&|`$]/,          // Shell 注入
      /__proto__/,         // 原型链污染
    ],
  });

  // 3. 监听事件
  client.on('connected', (info) => {
    console.log('✅ Connected to MCP Server:', info);
  });

  client.on('stderr', (data) => {
    console.error('⚠️ Server stderr:', data.trim());
  });

  client.on('notification', (method, params) => {
    console.log('📢 Notification:', method, params);
  });

  try {
    // 4. 连接并发现工具
    await client.connect();
    const toolManager = new McpToolManager(client);
    const tools = await toolManager.discoverTools();

    console.log(`🔧 Discovered ${tools.length} tools:`);
    for (const tool of tools) {
      console.log(`  - ${tool.name}: ${tool.description}`);
    }

    // 5. 安全调用工具
    security.check('read_file', { path: '/tmp/sandbox/hello.txt' });
    const result = await toolManager.callTool('read_file', {
      path: '/tmp/sandbox/hello.txt',
    });
    security.checkResponse(result);

    console.log('📄 File content:', result);

    // 6. 转换为 LLM Function Calling 格式
    const functions = toolManager.toFunctionDefinitions();
    console.log('🤖 LLM functions:', JSON.stringify(functions, null, 2));

  } finally {
    await client.disconnect();
    console.log('👋 Disconnected');
  }
}

main().catch(console.error);

⚡ 错误处理与重连策略

生产环境中,MCP Server 可能随时崩溃。以下是健壮的重连实现:

// MCP 客户端自动重连机制
export class ResilientMcpClient {
  private client: McpClient;
  private retryCount = 0;
  private maxRetries = 5;
  private baseDelay = 1000; // 1 秒

  constructor(
    private readonly command: string,
    private readonly args: string[] = [],
  ) {
    this.client = new McpClient(command, args);
    this.setupAutoReconnect();
  }

  private setupAutoReconnect(): void {
    this.client.on('exit', async (code) => {
      if (code !== 0 && this.retryCount < this.maxRetries) {
        const delay = this.baseDelay * Math.pow(2, this.retryCount);
        this.retryCount++;
        console.log(
          `🔄 Server exited (${code}), ` +
          `reconnecting in ${delay}ms ` +
          `(attempt ${this.retryCount}/${this.maxRetries})`
        );
        await new Promise((r) => setTimeout(r, delay));
        try {
          await this.client.connect();
          this.retryCount = 0;
          console.log('✅ Reconnected successfully');
        } catch (err) {
          console.error('❌ Reconnect failed:', err);
        }
      }
    });
  }

  async connect(): Promise<void> {
    await this.client.connect();
    this.retryCount = 0;
  }

  async sendRequest(method: string, params?: Record<string, unknown>) {
    if (!this.client.isConnected) {
      throw new Error('Client not connected, wait for reconnection');
    }
    return this.client.sendRequest(method, params);
  }

  async disconnect(): Promise<void> {
    this.maxRetries = 0; // 阻止自动重连
    await this.client.disconnect();
  }
}

关键结论: 指数退避(Exponential Backoff)是重连策略的黄金标准。首次重连 1 秒,第二次 2 秒,第三次 4 秒——避免在网络故障时用密集请求淹没服务器。

📊 性能优化:批量请求与并发控制

当需要调用多个工具时,串行执行效率低下。以下是并发控制模式:

// MCP 并发工具调用(带并发限制)
export async function batchCallTools(
  toolManager: McpToolManager,
  security: McpSecurityGuard,
  calls: Array<{ name: string; args: Record<string, unknown> }>,
  concurrency = 3,
): Promise<Array<{ success: boolean; result?: unknown; error?: string }>> {
  const results: Array<{ success: boolean; result?: unknown; error?: string }> =
    new Array(calls.length);
  let index = 0;

  // 工作协程
  async function worker() {
    while (index < calls.length) {
      const i = index++;
      const { name, args } = calls[i];
      try {
        security.check(name, args);
        const result = await toolManager.callTool(name, args);
        security.checkResponse(result);
        results[i] = { success: true, result };
      } catch (err) {
        results[i] = { success: false, error: (err as Error).message };
      }
    }
  }

  // 启动 N 个并发工作协程
  const workers = Array.from(
    { length: Math.min(concurrency, calls.length) },
    () => worker()
  );
  await Promise.all(workers);
  return results;
}

使用示例:

// 批量调用示例
const calls = [
  { name: 'read_file', args: { path: '/tmp/sandbox/a.txt' } },
  { name: 'read_file', args: { path: '/tmp/sandbox/b.txt' } },
  { name: 'read_file', args: { path: '/tmp/sandbox/c.txt' } },
  { name: 'list_directory', args: { path: '/tmp/sandbox' } },
];

const results = await batchCallTools(toolManager, security, calls, 3);
// 3 个并发,4 个任务,总耗时约等于最慢的 2 个任务之和

🎯 四、避坑指南与最佳实践

经过在生产环境中踩过的坑,以下是必须注意的关键点:

❌ 常见错误

  • 不做超时控制 — Server 崩溃后客户端永远挂起
  • 信任 Server 返回数据 — 可能包含 Prompt Injection 攻击
  • 不做参数校验 — 恶意参数可导致路径遍历或注入攻击
  • 直接拼接 Shell 命令 — MCP 参数中可能包含 ;rm -rf /
  • 忽略 stderr 输出 — Server 的警告和错误信息会被静默丢弃

✅ 推荐做法

  • 每个请求设置超时 — 30 秒是合理默认值
  • 实现指数退避重连 — 至少支持 3-5 次重试
  • 对所有文件路径做沙箱限制 — 防止路径遍历攻击
  • 限制响应大小 — 防止 Server 返回超大 payload 导致 OOM
  • 记录所有工具调用日志 — 便于审计和调试
  • 使用白名单而非黑名单 — 白名单更安全,黑名单容易遗漏

🔐 安全检查清单

检查项 必要性 说明
工具白名单 ✅ 必须 只允许已知安全的工具
参数大小限制 ✅ 必须 防止 DoS 攻击
路径沙箱 ✅ 必须 文件操作必须限制在指定目录
Shell 注入检测 ✅ 必须 检测 `;&
响应大小限制 ✅ 必须 防止 OOM
Prompt Injection 清洗 ⚠️ 推荐 清洗 LLM 响应中的恶意指令
调用频率限制 ⚠️ 推荐 防止工具被滥用
TLS 证书校验(SSE) ✅ 必须 远程传输必须验证证书

📝 总结

MCP 客户端开发的核心不在于协议本身——JSON-RPC 2.0 很简单——而在于工程化处理:安全校验、错误恢复、性能优化。本文实现的客户端涵盖了生产环境所需的关键能力:

  1. 传输层:stdio 模式的进程管理和消息帧解析
  2. 工具发现:自动获取 Server 能力并转换为 LLM 格式
  3. 安全层:路径沙箱、注入检测、原型链防护
  4. 韧性:指数退避重连、超时控制、批量并发

💡 提示: 如果你不想从零实现,可以直接使用官方 SDK @modelcontextprotocol/sdk 的 Client 类。但理解底层原理对排查问题至关重要——当 Claude Desktop 显示「MCP Server disconnected」时,你知道该看什么。

相关工具推荐:

📚 相关文章