从零实现 Raft 共识算法:Leader 选举、日志复制与安全性全解析

深入 Raft 共识算法核心原理,用 TypeScript 从零实现 Leader 选举、日志复制与安全性机制。覆盖 etcd、CockroachDB 底层协议,附完整可运行代码与性能对比。

数据结构与算法 2026-06-10 15 分钟

你可能每天都在使用 Raft 共识算法——当你执行 etcdctl put 写入一个键值、在 CockroachDB 中提交一笔事务、或让 Consul 进行服务发现时,背后都是 Raft 在保证多个节点之间的数据一致性。Raft 共识算法是当今分布式系统中最广泛采用的一致性协议,被 etcd、Consul、CockroachDB、TiKV、RethinkDB 等数十个生产系统使用。Diego Ongaro 在 2014 年的博士论文中提出 Raft 时,核心目标就是「让人能理解的共识算法」——但真正动手实现一遍,你才能体会到「可理解」和「简单」之间的巨大鸿沟。

本文将用 TypeScript 从零实现一个功能完整的 Raft 节点,覆盖 Leader 选举、日志复制、安全性保证三大核心模块,并给出与 Paxos 的对比分析。

📌 **记住:**Raft 的核心设计哲学是「分解问题」——将共识拆解为 Leader 选举(Leader Election)、日志复制(Log Replication)、安全性(Safety)三个独立子问题,每个子问题都可以独立理解和验证。

🗳️ 一、Leader 选举:心跳、超时与 Term 机制

Raft 集群中的每个节点在任意时刻处于三种状态之一:Follower(跟随者)、Candidate(候选者)、Leader(领导者)。Leader 选举是 Raft 的第一个核心机制——没有 Leader,就无法进行日志复制。

⏱️ 选举超时与随机化

Raft 使用「选举超时」(Election Timeout)触发选举。每个 Follower 维护一个随机化的超时计时器(通常 150ms-300ms),如果在超时时间内没有收到 Leader 的心跳(AppendEntries RPC),就会转变为 Candidate 并发起选举。

⚠️ **警告:**选举超时的随机化是 Raft 正确运作的关键。如果所有节点使用相同的超时值,会出现「split vote」(平票)的活锁问题——所有节点同时发起选举,没有任何一个获得多数票。

Term 机制:逻辑时钟

Raft 引入了 Term(任期)的概念,它是一个递增的整数,充当逻辑时钟。每个 Term 最多有一个 Leader。当 Candidate 发起选举时,它将自己的 Term +1,然后向所有节点发送 RequestVote RPC。

节点收到 RequestVote 时,遵循两个规则:

  • ✅ 如果候选人的 Term ≥ 自己的 Term,且日志至少和自己一样新,就投票
  • ❌ 如果已经投过票(在同一 Term 内),就拒绝

🔧 TypeScript 实现:选举核心逻辑

// raft-选举.ts — Raft 节点状态与选举核心逻辑

type NodeState = 'follower' | 'candidate' | 'leader'

interface LogEntry {
  term: number
  index: number
  command: string
}

interface RaftNode {
  id: string
  currentTerm: number
  votedFor: string | null
  log: LogEntry[]
  state: NodeState
  commitIndex: number
  lastApplied: number
  // Leader 专用
  nextIndex: Map<string, number>
  matchIndex: Map<string, number>
  // 计时器
  electionTimer: ReturnType<typeof setTimeout> | null
  heartbeatTimer: ReturnType<typeof setInterval> | null
}

// 创建一个新的 Raft 节点
function createRaftNode(id: string): RaftNode {
  return {
    id,
    currentTerm: 0,
    votedFor: null,
    log: [],
    state: 'follower',
    commitIndex: -1,
    lastApplied: -1,
    nextIndex: new Map(),
    matchIndex: new Map(),
    electionTimer: null,
    heartbeatTimer: null,
  }
}

// 随机选举超时(150-300ms)
function randomElectionTimeout(): number {
  return 150 + Math.floor(Math.random() * 150)
}

// 重置选举计时器
function resetElectionTimer(node: RaftNode, onTimeout: () => void): void {
  if (node.electionTimer) clearTimeout(node.electionTimer)
  node.electionTimer = setTimeout(onTimeout, randomElectionTimeout())
}

// 发起选举
function startElection(node: RaftNode, peers: RaftNode[]): void {
  node.currentTerm += 1
  node.state = 'candidate'
  node.votedFor = node.id

  let votesReceived = 1 // 自己投自己
  const totalNodes = peers.length + 1
  const majority = Math.floor(totalNodes / 2) + 1

  const lastLogIndex = node.log.length - 1
  const lastLogTerm = lastLogIndex >= 0 ? node.log[lastLogIndex].term : 0

  // 向所有 Peer 发送 RequestVote
  for (const peer of peers) {
    const granted = handleRequestVote(peer, {
      term: node.currentTerm,
      candidateId: node.id,
      lastLogIndex,
      lastLogTerm,
    })

    if (granted) {
      votesReceived += 1
      if (votesReceived >= majority) {
        becomeLeader(node, peers)
        return
      }
    }
  }

  // 没有获得多数票,重置为 Follower
  if (votesReceived < majority) {
    node.state = 'follower'
    node.votedFor = null
  }
}

// 处理 RequestVote RPC
function handleRequestVote(
  node: RaftNode,
  request: {
    term: number
    candidateId: string
    lastLogIndex: number
    lastLogTerm: number
  }
): boolean {
  // 如果请求的 Term 小于当前 Term,拒绝
  if (request.term < node.currentTerm) return false

  // 如果请求的 Term 更大,更新自己的 Term
  if (request.term > node.currentTerm) {
    node.currentTerm = request.term
    node.state = 'follower'
    node.votedFor = null
  }

  // 检查是否可以投票
  const canVote = node.votedFor === null || node.votedFor === request.candidateId
  const lastLogIndex = node.log.length - 1
  const lastLogTerm = lastLogIndex >= 0 ? node.log[lastLogIndex].term : 0

  // 日志至少一样新(Raft 论文 §5.4.1)
  const logIsUpToDate =
    request.lastLogTerm > lastLogTerm ||
    (request.lastLogTerm === lastLogTerm && request.lastLogIndex >= lastLogIndex)

  if (canVote && logIsUpToDate) {
    node.votedFor = request.candidateId
    resetElectionTimer(node, () => startElection(node, []))
    return true
  }

  return false
}

// 成为 Leader
function becomeLeader(node: RaftNode, peers: RaftNode[]): void {
  node.state = 'leader'
  if (node.electionTimer) clearTimeout(node.electionTimer)

  // 初始化 Leader 的 nextIndex 和 matchIndex
  const lastLogIndex = node.log.length
  for (const peer of peers) {
    node.nextIndex.set(peer.id, lastLogIndex)
    node.matchIndex.set(peer.id, 0)
  }

  // 立即发送心跳
  sendHeartbeat(node, peers)

  // 定期发送心跳(间隔 50ms)
  node.heartbeatTimer = setInterval(() => sendHeartbeat(node, peers), 50)
}

⚡ **关键结论:**Raft 的选举安全性(Election Safety)保证在一个 Term 内最多选出一个 Leader。这是通过每个节点在同一 Term 内只能投一票来实现的。

📊 选举超时参数对比

参数 推荐值 过短的风险 过长的影响
选举超时 150-300ms 频繁选举、split vote 故障恢复慢
心跳间隔 50-100ms 网络带宽浪费 被误判为故障
选举超时/心跳比 ≥3:1

📋 二、日志复制:AppendEntries 与提交机制

Leader 选出后,所有客户端写请求都由 Leader 处理。Leader 将命令追加到自己的日志中,然后通过 AppendEntries RPC 并行地复制给所有 Follower。当大多数节点确认后,该日志条目就被「提交」(committed),Leader 将结果应用到状态机并通知客户端。

日志结构与匹配属性

Raft 日志由一系列编号的条目(Log Entry)组成,每个条目包含:

  • Term:该条目被创建时的 Leader Term
  • Index:在日志中的位置
  • Command:要应用到状态机的命令

Raft 保证以下日志匹配属性(Log Matching Property):

  1. 如果两个不同日志中的条目有相同的 Index 和 Term,那么它们存储相同的命令
  2. 如果两个不同日志中的条目有相同的 Index 和 Term,那么它们之前的所有条目也完全相同

🔧 实现日志复制

// raft-日志复制.ts — AppendEntries 处理与提交逻辑

interface AppendEntriesRequest {
  term: number
  leaderId: string
  prevLogIndex: number
  prevLogTerm: number
  entries: LogEntry[]
  leaderCommit: number
}

interface AppendEntriesResponse {
  term: number
  success: boolean
  conflictIndex?: number  // 用于快速回退优化
  conflictTerm?: number
}

// Leader 发送 AppendEntries 给一个 Follower
function sendAppendEntries(
  leader: RaftNode,
  follower: RaftNode
): AppendEntriesResponse {
  const nextIdx = leader.nextIndex.get(follower.id) ?? 0
  const prevLogIndex = nextIdx - 1
  const prevLogTerm = prevLogIndex >= 0 ? leader.log[prevLogIndex].term : 0

  // 获取需要发送的日志条目
  const entries = leader.log.slice(nextIdx)

  return handleAppendEntries(follower, {
    term: leader.currentTerm,
    leaderId: leader.id,
    prevLogIndex,
    prevLogTerm,
    entries,
    leaderCommit: leader.commitIndex,
  })
}

// Follower 处理 AppendEntries RPC
function handleAppendEntries(
  node: RaftNode,
  request: AppendEntriesRequest
): AppendEntriesResponse {
  // 1. 如果 Term < currentTerm,拒绝
  if (request.term < node.currentTerm) {
    return { term: node.currentTerm, success: false }
  }

  // 收到合法 Leader 的消息,重置选举计时器
  node.currentTerm = request.term
  node.state = 'follower'
  node.votedFor = null
  resetElectionTimer(node, () => startElection(node, []))

  // 2. 日志一致性检查(Log Matching Property)
  if (request.prevLogIndex >= 0) {
    if (request.prevLogIndex >= node.log.length) {
      // 日志太短,缺少 prevLogIndex 处的条目
      return {
        term: node.currentTerm,
        success: false,
        conflictIndex: node.log.length,
        conflictTerm: 0,
      }
    }

    const existingEntry = node.log[request.prevLogIndex]
    if (existingEntry.term !== request.prevLogTerm) {
      // Term 不匹配,需要回退
      // 找到该 Term 的第一个条目(快速回退优化)
      let conflictIndex = request.prevLogIndex
      while (conflictIndex > 0 && node.log[conflictIndex - 1]?.term === existingEntry.term) {
        conflictIndex -= 1
      }

      return {
        term: node.currentTerm,
        success: false,
        conflictIndex,
        conflictTerm: existingEntry.term,
      }
    }
  }

  // 3. 追加新条目(如果与现有条目冲突,截断后追加)
  for (const entry of request.entries) {
    const existingIndex = entry.index
    if (existingIndex < node.log.length && node.log[existingIndex].term !== entry.term) {
      // 冲突:删除此条目及之后的所有条目
      node.log = node.log.slice(0, existingIndex)
    }
    if (existingIndex >= node.log.length) {
      node.log.push({ ...entry })
    }
  }

  // 4. 更新 commitIndex
  if (request.leaderCommit > node.commitIndex) {
    node.commitIndex = Math.min(request.leaderCommit, node.log.length - 1)
    applyCommittedEntries(node)
  }

  return { term: node.currentTerm, success: true }
}

// Leader 更新 commitIndex(Raft 论文 §5.4.2)
function leaderAdvanceCommitIndex(leader: RaftNode, peers: RaftNode[]): void {
  const majority = Math.floor((peers.length + 1) / 2) + 1

  // 从最新日志往前找,找到一个被多数节点复制的条目
  for (let n = leader.log.length - 1; n > leader.commitIndex; n -= 1) {
    if (leader.log[n].term !== leader.currentTerm) continue // 只提交当前 Term 的日志

    let replicatedCount = 1 // 自己
    for (const peer of peers) {
      if ((leader.matchIndex.get(peer.id) ?? 0) >= n) {
        replicatedCount += 1
      }
    }

    if (replicatedCount >= majority) {
      leader.commitIndex = n
      applyCommittedEntries(leader)
      break
    }
  }
}

// 将已提交的日志应用到状态机
function applyCommittedEntries(node: RaftNode): void {
  while (node.lastApplied < node.commitIndex) {
    node.lastApplied += 1
    const entry = node.log[node.lastApplied]
    // 应用到状态机(此处简化为日志输出)
    console.log(`[Node ${node.id}] Applied: ${entry.command} (term=${entry.term})`)
  }
}

💡 **提示:**Raft 只提交当前 Term 的日志条目,这是安全性(Safety)的关键保证。Leader 不会直接提交之前 Term 的日志——只有当当前 Term 的条目被提交时,之前 Term 的条目才会被间接提交。

快速回退优化

标准 Raet 论文中,当 AppendEntries 失败时,Leader 会将 nextIndex 递减 1 然后重试。这在日志差异很大时效率极低。上面的实现中加入了冲突快速回退(Fast Rollback)优化:

  • Follower 返回 conflictTermconflictIndex
  • Leader 跳过整个冲突的 Term,直接将 nextIndex 设为冲突 Term 之前的位置

这将 O(n) 的回退优化为 O(T),T 是冲突的 Term 数量。

🔒 三、安全性:为什么 Raft 不会丢失已提交的日志

Raft 的安全性保证是整个算法最精妙的部分。五个关键属性确保了正确性:

🏛️ Raft 的五个核心安全属性

属性 含义 保障机制
选举安全性 每个 Term 最多一个 Leader 每节点每 Term 只投一票
Leader 只追加 Leader 永不删除或覆盖自己的日志 Leader 不接受覆写请求
日志匹配 相同 Index+Term 的条目内容相同 AppendEntries 一致性检查
Leader 完整性 已提交的条目存在于所有后续 Leader 的日志中 选举限制(§5.4)
状态机安全 如果一个节点在某 Index 应用了条目,其他节点不会在同 Index 应用不同条目 以上四条的组合

🔧 实现:状态机快照与成员变更

// raft-快照与成员变更.ts — 快照压缩与动态成员变更

interface Snapshot {
  lastIncludedIndex: number
  lastIncludedTerm: number
  data: Record<string, string> // 快照数据(简化为 KV)
}

interface RaftConfig {
  electionTimeoutMin: number
  electionTimeoutMax: number
  heartbeatInterval: number
  snapshotThreshold: number  // 日志条目数超过此值时触发快照
}

const DEFAULT_CONFIG: RaftConfig = {
  electionTimeoutMin: 150,
  electionTimeoutMax: 300,
  heartbeatInterval: 50,
  snapshotThreshold: 1000,
}

// 生成快照(日志压缩)
function takeSnapshot(node: RaftNode, stateData: Record<string, string>): Snapshot {
  const snapshot: Snapshot = {
    lastIncludedIndex: node.commitIndex,
    lastIncludedTerm: node.commitIndex >= 0 ? node.log[node.commitIndex].term : 0,
    data: { ...stateData },
  }

  // 截断已包含在快照中的日志条目
  node.log = node.log.slice(node.commitIndex + 1)

  console.log(
    `[Node ${node.id}] Snapshot taken: index=${snapshot.lastIncludedIndex}, ` +
    `term=${snapshot.lastIncludedTerm}, remaining_logs=${node.log.length}`
  )

  return snapshot
}

// 动态成员变更(单节点变更法,Raft 论文 §6)
// 注意:Raft 论文推荐一次只变更一个节点,避免出现两个不相交的多数派
function changeMembership(
  node: RaftNode,
  currentMembers: string[],
  newMember: string | null,
  removeMember: string | null
): string[] {
  let newMembers = [...currentMembers]

  if (newMember && !newMembers.includes(newMember)) {
    newMembers.push(newMember)
    console.log(`[Node ${node.id}] Adding member: ${newMember}, cluster size: ${newMembers.length}`)
  }

  if (removeMember && newMembers.includes(removeMember)) {
    newMembers = newMembers.filter(id => id !== removeMember)
    console.log(`[Node ${node.id}] Removing member: ${removeMember}, cluster size: ${newMembers.length}`)
  }

  // 验证:新旧配置的多数派必须有交集
  // 单节点变更法保证了这一点
  const oldMajority = Math.floor(currentMembers.length / 2) + 1
  const newMajority = Math.floor(newMembers.length / 2) + 1
  console.log(
    `[Node ${node.id}] Membership change: ${currentMembers.length} nodes (majority=${oldMajority}) → ` +
    `${newMembers.length} nodes (majority=${newMajority})`
  )

  return newMembers
}

// 完整的 Raft 集群初始化
function createRaftCluster(nodeIds: string[]): RaftNode[] {
  const nodes = nodeIds.map(id => createRaftNode(id))
  const allNodes = [...nodes]

  // 为每个节点设置选举超时回调
  for (const node of nodes) {
    const peers = allNodes.filter(n => n.id !== node.id)
    resetElectionTimer(node, () => startElection(node, peers))
  }

  console.log(`Raft cluster created with ${nodes.length} nodes: [${nodeIds.join(', ')}]`)
  console.log(`Majority required: ${Math.floor(nodes.length / 2) + 1}`)

  return nodes
}

// 模拟客户端写入
function clientRequest(
  leader: RaftNode,
  peers: RaftNode[],
  command: string
): boolean {
  if (leader.state !== 'leader') {
    console.log(`[Client] Node ${leader.id} is not a leader, rejecting request`)
    return false
  }

  const entryIndex = leader.log.length
  const entry: LogEntry = {
    term: leader.currentTerm,
    index: entryIndex,
    command,
  }

  leader.log.push(entry)
  console.log(`[Leader ${leader.id}] Received command: "${command}" at index ${entryIndex}`)

  // 复制到所有 Follower
  let successCount = 1 // Leader 自己
  for (const peer of peers) {
    const response = sendAppendEntries(leader, peer)
    if (response.success) {
      successCount += 1
      leader.matchIndex.set(peer.id, entryIndex)
      leader.nextIndex.set(peer.id, entryIndex + 1)
    } else {
      // 日志不一致,需要回退重试
      const currentNext = leader.nextIndex.get(peer.id) ?? entryIndex
      leader.nextIndex.set(peer.id, Math.max(0, currentNext - 1))
    }
  }

  const majority = Math.floor((peers.length + 1) / 2) + 1
  if (successCount >= majority) {
    leader.commitIndex = entryIndex
    applyCommittedEntries(leader)
    console.log(`[Leader ${leader.id}] Command committed: "${command}" (${successCount}/${peers.length + 1} nodes)`)
    return true
  }

  console.log(`[Leader ${leader.id}] Command NOT committed: "${command}" (${successCount}/${peers.length + 1} nodes)`)
  return false
}

⚠️ **警告:**成员变更是 Raft 中最容易出错的部分。一次变更多个节点可能导致集群分裂——新旧配置各自形成不相交的多数派。始终使用单节点变更法(Joint Consensus 的简化版本)。

🔍 四、Raft vs Paxos:工程实践中的取舍

很多人问:为什么不直接用 Paxos?答案在于工程复杂度

维度 Raft Multi-Paxos
可理解性 ✅ 高(分解为三个子问题) ❌ 低(论文模糊、实现差异大)
Leader 依赖 强依赖 Leader 可以无 Leader(Flexible Paxos)
成员变更 单节点变更 / Joint Consensus 需要额外机制(如 Multi-Ring Paxos)
日志空洞 ❌ 不会出现 ⚠️ 可能出现(需要填充机制)
性能 与 Multi-Paxos 相当 理论上略优(无 Leader 时可并行提案)
生产采用 etcd, Consul, CockroachDB, TiKV Google Chubby, Spanner(变体)
论文清晰度 ✅ 完整、可实现 ❌ 原始论文留有大量空白

关键结论:Raft 在工程实践中完胜 Paxos 的核心原因是可实现性。Ongaro 的论文详细到可以直接转化为代码,而 Paxos 的论文留下了太多实现细节——不同团队的 Paxos 实现之间甚至可能不兼容。

💡 五、生产环境中的 Raft:坑点与最佳实践

⚠️ 常见踩坑

1. 网络分区导致的脑裂

当网络分区发生时,少数派分区中的旧 Leader 会继续接收客户端写入,但由于无法获得多数确认,这些写入永远不会提交。当分区恢复后,旧 Leader 发现更高的 Term,自动退位并回滚未提交的日志。

✅ **正确做法:**客户端应实现重试逻辑,当写入超时后换一个节点重试。 ❌ **避免做法:**假设 Leader 永远可用,不做超时处理。

2. 日志无限增长

如果不做快照,Raft 日志会无限增长。生产环境中必须配置日志压缩(Snapshot)策略。

✅ **正确做法:**当日志条目数超过阈值(如 1000 条)时自动触发快照。 ❌ **避免做法:**不做快照,期望日志永远不会满。

3. 选举风暴

当集群网络抖动时,可能触发大规模重新选举。频繁选举会导致集群短暂不可用。

✅ **正确做法:**使用 Pre-Vote 机制(Raft 论文 §9.6),在发起正式选举前先征求投票意向。 ❌ **避免做法:**设置过短的选举超时,导致网络抖动时频繁触发选举。

🎯 生产部署建议

建议 说明
奇数节点 始终使用 3、5、7 个节点,避免偶数节点的平票风险
跨机架部署 将节点分布在不同机架/可用区,避免单点故障
监控 Term 变化 Term 频繁变化意味着选举风暴,需要排查网络问题
Pre-Vote 启用 生产环境必须开启 Pre-Vote,防止被网络分区的节点干扰集群
合理设置超时 根据实际网络延迟调整,跨机房部署需要更大的超时值

📝 总结

Raft 共识算法之所以成为分布式系统的基础协议,不仅因为它「可理解」,更因为它将复杂的共识问题分解为三个可以独立验证的子问题。通过本文的 TypeScript 实现,你应该对以下内容有了深入理解:

  1. Leader 选举:通过随机超时和 Term 机制保证每个 Term 最多一个 Leader
  2. 日志复制:通过 AppendEntries 一致性检查保证日志匹配属性
  3. 安全性:通过选举限制保证已提交日志不会丢失
  4. 日志压缩:通过快照机制防止日志无限增长
  5. 成员变更:通过单节点变更法安全地扩缩集群

💡 **提示:**如果你想在生产中使用 Raft,推荐直接使用成熟的实现而非自己造轮子——etcd 的 Raft 库(Go)、hashicorp/raft(Go)、openraft(Rust)都经过了大规模生产验证。

🔧 相关工具推荐:

  • etcd — Kubernetes 的基石,最成熟的 Raft 实现
  • hashicorp/raft — HashiCorp 出品的 Go 语言 Raft 库,API 友好
  • openraft — Rust 语言的异步 Raft 实现,性能优异
  • Jepsen — Kyle Kingsbury 的分布式系统一致性测试框架,用它来验证你的实现
  • jsjson.com JSON 格式化工具 — 在调试 Raft 节点间的消息传递时,格式化 JSON 日志能大幅提升排查效率

📚 相关文章