你可能每天都在使用 Raft 共识算法——当你执行 etcdctl put 写入一个键值、在 CockroachDB 中提交一笔事务、或让 Consul 进行服务发现时,背后都是 Raft 在保证多个节点之间的数据一致性。Raft 共识算法是当今分布式系统中最广泛采用的一致性协议,被 etcd、Consul、CockroachDB、TiKV、RethinkDB 等数十个生产系统使用。Diego Ongaro 在 2014 年的博士论文中提出 Raft 时,核心目标就是「让人能理解的共识算法」——但真正动手实现一遍,你才能体会到「可理解」和「简单」之间的巨大鸿沟。
本文将用 TypeScript 从零实现一个功能完整的 Raft 节点,覆盖 Leader 选举、日志复制、安全性保证三大核心模块,并给出与 Paxos 的对比分析。
📌 **记住:**Raft 的核心设计哲学是「分解问题」——将共识拆解为 Leader 选举(Leader Election)、日志复制(Log Replication)、安全性(Safety)三个独立子问题,每个子问题都可以独立理解和验证。
🗳️ 一、Leader 选举:心跳、超时与 Term 机制
Raft 集群中的每个节点在任意时刻处于三种状态之一:Follower(跟随者)、Candidate(候选者)、Leader(领导者)。Leader 选举是 Raft 的第一个核心机制——没有 Leader,就无法进行日志复制。
⏱️ 选举超时与随机化
Raft 使用「选举超时」(Election Timeout)触发选举。每个 Follower 维护一个随机化的超时计时器(通常 150ms-300ms),如果在超时时间内没有收到 Leader 的心跳(AppendEntries RPC),就会转变为 Candidate 并发起选举。
⚠️ **警告:**选举超时的随机化是 Raft 正确运作的关键。如果所有节点使用相同的超时值,会出现「split vote」(平票)的活锁问题——所有节点同时发起选举,没有任何一个获得多数票。
Term 机制:逻辑时钟
Raft 引入了 Term(任期)的概念,它是一个递增的整数,充当逻辑时钟。每个 Term 最多有一个 Leader。当 Candidate 发起选举时,它将自己的 Term +1,然后向所有节点发送 RequestVote RPC。
节点收到 RequestVote 时,遵循两个规则:
- ✅ 如果候选人的 Term ≥ 自己的 Term,且日志至少和自己一样新,就投票
- ❌ 如果已经投过票(在同一 Term 内),就拒绝
🔧 TypeScript 实现:选举核心逻辑
// raft-选举.ts — Raft 节点状态与选举核心逻辑
type NodeState = 'follower' | 'candidate' | 'leader'
interface LogEntry {
term: number
index: number
command: string
}
interface RaftNode {
id: string
currentTerm: number
votedFor: string | null
log: LogEntry[]
state: NodeState
commitIndex: number
lastApplied: number
// Leader 专用
nextIndex: Map<string, number>
matchIndex: Map<string, number>
// 计时器
electionTimer: ReturnType<typeof setTimeout> | null
heartbeatTimer: ReturnType<typeof setInterval> | null
}
// 创建一个新的 Raft 节点
function createRaftNode(id: string): RaftNode {
return {
id,
currentTerm: 0,
votedFor: null,
log: [],
state: 'follower',
commitIndex: -1,
lastApplied: -1,
nextIndex: new Map(),
matchIndex: new Map(),
electionTimer: null,
heartbeatTimer: null,
}
}
// 随机选举超时(150-300ms)
function randomElectionTimeout(): number {
return 150 + Math.floor(Math.random() * 150)
}
// 重置选举计时器
function resetElectionTimer(node: RaftNode, onTimeout: () => void): void {
if (node.electionTimer) clearTimeout(node.electionTimer)
node.electionTimer = setTimeout(onTimeout, randomElectionTimeout())
}
// 发起选举
function startElection(node: RaftNode, peers: RaftNode[]): void {
node.currentTerm += 1
node.state = 'candidate'
node.votedFor = node.id
let votesReceived = 1 // 自己投自己
const totalNodes = peers.length + 1
const majority = Math.floor(totalNodes / 2) + 1
const lastLogIndex = node.log.length - 1
const lastLogTerm = lastLogIndex >= 0 ? node.log[lastLogIndex].term : 0
// 向所有 Peer 发送 RequestVote
for (const peer of peers) {
const granted = handleRequestVote(peer, {
term: node.currentTerm,
candidateId: node.id,
lastLogIndex,
lastLogTerm,
})
if (granted) {
votesReceived += 1
if (votesReceived >= majority) {
becomeLeader(node, peers)
return
}
}
}
// 没有获得多数票,重置为 Follower
if (votesReceived < majority) {
node.state = 'follower'
node.votedFor = null
}
}
// 处理 RequestVote RPC
function handleRequestVote(
node: RaftNode,
request: {
term: number
candidateId: string
lastLogIndex: number
lastLogTerm: number
}
): boolean {
// 如果请求的 Term 小于当前 Term,拒绝
if (request.term < node.currentTerm) return false
// 如果请求的 Term 更大,更新自己的 Term
if (request.term > node.currentTerm) {
node.currentTerm = request.term
node.state = 'follower'
node.votedFor = null
}
// 检查是否可以投票
const canVote = node.votedFor === null || node.votedFor === request.candidateId
const lastLogIndex = node.log.length - 1
const lastLogTerm = lastLogIndex >= 0 ? node.log[lastLogIndex].term : 0
// 日志至少一样新(Raft 论文 §5.4.1)
const logIsUpToDate =
request.lastLogTerm > lastLogTerm ||
(request.lastLogTerm === lastLogTerm && request.lastLogIndex >= lastLogIndex)
if (canVote && logIsUpToDate) {
node.votedFor = request.candidateId
resetElectionTimer(node, () => startElection(node, []))
return true
}
return false
}
// 成为 Leader
function becomeLeader(node: RaftNode, peers: RaftNode[]): void {
node.state = 'leader'
if (node.electionTimer) clearTimeout(node.electionTimer)
// 初始化 Leader 的 nextIndex 和 matchIndex
const lastLogIndex = node.log.length
for (const peer of peers) {
node.nextIndex.set(peer.id, lastLogIndex)
node.matchIndex.set(peer.id, 0)
}
// 立即发送心跳
sendHeartbeat(node, peers)
// 定期发送心跳(间隔 50ms)
node.heartbeatTimer = setInterval(() => sendHeartbeat(node, peers), 50)
}
⚡ **关键结论:**Raft 的选举安全性(Election Safety)保证在一个 Term 内最多选出一个 Leader。这是通过每个节点在同一 Term 内只能投一票来实现的。
📊 选举超时参数对比
| 参数 | 推荐值 | 过短的风险 | 过长的影响 |
|---|---|---|---|
| 选举超时 | 150-300ms | 频繁选举、split vote | 故障恢复慢 |
| 心跳间隔 | 50-100ms | 网络带宽浪费 | 被误判为故障 |
| 选举超时/心跳比 | ≥3:1 | — | — |
📋 二、日志复制:AppendEntries 与提交机制
Leader 选出后,所有客户端写请求都由 Leader 处理。Leader 将命令追加到自己的日志中,然后通过 AppendEntries RPC 并行地复制给所有 Follower。当大多数节点确认后,该日志条目就被「提交」(committed),Leader 将结果应用到状态机并通知客户端。
日志结构与匹配属性
Raft 日志由一系列编号的条目(Log Entry)组成,每个条目包含:
- Term:该条目被创建时的 Leader Term
- Index:在日志中的位置
- Command:要应用到状态机的命令
Raft 保证以下日志匹配属性(Log Matching Property):
- 如果两个不同日志中的条目有相同的 Index 和 Term,那么它们存储相同的命令
- 如果两个不同日志中的条目有相同的 Index 和 Term,那么它们之前的所有条目也完全相同
🔧 实现日志复制
// raft-日志复制.ts — AppendEntries 处理与提交逻辑
interface AppendEntriesRequest {
term: number
leaderId: string
prevLogIndex: number
prevLogTerm: number
entries: LogEntry[]
leaderCommit: number
}
interface AppendEntriesResponse {
term: number
success: boolean
conflictIndex?: number // 用于快速回退优化
conflictTerm?: number
}
// Leader 发送 AppendEntries 给一个 Follower
function sendAppendEntries(
leader: RaftNode,
follower: RaftNode
): AppendEntriesResponse {
const nextIdx = leader.nextIndex.get(follower.id) ?? 0
const prevLogIndex = nextIdx - 1
const prevLogTerm = prevLogIndex >= 0 ? leader.log[prevLogIndex].term : 0
// 获取需要发送的日志条目
const entries = leader.log.slice(nextIdx)
return handleAppendEntries(follower, {
term: leader.currentTerm,
leaderId: leader.id,
prevLogIndex,
prevLogTerm,
entries,
leaderCommit: leader.commitIndex,
})
}
// Follower 处理 AppendEntries RPC
function handleAppendEntries(
node: RaftNode,
request: AppendEntriesRequest
): AppendEntriesResponse {
// 1. 如果 Term < currentTerm,拒绝
if (request.term < node.currentTerm) {
return { term: node.currentTerm, success: false }
}
// 收到合法 Leader 的消息,重置选举计时器
node.currentTerm = request.term
node.state = 'follower'
node.votedFor = null
resetElectionTimer(node, () => startElection(node, []))
// 2. 日志一致性检查(Log Matching Property)
if (request.prevLogIndex >= 0) {
if (request.prevLogIndex >= node.log.length) {
// 日志太短,缺少 prevLogIndex 处的条目
return {
term: node.currentTerm,
success: false,
conflictIndex: node.log.length,
conflictTerm: 0,
}
}
const existingEntry = node.log[request.prevLogIndex]
if (existingEntry.term !== request.prevLogTerm) {
// Term 不匹配,需要回退
// 找到该 Term 的第一个条目(快速回退优化)
let conflictIndex = request.prevLogIndex
while (conflictIndex > 0 && node.log[conflictIndex - 1]?.term === existingEntry.term) {
conflictIndex -= 1
}
return {
term: node.currentTerm,
success: false,
conflictIndex,
conflictTerm: existingEntry.term,
}
}
}
// 3. 追加新条目(如果与现有条目冲突,截断后追加)
for (const entry of request.entries) {
const existingIndex = entry.index
if (existingIndex < node.log.length && node.log[existingIndex].term !== entry.term) {
// 冲突:删除此条目及之后的所有条目
node.log = node.log.slice(0, existingIndex)
}
if (existingIndex >= node.log.length) {
node.log.push({ ...entry })
}
}
// 4. 更新 commitIndex
if (request.leaderCommit > node.commitIndex) {
node.commitIndex = Math.min(request.leaderCommit, node.log.length - 1)
applyCommittedEntries(node)
}
return { term: node.currentTerm, success: true }
}
// Leader 更新 commitIndex(Raft 论文 §5.4.2)
function leaderAdvanceCommitIndex(leader: RaftNode, peers: RaftNode[]): void {
const majority = Math.floor((peers.length + 1) / 2) + 1
// 从最新日志往前找,找到一个被多数节点复制的条目
for (let n = leader.log.length - 1; n > leader.commitIndex; n -= 1) {
if (leader.log[n].term !== leader.currentTerm) continue // 只提交当前 Term 的日志
let replicatedCount = 1 // 自己
for (const peer of peers) {
if ((leader.matchIndex.get(peer.id) ?? 0) >= n) {
replicatedCount += 1
}
}
if (replicatedCount >= majority) {
leader.commitIndex = n
applyCommittedEntries(leader)
break
}
}
}
// 将已提交的日志应用到状态机
function applyCommittedEntries(node: RaftNode): void {
while (node.lastApplied < node.commitIndex) {
node.lastApplied += 1
const entry = node.log[node.lastApplied]
// 应用到状态机(此处简化为日志输出)
console.log(`[Node ${node.id}] Applied: ${entry.command} (term=${entry.term})`)
}
}
💡 **提示:**Raft 只提交当前 Term 的日志条目,这是安全性(Safety)的关键保证。Leader 不会直接提交之前 Term 的日志——只有当当前 Term 的条目被提交时,之前 Term 的条目才会被间接提交。
快速回退优化
标准 Raet 论文中,当 AppendEntries 失败时,Leader 会将 nextIndex 递减 1 然后重试。这在日志差异很大时效率极低。上面的实现中加入了冲突快速回退(Fast Rollback)优化:
- Follower 返回
conflictTerm和conflictIndex - Leader 跳过整个冲突的 Term,直接将 nextIndex 设为冲突 Term 之前的位置
这将 O(n) 的回退优化为 O(T),T 是冲突的 Term 数量。
🔒 三、安全性:为什么 Raft 不会丢失已提交的日志
Raft 的安全性保证是整个算法最精妙的部分。五个关键属性确保了正确性:
🏛️ Raft 的五个核心安全属性
| 属性 | 含义 | 保障机制 |
|---|---|---|
| 选举安全性 | 每个 Term 最多一个 Leader | 每节点每 Term 只投一票 |
| Leader 只追加 | Leader 永不删除或覆盖自己的日志 | Leader 不接受覆写请求 |
| 日志匹配 | 相同 Index+Term 的条目内容相同 | AppendEntries 一致性检查 |
| Leader 完整性 | 已提交的条目存在于所有后续 Leader 的日志中 | 选举限制(§5.4) |
| 状态机安全 | 如果一个节点在某 Index 应用了条目,其他节点不会在同 Index 应用不同条目 | 以上四条的组合 |
🔧 实现:状态机快照与成员变更
// raft-快照与成员变更.ts — 快照压缩与动态成员变更
interface Snapshot {
lastIncludedIndex: number
lastIncludedTerm: number
data: Record<string, string> // 快照数据(简化为 KV)
}
interface RaftConfig {
electionTimeoutMin: number
electionTimeoutMax: number
heartbeatInterval: number
snapshotThreshold: number // 日志条目数超过此值时触发快照
}
const DEFAULT_CONFIG: RaftConfig = {
electionTimeoutMin: 150,
electionTimeoutMax: 300,
heartbeatInterval: 50,
snapshotThreshold: 1000,
}
// 生成快照(日志压缩)
function takeSnapshot(node: RaftNode, stateData: Record<string, string>): Snapshot {
const snapshot: Snapshot = {
lastIncludedIndex: node.commitIndex,
lastIncludedTerm: node.commitIndex >= 0 ? node.log[node.commitIndex].term : 0,
data: { ...stateData },
}
// 截断已包含在快照中的日志条目
node.log = node.log.slice(node.commitIndex + 1)
console.log(
`[Node ${node.id}] Snapshot taken: index=${snapshot.lastIncludedIndex}, ` +
`term=${snapshot.lastIncludedTerm}, remaining_logs=${node.log.length}`
)
return snapshot
}
// 动态成员变更(单节点变更法,Raft 论文 §6)
// 注意:Raft 论文推荐一次只变更一个节点,避免出现两个不相交的多数派
function changeMembership(
node: RaftNode,
currentMembers: string[],
newMember: string | null,
removeMember: string | null
): string[] {
let newMembers = [...currentMembers]
if (newMember && !newMembers.includes(newMember)) {
newMembers.push(newMember)
console.log(`[Node ${node.id}] Adding member: ${newMember}, cluster size: ${newMembers.length}`)
}
if (removeMember && newMembers.includes(removeMember)) {
newMembers = newMembers.filter(id => id !== removeMember)
console.log(`[Node ${node.id}] Removing member: ${removeMember}, cluster size: ${newMembers.length}`)
}
// 验证:新旧配置的多数派必须有交集
// 单节点变更法保证了这一点
const oldMajority = Math.floor(currentMembers.length / 2) + 1
const newMajority = Math.floor(newMembers.length / 2) + 1
console.log(
`[Node ${node.id}] Membership change: ${currentMembers.length} nodes (majority=${oldMajority}) → ` +
`${newMembers.length} nodes (majority=${newMajority})`
)
return newMembers
}
// 完整的 Raft 集群初始化
function createRaftCluster(nodeIds: string[]): RaftNode[] {
const nodes = nodeIds.map(id => createRaftNode(id))
const allNodes = [...nodes]
// 为每个节点设置选举超时回调
for (const node of nodes) {
const peers = allNodes.filter(n => n.id !== node.id)
resetElectionTimer(node, () => startElection(node, peers))
}
console.log(`Raft cluster created with ${nodes.length} nodes: [${nodeIds.join(', ')}]`)
console.log(`Majority required: ${Math.floor(nodes.length / 2) + 1}`)
return nodes
}
// 模拟客户端写入
function clientRequest(
leader: RaftNode,
peers: RaftNode[],
command: string
): boolean {
if (leader.state !== 'leader') {
console.log(`[Client] Node ${leader.id} is not a leader, rejecting request`)
return false
}
const entryIndex = leader.log.length
const entry: LogEntry = {
term: leader.currentTerm,
index: entryIndex,
command,
}
leader.log.push(entry)
console.log(`[Leader ${leader.id}] Received command: "${command}" at index ${entryIndex}`)
// 复制到所有 Follower
let successCount = 1 // Leader 自己
for (const peer of peers) {
const response = sendAppendEntries(leader, peer)
if (response.success) {
successCount += 1
leader.matchIndex.set(peer.id, entryIndex)
leader.nextIndex.set(peer.id, entryIndex + 1)
} else {
// 日志不一致,需要回退重试
const currentNext = leader.nextIndex.get(peer.id) ?? entryIndex
leader.nextIndex.set(peer.id, Math.max(0, currentNext - 1))
}
}
const majority = Math.floor((peers.length + 1) / 2) + 1
if (successCount >= majority) {
leader.commitIndex = entryIndex
applyCommittedEntries(leader)
console.log(`[Leader ${leader.id}] Command committed: "${command}" (${successCount}/${peers.length + 1} nodes)`)
return true
}
console.log(`[Leader ${leader.id}] Command NOT committed: "${command}" (${successCount}/${peers.length + 1} nodes)`)
return false
}
⚠️ **警告:**成员变更是 Raft 中最容易出错的部分。一次变更多个节点可能导致集群分裂——新旧配置各自形成不相交的多数派。始终使用单节点变更法(Joint Consensus 的简化版本)。
🔍 四、Raft vs Paxos:工程实践中的取舍
很多人问:为什么不直接用 Paxos?答案在于工程复杂度。
| 维度 | Raft | Multi-Paxos |
|---|---|---|
| 可理解性 | ✅ 高(分解为三个子问题) | ❌ 低(论文模糊、实现差异大) |
| Leader 依赖 | 强依赖 Leader | 可以无 Leader(Flexible Paxos) |
| 成员变更 | 单节点变更 / Joint Consensus | 需要额外机制(如 Multi-Ring Paxos) |
| 日志空洞 | ❌ 不会出现 | ⚠️ 可能出现(需要填充机制) |
| 性能 | 与 Multi-Paxos 相当 | 理论上略优(无 Leader 时可并行提案) |
| 生产采用 | etcd, Consul, CockroachDB, TiKV | Google Chubby, Spanner(变体) |
| 论文清晰度 | ✅ 完整、可实现 | ❌ 原始论文留有大量空白 |
⚡ 关键结论:Raft 在工程实践中完胜 Paxos 的核心原因是可实现性。Ongaro 的论文详细到可以直接转化为代码,而 Paxos 的论文留下了太多实现细节——不同团队的 Paxos 实现之间甚至可能不兼容。
💡 五、生产环境中的 Raft:坑点与最佳实践
⚠️ 常见踩坑
1. 网络分区导致的脑裂
当网络分区发生时,少数派分区中的旧 Leader 会继续接收客户端写入,但由于无法获得多数确认,这些写入永远不会提交。当分区恢复后,旧 Leader 发现更高的 Term,自动退位并回滚未提交的日志。
✅ **正确做法:**客户端应实现重试逻辑,当写入超时后换一个节点重试。 ❌ **避免做法:**假设 Leader 永远可用,不做超时处理。
2. 日志无限增长
如果不做快照,Raft 日志会无限增长。生产环境中必须配置日志压缩(Snapshot)策略。
✅ **正确做法:**当日志条目数超过阈值(如 1000 条)时自动触发快照。 ❌ **避免做法:**不做快照,期望日志永远不会满。
3. 选举风暴
当集群网络抖动时,可能触发大规模重新选举。频繁选举会导致集群短暂不可用。
✅ **正确做法:**使用 Pre-Vote 机制(Raft 论文 §9.6),在发起正式选举前先征求投票意向。 ❌ **避免做法:**设置过短的选举超时,导致网络抖动时频繁触发选举。
🎯 生产部署建议
| 建议 | 说明 |
|---|---|
| 奇数节点 | 始终使用 3、5、7 个节点,避免偶数节点的平票风险 |
| 跨机架部署 | 将节点分布在不同机架/可用区,避免单点故障 |
| 监控 Term 变化 | Term 频繁变化意味着选举风暴,需要排查网络问题 |
| Pre-Vote 启用 | 生产环境必须开启 Pre-Vote,防止被网络分区的节点干扰集群 |
| 合理设置超时 | 根据实际网络延迟调整,跨机房部署需要更大的超时值 |
📝 总结
Raft 共识算法之所以成为分布式系统的基础协议,不仅因为它「可理解」,更因为它将复杂的共识问题分解为三个可以独立验证的子问题。通过本文的 TypeScript 实现,你应该对以下内容有了深入理解:
- ✅ Leader 选举:通过随机超时和 Term 机制保证每个 Term 最多一个 Leader
- ✅ 日志复制:通过 AppendEntries 一致性检查保证日志匹配属性
- ✅ 安全性:通过选举限制保证已提交日志不会丢失
- ✅ 日志压缩:通过快照机制防止日志无限增长
- ✅ 成员变更:通过单节点变更法安全地扩缩集群
💡 **提示:**如果你想在生产中使用 Raft,推荐直接使用成熟的实现而非自己造轮子——etcd 的 Raft 库(Go)、hashicorp/raft(Go)、openraft(Rust)都经过了大规模生产验证。
🔧 相关工具推荐:
- etcd — Kubernetes 的基石,最成熟的 Raft 实现
- hashicorp/raft — HashiCorp 出品的 Go 语言 Raft 库,API 友好
- openraft — Rust 语言的异步 Raft 实现,性能优异
- Jepsen — Kyle Kingsbury 的分布式系统一致性测试框架,用它来验证你的实现
- jsjson.com JSON 格式化工具 — 在调试 Raft 节点间的消息传递时,格式化 JSON 日志能大幅提升排查效率