Model Context Protocol(MCP)在 2025 年底正式将 OAuth 2.1 纳入核心规范,这意味着每一个对外暴露的 MCP Server 都必须认真对待授权问题。根据 MCP 官方安全审计报告,超过 60% 的公开 MCP Server 仍在使用硬编码 API Key,这相当于把家门钥匙贴在了门框上。如果你正在构建或集成 MCP Server,理解 OAuth 2.1 授权流程不是可选项——它是生产环境的基本门槛。
🔐 一、为什么 MCP 需要 OAuth 2.1
从 API Key 到 OAuth 2.1 的演进
MCP 协议最初的设计假设是本地运行——Host 应用(如 Claude Desktop)和 MCP Server 在同一台机器上通信,安全威胁模型相对简单。但随着远程 MCP Server 的普及,这个假设被彻底打破。
API Key 方案存在三个致命缺陷:
- ❌ 无法撤销细粒度权限——一个 Key 能访问所有工具,没有最小权限原则
- ❌ 无法区分调用者身份——所有使用者共享同一个 Key,审计日志形同虚设
- ❌ 无法自动轮换——Key 泄露后需要手动替换每一个集成方
⚠️ **警告:**如果你的 MCP Server 在 2026 年仍只接受 API Key 认证,你实际上是在给每一个 AI Agent 一张无限额度的信用卡。
OAuth 2.1 通过 Access Token(访问令牌)和 Refresh Token(刷新令牌)的组合,解决了上述所有问题。MCP 规范采用了 OAuth 2.1 的 Authorization Code + PKCE 流程,这是目前公认的 Web 应用最佳实践。
MCP OAuth 2.1 授权流程全景
MCP 的授权流程涉及三个角色:Client(发起请求的 AI 应用)、Authorization Server(签发 Token 的服务)、Resource Server(MCP Server 本身)。
完整的授权流程如下:
Client Authorization Server Resource Server (MCP)
| | |
|-- 1. Discovery -------->| |
|<-- AS Metadata ---------| |
| | |
|-- 2. Authorization Req->| |
| (PKCE code_verifier) | |
|<-- Authorization Code ---| |
| | |
|-- 3. Token Request ---->| |
| (code + code_verifier)| |
|<-- Access Token ---------| |
| | |
|-- 4. Tool Call (Bearer Token) ------------------------->|
|<-- Tool Response ----------------------------------------|
🚀 二、从零实现 MCP OAuth 2.1 Server
实现 Authorization Server
下面是一个完整的 MCP OAuth 2.1 授权服务器实现,使用 TypeScript + Express:
// MCP OAuth 2.1 Authorization Server 完整实现
import express from 'express'
import crypto from 'crypto'
const app = express()
app.use(express.json())
app.use(express.urlencoded({ extended: true }))
// 存储(生产环境应使用 Redis)
const authorizationCodes = new Map<string, {
clientId: string
redirectUri: string
codeChallenge: string
codeChallengeMethod: string
scope: string
userId: string
expiresAt: number
}>()
const accessTokens = new Map<string, {
clientId: string
userId: string
scope: string
expiresAt: number
}>()
// 已注册的 MCP Clients(生产环境应持久化)
const registeredClients = new Map<string, {
clientId: string
clientSecret?: string
redirectUris: string[]
grantTypes: string[]
}>()
// 1. MCP Discovery 端点 — Client 通过此端点发现授权服务器配置
app.get('/.well-known/oauth-authorization-server', (req, res) => {
const baseUrl = `https://${req.headers.host}`
res.json({
issuer: baseUrl,
authorization_endpoint: `${baseUrl}/authorize`,
token_endpoint: `${baseUrl}/token`,
registration_endpoint: `${baseUrl}/register`,
response_types_supported: ['code'],
grant_types_supported: ['authorization_code', 'refresh_token'],
code_challenge_methods_supported: ['S256'],
scopes_supported: ['mcp:tools:read', 'mcp:tools:execute', 'mcp:resources:read'],
token_endpoint_auth_methods_supported: ['client_secret_post', 'none']
})
})
// 2. Dynamic Client Registration(动态客户端注册)
app.post('/register', (req, res) => {
const { client_name, redirect_uris, grant_types } = req.body
if (!redirect_uris || redirect_uris.length === 0) {
return res.status(400).json({ error: 'invalid_request', error_description: 'redirect_uris required' })
}
const clientId = crypto.randomUUID()
const clientSecret = crypto.randomBytes(32).toString('base64url')
registeredClients.set(clientId, {
clientId,
clientSecret,
redirectUris: redirect_uris,
grantTypes: grant_types || ['authorization_code']
})
res.status(201).json({
client_id: clientId,
client_secret: clientSecret,
client_name: client_name || 'MCP Client',
redirect_uris,
grant_types: grant_types || ['authorization_code'],
client_id_issued_at: Math.floor(Date.now() / 1000)
})
})
// 3. Authorization Endpoint — 用户在此授权
app.get('/authorize', (req, res) => {
const { client_id, redirect_uri, response_type, code_challenge, code_challenge_method, scope, state } = req.query
// 验证参数
if (response_type !== 'code') {
return res.status(400).json({ error: 'unsupported_response_type' })
}
if (!code_challenge || code_challenge_method !== 'S256') {
return res.status(400).json({ error: 'invalid_request', error_description: 'PKCE required (S256)' })
}
const client = registeredClients.get(client_id as string)
if (!client || !client.redirectUris.includes(redirect_uri as string)) {
return res.status(400).json({ error: 'invalid_client' })
}
// 生产环境:渲染授权页面让用户确认
// 这里简化为自动授权
const code = crypto.randomBytes(32).toString('base64url')
const userId = 'user-001' // 实际应从登录 session 获取
authorizationCodes.set(code, {
clientId: client_id as string,
redirectUri: redirect_uri as string,
codeChallenge: code_challenge as string,
codeChallengeMethod: 'S256',
scope: (scope as string) || 'mcp:tools:execute',
userId,
expiresAt: Date.now() + 60_000 // 1 分钟过期
})
const redirectUrl = new URL(redirect_uri as string)
redirectUrl.searchParams.set('code', code)
if (state) redirectUrl.searchParams.set('state', state as string)
res.redirect(redirectUrl.toString())
})
// 4. Token Endpoint — 用 Authorization Code 换取 Access Token
app.post('/token', (req, res) => {
const { grant_type, code, redirect_uri, client_id, code_verifier } = req.body
if (grant_type !== 'authorization_code') {
return res.status(400).json({ error: 'unsupported_grant_type' })
}
const authCode = authorizationCodes.get(code)
if (!authCode) {
return res.status(400).json({ error: 'invalid_grant', error_description: 'Code not found or expired' })
}
// 验证 PKCE: code_verifier 必须匹配 code_challenge
const computedChallenge = crypto
.createHash('sha256')
.update(code_verifier)
.digest('base64url')
if (computedChallenge !== authCode.codeChallenge) {
return res.status(400).json({ error: 'invalid_grant', error_description: 'PKCE verification failed' })
}
// 验证其他参数
if (authCode.clientId !== client_id || authCode.redirectUri !== redirect_uri) {
return res.status(400).json({ error: 'invalid_grant' })
}
if (Date.now() > authCode.expiresAt) {
authorizationCodes.delete(code)
return res.status(400).json({ error: 'invalid_grant', error_description: 'Code expired' })
}
// 生成 Token
const accessToken = crypto.randomBytes(32).toString('base64url')
const refreshToken = crypto.randomBytes(32).toString('base64url')
accessTokens.set(accessToken, {
clientId: authCode.clientId,
userId: authCode.userId,
scope: authCode.scope,
expiresAt: Date.now() + 3600_000 // 1 小时
})
// 一次性使用,删除 authorization code
authorizationCodes.delete(code)
res.json({
access_token: accessToken,
token_type: 'Bearer',
expires_in: 3600,
refresh_token: refreshToken,
scope: authCode.scope
})
})
app.listen(3001, () => console.log('OAuth Server running on :3001'))
💡 **提示:**上面的代码展示了完整的 OAuth 2.1 流程骨架。生产环境需要将
Map替换为 Redis 或数据库,并添加真实的用户登录页面。
MCP Client 端实现 PKCE 流程
Client 端需要实现 PKCE(Proof Key for Code Exchange)来防止授权码拦截攻击:
// MCP Client: 完整的 PKCE 授权流程实现
import crypto from 'crypto'
interface MCPOAuthConfig {
serverUrl: string
clientId: string
redirectUri: string
scopes: string[]
}
class MCPOAuthClient {
private config: MCPOAuthConfig
private accessToken: string | null = null
private refreshToken: string | null = null
private tokenExpiry: number = 0
constructor(config: MCPOAuthConfig) {
this.config = config
}
// 生成 PKCE code_verifier 和 code_challenge
private generatePKCE(): { codeVerifier: string; codeChallenge: string } {
const codeVerifier = crypto.randomBytes(32).toString('base64url')
const codeChallenge = crypto
.createHash('sha256')
.update(codeVerifier)
.digest('base64url')
return { codeVerifier, codeChallenge }
}
// 发起授权请求
async startAuthorization(): Promise<{ authUrl: string; codeVerifier: string }> {
const { codeVerifier, codeChallenge } = this.generatePKCE()
const state = crypto.randomBytes(16).toString('base64url')
const params = new URLSearchParams({
response_type: 'code',
client_id: this.config.clientId,
redirect_uri: this.config.redirectUri,
code_challenge: codeChallenge,
code_challenge_method: 'S256',
scope: this.config.scopes.join(' '),
state
})
const authUrl = `${this.config.serverUrl}/authorize?${params.toString()}`
return { authUrl, codeVerifier }
}
// 用 Authorization Code 换取 Token
async exchangeCode(code: string, codeVerifier: string): Promise<void> {
const response = await fetch(`${this.config.serverUrl}/token`, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
code,
redirect_uri: this.config.redirectUri,
client_id: this.config.clientId,
code_verifier: codeVerifier
}).toString()
})
if (!response.ok) {
const error = await response.json()
throw new Error(`Token exchange failed: ${error.error_description}`)
}
const data = await response.json()
this.accessToken = data.access_token
this.refreshToken = data.refresh_token
this.tokenExpiry = Date.now() + data.expires_in * 1000
}
// 自动刷新过期的 Token
async ensureValidToken(): Promise<string> {
if (this.accessToken && Date.now() < this.tokenExpiry - 30_000) {
return this.accessToken
}
if (!this.refreshToken) {
throw new Error('No refresh token. Re-authorization required.')
}
const response = await fetch(`${this.config.serverUrl}/token`, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'refresh_token',
refresh_token: this.refreshToken,
client_id: this.config.clientId
}).toString()
})
if (!response.ok) throw new Error('Token refresh failed')
const data = await response.json()
this.accessToken = data.access_token
this.tokenExpiry = Date.now() + data.expires_in * 1000
return this.accessToken!
}
// 带 Authorization 的 MCP 工具调用
async callTool(toolName: string, args: Record<string, unknown>): Promise<unknown> {
const token = await this.ensureValidToken()
const response = await fetch(`${this.config.serverUrl}/mcp`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
jsonrpc: '2.0',
method: 'tools/call',
params: { name: toolName, arguments: args },
id: crypto.randomUUID()
})
})
return response.json()
}
}
Resource Server 端 Token 验证
MCP Server(Resource Server)需要验证 Bearer Token 并检查权限范围:
// MCP Resource Server: Token 验证中间件
import express from 'express'
import crypto from 'crypto'
// Token 存储(实际应从 Authorization Server 查询或验证 JWT)
const tokenStore = new Map<string, {
userId: string
scope: string
expiresAt: number
}>()
// Token 验证中间件
function authenticateToken(requiredScope?: string) {
return async (req: express.Request, res: express.Response, next: express.NextFunction) => {
const authHeader = req.headers.authorization
if (!authHeader?.startsWith('Bearer ')) {
return res.status(401).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'Missing or invalid Authorization header' },
id: null
})
}
const token = authHeader.slice(7)
const tokenData = tokenStore.get(token)
if (!tokenData) {
return res.status(401).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'Invalid or revoked token' },
id: null
})
}
if (Date.now() > tokenData.expiresAt) {
tokenStore.delete(token)
return res.status(401).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'Token expired' },
id: null
})
}
// 检查 scope 权限
if (requiredScope) {
const grantedScopes = tokenData.scope.split(' ')
if (!grantedScopes.includes(requiredScope)) {
return res.status(403).json({
jsonrpc: '2.0',
error: { code: -32000, message: `Insufficient scope. Required: ${requiredScope}` },
id: null
})
}
}
// 将用户信息注入请求
;(req as any).userId = tokenData.userId
;(req as any).scope = tokenData.scope
next()
}
}
// MCP Server 端点
const mcpRouter = express.Router()
// 工具列表 — 需要 read 权限
mcpRouter.post('/tools/list', authenticateToken('mcp:tools:read'), (req, res) => {
const userId = (req as any).userId
// 根据用户权限返回可用工具
res.json({
jsonrpc: '2.0',
result: {
tools: [
{ name: 'json-format', description: 'JSON 格式化工具' },
{ name: 'json-validate', description: 'JSON Schema 校验' }
].filter(tool => canUserAccessTool(userId, tool.name))
},
id: (req.body as any).id
})
})
// 工具调用 — 需要 execute 权限
mcpRouter.post('/tools/call', authenticateToken('mcp:tools:execute'), async (req, res) => {
const { name, arguments: args } = (req.body as any).params
const userId = (req as any).userId
// 记录审计日志
console.log(JSON.stringify({
timestamp: new Date().toISOString(),
userId,
action: 'tool_call',
tool: name,
args: JSON.stringify(args).slice(0, 200)
}))
// 执行工具
const result = await executeTool(name, args)
res.json({ jsonrpc: '2.0', result, id: (req.body as any).id })
})
function canUserAccessTool(userId: string, toolName: string): boolean {
// 实际应查询用户的工具权限列表
return true
}
async function executeTool(name: string, args: Record<string, unknown>): Promise<unknown> {
// 工具执行逻辑
return { content: [{ type: 'text', text: 'Result' }] }
}
📌 **记住:**Token 验证必须在每次请求时执行,不要缓存验证结果。Token 随时可能被撤销(Token Revocation),缓存会导致已撤销的 Token 仍能访问资源。
📊 三、方案对比与最佳实践
授权方案对比
下面对比三种常见的 MCP 授权方案,帮助你根据场景做出选择:
| 维度 | API Key | OAuth 2.1 + PKCE | mTLS(双向 TLS) |
|---|---|---|---|
| 实现复杂度 | ⭐ 极低 | ⭐⭐⭐ 中等 | ⭐⭐⭐⭐⭐ 高 |
| 细粒度权限 | ❌ 无 | ✅ 基于 Scope | ✅ 基于证书 |
| Token 自动轮换 | ❌ 不支持 | ✅ Refresh Token | ✅ 证书续期 |
| 用户级审计 | ❌ 无法区分用户 | ✅ 完整审计链 | ✅ 证书绑定身份 |
| 多租户支持 | ❌ 需要每租户一个 Key | ✅ 原生支持 | ✅ 证书隔离 |
| 适合场景 | 内部开发/测试 | 生产环境首选 | 企业级高安全要求 |
| 推荐指数 | ⚠️ 仅限开发环境 | ✅ 强烈推荐 | ✅ 金融/政务场景 |
授权范围(Scope)设计最佳实践
Scope 是 OAuth 2.1 实现最小权限原则的核心机制。对于 MCP Server,推荐以下 Scope 设计:
# MCP 推荐的 Scope 设计
# 一级:操作类型
mcp:tools:read # 查看工具列表和描述
mcp:tools:execute # 调用工具
mcp:resources:read # 读取资源文件
# 二级:按工具细分(可选,用于高安全场景)
mcp:tools:json-format:execute
mcp:tools:database-query:execute
mcp:tools:file-write:execute
# 三级:按数据范围细分
mcp:resources:project-a:read
mcp:resources:project-b:read
⚠️ **警告:**不要为了方便而给所有客户端
mcp:tools:execute的全量权限。一旦某个客户端被攻破,攻击者可以调用你的所有工具,包括文件写入、数据库操作等危险工具。
生产环境 Checklist
在将 MCP OAuth 2.1 部署到生产环境之前,逐项检查以下关键点:
- ✅ 强制 PKCE——必须使用
S256方法,拒绝plain - ✅ Token 有效期——Access Token 不超过 1 小时,Refresh Token 不超过 30 天
- ✅ Token 撤销端点——实现 RFC 7009 Token Revocation,支持即时吊销
- ✅ HTTPS 强制——所有端点必须使用 TLS,拒绝 HTTP 明文传输
- ✅ Authorization Code 一次性使用——使用后立即删除,防止重放攻击
- ✅ Redirect URI 严格匹配——使用精确匹配,不要使用通配符或前缀匹配
- ✅ 审计日志——记录每一次 Token 签发、刷新、撤销和工具调用
- ✅ Rate Limiting——对 Token 端点实施频率限制,防止暴力破解
- ❌ 不要在 URL 参数中传递 Token——始终使用 Authorization Header
- ❌ 不要将 Client Secret 嵌入前端代码——公开客户端使用 PKCE 无密钥模式
常见坑点与避坑指南
坑点 1:PKCE code_verifier 存储丢失
在浏览器环境中,如果用户在授权回调前刷新页面,code_verifier 会丢失。解决方案是将其存入 sessionStorage:
// 安全存储 code_verifier
sessionStorage.setItem('pkce_code_verifier', codeVerifier)
// 回调页面中读取
const verifier = sessionStorage.getItem('pkce_code_verifier')
sessionStorage.removeItem('pkce_code_verifier') // 一次性使用
坑点 2:Refresh Token 泄露
Refresh Token 生命周期长,泄露后攻击者可以持续获取新的 Access Token。防范措施:
- 绑定 Refresh Token 到客户端指纹(Client ID + IP 段)
- 实现 Refresh Token Rotation:每次刷新后旧的 Refresh Token 立即失效
- 检测异常刷新行为(如短时间内从不同 IP 刷新),自动吊销所有 Token
坑点 3:跨域 MCP Server 的 CORS 问题
当 MCP Client(浏览器)和 MCP Server 不同源时,/.well-known/oauth-authorization-server 端点需要正确的 CORS 头:
Access-Control-Allow-Origin: *
Access-Control-Allow-Methods: GET, POST
Access-Control-Allow-Headers: Content-Type, Authorization
💡 总结
MCP OAuth 2.1 授权不是可选的安全加固,而是生产环境的基本要求。核心要点:
- 选择 Authorization Code + PKCE 流程——这是 MCP 规范推荐且安全性最高的方案
- 实现细粒度 Scope——按工具类型和操作类型划分权限,遵循最小权限原则
- Token 生命周期管理——短期 Access Token + Refresh Token Rotation + 即时撤销能力
- 完整的审计链——从 Token 签发到工具调用,每一步都要有日志
对于刚接触 MCP 授权的开发者,推荐的落地路径是:先用本文的代码搭建一个本地 OAuth Server 验证流程,再集成 Auth0 或 Keycloak 等成熟的授权服务,最后根据业务需求定制 Scope 和审计策略。
相关工具推荐:
- 🔧 Auth0 — 托管式 OAuth 2.1 服务,适合快速上线
- 🔧 Keycloak — 开源身份认证平台,适合自托管
- 🔧 oauth4webapi — 符合规范的 TypeScript OAuth 客户端库
- 🔧 jose — JWT/JWE/JWS 处理库,用于 Token 自签发