pg_durable:微软开源的 PostgreSQL 持久执行扩展,告别任务队列和调度器

微软开源 pg_durable 扩展,将持久执行直接嵌入 PostgreSQL。本文深度解析其架构、DSL 语法、实战用例,对比 Temporal/Airflow 等方案,帮你判断是否值得采用。

数据库 2026-06-04 12 分钟

2026 年 6 月,微软在 GitHub 上开源了 pg_durable——一个将**持久执行(Durable Execution)**直接嵌入 PostgreSQL 的扩展。消息在 Hacker News 上迅速获得 220+ 点赞,原因很简单:无数团队都在用 pg_cron + 状态表 + 轮询 Worker 的方式拼凑后台任务系统,而 pg_durable 声称可以用纯 SQL 替代这一切。

这个扩展到底是噱头还是真材实料?它能替代 Temporal、Airflow 这些重量级编排引擎吗?本文将从架构原理、DSL 语法、实战场景和方案对比四个维度深度解析。

🔧 一、pg_durable 的核心架构与设计哲学

它到底解决了什么问题?

每个后端开发者都经历过这种痛苦:一个后台任务需要执行多个步骤(清理旧数据 → 调用外部 API → 更新状态 → 发送通知),而你不得不自己搭建一套「状态机 + 重试 + 检查点」系统。常见的拼凑方案如下:

-- ❌ 传统做法:手动管理状态的存储过程
CREATE PROCEDURE process_order(order_id INT)
LANGUAGE plpgsql AS $$
BEGIN
    UPDATE orders SET status = 'processing' WHERE id = order_id;
    -- 如果这里崩溃了怎么办?重跑会重复执行
    PERFORM call_payment_api(order_id);
    UPDATE orders SET status = 'paid' WHERE id = order_id;
    -- 这里崩溃了呢?状态不一致
    PERFORM send_notification(order_id);
    UPDATE orders SET status = 'completed' WHERE id = order_id;
END;
$$;

⚠️ 警告: 上面的存储过程在数据库崩溃、连接断开或长时间运行超时时,要么丢失进度,要么重复执行。这在生产环境中是不可接受的。

pg_durable 的核心理念是:让 PostgreSQL 自己管理工作流的检查点、重试和恢复,不需要外部服务。

架构分层

pg_durable 是一个 PostgreSQL 扩展(使用 Rust + pgrx 构建),架构分为三层:

层级 组件 职责
SQL DSL 层 df.* 函数和操作符 提供声明式的工作流定义语法
编排运行时 duroxide 确定性重放、检查点、子编排、定时器
状态持久化 duroxide-pg 将运行时状态存入 duroxide.* schema
-- ✅ pg_durable 做法:声明式持久执行
SELECT df.start(
    'SELECT id FROM orders WHERE status = ''pending'' LIMIT 1' |=> 'order_id'
    ~> 'UPDATE orders SET status = ''processing'' WHERE id = $order_id'
    ~> df.sleep(2)  -- 模拟处理时间
    ~> 'UPDATE orders SET status = ''completed'' WHERE id = $order_id',
    'process-order'
);

💡 提示: 每个步骤之间自动创建检查点。如果数据库在第 3 步崩溃,重启后会从第 3 步恢复,而不会重复执行前两步。

适用场景 vs 不适用场景

✅ 适合使用 pg_durable 的场景:

  • 数据 ETL 管道(清洗 → 转换 → 加载)
  • 向量嵌入管道(分块 → 调用嵌入 API → 写入 pgvector)
  • 定时维护任务(检测膨胀 → 通知 → 等待审批 → 执行)
  • 外部 API 工作流(富化 → 分类 → Webhook 回调)
  • 扇出聚合(并行查询 → 合并结果)

❌ 不适合使用 pg_durable 的场景:

  • 单条 INSERT ... SELECT 就能完成的简单操作
  • 需要亚毫秒级同步响应的请求处理
  • 无法安装扩展或运行后台 Worker 的托管 PostgreSQL
  • 工作流跨越多个异构系统且主要逻辑不在 SQL 中
  • 需要任意应用层代码(非 HTTP、非 SQL)的复杂逻辑

🚀 二、DSL 语法详解与实战示例

核心操作符一览

pg_durable 提供了一套 SQL 原生的操作符和函数,让工作流定义读起来像声明式配置:

操作符 名称 说明 示例
~> 顺序执行 左边完成后执行右边 'SELECT 1' ~> 'SELECT 2'
|=> 命名结果 将结果存入变量 'SELECT 1' |=> 'myvar'
& 并行执行 同时执行,等待全部完成 'SELECT 1' & 'SELECT 2'
| 竞争执行 同时执行,第一个完成的胜出 fast | slow
?> / !> 条件分支 if-then-else cond ?> then !> else
@> 循环 永久循环 @> body

实战示例 1:带重试的订单处理流程

-- 完整的订单处理工作流:查询 → 更新 → 等待 → 完成
SELECT df.start(
    'SELECT id FROM orders WHERE status = ''pending'' 
     ORDER BY created_at LIMIT 1' |=> 'order'
    ~> 'UPDATE orders SET status = ''processing'', 
        started_at = now() WHERE id = $order.id'
    ~> df.http(
        'https://payment.example.com/charge',
        'POST',
        '{"order_id": "$order.id", "amount": "$order.total"}'
    ) |=> 'payment'
    ~> df.if(
        'SELECT ($payment::jsonb->>''ok'')::boolean',
        'UPDATE orders SET status = ''paid'' WHERE id = $order.id',
        'UPDATE orders SET status = ''failed'', 
         error = $payment::jsonb->>''body'' WHERE id = $order.id'
    ),
    'process-payment'
);

实战示例 2:并行数据聚合

-- 并行查询多个维度,然后合并写入报表
SELECT df.start(
    df.join(
        'SELECT count(*) as user_count FROM users WHERE active',
        'SELECT count(*) as order_count FROM orders WHERE created_at > now() - interval ''1 day''',
        'SELECT sum(amount) as revenue FROM payments WHERE status = ''completed'''
    ) |=> 'stats'
    ~> 'INSERT INTO daily_reports (date, users, orders, revenue)
        VALUES (current_date, $stats.user_count, $stats.order_count, $stats.revenue)',
    'daily-report'
);

实战示例 3:带 Cron 调度的循环任务

-- 每小时清理过期日志,保留审计记录
SELECT df.start(
    @> (
        df.wait_for_schedule('0 * * * *')
        ~> 'DELETE FROM application_logs 
            WHERE created_at < now() - interval ''7 days''
            RETURNING count(*)' |=> 'deleted'
        ~> df.if(
            'SELECT $deleted > 0',
            'INSERT INTO audit_log (action, details) 
             VALUES (''cleanup'', jsonb_build_object(''deleted'', $deleted))',
            'SELECT ''nothing to clean'''
        )
    ),
    'hourly-log-cleanup'
);

📌 记住: @> 创建的是「永久循环」函数,它会一直运行直到你手动 df.cancel()。每个循环迭代都有独立的检查点。

可视化与调试

pg_durable 提供了 df.explain() 来可视化工作流结构,无需执行即可预览:

-- 预览工作流结构(不执行)
SELECT df.explain(
    'SELECT 1' |=> 'step1'
    ~> 'SELECT 2' |=> 'step2'
    ~> df.if(
        'SELECT $step1 > 0',
        'SELECT ''yes''',
        'SELECT ''no'''
    )
);

输出:

SQL |=> 'step1': SELECT 1
→ SQL |=> 'step2': SELECT 2
→ IF
    ✓ then:
      SQL: SELECT 'yes'
    ✗ else:
      SQL: SELECT 'no'

📊 三、方案对比:pg_durable vs 传统方案

全方位对比表

维度 pg_cron + 状态表 Temporal Airflow pg_durable
基础设施 仅 PostgreSQL 需要部署 Temporal Server + Worker 需要 Scheduler + Worker + DB 仅 PostgreSQL
学习成本 低(纯 SQL) 高(Go/Java/Python SDK) 中(Python DAG) 低(SQL DSL)
故障恢复 手动实现 自动 自动 自动
检查点 手动管理 内置 内置 内置
并行执行 手动协调 支持 支持 原生支持
可视化 Web UI Web UI df.explain()
外部 API 调用 需要应用层 原生支持 原生支持 df.http()
适合规模 小型项目 大型分布式 大型数据管道 中小型项目
运维复杂度 极低

性能与可靠性考量

pg_durable 的优势:

  • ✅ 零额外基础设施——不需要 Redis、不需要消息队列、不需要独立的 Worker 进程
  • ✅ 与 PostgreSQL 共享备份、监控、权限体系
  • ✅ 状态查询是普通 SQL——可以用任何 SQL 客户端查看
  • ✅ 多用户隔离——通过 RLS(行级安全)实现租户隔离

pg_durable 的局限:

  • ⚠️ 目前处于 Preview 阶段,生产环境使用需谨慎
  • ⚠️ 只支持 PostgreSQL 17 和 18
  • ⚠️ 步骤逻辑必须是 SQL 或 HTTP 调用——不能直接运行 Python/Node.js 代码
  • ⚠️ 长时间运行的工作流会占用 PostgreSQL 后台 Worker 资源
  • ⚠️ 不适合跨多个异构系统的复杂编排

实际场景决策树

你的后台任务需要持久执行吗?
├── 不需要 → 用 pg_cron + 简单存储过程
├── 需要,但逻辑主要在 SQL 中
│   ├── 单数据库 → ✅ pg_durable 是最佳选择
│   └── 多数据库/跨系统 → 考虑 Temporal
└── 需要,但逻辑主要在应用层代码中
    ├── 已有 Temporal/Airflow → 继续使用
    └── 没有 → 评估 pg_durable 的 df.http() 是否够用

⚠️ 四、避坑指南与最佳实践

安装与配置

-- 1. 在 postgresql.conf 中添加扩展
-- shared_preload_libraries = 'pg_durable'

-- 2. 重启 PostgreSQL 后创建扩展
CREATE EXTENSION pg_durable;

-- 3. 授权给应用角色
SELECT df.grant_usage('app_role');

⚠️ 警告: 如果 shared_preload_libraries 中添加了 pg_durable 但没有执行 CREATE EXTENSION,后台 Worker 会保持空闲状态,持久函数无法执行。

常见陷阱

❌ 陷阱 1:在循环中忘记退出条件

-- ❌ 错误:死循环,永远不会停止
SELECT df.start(
    @> 'INSERT INTO heartbeat (ts) VALUES (now())',
    'heartbeat'
);

-- ✅ 正确:带条件退出或手动取消
SELECT df.start(
    @> (
        'INSERT INTO heartbeat (ts) VALUES (now())'
        ~> df.sleep(60)
    ),
    'heartbeat'
);
-- 需要停止时:SELECT df.cancel('<instance_id>', 'Done');

❌ 陷阱 2:忽略 NULL 值处理

-- ❌ 错误:如果用户不存在,$user 为 NULL 会导致实例失败
SELECT df.start(
    'SELECT name FROM users WHERE id = 999' |=> 'user'
    ~> 'SELECT $user'
);

-- ✅ 正确:使用 ? 后缀做 NULL 安全访问
SELECT df.start(
    'SELECT name FROM users WHERE id = 999' |=> 'user'
    ~> 'SELECT COALESCE($user?, ''unknown'')'
);

❌ 陷阱 3:升级后忘记重新授权

-- 升级 pg_durable 后,新函数不会自动授权
-- ❌ 错误:升级后直接使用
ALTER EXTENSION pg_durable UPDATE;

-- ✅ 正确:升级后重新授权
ALTER EXTENSION pg_durable UPDATE;
SELECT df.grant_usage('app_role');  -- 重新授权

多用户环境最佳实践

-- 创建共享角色并授权
CREATE ROLE pg_durable_user NOLOGIN;
SELECT df.grant_usage('pg_durable_user');

-- 授予应用角色
GRANT pg_durable_user TO app_backend, etl_service;

-- 每个用户的变量空间是隔离的(通过 RLS)
-- 避免在变量中存储明文密码
SELECT df.setvar('api_endpoint', 'https://api.example.com');

监控与运维

-- 查看所有实例状态
SELECT * FROM df.list_instances();

-- 查看特定实例结果
SELECT df.result('a1b2c3d4');

-- 取消运行中的实例
SELECT df.cancel('a1b2c3d4', '业务原因');

-- 查看执行历史
SELECT * FROM df.nodes WHERE instance_id = 'a1b2c3d4';

💡 总结与展望

pg_durable 代表了一种有趣的技术趋势:将计算推向数据(bring compute close to data)。对于那些已经把状态存在 PostgreSQL 中的团队来说,它消除了搭建外部编排系统的需求。

我的判断:

  • 值得尝试的场景: 中小型项目的后台任务、数据 ETL 管道、定时维护任务、简单的 API 工作流
  • 不建议使用的场景: 大型分布式系统、跨多数据库/多服务的复杂编排、需要任意应用层逻辑的场景
  • ⚠️ 观望期建议: 目前是 Preview 阶段,建议在非关键路径上试用,不建议直接上生产

与其他工具的关系:

  • 如果你已经在用 Temporal/Airflow 且运行良好——不需要迁移
  • 如果你在用 pg_cron + 状态表拼凑方案——pg_durable 值得认真评估
  • 如果你刚开始搭建后台任务系统——pg_durable 是一个低门槛的起点

相关资源:

关键结论: pg_durable 不是要取代 Temporal 或 Airflow,而是为那些「数据已经在 PostgreSQL 里、工作流逻辑可以用 SQL 表达」的场景提供了一个零基础设施的持久执行方案。它的价值在于简化,而非替代

📚 相关文章