2024 年,Shopify 披露其核心订单系统每天处理超过 10 亿条事件,全部基于事件溯源(Event Sourcing)架构。这并非偶然——当你需要完整的审计追踪、时间旅行调试、以及高并发下的数据一致性时,传统的 CRUD 快照模式就会捉襟见肘。事件溯源不是银弹,但对于电商订单、金融交易、协作编辑等场景,它带来的可审计性和可扩展性远超传统方案。
本文将从零实现一个完整的 Event Sourcing 引擎,涵盖事件存储、聚合重建、快照优化、CQRS 读写分离等核心机制,并提供可直接运行的代码。
🔐 一、Event Sourcing 核心概念与传统 CRUD 的根本差异
1.1 为什么 CRUD 快照模式会出问题?
传统 CRUD 只存储「当前状态」。当用户把订单金额从 100 改为 150 再改为 120,数据库里只有最终值 120。你无法回答:谁改的?什么时候改的?为什么改?改之前是多少?
// 传统 CRUD:只保留最终状态
{ orderId: "ORD-001", amount: 120, status: "confirmed" }
// 问题:中间经历了什么?无从得知
Event Sourcing 的核心思想是:不存储状态,只存储状态变化的事件(Event)。当前状态通过「重放」所有历史事件得出。
// Event Sourcing:存储所有事件
[
{ type: "OrderCreated", data: { orderId: "ORD-001", amount: 100 } },
{ type: "OrderAmountUpdated", data: { orderId: "ORD-001", newAmount: 150 } },
{ type: "OrderConfirmed", data: { orderId: "ORD-001", amount: 120 } }
]
// 完整的变更历史,一目了然
💡 **提示:**Event Sourcing 不是数据库替代方案,而是数据建模范式的转变。它通常与 CQRS(命令查询职责分离)配合使用。
1.2 事件溯源 vs 快照存储对比
| 维度 | CRUD 快照模式 | Event Sourcing 事件溯源 |
|---|---|---|
| 存储内容 | 当前状态 | 状态变更事件序列 |
| 审计追踪 | 需要额外审计表 | 天然内建,零成本 |
| 时间旅行 | ❌ 不支持 | ✅ 可重建任意历史状态 |
| 调试能力 | 只看最终值 | 完整变更链路 |
| 写入性能 | 单次 UPDATE | 追加写入(append-only),更快 |
| 读取性能 | 直接查询 | 需要重放事件或投影 |
| 存储空间 | 较小 | 较大(需配合快照优化) |
| 适用场景 | 简单 CRUD 应用 | 金融、电商、协作、审计密集型 |
⚠️ **警告:**不要为了「酷」而引入 Event Sourcing。如果你的应用不需要审计追踪、时间旅行、或复杂的状态重建,传统 CRUD 是更简单的选择。
🚀 二、从零实现 Event Sourcing 引擎
2.1 事件定义与聚合根(Aggregate Root)
首先定义事件的基础结构和聚合根。聚合根是 DDD(领域驱动设计)中的核心概念,它是事件的生产者和消费者。
// 定义事件基础接口
interface DomainEvent {
type: string // 事件类型
aggregateId: string // 聚合 ID
timestamp: number // 事件时间戳
version: number // 事件版本号
data: Record<string, unknown> // 事件载荷
}
// 聚合根基类:所有领域对象的父类
abstract class AggregateRoot {
public id: string
public version: number = 0
private uncommittedEvents: DomainEvent[] = []
constructor(id: string) {
this.id = id
}
// 应用事件:更新状态 + 记录未提交事件
protected apply(event: Omit<DomainEvent, 'aggregateId' | 'timestamp' | 'version'>): void {
const fullEvent: DomainEvent = {
...event,
aggregateId: this.id,
timestamp: Date.now(),
version: this.version + 1
}
this.applyEvent(fullEvent) // 更新聚合状态
this.uncommittedEvents.push(fullEvent) // 记录待持久化
this.version++
}
// 子类实现:根据事件类型更新自身状态
protected abstract applyEvent(event: DomainEvent): void
// 获取并清空未提交事件
pullUncommittedEvents(): DomainEvent[] {
const events = [...this.uncommittedEvents]
this.uncommittedEvents = []
return events
}
// 从历史事件重建聚合状态
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.applyEvent(event)
this.version = event.version
}
}
}
2.2 实现一个订单聚合
用一个电商订单场景演示如何使用聚合根:
// 订单状态枚举
type OrderStatus = 'draft' | 'confirmed' | 'shipped' | 'cancelled'
// 订单聚合:继承 AggregateRoot
class OrderAggregate extends AggregateRoot {
public amount: number = 0
public status: OrderStatus = 'draft'
public items: { name: string; qty: number; price: number }[] = []
public confirmedAt?: number
// 命令方法:创建订单
createOrder(items: { name: string; qty: number; price: number }[]): void {
if (this.status !== 'draft') {
throw new Error('只有草稿状态的订单才能修改')
}
const totalAmount = items.reduce((sum, i) => sum + i.qty * i.price, 0)
this.apply({
type: 'OrderCreated',
data: { items, amount: totalAmount }
})
}
// 命令方法:确认订单
confirmOrder(): void {
if (this.status !== 'draft') {
throw new Error('只有草稿状态的订单才能确认')
}
if (this.items.length === 0) {
throw new Error('空订单不能确认')
}
this.apply({
type: 'OrderConfirmed',
data: { confirmedAt: Date.now() }
})
}
// 命令方法:取消订单
cancelOrder(reason: string): void {
if (this.status === 'shipped') {
throw new Error('已发货订单不能取消')
}
this.apply({
type: 'OrderCancelled',
data: { reason }
})
}
// 事件处理器:根据事件类型更新状态
protected applyEvent(event: DomainEvent): void {
switch (event.type) {
case 'OrderCreated':
this.items = event.data.items as typeof this.items
this.amount = event.data.amount as number
this.status = 'draft'
break
case 'OrderConfirmed':
this.status = 'confirmed'
this.confirmedAt = event.data.confirmedAt as number
break
case 'OrderCancelled':
this.status = 'cancelled'
break
}
}
}
2.3 事件存储(Event Store)
事件存储是整个系统的基石,负责持久化事件流。我们用内存实现一个,生产环境可替换为 PostgreSQL、EventStoreDB 等。
// 事件存储:管理所有事件的持久化
class EventStore {
// 内存存储:aggregateId -> 事件列表
private events: Map<string, DomainEvent[]> = new Map()
// 追加事件到指定聚合的事件流
append(aggregateId: string, events: DomainEvent[], expectedVersion: number): void {
const existing = this.events.get(aggregateId) || []
// 乐观并发控制:检查版本号,防止并发写入冲突
if (existing.length !== expectedVersion) {
throw new Error(
`并发冲突:期望版本 ${expectedVersion},实际版本 ${existing.length}`
)
}
this.events.set(aggregateId, [...existing, ...events])
}
// 加载指定聚合的全部事件
loadEvents(aggregateId: string): DomainEvent[] {
return this.events.get(aggregateId) || []
}
// 查询所有事件(用于全局投影)
getAllEvents(): DomainEvent[] {
const all: DomainEvent[] = []
for (const events of this.events.values()) {
all.push(...events)
}
return all.sort((a, b) => a.timestamp - b.timestamp)
}
// 按事件类型查询(用于按类型投影)
getEventsByType(eventType: string): DomainEvent[] {
return this.getAllEvents().filter(e => e.type === eventType)
}
}
📌 **记住:**乐观并发控制(Optimistic Concurrency Control)是 Event Sourcing 的关键机制。
expectedVersion参数确保两个并发操作不会互相覆盖。
💡 三、CQRS 读写分离与快照优化
3.1 CQRS 模式:写模型与读模型分离
Event Sourcing 天然适合 CQRS(Command Query Responsibility Segregation)。写端产生事件,读端消费事件构建专用的查询模型。
// 读模型:订单摘要(为列表查询优化)
interface OrderReadModel {
orderId: string
amount: number
status: OrderStatus
itemCount: number
createdAt: number
confirmedAt?: number
}
// 投影器(Projector):监听事件,更新读模型
class OrderProjector {
private readStore: Map<string, OrderReadModel> = new Map()
// 处理单个事件,更新读模型
handleEvent(event: DomainEvent): void {
switch (event.type) {
case 'OrderCreated': {
const model: OrderReadModel = {
orderId: event.aggregateId,
amount: event.data.amount as number,
status: 'draft',
itemCount: (event.data.items as unknown[]).length,
createdAt: event.timestamp
}
this.readStore.set(event.aggregateId, model)
break
}
case 'OrderConfirmed': {
const model = this.readStore.get(event.aggregateId)
if (model) {
model.status = 'confirmed'
model.confirmedAt = event.data.confirmedAt as number
}
break
}
case 'OrderCancelled': {
const model = this.readStore.get(event.aggregateId)
if (model) {
model.status = 'cancelled'
}
break
}
}
}
// 查询读模型(高性能,无需重放事件)
getOrder(orderId: string): OrderReadModel | undefined {
return this.readStore.get(orderId)
}
// 列表查询(投影后的扁平数据,可建索引)
listOrders(status?: OrderStatus): OrderReadModel[] {
const orders = Array.from(this.readStore.values())
return status ? orders.filter(o => o.status === status) : orders
}
}
3.2 快照优化:解决事件重放性能问题
当一个聚合积累了成千上万条事件时,每次重建都要重放全部事件,性能会急剧下降。快照(Snapshot)机制解决这个问题。
// 快照存储
interface Snapshot {
aggregateId: string
version: number
state: Record<string, unknown>
timestamp: number
}
class SnapshotStore {
private snapshots: Map<string, Snapshot> = new Map()
save(snapshot: Snapshot): void {
this.snapshots.set(snapshot.aggregateId, snapshot)
}
load(aggregateId: string): Snapshot | undefined {
return this.snapshots.get(aggregateId)
}
}
// 带快照的事件仓储
class AggregateRepository {
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore,
private snapshotInterval: number = 50 // 每 50 个事件生成一次快照
) {}
// 加载聚合:先加载快照,再重放后续事件
async load<T extends AggregateRoot>(
aggregateId: string,
factory: (id: string) => T
): Promise<T> {
const aggregate = factory(aggregateId)
const snapshot = this.snapshotStore.load(aggregateId)
let events: DomainEvent[]
if (snapshot) {
// 从快照恢复状态
aggregate.loadFromHistory([{
type: 'SnapshotRestored',
aggregateId,
timestamp: snapshot.timestamp,
version: snapshot.version,
data: snapshot.state
}] as DomainEvent[])
// 只加载快照之后的事件
const allEvents = this.eventStore.loadEvents(aggregateId)
events = allEvents.filter(e => e.version > snapshot.version)
} else {
events = this.eventStore.loadEvents(aggregateId)
}
aggregate.loadFromHistory(events)
return aggregate
}
// 保存聚合:持久化事件 + 条件生成快照
async save<T extends AggregateRoot>(aggregate: T): Promise<void> {
const events = aggregate.pullUncommittedEvents()
if (events.length === 0) return
this.eventStore.append(aggregate.id, events, aggregate.version - events.length)
// 每 N 个事件生成一次快照
if (aggregate.version % this.snapshotInterval === 0) {
this.snapshotStore.save({
aggregateId: aggregate.id,
version: aggregate.version,
state: this.extractState(aggregate),
timestamp: Date.now()
})
}
}
private extractState(aggregate: AggregateRoot): Record<string, unknown> {
// 提取聚合的可序列化状态
const { id, version, ...state } = aggregate as any
return { id, version, ...state }
}
}
3.3 完整运行示例
把所有组件组装起来,演示完整的 Event Sourcing 流程:
// 组装所有组件
const eventStore = new EventStore()
const snapshotStore = new SnapshotStore()
const projector = new OrderProjector()
const repo = new AggregateRepository(eventStore, snapshotStore)
// === 写端:创建并处理订单 ===
const order = new OrderAggregate('ORD-2026-001')
order.createOrder([
{ name: 'MacBook Pro', qty: 1, price: 19999 },
{ name: 'AirPods Pro', qty: 2, price: 1899 }
])
await repo.save(order)
// 确认订单
const loadedOrder = await repo.load('ORD-2026-001', id => new OrderAggregate(id))
loadedOrder.confirmOrder()
await repo.save(loadedOrder)
// === 读端:查询投影数据 ===
// 将事件投射到读模型
const allEvents = eventStore.getAllEvents()
allEvents.forEach(event => projector.handleEvent(event))
// 高性能查询
console.log(projector.listOrders())
// [{ orderId: 'ORD-2026-001', amount: 23797, status: 'confirmed', ... }]
// === 时间旅行:查看历史状态 ===
const historyEvents = eventStore.loadEvents('ORD-2026-001')
const historicalOrder = new OrderAggregate('ORD-2026-001')
historicalOrder.loadFromHistory(historyEvents.slice(0, 1)) // 只加载第一条事件
console.log(historicalOrder.status) // 'draft' — 回到创建时的状态
⚡ 四、生产环境避坑指南
4.1 事件版本演进(Event Versioning)
这是 Event Sourcing 在生产中最大的痛点。当你需要给已有事件添加字段时,旧事件和新事件的 schema 不一致会带来严重问题。
// 事件升级器:将旧版本事件升级到最新 schema
class EventUpgrader {
private upgraders: Map<string, Map<number, (data: any) => any>> = new Map()
// 注册升级路径
register(eventType: string, fromVersion: number, upgrader: (data: any) => any): void {
if (!this.upgraders.has(eventType)) {
this.upgraders.set(eventType, new Map())
}
this.upgraders.get(eventType)!.set(fromVersion, upgrader)
}
// 执行升级
upgrade(event: DomainEvent, targetVersion: number): DomainEvent {
const typeUpgraders = this.upgraders.get(event.type)
if (!typeUpgraders) return event
let current = { ...event }
// 按版本号依次升级
for (let v = 1; v <= targetVersion; v++) {
const upgrader = typeUpgraders.get(v)
if (upgrader) {
current = { ...current, data: upgrader(current.data) }
}
}
return current
}
}
// 示例:OrderCreated v1 -> v2,添加 currency 字段
const upgrader = new EventUpgrader()
upgrader.register('OrderCreated', 2, (data) => ({
...data,
currency: data.currency || 'CNY' // 旧事件默认人民币
}))
⚠️ **警告:**永远不要修改已持久化的事件(immutability)。新字段只能通过 upgrader 添加默认值,不能改变历史事件本身。
4.2 快照策略选择
| 策略 | 适用场景 | 实现复杂度 | 效果 |
|---|---|---|---|
| 固定间隔(每 N 个事件) | 通用场景 | 低 | 稳定,但不感知事件大小 |
| 时间间隔(每 N 小时) | 事件频率波动大 | 低 | 按时间均匀分布 |
| 事件大小阈值 | 事件 payload 差异大 | 中 | 按存储开销优化 |
| 自适应(事件数 × 复杂度) | 高性能要求 | 高 | 最优但实现复杂 |
✅ **推荐做法:**大多数场景下,固定间隔 50-100 个事件生成一次快照是性价比最高的选择。
4.3 常见陷阱总结
- ❌ **避免:**在事件中存储可变引用(如外键 ID 可能变化)
- ❌ **避免:**让事件处理产生副作用(如发邮件)——用异步 Saga 模式
- ❌ **避免:**在一个聚合中产生过多事件——拆分为子聚合
- ✅ **推荐:**事件命名用过去式(OrderCreated 而非 CreateOrder)
- ✅ **推荐:**事件载荷包含足够的上下文,减少重建时的外部查询
- ✅ **推荐:**为事件存储建立全局序列号,支持有序重放
🎯 总结
Event Sourcing 是一种强大的架构模式,但它不是万能钥匙。它的核心价值在于完整的历史记录和灵活的读写分离。如果你的系统需要审计追踪、复杂的状态回溯、或者多种不同的查询视角,Event Sourcing 值得投入。
对于 JavaScript/TypeScript 开发者,推荐以下工具链快速上手:
- 🔧 EventStoreDB — 专业事件存储数据库,原生支持 Event Sourcing
- 🔧 Axon Framework(Java)/ Marten(.NET)— 成熟的 CQRS+ES 框架
- 🔧 Eventuous(.NET)— 轻量级 Event Sourcing 库
- 🔧 PostgreSQL + JSONB — 用关系数据库做事件存储的务实选择
⚡ **关键结论:**从简单开始——先用内存实现理解核心概念,再逐步替换为生产级存储。Event Sourcing 的复杂度不在实现,而在事件设计和版本管理。