MCP Server OAuth 2.1 授权完全指南:从零实现安全的工具授权体系

深入解析 Model Context Protocol 的 OAuth 2.1 授权规范,手把手实现 PKCE 流程、Token 管理、多租户隔离,包含完整 TypeScript 代码与方案对比。

安全与密码 2026-06-01 15 分钟

Model Context Protocol(MCP)在 2025 年底正式将 OAuth 2.1 纳入核心规范,这意味着每一个对外暴露的 MCP Server 都必须认真对待授权问题。根据 MCP 官方安全审计报告,超过 60% 的公开 MCP Server 仍在使用硬编码 API Key,这相当于把家门钥匙贴在了门框上。如果你正在构建或集成 MCP Server,理解 OAuth 2.1 授权流程不是可选项——它是生产环境的基本门槛。

🔐 一、为什么 MCP 需要 OAuth 2.1

从 API Key 到 OAuth 2.1 的演进

MCP 协议最初的设计假设是本地运行——Host 应用(如 Claude Desktop)和 MCP Server 在同一台机器上通信,安全威胁模型相对简单。但随着远程 MCP Server 的普及,这个假设被彻底打破。

API Key 方案存在三个致命缺陷:

  • 无法撤销细粒度权限——一个 Key 能访问所有工具,没有最小权限原则
  • 无法区分调用者身份——所有使用者共享同一个 Key,审计日志形同虚设
  • 无法自动轮换——Key 泄露后需要手动替换每一个集成方

⚠️ **警告:**如果你的 MCP Server 在 2026 年仍只接受 API Key 认证,你实际上是在给每一个 AI Agent 一张无限额度的信用卡。

OAuth 2.1 通过 Access Token(访问令牌)和 Refresh Token(刷新令牌)的组合,解决了上述所有问题。MCP 规范采用了 OAuth 2.1 的 Authorization Code + PKCE 流程,这是目前公认的 Web 应用最佳实践。

MCP OAuth 2.1 授权流程全景

MCP 的授权流程涉及三个角色:Client(发起请求的 AI 应用)、Authorization Server(签发 Token 的服务)、Resource Server(MCP Server 本身)。

完整的授权流程如下:

Client                Authorization Server           Resource Server (MCP)
  |                         |                              |
  |-- 1. Discovery -------->|                              |
  |<-- AS Metadata ---------|                              |
  |                         |                              |
  |-- 2. Authorization Req->|                              |
  |   (PKCE code_verifier)  |                              |
  |<-- Authorization Code ---|                              |
  |                         |                              |
  |-- 3. Token Request ---->|                              |
  |   (code + code_verifier)|                              |
  |<-- Access Token ---------|                              |
  |                         |                              |
  |-- 4. Tool Call (Bearer Token) ------------------------->|
  |<-- Tool Response ----------------------------------------|

🚀 二、从零实现 MCP OAuth 2.1 Server

实现 Authorization Server

下面是一个完整的 MCP OAuth 2.1 授权服务器实现,使用 TypeScript + Express:

// MCP OAuth 2.1 Authorization Server 完整实现
import express from 'express'
import crypto from 'crypto'

const app = express()
app.use(express.json())
app.use(express.urlencoded({ extended: true }))

// 存储(生产环境应使用 Redis)
const authorizationCodes = new Map<string, {
  clientId: string
  redirectUri: string
  codeChallenge: string
  codeChallengeMethod: string
  scope: string
  userId: string
  expiresAt: number
}>()

const accessTokens = new Map<string, {
  clientId: string
  userId: string
  scope: string
  expiresAt: number
}>()

// 已注册的 MCP Clients(生产环境应持久化)
const registeredClients = new Map<string, {
  clientId: string
  clientSecret?: string
  redirectUris: string[]
  grantTypes: string[]
}>()

// 1. MCP Discovery 端点 — Client 通过此端点发现授权服务器配置
app.get('/.well-known/oauth-authorization-server', (req, res) => {
  const baseUrl = `https://${req.headers.host}`
  res.json({
    issuer: baseUrl,
    authorization_endpoint: `${baseUrl}/authorize`,
    token_endpoint: `${baseUrl}/token`,
    registration_endpoint: `${baseUrl}/register`,
    response_types_supported: ['code'],
    grant_types_supported: ['authorization_code', 'refresh_token'],
    code_challenge_methods_supported: ['S256'],
    scopes_supported: ['mcp:tools:read', 'mcp:tools:execute', 'mcp:resources:read'],
    token_endpoint_auth_methods_supported: ['client_secret_post', 'none']
  })
})

// 2. Dynamic Client Registration(动态客户端注册)
app.post('/register', (req, res) => {
  const { client_name, redirect_uris, grant_types } = req.body

  if (!redirect_uris || redirect_uris.length === 0) {
    return res.status(400).json({ error: 'invalid_request', error_description: 'redirect_uris required' })
  }

  const clientId = crypto.randomUUID()
  const clientSecret = crypto.randomBytes(32).toString('base64url')

  registeredClients.set(clientId, {
    clientId,
    clientSecret,
    redirectUris: redirect_uris,
    grantTypes: grant_types || ['authorization_code']
  })

  res.status(201).json({
    client_id: clientId,
    client_secret: clientSecret,
    client_name: client_name || 'MCP Client',
    redirect_uris,
    grant_types: grant_types || ['authorization_code'],
    client_id_issued_at: Math.floor(Date.now() / 1000)
  })
})

// 3. Authorization Endpoint — 用户在此授权
app.get('/authorize', (req, res) => {
  const { client_id, redirect_uri, response_type, code_challenge, code_challenge_method, scope, state } = req.query

  // 验证参数
  if (response_type !== 'code') {
    return res.status(400).json({ error: 'unsupported_response_type' })
  }
  if (!code_challenge || code_challenge_method !== 'S256') {
    return res.status(400).json({ error: 'invalid_request', error_description: 'PKCE required (S256)' })
  }

  const client = registeredClients.get(client_id as string)
  if (!client || !client.redirectUris.includes(redirect_uri as string)) {
    return res.status(400).json({ error: 'invalid_client' })
  }

  // 生产环境:渲染授权页面让用户确认
  // 这里简化为自动授权
  const code = crypto.randomBytes(32).toString('base64url')
  const userId = 'user-001' // 实际应从登录 session 获取

  authorizationCodes.set(code, {
    clientId: client_id as string,
    redirectUri: redirect_uri as string,
    codeChallenge: code_challenge as string,
    codeChallengeMethod: 'S256',
    scope: (scope as string) || 'mcp:tools:execute',
    userId,
    expiresAt: Date.now() + 60_000 // 1 分钟过期
  })

  const redirectUrl = new URL(redirect_uri as string)
  redirectUrl.searchParams.set('code', code)
  if (state) redirectUrl.searchParams.set('state', state as string)

  res.redirect(redirectUrl.toString())
})

// 4. Token Endpoint — 用 Authorization Code 换取 Access Token
app.post('/token', (req, res) => {
  const { grant_type, code, redirect_uri, client_id, code_verifier } = req.body

  if (grant_type !== 'authorization_code') {
    return res.status(400).json({ error: 'unsupported_grant_type' })
  }

  const authCode = authorizationCodes.get(code)
  if (!authCode) {
    return res.status(400).json({ error: 'invalid_grant', error_description: 'Code not found or expired' })
  }

  // 验证 PKCE: code_verifier 必须匹配 code_challenge
  const computedChallenge = crypto
    .createHash('sha256')
    .update(code_verifier)
    .digest('base64url')

  if (computedChallenge !== authCode.codeChallenge) {
    return res.status(400).json({ error: 'invalid_grant', error_description: 'PKCE verification failed' })
  }

  // 验证其他参数
  if (authCode.clientId !== client_id || authCode.redirectUri !== redirect_uri) {
    return res.status(400).json({ error: 'invalid_grant' })
  }

  if (Date.now() > authCode.expiresAt) {
    authorizationCodes.delete(code)
    return res.status(400).json({ error: 'invalid_grant', error_description: 'Code expired' })
  }

  // 生成 Token
  const accessToken = crypto.randomBytes(32).toString('base64url')
  const refreshToken = crypto.randomBytes(32).toString('base64url')

  accessTokens.set(accessToken, {
    clientId: authCode.clientId,
    userId: authCode.userId,
    scope: authCode.scope,
    expiresAt: Date.now() + 3600_000 // 1 小时
  })

  // 一次性使用,删除 authorization code
  authorizationCodes.delete(code)

  res.json({
    access_token: accessToken,
    token_type: 'Bearer',
    expires_in: 3600,
    refresh_token: refreshToken,
    scope: authCode.scope
  })
})

app.listen(3001, () => console.log('OAuth Server running on :3001'))

💡 **提示:**上面的代码展示了完整的 OAuth 2.1 流程骨架。生产环境需要将 Map 替换为 Redis 或数据库,并添加真实的用户登录页面。

MCP Client 端实现 PKCE 流程

Client 端需要实现 PKCE(Proof Key for Code Exchange)来防止授权码拦截攻击:

// MCP Client: 完整的 PKCE 授权流程实现
import crypto from 'crypto'

interface MCPOAuthConfig {
  serverUrl: string
  clientId: string
  redirectUri: string
  scopes: string[]
}

class MCPOAuthClient {
  private config: MCPOAuthConfig
  private accessToken: string | null = null
  private refreshToken: string | null = null
  private tokenExpiry: number = 0

  constructor(config: MCPOAuthConfig) {
    this.config = config
  }

  // 生成 PKCE code_verifier 和 code_challenge
  private generatePKCE(): { codeVerifier: string; codeChallenge: string } {
    const codeVerifier = crypto.randomBytes(32).toString('base64url')
    const codeChallenge = crypto
      .createHash('sha256')
      .update(codeVerifier)
      .digest('base64url')
    return { codeVerifier, codeChallenge }
  }

  // 发起授权请求
  async startAuthorization(): Promise<{ authUrl: string; codeVerifier: string }> {
    const { codeVerifier, codeChallenge } = this.generatePKCE()
    const state = crypto.randomBytes(16).toString('base64url')

    const params = new URLSearchParams({
      response_type: 'code',
      client_id: this.config.clientId,
      redirect_uri: this.config.redirectUri,
      code_challenge: codeChallenge,
      code_challenge_method: 'S256',
      scope: this.config.scopes.join(' '),
      state
    })

    const authUrl = `${this.config.serverUrl}/authorize?${params.toString()}`
    return { authUrl, codeVerifier }
  }

  // 用 Authorization Code 换取 Token
  async exchangeCode(code: string, codeVerifier: string): Promise<void> {
    const response = await fetch(`${this.config.serverUrl}/token`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
      body: new URLSearchParams({
        grant_type: 'authorization_code',
        code,
        redirect_uri: this.config.redirectUri,
        client_id: this.config.clientId,
        code_verifier: codeVerifier
      }).toString()
    })

    if (!response.ok) {
      const error = await response.json()
      throw new Error(`Token exchange failed: ${error.error_description}`)
    }

    const data = await response.json()
    this.accessToken = data.access_token
    this.refreshToken = data.refresh_token
    this.tokenExpiry = Date.now() + data.expires_in * 1000
  }

  // 自动刷新过期的 Token
  async ensureValidToken(): Promise<string> {
    if (this.accessToken && Date.now() < this.tokenExpiry - 30_000) {
      return this.accessToken
    }

    if (!this.refreshToken) {
      throw new Error('No refresh token. Re-authorization required.')
    }

    const response = await fetch(`${this.config.serverUrl}/token`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
      body: new URLSearchParams({
        grant_type: 'refresh_token',
        refresh_token: this.refreshToken,
        client_id: this.config.clientId
      }).toString()
    })

    if (!response.ok) throw new Error('Token refresh failed')

    const data = await response.json()
    this.accessToken = data.access_token
    this.tokenExpiry = Date.now() + data.expires_in * 1000

    return this.accessToken!
  }

  // 带 Authorization 的 MCP 工具调用
  async callTool(toolName: string, args: Record<string, unknown>): Promise<unknown> {
    const token = await this.ensureValidToken()

    const response = await fetch(`${this.config.serverUrl}/mcp`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${token}`
      },
      body: JSON.stringify({
        jsonrpc: '2.0',
        method: 'tools/call',
        params: { name: toolName, arguments: args },
        id: crypto.randomUUID()
      })
    })

    return response.json()
  }
}

Resource Server 端 Token 验证

MCP Server(Resource Server)需要验证 Bearer Token 并检查权限范围:

// MCP Resource Server: Token 验证中间件
import express from 'express'
import crypto from 'crypto'

// Token 存储(实际应从 Authorization Server 查询或验证 JWT)
const tokenStore = new Map<string, {
  userId: string
  scope: string
  expiresAt: number
}>()

// Token 验证中间件
function authenticateToken(requiredScope?: string) {
  return async (req: express.Request, res: express.Response, next: express.NextFunction) => {
    const authHeader = req.headers.authorization
    if (!authHeader?.startsWith('Bearer ')) {
      return res.status(401).json({
        jsonrpc: '2.0',
        error: { code: -32000, message: 'Missing or invalid Authorization header' },
        id: null
      })
    }

    const token = authHeader.slice(7)
    const tokenData = tokenStore.get(token)

    if (!tokenData) {
      return res.status(401).json({
        jsonrpc: '2.0',
        error: { code: -32000, message: 'Invalid or revoked token' },
        id: null
      })
    }

    if (Date.now() > tokenData.expiresAt) {
      tokenStore.delete(token)
      return res.status(401).json({
        jsonrpc: '2.0',
        error: { code: -32000, message: 'Token expired' },
        id: null
      })
    }

    // 检查 scope 权限
    if (requiredScope) {
      const grantedScopes = tokenData.scope.split(' ')
      if (!grantedScopes.includes(requiredScope)) {
        return res.status(403).json({
          jsonrpc: '2.0',
          error: { code: -32000, message: `Insufficient scope. Required: ${requiredScope}` },
          id: null
        })
      }
    }

    // 将用户信息注入请求
    ;(req as any).userId = tokenData.userId
    ;(req as any).scope = tokenData.scope
    next()
  }
}

// MCP Server 端点
const mcpRouter = express.Router()

// 工具列表 — 需要 read 权限
mcpRouter.post('/tools/list', authenticateToken('mcp:tools:read'), (req, res) => {
  const userId = (req as any).userId
  // 根据用户权限返回可用工具
  res.json({
    jsonrpc: '2.0',
    result: {
      tools: [
        { name: 'json-format', description: 'JSON 格式化工具' },
        { name: 'json-validate', description: 'JSON Schema 校验' }
      ].filter(tool => canUserAccessTool(userId, tool.name))
    },
    id: (req.body as any).id
  })
})

// 工具调用 — 需要 execute 权限
mcpRouter.post('/tools/call', authenticateToken('mcp:tools:execute'), async (req, res) => {
  const { name, arguments: args } = (req.body as any).params
  const userId = (req as any).userId

  // 记录审计日志
  console.log(JSON.stringify({
    timestamp: new Date().toISOString(),
    userId,
    action: 'tool_call',
    tool: name,
    args: JSON.stringify(args).slice(0, 200)
  }))

  // 执行工具
  const result = await executeTool(name, args)
  res.json({ jsonrpc: '2.0', result, id: (req.body as any).id })
})

function canUserAccessTool(userId: string, toolName: string): boolean {
  // 实际应查询用户的工具权限列表
  return true
}

async function executeTool(name: string, args: Record<string, unknown>): Promise<unknown> {
  // 工具执行逻辑
  return { content: [{ type: 'text', text: 'Result' }] }
}

📌 **记住:**Token 验证必须在每次请求时执行,不要缓存验证结果。Token 随时可能被撤销(Token Revocation),缓存会导致已撤销的 Token 仍能访问资源。

📊 三、方案对比与最佳实践

授权方案对比

下面对比三种常见的 MCP 授权方案,帮助你根据场景做出选择:

维度 API Key OAuth 2.1 + PKCE mTLS(双向 TLS)
实现复杂度 ⭐ 极低 ⭐⭐⭐ 中等 ⭐⭐⭐⭐⭐ 高
细粒度权限 ❌ 无 ✅ 基于 Scope ✅ 基于证书
Token 自动轮换 ❌ 不支持 ✅ Refresh Token ✅ 证书续期
用户级审计 ❌ 无法区分用户 ✅ 完整审计链 ✅ 证书绑定身份
多租户支持 ❌ 需要每租户一个 Key ✅ 原生支持 ✅ 证书隔离
适合场景 内部开发/测试 生产环境首选 企业级高安全要求
推荐指数 ⚠️ 仅限开发环境 强烈推荐 ✅ 金融/政务场景

授权范围(Scope)设计最佳实践

Scope 是 OAuth 2.1 实现最小权限原则的核心机制。对于 MCP Server,推荐以下 Scope 设计:

# MCP 推荐的 Scope 设计

# 一级:操作类型
mcp:tools:read      # 查看工具列表和描述
mcp:tools:execute   # 调用工具
mcp:resources:read  # 读取资源文件

# 二级:按工具细分(可选,用于高安全场景)
mcp:tools:json-format:execute
mcp:tools:database-query:execute
mcp:tools:file-write:execute

# 三级:按数据范围细分
mcp:resources:project-a:read
mcp:resources:project-b:read

⚠️ **警告:**不要为了方便而给所有客户端 mcp:tools:execute 的全量权限。一旦某个客户端被攻破,攻击者可以调用你的所有工具,包括文件写入、数据库操作等危险工具。

生产环境 Checklist

在将 MCP OAuth 2.1 部署到生产环境之前,逐项检查以下关键点:

  • 强制 PKCE——必须使用 S256 方法,拒绝 plain
  • Token 有效期——Access Token 不超过 1 小时,Refresh Token 不超过 30 天
  • Token 撤销端点——实现 RFC 7009 Token Revocation,支持即时吊销
  • HTTPS 强制——所有端点必须使用 TLS,拒绝 HTTP 明文传输
  • Authorization Code 一次性使用——使用后立即删除,防止重放攻击
  • Redirect URI 严格匹配——使用精确匹配,不要使用通配符或前缀匹配
  • 审计日志——记录每一次 Token 签发、刷新、撤销和工具调用
  • Rate Limiting——对 Token 端点实施频率限制,防止暴力破解
  • 不要在 URL 参数中传递 Token——始终使用 Authorization Header
  • 不要将 Client Secret 嵌入前端代码——公开客户端使用 PKCE 无密钥模式

常见坑点与避坑指南

坑点 1:PKCE code_verifier 存储丢失

在浏览器环境中,如果用户在授权回调前刷新页面,code_verifier 会丢失。解决方案是将其存入 sessionStorage

// 安全存储 code_verifier
sessionStorage.setItem('pkce_code_verifier', codeVerifier)
// 回调页面中读取
const verifier = sessionStorage.getItem('pkce_code_verifier')
sessionStorage.removeItem('pkce_code_verifier') // 一次性使用

坑点 2:Refresh Token 泄露

Refresh Token 生命周期长,泄露后攻击者可以持续获取新的 Access Token。防范措施:

  • 绑定 Refresh Token 到客户端指纹(Client ID + IP 段)
  • 实现 Refresh Token Rotation:每次刷新后旧的 Refresh Token 立即失效
  • 检测异常刷新行为(如短时间内从不同 IP 刷新),自动吊销所有 Token

坑点 3:跨域 MCP Server 的 CORS 问题

当 MCP Client(浏览器)和 MCP Server 不同源时,/.well-known/oauth-authorization-server 端点需要正确的 CORS 头:

Access-Control-Allow-Origin: *
Access-Control-Allow-Methods: GET, POST
Access-Control-Allow-Headers: Content-Type, Authorization

💡 总结

MCP OAuth 2.1 授权不是可选的安全加固,而是生产环境的基本要求。核心要点:

  1. 选择 Authorization Code + PKCE 流程——这是 MCP 规范推荐且安全性最高的方案
  2. 实现细粒度 Scope——按工具类型和操作类型划分权限,遵循最小权限原则
  3. Token 生命周期管理——短期 Access Token + Refresh Token Rotation + 即时撤销能力
  4. 完整的审计链——从 Token 签发到工具调用,每一步都要有日志

对于刚接触 MCP 授权的开发者,推荐的落地路径是:先用本文的代码搭建一个本地 OAuth Server 验证流程,再集成 Auth0 或 Keycloak 等成熟的授权服务,最后根据业务需求定制 Scope 和审计策略。

相关工具推荐:

  • 🔧 Auth0 — 托管式 OAuth 2.1 服务,适合快速上线
  • 🔧 Keycloak — 开源身份认证平台,适合自托管
  • 🔧 oauth4webapi — 符合规范的 TypeScript OAuth 客户端库
  • 🔧 jose — JWT/JWE/JWS 处理库,用于 Token 自签发

📚 相关文章