2025 年末,DBOS 团队发表了一篇引发广泛讨论的文章:Postgres is all you need for durable execution。他们的核心观点是——大多数工作流引擎(Temporal、Prefect、Apache Airflow)本质上都在重新发明数据库已经实现的功能。如果你的应用已经在用 PostgreSQL,那你可能根本不需要额外引入一个工作流系统。
根据 DB2DB 的统计,全球超过 48% 的开发者在使用 PostgreSQL,而其中绝大多数团队在需要"可靠任务编排"时,第一反应是引入 Temporal 或 RabbitMQ + Celery 这样的外部依赖。但实际上,Postgres 本身提供的事务保证、通知机制和锁原语,完全足以构建一套轻量级的持久化工作流引擎。
🔧 一、Postgres 的三大工作流原语
要理解"Postgres 做工作流"的可行性,首先要搞清楚它提供的三个核心能力。
📦 事务性发件箱(Transactional Outbox)模式
工作流的核心需求是**“状态变更和消息发送的原子性”**。比如你在订单服务中扣减库存后,需要通知物流服务发货——如果这两个操作不是原子的,就可能出现"库存扣了但通知没发出去"的脏状态。
事务性发件箱模式的做法是:在同一个数据库事务中,既写业务数据,也写一条"待发送"的消息到 outbox 表。然后由一个独立的轮询进程读取并发送。
-- 创建发件箱表
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate VARCHAR(100) NOT NULL, -- 业务实体类型,如 'order'
aggregate_id VARCHAR(100) NOT NULL, -- 业务实体 ID
event_type VARCHAR(100) NOT NULL, -- 事件类型,如 'OrderCreated'
payload JSONB NOT NULL, -- 事件数据
created_at TIMESTAMPTZ DEFAULT NOW(),
processed BOOLEAN DEFAULT FALSE,
retry_count INTEGER DEFAULT 0
);
-- 创建索引,轮询查询用
CREATE INDEX idx_outbox_unprocessed
ON outbox (created_at)
WHERE processed = FALSE;
// Node.js 示例:在一个事务中完成业务操作 + 写入发件箱
const { Pool } = require('pg');
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
async function createOrder(orderData) {
const client = await pool.connect();
try {
await client.query('BEGIN');
// 1. 写入业务数据
const orderResult = await client.query(
`INSERT INTO orders (user_id, product_id, quantity, status)
VALUES ($1, $2, $3, 'pending')
RETURNING id`,
[orderData.userId, orderData.productId, orderData.quantity]
);
const orderId = orderResult.rows[0].id;
// 2. 在同一个事务中写入发件箱
await client.query(
`INSERT INTO outbox (aggregate, aggregate_id, event_type, payload)
VALUES ('order', $1, 'OrderCreated', $2)`,
[orderId.toString(), JSON.stringify({ orderId, ...orderData })]
);
await client.query('COMMIT');
return orderId;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
⚠️ **警告:**千万不要先写业务表,再异步写 outbox 表——这会破坏原子性,导致消息丢失。必须在同一个数据库事务中完成。
🔒 Advisory Lock 实现任务调度
Postgres 的 Advisory Lock 是一种应用级别的锁机制,不绑定任何表或行,非常适合做分布式任务调度中的互斥控制。
比如你有 3 个 worker 进程同时在轮询待处理任务,你需要确保同一个任务不会被多个 worker 抢走:
-- 用 pg_try_advisory_lock 实现任务认领
-- 先查找待处理任务,然后用 advisory lock 防止并发抢夺
WITH candidate AS (
SELECT id
FROM outbox
WHERE processed = FALSE
AND retry_count < 3
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED -- 跳过已被锁定的行
)
UPDATE outbox
SET retry_count = retry_count + 1
WHERE id = (SELECT id FROM candidate)
RETURNING id, aggregate, aggregate_id, event_type, payload;
FOR UPDATE SKIP LOCKED 是 Postgres 9.5+ 的特性,它让多个 worker 并发查询时自动跳过已被其他事务锁定的行,无需额外的锁管理。
📡 LISTEN/NOTIFY 实现实时通知
传统轮询模式的缺点是延迟高(取决于轮询间隔)。Postgres 的 LISTEN/NOTIFY 机制可以实现准实时的事件推送:
-- 在 outbox 写入完成后触发通知
CREATE OR REPLACE FUNCTION notify_outbox()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('outbox_events',
json_build_object(
'id', NEW.id,
'event_type', NEW.event_type,
'aggregate', NEW.aggregate
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_notify
AFTER INSERT ON outbox
FOR EACH ROW EXECUTE FUNCTION notify_outbox();
// Node.js 监听通知
const { Client } = require('pg');
const listener = new Client({ connectionString: process.env.DATABASE_URL });
await listener.connect();
await listener.query('LISTEN outbox_events');
listener.on('notification', (msg) => {
const payload = JSON.parse(msg.payload);
console.log(`[Worker] 收到事件: ${payload.event_type}, ID: ${payload.id}`);
// 立即处理该事件,无需等待下一次轮询
processEvent(payload);
});
💡 **提示:**LISTEN/NOTIFY 的消息不能跨数据库,且消息不持久化——如果监听方不在线,消息会丢失。所以它只适合作为轮询的"加速器",不能替代轮询。
🚀 二、完整工作流引擎实现
掌握了三个原语之后,我们可以组合出一个轻量级的工作流引擎。核心思路是用一张 workflows 表存储工作流定义和状态,用 steps 表存储每个步骤的执行记录。
📊 数据模型设计
-- 工作流实例表
CREATE TABLE workflows (
id BIGSERIAL PRIMARY KEY,
workflow_type VARCHAR(100) NOT NULL, -- 工作流类型,如 'OrderFulfillment'
status VARCHAR(20) NOT NULL DEFAULT 'running', -- running/completed/failed
input JSONB NOT NULL, -- 工作流入参
output JSONB, -- 工作流出参
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 工作流步骤表
CREATE TABLE workflow_steps (
id BIGSERIAL PRIMARY KEY,
workflow_id BIGINT REFERENCES workflows(id),
step_name VARCHAR(100) NOT NULL,
step_index INTEGER NOT NULL, -- 步骤顺序
status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending/running/completed/failed
input JSONB,
output JSONB,
error TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
UNIQUE(workflow_id, step_index)
);
CREATE INDEX idx_steps_pending ON workflow_steps (workflow_id, step_index)
WHERE status = 'pending';
⚡ 工作流执行器
// workflow-engine.js — 轻量级 Postgres 工作流引擎
const { Pool } = require('pg');
class WorkflowEngine {
constructor(connectionString) {
this.pool = new Pool({ connectionString });
this.handlers = new Map(); // 步骤处理器注册表
}
// 注册步骤处理器
register(stepName, handler) {
this.handlers.set(stepName, handler);
}
// 启动一个新工作流
async start(workflowType, input, steps) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 创建工作流实例
const wf = await client.query(
`INSERT INTO workflows (workflow_type, input)
VALUES ($1, $2) RETURNING id`,
[workflowType, JSON.stringify(input)]
);
const workflowId = wf.rows[0].id;
// 批量创建步骤
for (let i = 0; i < steps.length; i++) {
await client.query(
`INSERT INTO workflow_steps (workflow_id, step_name, step_index, input)
VALUES ($1, $2, $3, $4)`,
[workflowId, steps[i].name, i, JSON.stringify(steps[i].input || {})]
);
}
await client.query('COMMIT');
// 异步执行(不阻塞调用方)
this.executeNextStep(workflowId);
return workflowId;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
// 执行下一个待处理步骤
async executeNextStep(workflowId) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 用 FOR UPDATE SKIP LOCKED 防止并发执行
const result = await client.query(
`SELECT id, step_name, step_index, input, retry_count, max_retries
FROM workflow_steps
WHERE workflow_id = $1 AND status = 'pending'
ORDER BY step_index
LIMIT 1
FOR UPDATE SKIP LOCKED`,
[workflowId]
);
if (result.rows.length === 0) {
// 没有更多步骤,标记工作流完成
await client.query(
`UPDATE workflows SET status = 'completed',
output = (SELECT json_agg(output) FROM workflow_steps
WHERE workflow_id = $1 AND status = 'completed'),
updated_at = NOW()
WHERE id = $1`,
[workflowId]
);
await client.query('COMMIT');
return;
}
const step = result.rows[0];
// 标记步骤为运行中
await client.query(
`UPDATE workflow_steps SET status = 'running', started_at = NOW()
WHERE id = $1`,
[step.id]
);
await client.query('COMMIT');
// 执行步骤处理器
const handler = this.handlers.get(step.step_name);
if (!handler) {
throw new Error(`未注册的步骤处理器: ${step.step_name}`);
}
const stepInput = JSON.parse(step.input);
const output = await handler(stepInput);
// 标记步骤完成,继续下一步
await this.pool.query(
`UPDATE workflow_steps
SET status = 'completed', output = $1, completed_at = NOW()
WHERE id = $2`,
[JSON.stringify(output), step.id]
);
this.executeNextStep(workflowId);
} catch (err) {
// 处理失败:重试或标记失败
const step = result?.rows?.[0];
if (step && step.retry_count < step.max_retries - 1) {
await this.pool.query(
`UPDATE workflow_steps
SET status = 'pending', retry_count = retry_count + 1,
error = $1
WHERE id = $2`,
[err.message, step.id]
);
// 延迟后重试
setTimeout(() => this.executeNextStep(workflowId),
Math.pow(2, step.retry_count) * 1000);
} else {
await this.pool.query(
`UPDATE workflow_steps SET status = 'failed', error = $1
WHERE id = $2`,
[err.message, step?.id]
);
await this.pool.query(
`UPDATE workflows SET status = 'failed', updated_at = NOW()
WHERE id = $1`,
[workflowId]
);
}
if (client) await client.query('ROLLBACK').catch(() => {});
} finally {
client?.release();
}
}
}
module.exports = { WorkflowEngine };
🎯 使用示例:订单履约流程
// 使用引擎编排订单履约
const { WorkflowEngine } = require('./workflow-engine');
const engine = new WorkflowEngine(process.env.DATABASE_URL);
// 注册各步骤的处理器
engine.register('validate_order', async (input) => {
console.log(`验证订单 ${input.orderId}`);
// 调用订单验证逻辑
return { valid: true, checkedAt: new Date().toISOString() };
});
engine.register('reserve_inventory', async (input) => {
console.log(`预留库存: 商品 ${input.productId}, 数量 ${input.quantity}`);
// 调用库存服务
return { reservationId: `R-${Date.now()}`, reserved: true };
});
engine.register('charge_payment', async (input) => {
console.log(`扣款: 金额 ${input.amount} 元`);
// 调用支付服务
return { transactionId: `TXN-${Date.now()}`, success: true };
});
engine.register('notify_shipping', async (input) => {
console.log(`通知物流发货: 订单 ${input.orderId}`);
// 调用物流服务
return { trackingNumber: `SF${Date.now()}`, carrier: 'SF' };
});
// 启动工作流
const workflowId = await engine.start('OrderFulfillment',
{ orderId: 'ORD-20260529-001', productId: 'SKU-123', quantity: 2, amount: 299.00 },
[
{ name: 'validate_order', input: { orderId: 'ORD-20260529-001' } },
{ name: 'reserve_inventory', input: { productId: 'SKU-123', quantity: 2 } },
{ name: 'charge_payment', input: { amount: 299.00 } },
{ name: 'notify_shipping', input: { orderId: 'ORD-20260529-001' } }
]
);
console.log(`工作流已启动,ID: ${workflowId}`);
📌 **记住:**每个步骤处理器必须是幂等的。因为重试机制会导致同一个步骤被多次执行,幂等性保证不会产生副作用(比如重复扣款)。
📊 三、方案对比与选型建议
Postgres 工作流 vs 专业工作流引擎
| 特性 | Postgres 原生 | Temporal | Apache Airflow | Redis + BullMQ |
|---|---|---|---|---|
| 部署复杂度 | ⭐ 零额外依赖 | ⭐⭐⭐ 需部署 Server + DB | ⭐⭐⭐ 需部署 Scheduler + Worker | ⭐⭐ 需部署 Redis |
| 持久化保证 | ✅ 强一致(ACID) | ✅ 强一致 | ✅ 元数据 DB | ⚠️ 依赖 Redis AOF |
| 水平扩展 | ⚠️ 单主写入 | ✅ 多节点 | ✅ 多 Worker | ✅ 多 Consumer |
| 可视化监控 | ❌ 需自建 | ✅ 自带 Web UI | ✅ 自带 Web UI | ⚠️ 需第三方 |
| 学习成本 | ⭐ 低(只需 SQL) | ⭐⭐⭐ 高 | ⭐⭐ 中 | ⭐⭐ 中 |
| 适用规模 | 中小规模(<100万/天) | 大规模 | 大规模批处理 | 中大规模 |
| 运维成本 | ⭐ 极低 | ⭐⭐⭐ 高 | ⭐⭐ 中 | ⭐⭐ 中 |
⚠️ 什么时候不该用 Postgres 做工作流
虽然 Postgres 工作流方案简洁优雅,但它有明确的适用边界:
- ❌ 高吞吐场景:当日处理量超过百万级别时,单库写入会成为瓶颈
- ❌ 需要复杂可视化:Temporal 自带的 Web UI 能让你看到每个工作流的执行历史、重放状态
- ❌ 跨语言/跨团队:Temporal 支持 Go/Java/Python/TypeScript SDK,Postgres 方案需要每个团队自己封装
- ❌ 长时间运行(数天/数周):Temporal 的 Continue-as-New 模式天然支持,Postgres 方案需要额外设计
✅ 什么时候该用 Postgres 做工作流
- ✅ 中小团队(<50人):不想引入额外基础设施
- ✅ 事件量适中:日处理量 < 100 万条
- ✅ 已有 Postgres:不想增加运维复杂度
- ✅ 简单线性流程:步骤不多,无复杂分支/并行逻辑
- ✅ 对延迟要求不高:轮询间隔 1-5 秒可以接受
💡 **提示:**对于 90% 的中小项目,Postgres 原生方案完全够用。当你确实需要 Temporal 时,说明你的业务规模已经值得那额外的运维成本了。
💡 四、实战避坑指南
经过多个项目的实践,以下是使用 Postgres 做工作流时最常见的坑点:
坑点 1:忘记清理历史数据
outbox 表和 workflow_steps 表会持续增长,如果不定期清理,几个月后查询性能会急剧下降。
-- 定期清理已完成的历史记录(建议每天执行一次)
DELETE FROM workflow_steps
WHERE workflow_id IN (
SELECT id FROM workflows
WHERE status IN ('completed', 'failed')
AND updated_at < NOW() - INTERVAL '30 days'
);
DELETE FROM outbox
WHERE processed = TRUE
AND created_at < NOW() - INTERVAL '7 days';
-- 或者用分区表自动管理
CREATE TABLE outbox_partitioned (
LIKE outbox INCLUDING ALL
) PARTITION BY RANGE (created_at);
CREATE TABLE outbox_2026_05 PARTITION OF outbox_partitioned
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
坑点 2:长事务阻塞
工作流执行器如果在事务中执行耗时操作(比如调用外部 API),会导致行锁长时间不释放。
// ❌ 错误写法:在事务中调用外部服务
async function badExecute(workflowId) {
await client.query('BEGIN');
const task = await client.query('SELECT ... FOR UPDATE');
await callExternalAPI(); // 可能耗时数秒!锁被持有
await client.query('UPDATE ...');
await client.query('COMMIT');
}
// ✅ 正确写法:先锁后放,执行在事务外
async function goodExecute(workflowId) {
// 短事务:仅认领任务
await client.query('BEGIN');
const task = await client.query(
`UPDATE workflow_steps SET status = 'running'
WHERE id = $1 RETURNING *`, [stepId]
);
await client.query('COMMIT');
client.release(); // 立即释放连接
// 在事务外执行耗时操作
const result = await callExternalAPI();
// 短事务:写回结果
await pool.query(
`UPDATE workflow_steps SET status = 'completed', output = $1
WHERE id = $2`, [JSON.stringify(result), stepId]
);
}
坑点 3:并发控制不当
多 worker 模式下最容易出现的问题是"重复执行"。除了 FOR UPDATE SKIP LOCKED,还需要注意连接池配置:
// 生产环境推荐配置
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20, // 最大连接数
idleTimeoutMillis: 30000, // 空闲连接超时
connectionTimeoutMillis: 5000,
// 关键:确保每个 worker 使用独立的连接
application_name: `worker-${process.pid}`
});
⚠️ **警告:**连接池大小不要设置过大。每个 Postgres 连接大约消耗 10MB 内存,100 个连接就是 1GB。建议用
pgbouncer做连接池管理。
✅ 总结
Postgres 不是银弹,但对于大多数中小型项目来说,它完全有能力承担工作流引擎的角色。三个核心原语——事务性发件箱、Advisory Lock、LISTEN/NOTIFY——组合起来就能覆盖 80% 的任务编排需求。
关键决策逻辑很简单:如果你的团队不到 50 人,日事件量不到 100 万,且已经使用了 Postgres,那就别急着上 Temporal。先用原生方案跑起来,等业务规模真的撑不住了再迁移——到时候你的工作流逻辑已经经过了充分验证,迁移成本反而更低。
🔧 相关工具推荐:
- DBOS — 基于 Postgres 的轻量级工作流框架(TypeScript/Python)
- graphile-worker — Postgres 原生的任务队列库
- pg-boss — 基于 Postgres 的 Node.js 任务队列
- River — Go 语言的 Postgres 任务队列
- jsjson.com JSON 格式化工具 — 配置文件和 Payload 的格式化校验