CQRS 与 Event Sourcing 实战指南:从架构设计到生产落地

深入解析 CQRS 与 Event Sourcing 架构模式,包含完整 TypeScript 代码示例、性能对比数据、常见踩坑经验与最佳实践,帮助开发者在复杂业务场景中做出正确的架构决策。

API 设计 2026-05-28 12 分钟

🏗️ 一、为什么你需要了解 CQRS 与 Event Sourcing

在一个日活百万的电商系统中,一次「下单」操作会触发库存扣减、积分计算、物流分配、消息推送等十几个业务动作。如果把这些逻辑全部塞进一个 createOrder() 方法,代码会迅速膨胀到不可维护的地步。CQRS(Command Query Responsibility Segregation,命令查询职责分离)Event Sourcing(事件溯源) 正是为解决这类复杂业务场景而生的架构模式。

但这两个概念在社区中被严重「神话」了——有人把它当成银弹,有人觉得过度设计。本文不讲理论空话,只讲实战中真正有用的模式、代码和踩坑经验。

📌 记住: CQRS 和 Event Sourcing 是两个独立的模式,可以单独使用,也可以组合使用。不要被「必须一起用」的说法误导。

🔀 二、CQRS:把「写」和「读」彻底分开

📐 核心思想

传统 CRUD 架构中,同一个数据模型同时承担读和写职责。这在业务简单时没问题,但当读写比例严重失衡(比如 100:1)且读写的数据结构差异很大时,单一模型就成了性能瓶颈。

CQRS 的核心思想很简单:用不同的模型处理命令(写操作)和查询(读操作)

维度 命令侧(Write) 查询侧(Read)
职责 处理业务逻辑,验证规则 返回数据,优化展示
数据模型 富领域模型,包含行为 扁平 DTO,面向 UI
一致性 强一致性 最终一致性
存储 关系型数据库 可以是 Redis、ES、物化视图
优化方向 事务完整性 查询速度

🔧 TypeScript 实现:从单体到 CQRS

先看一个典型的「反模式」——把读写逻辑混在一起:

// ❌ 反模式:读写逻辑混在一起,职责不清晰
class OrderService {
  async createOrder(userId: string, items: CartItem[]): Promise<Order> {
    // 验证库存
    // 计算价格
    // 创建订单
    // 扣减库存
    // 发送消息
    const order = await this.orderRepo.save(newOrder);
    return order; // 返回完整领域对象
  }

  async getOrder(orderId: string): Promise<Order> {
    return this.orderRepo.findById(orderId); // 查询也返回领域对象
  }

  async getOrderList(userId: string, page: number): Promise<Order[]> {
    // 列表查询其实只需要几个字段,却返回了完整的 Order 对象
    return this.orderRepo.findByUser(userId, page);
  }
}

用 CQRS 改造后:

// ✅ CQRS:命令和查询彻底分离

// 命令侧 —— 处理业务逻辑
class CreateOrderCommand {
  constructor(
    public readonly userId: string,
    public readonly items: Array<{ productId: string; quantity: number; price: number }>
  ) {}
}

class CreateOrderHandler {
  constructor(
    private readonly inventoryService: InventoryService,
    private readonly orderRepository: OrderRepository,
    private readonly eventBus: EventBus
  ) {}

  async execute(command: CreateOrderCommand): Promise<string> {
    // 1. 验证库存
    for (const item of command.items) {
      const available = await this.inventoryService.check(item.productId);
      if (available < item.quantity) {
        throw new InsufficientStockError(item.productId);
      }
    }

    // 2. 创建订单(领域逻辑)
    const order = Order.create(command.userId, command.items);
    
    // 3. 持久化
    await this.orderRepository.save(order);
    
    // 4. 发布领域事件
    await this.eventBus.publish(new OrderCreatedEvent(order.id, order.totalAmount));

    return order.id; // 命令只返回 ID,不返回完整对象
  }
}

// 查询侧 —— 只负责数据展示
interface OrderListItem {
  id: string;
  status: string;
  totalAmount: number;
  createdAt: string;
  itemCount: number;
}

class OrderQueryService {
  constructor(private readonly readDb: ReadDatabase) {}

  async getOrderList(userId: string, page: number): Promise<OrderListItem[]> {
    // 直接从读库查询,SQL 针对列表展示优化
    return this.readDb.query(`
      SELECT id, status, total_amount, created_at, item_count
      FROM order_list_view
      WHERE user_id = $1
      ORDER BY created_at DESC
      LIMIT 20 OFFSET $2
    `, [userId, (page - 1) * 20]);
  }

  async getOrderDetail(orderId: string): Promise<OrderDetailDTO> {
    // 详情查询走另一个视图,包含关联数据
    return this.readDb.queryOne(`
      SELECT * FROM order_detail_view WHERE id = $1
    `, [orderId]);
  }
}

💡 提示: 命令返回 ID 而非完整对象,是一个重要的设计原则。它迫使调用方在需要数据时主动查询,避免了命令和查询的隐式耦合。

📜 三、Event Sourcing:用事件流替代状态快照

🧠 核心思想

传统方式只存储「当前状态」——订单状态从「待支付」变成「已支付」,数据库里的旧值就被覆盖了。这意味着你丢失了「谁在什么时间做了什么」的完整历史。

Event Sourcing 的做法完全不同:不存储状态,只存储导致状态变化的事件。当前状态通过对事件流的重放(Replay)来重建。

// ❌ 传统方式:只存当前状态,历史丢失
interface Order {
  id: string;
  status: 'pending' | 'paid' | 'shipped' | 'delivered' | 'cancelled';
  totalAmount: number;
  updatedAt: Date;
}

// ✅ Event Sourcing:存储完整的事件流
interface OrderEvent {
  type: string;
  orderId: string;
  timestamp: Date;
  payload: Record<string, unknown>;
}

// 事件流示例
const orderEvents: OrderEvent[] = [
  { type: 'OrderCreated', orderId: 'ORD-001', timestamp: new Date('2026-01-01'), 
    payload: { userId: 'U1', items: [...], total: 299 } },
  { type: 'OrderPaid', orderId: 'ORD-001', timestamp: new Date('2026-01-01'), 
    payload: { paymentId: 'PAY-001', method: 'alipay' } },
  { type: 'OrderShipped', orderId: 'ORD-001', timestamp: new Date('2026-01-03'), 
    payload: { trackingNo: 'SF1234567890' } },
  { type: 'OrderDelivered', orderId: 'ORD-001', timestamp: new Date('2026-01-05'), 
    payload: { signedBy: '本人签收' } },
];

🔧 完整实现:聚合根 + 事件存储

// 事件基类
abstract class DomainEvent {
  abstract readonly type: string;
  readonly occurredAt: Date = new Date();
  readonly eventId: string = crypto.randomUUID();
}

// 具体事件
class OrderCreatedEvent extends DomainEvent {
  readonly type = 'OrderCreated';
  constructor(
    public readonly orderId: string,
    public readonly userId: string,
    public readonly items: Array<{ productId: string; quantity: number; price: number }>,
    public readonly totalAmount: number
  ) { super(); }
}

class OrderPaidEvent extends DomainEvent {
  readonly type = 'OrderPaid';
  constructor(
    public readonly orderId: string,
    public readonly paymentId: string,
    public readonly amount: number
  ) { super(); }
}

class OrderShippedEvent extends DomainEvent {
  readonly type = 'OrderShipped';
  constructor(
    public readonly orderId: string,
    public readonly trackingNo: string,
    public readonly carrier: string
  ) { super(); }
}

// 聚合根:通过事件重放构建状态
class OrderAggregate {
  private uncommittedEvents: DomainEvent[] = [];

  // 当前状态(通过事件重放得到)
  public id!: string;
  public userId!: string;
  public status: string = 'init';
  public totalAmount: number = 0;
  public items: Array<{ productId: string; quantity: number; price: number }> = [];
  public trackingNo?: string;

  // 命令方法:验证业务规则后产生事件
  static create(userId: string, items: Array<{ productId: string; quantity: number; price: number }>): OrderAggregate {
    if (items.length === 0) throw new Error('订单至少需要一个商品');

    const order = new OrderAggregate();
    const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
    
    order.apply(new OrderCreatedEvent(
      crypto.randomUUID(), userId, items, totalAmount
    ));
    
    return order;
  }

  pay(paymentId: string, amount: number): void {
    if (this.status !== 'created') {
      throw new Error(`订单状态 ${this.status} 不允许支付`);
    }
    if (amount !== this.totalAmount) {
      throw new Error(`支付金额 ${amount} 与订单金额 ${this.totalAmount} 不匹配`);
    }
    this.apply(new OrderPaidEvent(this.id, paymentId, amount));
  }

  ship(trackingNo: string, carrier: string): void {
    if (this.status !== 'paid') {
      throw new Error(`订单状态 ${this.status} 不允许发货`);
    }
    this.apply(new OrderShippedEvent(this.id, trackingNo, carrier));
  }

  // 事件应用:更新状态 + 记录未提交事件
  private apply(event: DomainEvent): void {
    this.applyEvent(event);      // 更新内存状态
    this.uncommittedEvents.push(event); // 记录待持久化事件
  }

  // 事件处理器:每个事件类型对应一个状态变更逻辑
  private applyEvent(event: DomainEvent): void {
    switch (event.type) {
      case 'OrderCreated':
        const created = event as OrderCreatedEvent;
        this.id = created.orderId;
        this.userId = created.userId;
        this.items = created.items;
        this.totalAmount = created.totalAmount;
        this.status = 'created';
        break;
      case 'OrderPaid':
        this.status = 'paid';
        break;
      case 'OrderShipped':
        const shipped = event as OrderShippedEvent;
        this.trackingNo = shipped.trackingNo;
        this.status = 'shipped';
        break;
    }
  }

  // 从事件流重建聚合
  static fromEvents(events: DomainEvent[]): OrderAggregate {
    const aggregate = new OrderAggregate();
    for (const event of events) {
      aggregate.applyEvent(event);
    }
    aggregate.uncommittedEvents = []; // 历史事件不算未提交
    return aggregate;
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }

  markEventsCommitted(): void {
    this.uncommittedEvents = [];
  }
}

// 事件存储接口
interface EventStore {
  append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
  load(aggregateId: string): Promise<DomainEvent[]>;
}

// 基于 PostgreSQL 的事件存储实现
class PostgresEventStore implements EventStore {
  constructor(private readonly pool: Pool) {}

  async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      // 乐观锁:检查当前版本
      const { rows } = await client.query(
        'SELECT MAX(version) as version FROM events WHERE aggregate_id = $1',
        [aggregateId]
      );
      const currentVersion = rows[0]?.version ?? 0;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `预期版本 ${expectedVersion},实际版本 ${currentVersion}`
        );
      }

      // 批量插入事件
      for (let i = 0; i < events.length; i++) {
        const event = events[i];
        await client.query(
          `INSERT INTO events (aggregate_id, event_type, payload, version, occurred_at)
           VALUES ($1, $2, $3, $4, $5)`,
          [aggregateId, event.type, JSON.stringify(event), expectedVersion + i + 1, event.occurredAt]
        );
      }

      await client.query('COMMIT');
    } catch (err) {
      await client.query('ROLLBACK');
      throw err;
    } finally {
      client.release();
    }
  }

  async load(aggregateId: string): Promise<DomainEvent[]> {
    const { rows } = await this.pool.query(
      'SELECT event_type, payload FROM events WHERE aggregate_id = $1 ORDER BY version ASC',
      [aggregateId]
    );
    return rows.map(row => JSON.parse(row.payload));
  }
}

⚠️ 警告: 乐观锁(Optimistic Locking)是 Event Sourcing 的核心保障。没有版本检查,并发写入会导致事件流错乱,这种 Bug 在生产环境中极难排查。

📊 快照优化:解决事件流过长的性能问题

当一个聚合的事件数量超过几百条时,每次重放都会变慢。快照(Snapshot)机制可以解决这个问题:

class SnapshotStore {
  constructor(private readonly pool: Pool) {}

  async save(aggregateId: string, state: object, version: number): Promise<void> {
    await this.pool.query(
      `INSERT INTO snapshots (aggregate_id, state, version, created_at)
       VALUES ($1, $2, $3, NOW())
       ON CONFLICT (aggregate_id) DO UPDATE
       SET state = $2, version = $3, created_at = NOW()`,
      [aggregateId, JSON.stringify(state), version]
    );
  }

  async load(aggregateId: string): Promise<{ state: object; version: number } | null> {
    const { rows } = await this.pool.query(
      'SELECT state, version FROM snapshots WHERE aggregate_id = $1',
      [aggregateId]
    );
    return rows[0] ? { state: JSON.parse(rows[0].state), version: rows[0].version } : null;
  }
}

// 加载聚合时优先使用快照
async function loadOrderWithSnapshot(
  aggregateId: string,
  eventStore: EventStore,
  snapshotStore: SnapshotStore
): Promise<OrderAggregate> {
  // 1. 尝试加载快照
  const snapshot = await snapshotStore.load(aggregateId);
  
  let aggregate: OrderAggregate;
  let fromVersion: number;

  if (snapshot) {
    // 从快照恢复状态
    aggregate = new OrderAggregate();
    Object.assign(aggregate, snapshot.state);
    fromVersion = snapshot.version;
  } else {
    aggregate = new OrderAggregate();
    fromVersion = 0;
  }

  // 2. 加载快照之后的增量事件
  const allEvents = await eventStore.load(aggregateId);
  const incrementalEvents = allEvents.slice(fromVersion);

  for (const event of incrementalEvents) {
    aggregate.applyEvent(event);
  }

  // 3. 每 100 个事件保存一次快照
  if (allEvents.length > 0 && allEvents.length % 100 === 0) {
    await snapshotStore.save(aggregateId, aggregate, allEvents.length);
  }

  return aggregate;
}
事件数量 无快照加载时间 有快照加载时间(每 100 事件)
50 ~2ms ~3ms(快照开销)
200 ~8ms ~4ms
1000 ~45ms ~6ms
5000 ~220ms ~8ms
10000+ ~500ms+ ~10ms

关键结论: 当事件数量超过 200 时,快照优化的效果开始显著。建议在事件数量达到 100 时就启用快照机制。

🎯 四、实战避坑指南

⚠️ 坑点一:Event Sourcing 不是免费的

Event Sourcing 引入了显著的架构复杂度。以下是你在生产中必然面对的问题:

最终一致性的代价。 写入事件后,读模型(Read Model)通过异步投影更新,中间存在延迟。用户刚下单就去查看订单列表,可能会发现「订单不存在」。解决方案是写入后直接返回内存中的状态,或使用「读己之写(Read Your Own Writes)」策略。

事件版本演进。 业务需求变了,事件的结构也要变。但历史事件已经写入了,不能修改。你需要实现事件的向上转换(Upcasting):

// 事件版本转换器
class OrderCreatedUpcaster {
  // 将 V1 版本的事件转换为 V2 版本
  static upcast(event: any): OrderCreatedEvent {
    if (event.version === 1) {
      return {
        ...event,
        version: 2,
        // V1 没有 currency 字段,默认为 CNY
        currency: event.payload.currency ?? 'CNY',
        // V1 的 items 结构不同,需要转换
        items: event.payload.items.map((item: any) => ({
          productId: item.productId,
          quantity: item.qty,  // V1 叫 qty,V2 叫 quantity
          price: item.unitPrice, // V1 叫 unitPrice,V2 叫 price
        })),
      };
    }
    return event; // V2 及以上版本不需要转换
  }
}

⚠️ 坑点二:不要对所有实体使用 Event Sourcing

80% 的实体用传统 CRUD 就够了。 只有以下场景才值得引入 Event Sourcing:

  • 审计要求高:金融交易、医疗记录、法律合同

  • 需要时间旅行:任意时间点的状态回溯

  • 业务逻辑复杂:需要通过事件驱动多个下游系统

  • 需要回放重建:出 Bug 后可以通过重放修复数据

  • 简单的 CRUD 实体:用户设置、标签管理、配置信息

  • 读多写少且逻辑简单:文章、评论、通知

  • 团队没有 DDD 经验:学习曲线陡峭,初期容易写出更烂的代码

⚠️ 坑点三:投影(Projection)设计的常见错误

// ❌ 错误:投影逻辑过于复杂,耦合了多个聚合的事件
class OrderSummaryProjection {
  async handle(event: DomainEvent) {
    if (event.type === 'OrderCreated') {
      // 还要查用户信息、商品信息……
      const user = await this.userService.getUser(event.userId);
      const products = await this.productService.getProducts(event.items.map(i => i.productId));
      // 这样投影变成了一个微服务编排器
    }
  }
}

// ✅ 正确:投影只处理自己关心的事件,数据在写入时就冗余好
class OrderSummaryProjection {
  async handle(event: DomainEvent) {
    if (event.type === 'OrderCreated') {
      // 事件本身已经包含了足够的数据
      await this.readDb.insert('order_summaries', {
        id: event.orderId,
        user_name: event.userName,       // 创建时就冗余了用户名
        total_amount: event.totalAmount,
        item_count: event.items.length,
        status: 'created',
        created_at: event.occurredAt,
      });
    }
  }
}

💡 提示: 事件中应该包含足够的上下文数据,避免投影在处理事件时再去查询其他服务。这叫「事件自包含(Self-contained Event)」原则。

📊 CQRS + Event Sourcing vs 传统 CRUD 对比

维度 传统 CRUD CQRS CQRS + Event Sourcing
复杂度
读写分离
审计追踪 需额外实现 需额外实现 ✅ 天然支持
时间旅行
最终一致性 ❌ 强一致 ⚠️ 可选 ✅ 必然
调试难度
适用场景 简单 CRUD 读写比例失衡 金融、审计、复杂业务
团队要求 初级 中级 高级
基础设施成本 高(需要消息队列、事件存储)

💡 五、总结与建议

选择架构的核心原则: 复杂度必须与业务复杂度匹配。不要为了「架构先进性」而引入不必要的复杂度。

对于 jsjson.com 这类工具型网站,传统 CRUD 完全够用。但如果你在做以下系统,请认真考虑 CQRS + Event Sourcing:

  1. 金融/支付系统:每笔交易都需要完整的审计追踪
  2. 协同编辑系统:需要 OT/CRDT + 事件流
  3. 复杂的业务流程引擎:订单、工单等多状态流转
  4. 需要「撤销/回滚」能力的系统:通过反向事件实现

如果你想入门 Event Sourcing,建议从以下路径开始:

  1. 先在项目中实现 CQRS(读写分离),不引入事件溯源
  2. 对一个非核心聚合尝试 Event Sourcing,积累经验
  3. 逐步扩展到核心业务,配合快照、投影、Upcaster 等基础设施

推荐技术栈:

  • Node.js/TypeScript@event-driven-io/emmetteventstore-db-client
  • Java:Axon Framework(最成熟的 CQRS/ES 框架)
  • Golooplab/fazgoEventStore/EventStoreDB-Client-Go
  • 通用:EventStoreDB(专用事件存储数据库)、Kafka(事件流管道)

关键结论: CQRS 和 Event Sourcing 是强大的工具,但不是银弹。在引入之前,先问自己三个问题:(1) 我的业务真的需要读写分离吗?(2) 我真的需要完整的事件历史吗?(3) 我的团队有能力维护这个架构吗?三个答案都是「是」的时候,再动手。

📚 相关文章