Postgres 也能做工作流引擎?用数据库实现可靠任务编排

深入讲解如何用 PostgreSQL 实现持久化工作流,包括事务性发件箱模式、Advisory Lock、LISTEN/NOTIFY 等核心机制,替代 Temporal/Airflow 等重型工具。

数据库 2026-05-28 12 分钟

2025 年末,DBOS 团队发表了一篇引发广泛讨论的文章:Postgres is all you need for durable execution。他们的核心观点是——大多数工作流引擎(Temporal、Prefect、Apache Airflow)本质上都在重新发明数据库已经实现的功能。如果你的应用已经在用 PostgreSQL,那你可能根本不需要额外引入一个工作流系统。

根据 DB2DB 的统计,全球超过 48% 的开发者在使用 PostgreSQL,而其中绝大多数团队在需要"可靠任务编排"时,第一反应是引入 Temporal 或 RabbitMQ + Celery 这样的外部依赖。但实际上,Postgres 本身提供的事务保证、通知机制和锁原语,完全足以构建一套轻量级的持久化工作流引擎。

🔧 一、Postgres 的三大工作流原语

要理解"Postgres 做工作流"的可行性,首先要搞清楚它提供的三个核心能力。

📦 事务性发件箱(Transactional Outbox)模式

工作流的核心需求是**“状态变更和消息发送的原子性”**。比如你在订单服务中扣减库存后,需要通知物流服务发货——如果这两个操作不是原子的,就可能出现"库存扣了但通知没发出去"的脏状态。

事务性发件箱模式的做法是:在同一个数据库事务中,既写业务数据,也写一条"待发送"的消息到 outbox 表。然后由一个独立的轮询进程读取并发送。

-- 创建发件箱表
CREATE TABLE outbox (
    id          BIGSERIAL PRIMARY KEY,
    aggregate   VARCHAR(100) NOT NULL,   -- 业务实体类型,如 'order'
    aggregate_id VARCHAR(100) NOT NULL,  -- 业务实体 ID
    event_type  VARCHAR(100) NOT NULL,   -- 事件类型,如 'OrderCreated'
    payload     JSONB NOT NULL,          -- 事件数据
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    processed   BOOLEAN DEFAULT FALSE,
    retry_count INTEGER DEFAULT 0
);

-- 创建索引,轮询查询用
CREATE INDEX idx_outbox_unprocessed 
ON outbox (created_at) 
WHERE processed = FALSE;
// Node.js 示例:在一个事务中完成业务操作 + 写入发件箱
const { Pool } = require('pg');
const pool = new Pool({ connectionString: process.env.DATABASE_URL });

async function createOrder(orderData) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');

    // 1. 写入业务数据
    const orderResult = await client.query(
      `INSERT INTO orders (user_id, product_id, quantity, status)
       VALUES ($1, $2, $3, 'pending')
       RETURNING id`,
      [orderData.userId, orderData.productId, orderData.quantity]
    );
    const orderId = orderResult.rows[0].id;

    // 2. 在同一个事务中写入发件箱
    await client.query(
      `INSERT INTO outbox (aggregate, aggregate_id, event_type, payload)
       VALUES ('order', $1, 'OrderCreated', $2)`,
      [orderId.toString(), JSON.stringify({ orderId, ...orderData })]
    );

    await client.query('COMMIT');
    return orderId;
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();
  }
}

⚠️ **警告:**千万不要先写业务表,再异步写 outbox 表——这会破坏原子性,导致消息丢失。必须在同一个数据库事务中完成。

🔒 Advisory Lock 实现任务调度

Postgres 的 Advisory Lock 是一种应用级别的锁机制,不绑定任何表或行,非常适合做分布式任务调度中的互斥控制。

比如你有 3 个 worker 进程同时在轮询待处理任务,你需要确保同一个任务不会被多个 worker 抢走:

-- 用 pg_try_advisory_lock 实现任务认领
-- 先查找待处理任务,然后用 advisory lock 防止并发抢夺
WITH candidate AS (
    SELECT id
    FROM outbox
    WHERE processed = FALSE
      AND retry_count < 3
    ORDER BY created_at
    LIMIT 1
    FOR UPDATE SKIP LOCKED  -- 跳过已被锁定的行
)
UPDATE outbox
SET retry_count = retry_count + 1
WHERE id = (SELECT id FROM candidate)
RETURNING id, aggregate, aggregate_id, event_type, payload;

FOR UPDATE SKIP LOCKED 是 Postgres 9.5+ 的特性,它让多个 worker 并发查询时自动跳过已被其他事务锁定的行,无需额外的锁管理。

📡 LISTEN/NOTIFY 实现实时通知

传统轮询模式的缺点是延迟高(取决于轮询间隔)。Postgres 的 LISTEN/NOTIFY 机制可以实现准实时的事件推送:

-- 在 outbox 写入完成后触发通知
CREATE OR REPLACE FUNCTION notify_outbox()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('outbox_events', 
        json_build_object(
            'id', NEW.id,
            'event_type', NEW.event_type,
            'aggregate', NEW.aggregate
        )::text
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_notify
AFTER INSERT ON outbox
FOR EACH ROW EXECUTE FUNCTION notify_outbox();
// Node.js 监听通知
const { Client } = require('pg');
const listener = new Client({ connectionString: process.env.DATABASE_URL });

await listener.connect();
await listener.query('LISTEN outbox_events');

listener.on('notification', (msg) => {
    const payload = JSON.parse(msg.payload);
    console.log(`[Worker] 收到事件: ${payload.event_type}, ID: ${payload.id}`);
    // 立即处理该事件,无需等待下一次轮询
    processEvent(payload);
});

💡 **提示:**LISTEN/NOTIFY 的消息不能跨数据库,且消息不持久化——如果监听方不在线,消息会丢失。所以它只适合作为轮询的"加速器",不能替代轮询。

🚀 二、完整工作流引擎实现

掌握了三个原语之后,我们可以组合出一个轻量级的工作流引擎。核心思路是用一张 workflows 表存储工作流定义和状态,用 steps 表存储每个步骤的执行记录。

📊 数据模型设计

-- 工作流实例表
CREATE TABLE workflows (
    id             BIGSERIAL PRIMARY KEY,
    workflow_type  VARCHAR(100) NOT NULL,  -- 工作流类型,如 'OrderFulfillment'
    status         VARCHAR(20) NOT NULL DEFAULT 'running',  -- running/completed/failed
    input          JSONB NOT NULL,         -- 工作流入参
    output         JSONB,                  -- 工作流出参
    created_at     TIMESTAMPTZ DEFAULT NOW(),
    updated_at     TIMESTAMPTZ DEFAULT NOW()
);

-- 工作流步骤表
CREATE TABLE workflow_steps (
    id             BIGSERIAL PRIMARY KEY,
    workflow_id    BIGINT REFERENCES workflows(id),
    step_name      VARCHAR(100) NOT NULL,
    step_index     INTEGER NOT NULL,       -- 步骤顺序
    status         VARCHAR(20) NOT NULL DEFAULT 'pending',  -- pending/running/completed/failed
    input          JSONB,
    output         JSONB,
    error          TEXT,
    started_at     TIMESTAMPTZ,
    completed_at   TIMESTAMPTZ,
    retry_count    INTEGER DEFAULT 0,
    max_retries    INTEGER DEFAULT 3,
    UNIQUE(workflow_id, step_index)
);

CREATE INDEX idx_steps_pending ON workflow_steps (workflow_id, step_index) 
WHERE status = 'pending';

⚡ 工作流执行器

// workflow-engine.js — 轻量级 Postgres 工作流引擎
const { Pool } = require('pg');

class WorkflowEngine {
  constructor(connectionString) {
    this.pool = new Pool({ connectionString });
    this.handlers = new Map(); // 步骤处理器注册表
  }

  // 注册步骤处理器
  register(stepName, handler) {
    this.handlers.set(stepName, handler);
  }

  // 启动一个新工作流
  async start(workflowType, input, steps) {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      // 创建工作流实例
      const wf = await client.query(
        `INSERT INTO workflows (workflow_type, input)
         VALUES ($1, $2) RETURNING id`,
        [workflowType, JSON.stringify(input)]
      );
      const workflowId = wf.rows[0].id;

      // 批量创建步骤
      for (let i = 0; i < steps.length; i++) {
        await client.query(
          `INSERT INTO workflow_steps (workflow_id, step_name, step_index, input)
           VALUES ($1, $2, $3, $4)`,
          [workflowId, steps[i].name, i, JSON.stringify(steps[i].input || {})]
        );
      }

      await client.query('COMMIT');

      // 异步执行(不阻塞调用方)
      this.executeNextStep(workflowId);
      return workflowId;
    } catch (err) {
      await client.query('ROLLBACK');
      throw err;
    } finally {
      client.release();
    }
  }

  // 执行下一个待处理步骤
  async executeNextStep(workflowId) {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      // 用 FOR UPDATE SKIP LOCKED 防止并发执行
      const result = await client.query(
        `SELECT id, step_name, step_index, input, retry_count, max_retries
         FROM workflow_steps
         WHERE workflow_id = $1 AND status = 'pending'
         ORDER BY step_index
         LIMIT 1
         FOR UPDATE SKIP LOCKED`,
        [workflowId]
      );

      if (result.rows.length === 0) {
        // 没有更多步骤,标记工作流完成
        await client.query(
          `UPDATE workflows SET status = 'completed', 
           output = (SELECT json_agg(output) FROM workflow_steps 
                     WHERE workflow_id = $1 AND status = 'completed'),
           updated_at = NOW()
           WHERE id = $1`,
          [workflowId]
        );
        await client.query('COMMIT');
        return;
      }

      const step = result.rows[0];

      // 标记步骤为运行中
      await client.query(
        `UPDATE workflow_steps SET status = 'running', started_at = NOW()
         WHERE id = $1`,
        [step.id]
      );
      await client.query('COMMIT');

      // 执行步骤处理器
      const handler = this.handlers.get(step.step_name);
      if (!handler) {
        throw new Error(`未注册的步骤处理器: ${step.step_name}`);
      }

      const stepInput = JSON.parse(step.input);
      const output = await handler(stepInput);

      // 标记步骤完成,继续下一步
      await this.pool.query(
        `UPDATE workflow_steps 
         SET status = 'completed', output = $1, completed_at = NOW()
         WHERE id = $2`,
        [JSON.stringify(output), step.id]
      );

      this.executeNextStep(workflowId);
    } catch (err) {
      // 处理失败:重试或标记失败
      const step = result?.rows?.[0];
      if (step && step.retry_count < step.max_retries - 1) {
        await this.pool.query(
          `UPDATE workflow_steps 
           SET status = 'pending', retry_count = retry_count + 1,
               error = $1
           WHERE id = $2`,
          [err.message, step.id]
        );
        // 延迟后重试
        setTimeout(() => this.executeNextStep(workflowId), 
                   Math.pow(2, step.retry_count) * 1000);
      } else {
        await this.pool.query(
          `UPDATE workflow_steps SET status = 'failed', error = $1
           WHERE id = $2`,
          [err.message, step?.id]
        );
        await this.pool.query(
          `UPDATE workflows SET status = 'failed', updated_at = NOW()
           WHERE id = $1`,
          [workflowId]
        );
      }
      if (client) await client.query('ROLLBACK').catch(() => {});
    } finally {
      client?.release();
    }
  }
}

module.exports = { WorkflowEngine };

🎯 使用示例:订单履约流程

// 使用引擎编排订单履约
const { WorkflowEngine } = require('./workflow-engine');

const engine = new WorkflowEngine(process.env.DATABASE_URL);

// 注册各步骤的处理器
engine.register('validate_order', async (input) => {
  console.log(`验证订单 ${input.orderId}`);
  // 调用订单验证逻辑
  return { valid: true, checkedAt: new Date().toISOString() };
});

engine.register('reserve_inventory', async (input) => {
  console.log(`预留库存: 商品 ${input.productId}, 数量 ${input.quantity}`);
  // 调用库存服务
  return { reservationId: `R-${Date.now()}`, reserved: true };
});

engine.register('charge_payment', async (input) => {
  console.log(`扣款: 金额 ${input.amount} 元`);
  // 调用支付服务
  return { transactionId: `TXN-${Date.now()}`, success: true };
});

engine.register('notify_shipping', async (input) => {
  console.log(`通知物流发货: 订单 ${input.orderId}`);
  // 调用物流服务
  return { trackingNumber: `SF${Date.now()}`, carrier: 'SF' };
});

// 启动工作流
const workflowId = await engine.start('OrderFulfillment', 
  { orderId: 'ORD-20260529-001', productId: 'SKU-123', quantity: 2, amount: 299.00 },
  [
    { name: 'validate_order',   input: { orderId: 'ORD-20260529-001' } },
    { name: 'reserve_inventory', input: { productId: 'SKU-123', quantity: 2 } },
    { name: 'charge_payment',    input: { amount: 299.00 } },
    { name: 'notify_shipping',   input: { orderId: 'ORD-20260529-001' } }
  ]
);

console.log(`工作流已启动,ID: ${workflowId}`);

📌 **记住:**每个步骤处理器必须是幂等的。因为重试机制会导致同一个步骤被多次执行,幂等性保证不会产生副作用(比如重复扣款)。

📊 三、方案对比与选型建议

Postgres 工作流 vs 专业工作流引擎

特性 Postgres 原生 Temporal Apache Airflow Redis + BullMQ
部署复杂度 ⭐ 零额外依赖 ⭐⭐⭐ 需部署 Server + DB ⭐⭐⭐ 需部署 Scheduler + Worker ⭐⭐ 需部署 Redis
持久化保证 ✅ 强一致(ACID) ✅ 强一致 ✅ 元数据 DB ⚠️ 依赖 Redis AOF
水平扩展 ⚠️ 单主写入 ✅ 多节点 ✅ 多 Worker ✅ 多 Consumer
可视化监控 ❌ 需自建 ✅ 自带 Web UI ✅ 自带 Web UI ⚠️ 需第三方
学习成本 ⭐ 低(只需 SQL) ⭐⭐⭐ 高 ⭐⭐ 中 ⭐⭐ 中
适用规模 中小规模(<100万/天) 大规模 大规模批处理 中大规模
运维成本 ⭐ 极低 ⭐⭐⭐ 高 ⭐⭐ 中 ⭐⭐ 中

⚠️ 什么时候不该用 Postgres 做工作流

虽然 Postgres 工作流方案简洁优雅,但它有明确的适用边界:

  • 高吞吐场景:当日处理量超过百万级别时,单库写入会成为瓶颈
  • 需要复杂可视化:Temporal 自带的 Web UI 能让你看到每个工作流的执行历史、重放状态
  • 跨语言/跨团队:Temporal 支持 Go/Java/Python/TypeScript SDK,Postgres 方案需要每个团队自己封装
  • 长时间运行(数天/数周):Temporal 的 Continue-as-New 模式天然支持,Postgres 方案需要额外设计

✅ 什么时候该用 Postgres 做工作流

  • 中小团队(<50人):不想引入额外基础设施
  • 事件量适中:日处理量 < 100 万条
  • 已有 Postgres:不想增加运维复杂度
  • 简单线性流程:步骤不多,无复杂分支/并行逻辑
  • 对延迟要求不高:轮询间隔 1-5 秒可以接受

💡 **提示:**对于 90% 的中小项目,Postgres 原生方案完全够用。当你确实需要 Temporal 时,说明你的业务规模已经值得那额外的运维成本了。

💡 四、实战避坑指南

经过多个项目的实践,以下是使用 Postgres 做工作流时最常见的坑点:

坑点 1:忘记清理历史数据

outbox 表和 workflow_steps 表会持续增长,如果不定期清理,几个月后查询性能会急剧下降。

-- 定期清理已完成的历史记录(建议每天执行一次)
DELETE FROM workflow_steps 
WHERE workflow_id IN (
    SELECT id FROM workflows 
    WHERE status IN ('completed', 'failed') 
    AND updated_at < NOW() - INTERVAL '30 days'
);

DELETE FROM outbox 
WHERE processed = TRUE 
AND created_at < NOW() - INTERVAL '7 days';

-- 或者用分区表自动管理
CREATE TABLE outbox_partitioned (
    LIKE outbox INCLUDING ALL
) PARTITION BY RANGE (created_at);

CREATE TABLE outbox_2026_05 PARTITION OF outbox_partitioned
    FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');

坑点 2:长事务阻塞

工作流执行器如果在事务中执行耗时操作(比如调用外部 API),会导致行锁长时间不释放。

// ❌ 错误写法:在事务中调用外部服务
async function badExecute(workflowId) {
  await client.query('BEGIN');
  const task = await client.query('SELECT ... FOR UPDATE');
  await callExternalAPI();  // 可能耗时数秒!锁被持有
  await client.query('UPDATE ...');
  await client.query('COMMIT');
}

// ✅ 正确写法:先锁后放,执行在事务外
async function goodExecute(workflowId) {
  // 短事务:仅认领任务
  await client.query('BEGIN');
  const task = await client.query(
    `UPDATE workflow_steps SET status = 'running' 
     WHERE id = $1 RETURNING *`, [stepId]
  );
  await client.query('COMMIT');
  client.release();  // 立即释放连接

  // 在事务外执行耗时操作
  const result = await callExternalAPI();

  // 短事务:写回结果
  await pool.query(
    `UPDATE workflow_steps SET status = 'completed', output = $1 
     WHERE id = $2`, [JSON.stringify(result), stepId]
  );
}

坑点 3:并发控制不当

多 worker 模式下最容易出现的问题是"重复执行"。除了 FOR UPDATE SKIP LOCKED,还需要注意连接池配置:

// 生产环境推荐配置
const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
  max: 20,                    // 最大连接数
  idleTimeoutMillis: 30000,   // 空闲连接超时
  connectionTimeoutMillis: 5000,
  // 关键:确保每个 worker 使用独立的连接
  application_name: `worker-${process.pid}`
});

⚠️ **警告:**连接池大小不要设置过大。每个 Postgres 连接大约消耗 10MB 内存,100 个连接就是 1GB。建议用 pgbouncer 做连接池管理。

✅ 总结

Postgres 不是银弹,但对于大多数中小型项目来说,它完全有能力承担工作流引擎的角色。三个核心原语——事务性发件箱、Advisory Lock、LISTEN/NOTIFY——组合起来就能覆盖 80% 的任务编排需求。

关键决策逻辑很简单:如果你的团队不到 50 人,日事件量不到 100 万,且已经使用了 Postgres,那就别急着上 Temporal。先用原生方案跑起来,等业务规模真的撑不住了再迁移——到时候你的工作流逻辑已经经过了充分验证,迁移成本反而更低。

🔧 相关工具推荐:

📚 相关文章