🏗️ 一、为什么你需要了解 CQRS 与 Event Sourcing
在一个日活百万的电商系统中,一次「下单」操作会触发库存扣减、积分计算、物流分配、消息推送等十几个业务动作。如果把这些逻辑全部塞进一个 createOrder() 方法,代码会迅速膨胀到不可维护的地步。CQRS(Command Query Responsibility Segregation,命令查询职责分离) 和 Event Sourcing(事件溯源) 正是为解决这类复杂业务场景而生的架构模式。
但这两个概念在社区中被严重「神话」了——有人把它当成银弹,有人觉得过度设计。本文不讲理论空话,只讲实战中真正有用的模式、代码和踩坑经验。
📌 记住: CQRS 和 Event Sourcing 是两个独立的模式,可以单独使用,也可以组合使用。不要被「必须一起用」的说法误导。
🔀 二、CQRS:把「写」和「读」彻底分开
📐 核心思想
传统 CRUD 架构中,同一个数据模型同时承担读和写职责。这在业务简单时没问题,但当读写比例严重失衡(比如 100:1)且读写的数据结构差异很大时,单一模型就成了性能瓶颈。
CQRS 的核心思想很简单:用不同的模型处理命令(写操作)和查询(读操作)。
| 维度 | 命令侧(Write) | 查询侧(Read) |
|---|---|---|
| 职责 | 处理业务逻辑,验证规则 | 返回数据,优化展示 |
| 数据模型 | 富领域模型,包含行为 | 扁平 DTO,面向 UI |
| 一致性 | 强一致性 | 最终一致性 |
| 存储 | 关系型数据库 | 可以是 Redis、ES、物化视图 |
| 优化方向 | 事务完整性 | 查询速度 |
🔧 TypeScript 实现:从单体到 CQRS
先看一个典型的「反模式」——把读写逻辑混在一起:
// ❌ 反模式:读写逻辑混在一起,职责不清晰
class OrderService {
async createOrder(userId: string, items: CartItem[]): Promise<Order> {
// 验证库存
// 计算价格
// 创建订单
// 扣减库存
// 发送消息
const order = await this.orderRepo.save(newOrder);
return order; // 返回完整领域对象
}
async getOrder(orderId: string): Promise<Order> {
return this.orderRepo.findById(orderId); // 查询也返回领域对象
}
async getOrderList(userId: string, page: number): Promise<Order[]> {
// 列表查询其实只需要几个字段,却返回了完整的 Order 对象
return this.orderRepo.findByUser(userId, page);
}
}
用 CQRS 改造后:
// ✅ CQRS:命令和查询彻底分离
// 命令侧 —— 处理业务逻辑
class CreateOrderCommand {
constructor(
public readonly userId: string,
public readonly items: Array<{ productId: string; quantity: number; price: number }>
) {}
}
class CreateOrderHandler {
constructor(
private readonly inventoryService: InventoryService,
private readonly orderRepository: OrderRepository,
private readonly eventBus: EventBus
) {}
async execute(command: CreateOrderCommand): Promise<string> {
// 1. 验证库存
for (const item of command.items) {
const available = await this.inventoryService.check(item.productId);
if (available < item.quantity) {
throw new InsufficientStockError(item.productId);
}
}
// 2. 创建订单(领域逻辑)
const order = Order.create(command.userId, command.items);
// 3. 持久化
await this.orderRepository.save(order);
// 4. 发布领域事件
await this.eventBus.publish(new OrderCreatedEvent(order.id, order.totalAmount));
return order.id; // 命令只返回 ID,不返回完整对象
}
}
// 查询侧 —— 只负责数据展示
interface OrderListItem {
id: string;
status: string;
totalAmount: number;
createdAt: string;
itemCount: number;
}
class OrderQueryService {
constructor(private readonly readDb: ReadDatabase) {}
async getOrderList(userId: string, page: number): Promise<OrderListItem[]> {
// 直接从读库查询,SQL 针对列表展示优化
return this.readDb.query(`
SELECT id, status, total_amount, created_at, item_count
FROM order_list_view
WHERE user_id = $1
ORDER BY created_at DESC
LIMIT 20 OFFSET $2
`, [userId, (page - 1) * 20]);
}
async getOrderDetail(orderId: string): Promise<OrderDetailDTO> {
// 详情查询走另一个视图,包含关联数据
return this.readDb.queryOne(`
SELECT * FROM order_detail_view WHERE id = $1
`, [orderId]);
}
}
💡 提示: 命令返回 ID 而非完整对象,是一个重要的设计原则。它迫使调用方在需要数据时主动查询,避免了命令和查询的隐式耦合。
📜 三、Event Sourcing:用事件流替代状态快照
🧠 核心思想
传统方式只存储「当前状态」——订单状态从「待支付」变成「已支付」,数据库里的旧值就被覆盖了。这意味着你丢失了「谁在什么时间做了什么」的完整历史。
Event Sourcing 的做法完全不同:不存储状态,只存储导致状态变化的事件。当前状态通过对事件流的重放(Replay)来重建。
// ❌ 传统方式:只存当前状态,历史丢失
interface Order {
id: string;
status: 'pending' | 'paid' | 'shipped' | 'delivered' | 'cancelled';
totalAmount: number;
updatedAt: Date;
}
// ✅ Event Sourcing:存储完整的事件流
interface OrderEvent {
type: string;
orderId: string;
timestamp: Date;
payload: Record<string, unknown>;
}
// 事件流示例
const orderEvents: OrderEvent[] = [
{ type: 'OrderCreated', orderId: 'ORD-001', timestamp: new Date('2026-01-01'),
payload: { userId: 'U1', items: [...], total: 299 } },
{ type: 'OrderPaid', orderId: 'ORD-001', timestamp: new Date('2026-01-01'),
payload: { paymentId: 'PAY-001', method: 'alipay' } },
{ type: 'OrderShipped', orderId: 'ORD-001', timestamp: new Date('2026-01-03'),
payload: { trackingNo: 'SF1234567890' } },
{ type: 'OrderDelivered', orderId: 'ORD-001', timestamp: new Date('2026-01-05'),
payload: { signedBy: '本人签收' } },
];
🔧 完整实现:聚合根 + 事件存储
// 事件基类
abstract class DomainEvent {
abstract readonly type: string;
readonly occurredAt: Date = new Date();
readonly eventId: string = crypto.randomUUID();
}
// 具体事件
class OrderCreatedEvent extends DomainEvent {
readonly type = 'OrderCreated';
constructor(
public readonly orderId: string,
public readonly userId: string,
public readonly items: Array<{ productId: string; quantity: number; price: number }>,
public readonly totalAmount: number
) { super(); }
}
class OrderPaidEvent extends DomainEvent {
readonly type = 'OrderPaid';
constructor(
public readonly orderId: string,
public readonly paymentId: string,
public readonly amount: number
) { super(); }
}
class OrderShippedEvent extends DomainEvent {
readonly type = 'OrderShipped';
constructor(
public readonly orderId: string,
public readonly trackingNo: string,
public readonly carrier: string
) { super(); }
}
// 聚合根:通过事件重放构建状态
class OrderAggregate {
private uncommittedEvents: DomainEvent[] = [];
// 当前状态(通过事件重放得到)
public id!: string;
public userId!: string;
public status: string = 'init';
public totalAmount: number = 0;
public items: Array<{ productId: string; quantity: number; price: number }> = [];
public trackingNo?: string;
// 命令方法:验证业务规则后产生事件
static create(userId: string, items: Array<{ productId: string; quantity: number; price: number }>): OrderAggregate {
if (items.length === 0) throw new Error('订单至少需要一个商品');
const order = new OrderAggregate();
const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
order.apply(new OrderCreatedEvent(
crypto.randomUUID(), userId, items, totalAmount
));
return order;
}
pay(paymentId: string, amount: number): void {
if (this.status !== 'created') {
throw new Error(`订单状态 ${this.status} 不允许支付`);
}
if (amount !== this.totalAmount) {
throw new Error(`支付金额 ${amount} 与订单金额 ${this.totalAmount} 不匹配`);
}
this.apply(new OrderPaidEvent(this.id, paymentId, amount));
}
ship(trackingNo: string, carrier: string): void {
if (this.status !== 'paid') {
throw new Error(`订单状态 ${this.status} 不允许发货`);
}
this.apply(new OrderShippedEvent(this.id, trackingNo, carrier));
}
// 事件应用:更新状态 + 记录未提交事件
private apply(event: DomainEvent): void {
this.applyEvent(event); // 更新内存状态
this.uncommittedEvents.push(event); // 记录待持久化事件
}
// 事件处理器:每个事件类型对应一个状态变更逻辑
private applyEvent(event: DomainEvent): void {
switch (event.type) {
case 'OrderCreated':
const created = event as OrderCreatedEvent;
this.id = created.orderId;
this.userId = created.userId;
this.items = created.items;
this.totalAmount = created.totalAmount;
this.status = 'created';
break;
case 'OrderPaid':
this.status = 'paid';
break;
case 'OrderShipped':
const shipped = event as OrderShippedEvent;
this.trackingNo = shipped.trackingNo;
this.status = 'shipped';
break;
}
}
// 从事件流重建聚合
static fromEvents(events: DomainEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
for (const event of events) {
aggregate.applyEvent(event);
}
aggregate.uncommittedEvents = []; // 历史事件不算未提交
return aggregate;
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
markEventsCommitted(): void {
this.uncommittedEvents = [];
}
}
// 事件存储接口
interface EventStore {
append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
load(aggregateId: string): Promise<DomainEvent[]>;
}
// 基于 PostgreSQL 的事件存储实现
class PostgresEventStore implements EventStore {
constructor(private readonly pool: Pool) {}
async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 乐观锁:检查当前版本
const { rows } = await client.query(
'SELECT MAX(version) as version FROM events WHERE aggregate_id = $1',
[aggregateId]
);
const currentVersion = rows[0]?.version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`预期版本 ${expectedVersion},实际版本 ${currentVersion}`
);
}
// 批量插入事件
for (let i = 0; i < events.length; i++) {
const event = events[i];
await client.query(
`INSERT INTO events (aggregate_id, event_type, payload, version, occurred_at)
VALUES ($1, $2, $3, $4, $5)`,
[aggregateId, event.type, JSON.stringify(event), expectedVersion + i + 1, event.occurredAt]
);
}
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
async load(aggregateId: string): Promise<DomainEvent[]> {
const { rows } = await this.pool.query(
'SELECT event_type, payload FROM events WHERE aggregate_id = $1 ORDER BY version ASC',
[aggregateId]
);
return rows.map(row => JSON.parse(row.payload));
}
}
⚠️ 警告: 乐观锁(Optimistic Locking)是 Event Sourcing 的核心保障。没有版本检查,并发写入会导致事件流错乱,这种 Bug 在生产环境中极难排查。
📊 快照优化:解决事件流过长的性能问题
当一个聚合的事件数量超过几百条时,每次重放都会变慢。快照(Snapshot)机制可以解决这个问题:
class SnapshotStore {
constructor(private readonly pool: Pool) {}
async save(aggregateId: string, state: object, version: number): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (aggregate_id, state, version, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (aggregate_id) DO UPDATE
SET state = $2, version = $3, created_at = NOW()`,
[aggregateId, JSON.stringify(state), version]
);
}
async load(aggregateId: string): Promise<{ state: object; version: number } | null> {
const { rows } = await this.pool.query(
'SELECT state, version FROM snapshots WHERE aggregate_id = $1',
[aggregateId]
);
return rows[0] ? { state: JSON.parse(rows[0].state), version: rows[0].version } : null;
}
}
// 加载聚合时优先使用快照
async function loadOrderWithSnapshot(
aggregateId: string,
eventStore: EventStore,
snapshotStore: SnapshotStore
): Promise<OrderAggregate> {
// 1. 尝试加载快照
const snapshot = await snapshotStore.load(aggregateId);
let aggregate: OrderAggregate;
let fromVersion: number;
if (snapshot) {
// 从快照恢复状态
aggregate = new OrderAggregate();
Object.assign(aggregate, snapshot.state);
fromVersion = snapshot.version;
} else {
aggregate = new OrderAggregate();
fromVersion = 0;
}
// 2. 加载快照之后的增量事件
const allEvents = await eventStore.load(aggregateId);
const incrementalEvents = allEvents.slice(fromVersion);
for (const event of incrementalEvents) {
aggregate.applyEvent(event);
}
// 3. 每 100 个事件保存一次快照
if (allEvents.length > 0 && allEvents.length % 100 === 0) {
await snapshotStore.save(aggregateId, aggregate, allEvents.length);
}
return aggregate;
}
| 事件数量 | 无快照加载时间 | 有快照加载时间(每 100 事件) |
|---|---|---|
| 50 | ~2ms | ~3ms(快照开销) |
| 200 | ~8ms | ~4ms |
| 1000 | ~45ms | ~6ms |
| 5000 | ~220ms | ~8ms |
| 10000+ | ~500ms+ | ~10ms |
⚡ 关键结论: 当事件数量超过 200 时,快照优化的效果开始显著。建议在事件数量达到 100 时就启用快照机制。
🎯 四、实战避坑指南
⚠️ 坑点一:Event Sourcing 不是免费的
Event Sourcing 引入了显著的架构复杂度。以下是你在生产中必然面对的问题:
最终一致性的代价。 写入事件后,读模型(Read Model)通过异步投影更新,中间存在延迟。用户刚下单就去查看订单列表,可能会发现「订单不存在」。解决方案是写入后直接返回内存中的状态,或使用「读己之写(Read Your Own Writes)」策略。
事件版本演进。 业务需求变了,事件的结构也要变。但历史事件已经写入了,不能修改。你需要实现事件的向上转换(Upcasting):
// 事件版本转换器
class OrderCreatedUpcaster {
// 将 V1 版本的事件转换为 V2 版本
static upcast(event: any): OrderCreatedEvent {
if (event.version === 1) {
return {
...event,
version: 2,
// V1 没有 currency 字段,默认为 CNY
currency: event.payload.currency ?? 'CNY',
// V1 的 items 结构不同,需要转换
items: event.payload.items.map((item: any) => ({
productId: item.productId,
quantity: item.qty, // V1 叫 qty,V2 叫 quantity
price: item.unitPrice, // V1 叫 unitPrice,V2 叫 price
})),
};
}
return event; // V2 及以上版本不需要转换
}
}
⚠️ 坑点二:不要对所有实体使用 Event Sourcing
80% 的实体用传统 CRUD 就够了。 只有以下场景才值得引入 Event Sourcing:
-
✅ 审计要求高:金融交易、医疗记录、法律合同
-
✅ 需要时间旅行:任意时间点的状态回溯
-
✅ 业务逻辑复杂:需要通过事件驱动多个下游系统
-
✅ 需要回放重建:出 Bug 后可以通过重放修复数据
-
❌ 简单的 CRUD 实体:用户设置、标签管理、配置信息
-
❌ 读多写少且逻辑简单:文章、评论、通知
-
❌ 团队没有 DDD 经验:学习曲线陡峭,初期容易写出更烂的代码
⚠️ 坑点三:投影(Projection)设计的常见错误
// ❌ 错误:投影逻辑过于复杂,耦合了多个聚合的事件
class OrderSummaryProjection {
async handle(event: DomainEvent) {
if (event.type === 'OrderCreated') {
// 还要查用户信息、商品信息……
const user = await this.userService.getUser(event.userId);
const products = await this.productService.getProducts(event.items.map(i => i.productId));
// 这样投影变成了一个微服务编排器
}
}
}
// ✅ 正确:投影只处理自己关心的事件,数据在写入时就冗余好
class OrderSummaryProjection {
async handle(event: DomainEvent) {
if (event.type === 'OrderCreated') {
// 事件本身已经包含了足够的数据
await this.readDb.insert('order_summaries', {
id: event.orderId,
user_name: event.userName, // 创建时就冗余了用户名
total_amount: event.totalAmount,
item_count: event.items.length,
status: 'created',
created_at: event.occurredAt,
});
}
}
}
💡 提示: 事件中应该包含足够的上下文数据,避免投影在处理事件时再去查询其他服务。这叫「事件自包含(Self-contained Event)」原则。
📊 CQRS + Event Sourcing vs 传统 CRUD 对比
| 维度 | 传统 CRUD | CQRS | CQRS + Event Sourcing |
|---|---|---|---|
| 复杂度 | 低 | 中 | 高 |
| 读写分离 | ❌ | ✅ | ✅ |
| 审计追踪 | 需额外实现 | 需额外实现 | ✅ 天然支持 |
| 时间旅行 | ❌ | ❌ | ✅ |
| 最终一致性 | ❌ 强一致 | ⚠️ 可选 | ✅ 必然 |
| 调试难度 | 低 | 中 | 高 |
| 适用场景 | 简单 CRUD | 读写比例失衡 | 金融、审计、复杂业务 |
| 团队要求 | 初级 | 中级 | 高级 |
| 基础设施成本 | 低 | 中 | 高(需要消息队列、事件存储) |
💡 五、总结与建议
选择架构的核心原则: 复杂度必须与业务复杂度匹配。不要为了「架构先进性」而引入不必要的复杂度。
对于 jsjson.com 这类工具型网站,传统 CRUD 完全够用。但如果你在做以下系统,请认真考虑 CQRS + Event Sourcing:
- 金融/支付系统:每笔交易都需要完整的审计追踪
- 协同编辑系统:需要 OT/CRDT + 事件流
- 复杂的业务流程引擎:订单、工单等多状态流转
- 需要「撤销/回滚」能力的系统:通过反向事件实现
如果你想入门 Event Sourcing,建议从以下路径开始:
- 先在项目中实现 CQRS(读写分离),不引入事件溯源
- 对一个非核心聚合尝试 Event Sourcing,积累经验
- 逐步扩展到核心业务,配合快照、投影、Upcaster 等基础设施
推荐技术栈:
- ✅ Node.js/TypeScript:
@event-driven-io/emmett、eventstore-db-client - ✅ Java:Axon Framework(最成熟的 CQRS/ES 框架)
- ✅ Go:
looplab/fazgo、EventStore/EventStoreDB-Client-Go - ✅ 通用:EventStoreDB(专用事件存储数据库)、Kafka(事件流管道)
⚡ 关键结论: CQRS 和 Event Sourcing 是强大的工具,但不是银弹。在引入之前,先问自己三个问题:(1) 我的业务真的需要读写分离吗?(2) 我真的需要完整的事件历史吗?(3) 我的团队有能力维护这个架构吗?三个答案都是「是」的时候,再动手。