2026 年 6 月,微软在 GitHub 上开源了 pg_durable——一个将**持久执行(Durable Execution)**直接嵌入 PostgreSQL 的扩展。消息在 Hacker News 上迅速获得 220+ 点赞,原因很简单:无数团队都在用 pg_cron + 状态表 + 轮询 Worker 的方式拼凑后台任务系统,而 pg_durable 声称可以用纯 SQL 替代这一切。
这个扩展到底是噱头还是真材实料?它能替代 Temporal、Airflow 这些重量级编排引擎吗?本文将从架构原理、DSL 语法、实战场景和方案对比四个维度深度解析。
🔧 一、pg_durable 的核心架构与设计哲学
它到底解决了什么问题?
每个后端开发者都经历过这种痛苦:一个后台任务需要执行多个步骤(清理旧数据 → 调用外部 API → 更新状态 → 发送通知),而你不得不自己搭建一套「状态机 + 重试 + 检查点」系统。常见的拼凑方案如下:
-- ❌ 传统做法:手动管理状态的存储过程
CREATE PROCEDURE process_order(order_id INT)
LANGUAGE plpgsql AS $$
BEGIN
UPDATE orders SET status = 'processing' WHERE id = order_id;
-- 如果这里崩溃了怎么办?重跑会重复执行
PERFORM call_payment_api(order_id);
UPDATE orders SET status = 'paid' WHERE id = order_id;
-- 这里崩溃了呢?状态不一致
PERFORM send_notification(order_id);
UPDATE orders SET status = 'completed' WHERE id = order_id;
END;
$$;
⚠️ 警告: 上面的存储过程在数据库崩溃、连接断开或长时间运行超时时,要么丢失进度,要么重复执行。这在生产环境中是不可接受的。
pg_durable 的核心理念是:让 PostgreSQL 自己管理工作流的检查点、重试和恢复,不需要外部服务。
架构分层
pg_durable 是一个 PostgreSQL 扩展(使用 Rust + pgrx 构建),架构分为三层:
| 层级 | 组件 | 职责 |
|---|---|---|
| SQL DSL 层 | df.* 函数和操作符 |
提供声明式的工作流定义语法 |
| 编排运行时 | duroxide | 确定性重放、检查点、子编排、定时器 |
| 状态持久化 | duroxide-pg | 将运行时状态存入 duroxide.* schema |
-- ✅ pg_durable 做法:声明式持久执行
SELECT df.start(
'SELECT id FROM orders WHERE status = ''pending'' LIMIT 1' |=> 'order_id'
~> 'UPDATE orders SET status = ''processing'' WHERE id = $order_id'
~> df.sleep(2) -- 模拟处理时间
~> 'UPDATE orders SET status = ''completed'' WHERE id = $order_id',
'process-order'
);
💡 提示: 每个步骤之间自动创建检查点。如果数据库在第 3 步崩溃,重启后会从第 3 步恢复,而不会重复执行前两步。
适用场景 vs 不适用场景
✅ 适合使用 pg_durable 的场景:
- 数据 ETL 管道(清洗 → 转换 → 加载)
- 向量嵌入管道(分块 → 调用嵌入 API → 写入 pgvector)
- 定时维护任务(检测膨胀 → 通知 → 等待审批 → 执行)
- 外部 API 工作流(富化 → 分类 → Webhook 回调)
- 扇出聚合(并行查询 → 合并结果)
❌ 不适合使用 pg_durable 的场景:
- 单条
INSERT ... SELECT就能完成的简单操作 - 需要亚毫秒级同步响应的请求处理
- 无法安装扩展或运行后台 Worker 的托管 PostgreSQL
- 工作流跨越多个异构系统且主要逻辑不在 SQL 中
- 需要任意应用层代码(非 HTTP、非 SQL)的复杂逻辑
🚀 二、DSL 语法详解与实战示例
核心操作符一览
pg_durable 提供了一套 SQL 原生的操作符和函数,让工作流定义读起来像声明式配置:
| 操作符 | 名称 | 说明 | 示例 |
|---|---|---|---|
~> |
顺序执行 | 左边完成后执行右边 | 'SELECT 1' ~> 'SELECT 2' |
|=> |
命名结果 | 将结果存入变量 | 'SELECT 1' |=> 'myvar' |
& |
并行执行 | 同时执行,等待全部完成 | 'SELECT 1' & 'SELECT 2' |
| |
竞争执行 | 同时执行,第一个完成的胜出 | fast | slow |
?> / !> |
条件分支 | if-then-else | cond ?> then !> else |
@> |
循环 | 永久循环 | @> body |
实战示例 1:带重试的订单处理流程
-- 完整的订单处理工作流:查询 → 更新 → 等待 → 完成
SELECT df.start(
'SELECT id FROM orders WHERE status = ''pending''
ORDER BY created_at LIMIT 1' |=> 'order'
~> 'UPDATE orders SET status = ''processing'',
started_at = now() WHERE id = $order.id'
~> df.http(
'https://payment.example.com/charge',
'POST',
'{"order_id": "$order.id", "amount": "$order.total"}'
) |=> 'payment'
~> df.if(
'SELECT ($payment::jsonb->>''ok'')::boolean',
'UPDATE orders SET status = ''paid'' WHERE id = $order.id',
'UPDATE orders SET status = ''failed'',
error = $payment::jsonb->>''body'' WHERE id = $order.id'
),
'process-payment'
);
实战示例 2:并行数据聚合
-- 并行查询多个维度,然后合并写入报表
SELECT df.start(
df.join(
'SELECT count(*) as user_count FROM users WHERE active',
'SELECT count(*) as order_count FROM orders WHERE created_at > now() - interval ''1 day''',
'SELECT sum(amount) as revenue FROM payments WHERE status = ''completed'''
) |=> 'stats'
~> 'INSERT INTO daily_reports (date, users, orders, revenue)
VALUES (current_date, $stats.user_count, $stats.order_count, $stats.revenue)',
'daily-report'
);
实战示例 3:带 Cron 调度的循环任务
-- 每小时清理过期日志,保留审计记录
SELECT df.start(
@> (
df.wait_for_schedule('0 * * * *')
~> 'DELETE FROM application_logs
WHERE created_at < now() - interval ''7 days''
RETURNING count(*)' |=> 'deleted'
~> df.if(
'SELECT $deleted > 0',
'INSERT INTO audit_log (action, details)
VALUES (''cleanup'', jsonb_build_object(''deleted'', $deleted))',
'SELECT ''nothing to clean'''
)
),
'hourly-log-cleanup'
);
📌 记住:
@>创建的是「永久循环」函数,它会一直运行直到你手动df.cancel()。每个循环迭代都有独立的检查点。
可视化与调试
pg_durable 提供了 df.explain() 来可视化工作流结构,无需执行即可预览:
-- 预览工作流结构(不执行)
SELECT df.explain(
'SELECT 1' |=> 'step1'
~> 'SELECT 2' |=> 'step2'
~> df.if(
'SELECT $step1 > 0',
'SELECT ''yes''',
'SELECT ''no'''
)
);
输出:
SQL |=> 'step1': SELECT 1
→ SQL |=> 'step2': SELECT 2
→ IF
✓ then:
SQL: SELECT 'yes'
✗ else:
SQL: SELECT 'no'
📊 三、方案对比:pg_durable vs 传统方案
全方位对比表
| 维度 | pg_cron + 状态表 | Temporal | Airflow | pg_durable |
|---|---|---|---|---|
| 基础设施 | 仅 PostgreSQL | 需要部署 Temporal Server + Worker | 需要 Scheduler + Worker + DB | 仅 PostgreSQL |
| 学习成本 | 低(纯 SQL) | 高(Go/Java/Python SDK) | 中(Python DAG) | 低(SQL DSL) |
| 故障恢复 | 手动实现 | 自动 | 自动 | 自动 |
| 检查点 | 手动管理 | 内置 | 内置 | 内置 |
| 并行执行 | 手动协调 | 支持 | 支持 | 原生支持 |
| 可视化 | 无 | Web UI | Web UI | df.explain() |
| 外部 API 调用 | 需要应用层 | 原生支持 | 原生支持 | df.http() |
| 适合规模 | 小型项目 | 大型分布式 | 大型数据管道 | 中小型项目 |
| 运维复杂度 | 低 | 高 | 高 | 极低 |
性能与可靠性考量
pg_durable 的优势:
- ✅ 零额外基础设施——不需要 Redis、不需要消息队列、不需要独立的 Worker 进程
- ✅ 与 PostgreSQL 共享备份、监控、权限体系
- ✅ 状态查询是普通 SQL——可以用任何 SQL 客户端查看
- ✅ 多用户隔离——通过 RLS(行级安全)实现租户隔离
pg_durable 的局限:
- ⚠️ 目前处于 Preview 阶段,生产环境使用需谨慎
- ⚠️ 只支持 PostgreSQL 17 和 18
- ⚠️ 步骤逻辑必须是 SQL 或 HTTP 调用——不能直接运行 Python/Node.js 代码
- ⚠️ 长时间运行的工作流会占用 PostgreSQL 后台 Worker 资源
- ⚠️ 不适合跨多个异构系统的复杂编排
实际场景决策树
你的后台任务需要持久执行吗?
├── 不需要 → 用 pg_cron + 简单存储过程
├── 需要,但逻辑主要在 SQL 中
│ ├── 单数据库 → ✅ pg_durable 是最佳选择
│ └── 多数据库/跨系统 → 考虑 Temporal
└── 需要,但逻辑主要在应用层代码中
├── 已有 Temporal/Airflow → 继续使用
└── 没有 → 评估 pg_durable 的 df.http() 是否够用
⚠️ 四、避坑指南与最佳实践
安装与配置
-- 1. 在 postgresql.conf 中添加扩展
-- shared_preload_libraries = 'pg_durable'
-- 2. 重启 PostgreSQL 后创建扩展
CREATE EXTENSION pg_durable;
-- 3. 授权给应用角色
SELECT df.grant_usage('app_role');
⚠️ 警告: 如果
shared_preload_libraries中添加了pg_durable但没有执行CREATE EXTENSION,后台 Worker 会保持空闲状态,持久函数无法执行。
常见陷阱
❌ 陷阱 1:在循环中忘记退出条件
-- ❌ 错误:死循环,永远不会停止
SELECT df.start(
@> 'INSERT INTO heartbeat (ts) VALUES (now())',
'heartbeat'
);
-- ✅ 正确:带条件退出或手动取消
SELECT df.start(
@> (
'INSERT INTO heartbeat (ts) VALUES (now())'
~> df.sleep(60)
),
'heartbeat'
);
-- 需要停止时:SELECT df.cancel('<instance_id>', 'Done');
❌ 陷阱 2:忽略 NULL 值处理
-- ❌ 错误:如果用户不存在,$user 为 NULL 会导致实例失败
SELECT df.start(
'SELECT name FROM users WHERE id = 999' |=> 'user'
~> 'SELECT $user'
);
-- ✅ 正确:使用 ? 后缀做 NULL 安全访问
SELECT df.start(
'SELECT name FROM users WHERE id = 999' |=> 'user'
~> 'SELECT COALESCE($user?, ''unknown'')'
);
❌ 陷阱 3:升级后忘记重新授权
-- 升级 pg_durable 后,新函数不会自动授权
-- ❌ 错误:升级后直接使用
ALTER EXTENSION pg_durable UPDATE;
-- ✅ 正确:升级后重新授权
ALTER EXTENSION pg_durable UPDATE;
SELECT df.grant_usage('app_role'); -- 重新授权
多用户环境最佳实践
-- 创建共享角色并授权
CREATE ROLE pg_durable_user NOLOGIN;
SELECT df.grant_usage('pg_durable_user');
-- 授予应用角色
GRANT pg_durable_user TO app_backend, etl_service;
-- 每个用户的变量空间是隔离的(通过 RLS)
-- 避免在变量中存储明文密码
SELECT df.setvar('api_endpoint', 'https://api.example.com');
监控与运维
-- 查看所有实例状态
SELECT * FROM df.list_instances();
-- 查看特定实例结果
SELECT df.result('a1b2c3d4');
-- 取消运行中的实例
SELECT df.cancel('a1b2c3d4', '业务原因');
-- 查看执行历史
SELECT * FROM df.nodes WHERE instance_id = 'a1b2c3d4';
💡 总结与展望
pg_durable 代表了一种有趣的技术趋势:将计算推向数据(bring compute close to data)。对于那些已经把状态存在 PostgreSQL 中的团队来说,它消除了搭建外部编排系统的需求。
我的判断:
- ✅ 值得尝试的场景: 中小型项目的后台任务、数据 ETL 管道、定时维护任务、简单的 API 工作流
- ❌ 不建议使用的场景: 大型分布式系统、跨多数据库/多服务的复杂编排、需要任意应用层逻辑的场景
- ⚠️ 观望期建议: 目前是 Preview 阶段,建议在非关键路径上试用,不建议直接上生产
与其他工具的关系:
- 如果你已经在用 Temporal/Airflow 且运行良好——不需要迁移
- 如果你在用
pg_cron+ 状态表拼凑方案——pg_durable 值得认真评估 - 如果你刚开始搭建后台任务系统——pg_durable 是一个低门槛的起点
相关资源:
- 📦 pg_durable GitHub 仓库
- 📖 User Guide
- 🏗️ duroxide — 持久任务框架
- 🗄️ Azure HorizonDB — 微软的 PostgreSQL 云服务,已内置 pg_durable
⚡ 关键结论: pg_durable 不是要取代 Temporal 或 Airflow,而是为那些「数据已经在 PostgreSQL 里、工作流逻辑可以用 SQL 表达」的场景提供了一个零基础设施的持久执行方案。它的价值在于简化,而非替代。