2026 年 5 月,一个名为 Obelisk 的开源项目在 Hacker News 上引爆了 664 点和 365 条讨论——它的核心理念只有一句话:SQLite is all you need for durable workflows。这不是又一个 “SQLite 万能论” 的标题党项目,而是一个经过生产验证的工作流引擎,用 SQLite 单文件替代了 Redis + PostgreSQL + Kafka 的经典组合,将基础设施复杂度降低了一个数量级。对于正在被微服务架构折磨的开发者来说,这篇文章会彻底改变你对"持久化工作流"的认知。
🏗️ 一、持久化工作流的核心问题
1.1 什么是持久化工作流?
持久化工作流(Durable Workflow)是指即使进程崩溃、服务器重启,工作流的执行状态也能被恢复并继续执行的机制。这是所有需要"可靠执行"的业务场景的基石:
- 电商订单:下单 → 扣库存 → 发通知 → 发货,任何一步失败都需要重试或补偿
- 数据管道:ETL 任务的多阶段处理,每个阶段需要确认完成后才进入下一阶段
- 审批流程:多级审批,每个节点可能等待数天
- AI Agent 编排:多步骤的 Agent 调用链,需要断点续传
💡 **提示:**持久化工作流的核心不是"流程编排",而是"状态持久化"——任何时刻崩溃都能从断点恢复。
1.2 传统方案的复杂度陷阱
大多数团队的持久化工作流方案长这样:
Redis(任务队列 + 分布式锁)
+ PostgreSQL(工作流状态存储)
+ Kafka/RabbitMQ(事件总线)
+ 调度服务(Cron/Quartz)
这套组合的运维成本惊人:
| 组件 | 最低配置 | 月成本(云服务) | 运维复杂度 |
|---|---|---|---|
| Redis | 2GB 内存 | ¥200-500 | 中 |
| PostgreSQL | 2核4GB | ¥300-800 | 中 |
| Kafka | 3 节点集群 | ¥1500-3000 | 高 |
| 调度服务 | 独立进程 | ¥100-200 | 低 |
| 合计 | - | ¥2100-4500 | 高 |
⚠️ **警告:**对于 90% 的中小项目,这套基础设施的运维复杂度远超业务本身的复杂度。你可能花了 70% 的时间在调 Kafka 分区、Redis 持久化策略和 PostgreSQL 连接池上。
1.3 SQLite 方案的核心思路
SQLite 方案的架构极度简化:
SQLite(WAL 模式) = 任务队列 + 状态存储 + 事件日志
+ 应用进程内调度器
核心原理:利用 SQLite 的 WAL(Write-Ahead Logging)模式和事务原子性,将工作流的每一步执行记录写入 SQLite 表。崩溃恢复时,扫描未完成的步骤并重新执行。
-- 工作流实例表:记录每个工作流的全局状态
CREATE TABLE workflow_instances (
id TEXT PRIMARY KEY,
workflow_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'running', -- running / completed / failed / paused
current_step INTEGER NOT NULL DEFAULT 0,
input_data TEXT, -- JSON 格式的输入参数
context_data TEXT, -- JSON 格式的上下文(步骤间共享数据)
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
retry_count INTEGER NOT NULL DEFAULT 0
);
-- 工作流步骤表:记录每一步的执行状态
CREATE TABLE workflow_steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instance_id TEXT NOT NULL REFERENCES workflow_instances(id),
step_index INTEGER NOT NULL,
step_type TEXT NOT NULL, -- action / delay / condition / parallel
step_config TEXT NOT NULL, -- JSON 格式的步骤配置
status TEXT NOT NULL DEFAULT 'pending', -- pending / running / completed / failed / skipped
result_data TEXT, -- JSON 格式的执行结果
error_message TEXT,
started_at TEXT,
completed_at TEXT,
retry_count INTEGER NOT NULL DEFAULT 0,
UNIQUE(instance_id, step_index)
);
-- 事件日志表:记录所有状态变更,支持审计和重放
CREATE TABLE workflow_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instance_id TEXT NOT NULL,
event_type TEXT NOT NULL, -- step_started / step_completed / step_failed / workflow_paused
event_data TEXT, -- JSON 格式的事件数据
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- 为高频查询创建索引
CREATE INDEX idx_steps_pending ON workflow_steps(status, instance_id)
WHERE status IN ('pending', 'running');
CREATE INDEX idx_instances_running ON workflow_instances(status)
WHERE status = 'running';
CREATE INDEX idx_events_instance ON workflow_events(instance_id, id);
⚙️ 二、SQLite 工作流引擎实战
2.1 基础引擎实现
下面是用 Python 实现的 SQLite 工作流引擎核心代码,可以直接运行:
# SQLite 持久化工作流引擎 - 核心实现
import sqlite3
import json
import time
import uuid
from datetime import datetime
from contextlib import contextmanager
class SQLiteWorkflowEngine:
def __init__(self, db_path: str = "workflows.db"):
self.db_path = db_path
self._init_db()
self._handlers = {}
def _init_db(self):
"""初始化数据库表结构,启用 WAL 模式"""
with self._get_conn() as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.execute("PRAGMA synchronous=NORMAL")
conn.executescript("""
CREATE TABLE IF NOT EXISTS workflow_instances (
id TEXT PRIMARY KEY,
workflow_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'running',
current_step INTEGER NOT NULL DEFAULT 0,
input_data TEXT,
context_data TEXT DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS workflow_steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instance_id TEXT NOT NULL,
step_index INTEGER NOT NULL,
step_type TEXT NOT NULL,
step_config TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
result_data TEXT,
error_message TEXT,
started_at TEXT,
completed_at TEXT,
retry_count INTEGER NOT NULL DEFAULT 0,
UNIQUE(instance_id, step_index)
);
CREATE TABLE IF NOT EXISTS workflow_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instance_id TEXT NOT NULL,
event_type TEXT NOT NULL,
event_data TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_steps_pending
ON workflow_steps(status, instance_id)
WHERE status IN ('pending', 'running');
CREATE INDEX IF NOT EXISTS idx_instances_running
ON workflow_instances(status)
WHERE status = 'running';
""")
@contextmanager
def _get_conn(self):
conn = sqlite3.connect(self.db_path, timeout=10)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def register_handler(self, step_type: str, handler):
"""注册步骤处理器"""
self._handlers[step_type] = handler
def start_workflow(self, workflow_type: str, steps: list, input_data: dict = None) -> str:
"""启动一个新的工作流实例"""
instance_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat()
with self._get_conn() as conn:
conn.execute(
"INSERT INTO workflow_instances (id, workflow_type, input_data, created_at, updated_at) VALUES (?,?,?,?,?)",
(instance_id, workflow_type, json.dumps(input_data or {}), now, now)
)
for i, step in enumerate(steps):
conn.execute(
"INSERT INTO workflow_steps (instance_id, step_index, step_type, step_config) VALUES (?,?,?,?)",
(instance_id, i, step["type"], json.dumps(step.get("config", {})))
)
conn.execute(
"INSERT INTO workflow_events (instance_id, event_type, event_data, created_at) VALUES (?,?,?,?)",
(instance_id, "workflow_started", json.dumps({"input": input_data}), now)
)
return instance_id
def execute_next_step(self, instance_id: str) -> bool:
"""执行工作流的下一步,返回是否还有后续步骤"""
with self._get_conn() as conn:
instance = conn.execute(
"SELECT * FROM workflow_instances WHERE id=? AND status='running'", (instance_id,)
).fetchone()
if not instance:
return False
step = conn.execute(
"SELECT * FROM workflow_steps WHERE instance_id=? AND status='pending' ORDER BY step_index LIMIT 1",
(instance_id,)
).fetchone()
if not step:
conn.execute(
"UPDATE workflow_instances SET status='completed', updated_at=? WHERE id=?",
(datetime.utcnow().isoformat(), instance_id)
)
conn.execute(
"INSERT INTO workflow_events (instance_id, event_type, created_at) VALUES (?,?,?)",
(instance_id, "workflow_completed", datetime.utcnow().isoformat())
)
return False
# 执行步骤
conn.execute(
"UPDATE workflow_steps SET status='running', started_at=? WHERE id=?",
(datetime.utcnow().isoformat(), step["id"])
)
conn.execute(
"UPDATE workflow_instances SET current_step=?, updated_at=? WHERE id=?",
(step["step_index"], datetime.utcnow().isoformat(), instance_id)
)
# 在事务外执行处理器(避免长事务锁)
handler = self._handlers.get(step["step_type"])
if not handler:
self._mark_step_failed(instance_id, step["id"], f"No handler for step type: {step['step_type']}")
return True
try:
context = json.loads(instance["context_data"] or "{}")
config = json.loads(step["step_config"])
result = handler(context, config)
self._mark_step_completed(instance_id, step["id"], result)
return True
except Exception as e:
retry_count = step["retry_count"] + 1
if retry_count >= 3:
self._mark_step_failed(instance_id, step["id"], str(e))
else:
with self._get_conn() as conn:
conn.execute(
"UPDATE workflow_steps SET status='pending', retry_count=?, error_message=? WHERE id=?",
(retry_count, str(e), step["id"])
)
return True
def _mark_step_completed(self, instance_id: str, step_id: int, result: dict):
now = datetime.utcnow().isoformat()
with self._get_conn() as conn:
conn.execute(
"UPDATE workflow_steps SET status='completed', result_data=?, completed_at=? WHERE id=?",
(json.dumps(result), now, step_id)
)
# 将结果合并到工作流上下文
instance = conn.execute("SELECT context_data FROM workflow_instances WHERE id=?", (instance_id,)).fetchone()
context = json.loads(instance["context_data"] or "{}")
context.update(result)
conn.execute("UPDATE workflow_instances SET context_data=? WHERE id=?", (json.dumps(context), instance_id))
conn.execute(
"INSERT INTO workflow_events (instance_id, event_type, event_data, created_at) VALUES (?,?,?,?)",
(instance_id, "step_completed", json.dumps({"step_id": step_id, "result": result}), now)
)
def _mark_step_failed(self, instance_id: str, step_id: int, error: str):
now = datetime.utcnow().isoformat()
with self._get_conn() as conn:
conn.execute(
"UPDATE workflow_steps SET status='failed', error_message=?, completed_at=? WHERE id=?",
(error, now, step_id)
)
conn.execute(
"UPDATE workflow_instances SET status='failed', updated_at=? WHERE id=?",
(now, instance_id)
)
conn.execute(
"INSERT INTO workflow_events (instance_id, event_type, event_data, created_at) VALUES (?,?,?,?)",
(instance_id, "step_failed", json.dumps({"step_id": step_id, "error": error}), now)
)
def recover_stale_workflows(self, timeout_seconds: int = 300):
"""恢复卡住的工作流(崩溃恢复的核心逻辑)"""
now = datetime.utcnow().isoformat()
with self._get_conn() as conn:
# 将 running 状态超时的步骤重置为 pending
stale_steps = conn.execute("""
SELECT ws.id, ws.instance_id FROM workflow_steps ws
JOIN workflow_instances wi ON ws.instance_id = wi.id
WHERE ws.status = 'running'
AND ws.started_at < datetime('now', ? || ' seconds')
""", (f"-{timeout_seconds}",)).fetchall()
for step in stale_steps:
conn.execute(
"UPDATE workflow_steps SET status='pending', started_at=NULL WHERE id=?",
(step["id"],)
)
conn.execute(
"INSERT INTO workflow_events (instance_id, event_type, event_data, created_at) VALUES (?,?,?,?)",
(step["instance_id"], "step_recovered", json.dumps({"step_id": step["id"]}), now)
)
return len(stale_steps)
2.2 使用示例:订单处理工作流
下面是一个完整的订单处理工作流示例,包含库存扣减、支付处理、通知发送三个步骤:
# 订单处理工作流 - 完整示例
import sqlite3
import json
import time
# 初始化引擎
engine = SQLiteWorkflowEngine("order_workflows.db")
# 模拟外部服务
def check_inventory(context, config):
"""检查并扣减库存"""
product_id = config["product_id"]
quantity = config.get("quantity", 1)
# 实际项目中这里会调用库存服务
print(f" ✅ 扣减库存: product={product_id}, qty={quantity}")
return {"inventory_reserved": True, "reservation_id": f"R-{int(time.time())}"}
def process_payment(context, config):
"""处理支付"""
amount = config["amount"]
reservation_id = context.get("reservation_id")
print(f" ✅ 处理支付: amount={amount}, reservation={reservation_id}")
return {"payment_id": f"PAY-{int(time.time())}", "payment_status": "success"}
def send_notification(context, config):
"""发送通知"""
notification_type = config.get("type", "order_confirmed")
payment_id = context.get("payment_id")
print(f" ✅ 发送通知: type={notification_type}, payment={payment_id}")
return {"notification_sent": True}
# 注册处理器
engine.register_handler("check_inventory", check_inventory)
engine.register_handler("process_payment", process_payment)
engine.register_handler("send_notification", send_notification)
# 定义工作流步骤
order_steps = [
{"type": "check_inventory", "config": {"product_id": "SKU-001", "quantity": 2}},
{"type": "process_payment", "config": {"amount": 199.00}},
{"type": "send_notification", "config": {"type": "order_confirmed"}},
]
# 启动并执行工作流
instance_id = engine.start_workflow("order_processing", order_steps, {"user_id": "U-1001"})
print(f"🚀 启动订单工作流: {instance_id}")
while engine.execute_next_step(instance_id):
print(f" → 执行下一步...")
print(f"\n✅ 工作流完成!")
2.3 崩溃恢复:SQLite 的杀手锏
SQLite 方案真正的价值在崩溃恢复。传统方案需要 ZooKeeper 选举、Redis Sentinel 故障转移,而 SQLite 只需要一个简单的扫描:
# 崩溃恢复守护进程 - 每 30 秒运行一次
import time
def recovery_loop(engine: SQLiteWorkflowEngine, interval: int = 30):
"""崩溃恢复主循环"""
print("🔄 启动崩溃恢复守护进程...")
while True:
try:
# 1. 恢复卡住的步骤(超过 5 分钟未完成)
recovered = engine.recover_stale_workflows(timeout_seconds=300)
if recovered > 0:
print(f" ⚠️ 恢复了 {recovered} 个卡住的步骤")
# 2. 继续执行所有 running 状态的工作流
with engine._get_conn() as conn:
running = conn.execute(
"SELECT id FROM workflow_instances WHERE status='running'"
).fetchall()
for wf in running:
engine.execute_next_step(wf["id"])
# 3. 清理已完成超过 7 天的工作流数据(可选)
with engine._get_conn() as conn:
conn.execute("""
DELETE FROM workflow_events WHERE instance_id IN (
SELECT id FROM workflow_instances
WHERE status IN ('completed', 'failed')
AND updated_at < datetime('now', '-7 days')
)
""")
conn.execute("""
DELETE FROM workflow_steps WHERE instance_id IN (
SELECT id FROM workflow_instances
WHERE status IN ('completed', 'failed')
AND updated_at < datetime('now', '-7 days')
)
""")
conn.execute("""
DELETE FROM workflow_instances
WHERE status IN ('completed', 'failed')
AND updated_at < datetime('now', '-7 days')
""")
except Exception as e:
print(f" ❌ 恢复循环异常: {e}")
time.sleep(interval)
# 运行方式:在独立进程中启动
# recovery_loop(engine, interval=30)
📌 **记住:**SQLite 的崩溃恢复之所以简单,是因为 WAL 模式保证了即使进程被 kill -9,已提交的事务也不会丢失。这是文件级原子性,比 Redis 的 RDB/AOF 持久化更可靠。
📊 三、方案对比与适用场景
3.1 性能与可靠性对比
| 维度 | SQLite WAL | Redis + PG | Kafka + PG |
|---|---|---|---|
| 写入吞吐量 | 5,000-10,000 TPS | 50,000+ TPS | 100,000+ TPS |
| 读取延迟 | < 1ms | < 1ms | 5-10ms |
| 崩溃恢复 | 自动(WAL) | 手动配置 | 手动配置 |
| 数据可靠性 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 运维复杂度 | ⭐(极低) | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 月成本 | ¥0(本地) | ¥2000+ | ¥5000+ |
| 适用并发 | 单机多进程 | 分布式集群 | 大规模流处理 |
| 水平扩展 | ❌ 不支持 | ✅ 支持 | ✅ 支持 |
3.2 选型决策树
你的日工作流执行量是多少?
│
├─ < 10,000 / 天 ──→ SQLite(直接用,不要犹豫)
│
├─ 10,000 - 100,000 / 天 ──→ SQLite + 分库(按业务线拆分 DB 文件)
│
└─ > 100,000 / 天 ──→ 你需要分布式方案吗?
│
├─ 单机性能足够 ──→ SQLite + 队列缓冲
│
└─ 必须分布式 ──→ PostgreSQL + 消息队列
⚡ **关键结论:**对于 90% 的中小企业应用,SQLite 的 5,000-10,000 TPS 完全够用。一天 10,000 个工作流意味着平均每秒不到 0.12 个——你的业务瓶颈根本不在数据库。
3.3 SQLite 方案的局限性
不要把 SQLite 吹成万能方案,它有明确的边界:
- ❌ 不能跨机器:SQLite 是单机数据库,无法在多台服务器间共享
- ❌ 写入并发有限:WAL 模式支持多读一写,但写入仍然是串行的
- ❌ 不适合实时流处理:如果你需要 Kafka 的流语义(消费者组、偏移量),SQLite 不行
- ⚠️ 网络文件系统不可靠:不要在 NFS/SMB 上跑 SQLite,会损坏数据库
✅ 适合的场景:单体应用、嵌入式系统、边缘计算、小型 SaaS、内部工具、原型开发 ❌ 不适合的场景:高并发分布式系统、微服务架构、实时流处理、多数据中心部署
🔧 四、生产环境最佳实践
4.1 SQLite 工作流引擎的调优配置
# 生产环境 SQLite 配置 - 关键 PRAGMA 设置
def get_production_pragmas() -> dict:
return {
"journal_mode": "WAL", # 启用 WAL 模式,支持并发读
"synchronous": "NORMAL", # WAL 模式下 NORMAL 即可,比 FULL 快 2 倍
"busy_timeout": "10000", # 写锁等待 10 秒,避免 SQLITE_BUSY
"cache_size": "-64000", # 64MB 缓存(负数单位是 KB)
"temp_store": "MEMORY", # 临时表存内存
"mmap_size": "268435456", # 256MB 内存映射,加速读取
"wal_autocheckpoint": "1000", # 每 1000 页自动 checkpoint
}
def apply_pragmas(conn: sqlite3.Connection):
for pragma, value in get_production_pragmas().items():
conn.execute(f"PRAGMA {pragma}={value}")
4.2 与 Python asyncio 集成
# SQLite 工作流引擎的 asyncio 版本 - 使用线程池避免阻塞事件循环
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncWorkflowEngine:
def __init__(self, db_path: str = "workflows.db", max_workers: int = 4):
self.engine = SQLiteWorkflowEngine(db_path)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
async def start_workflow(self, workflow_type: str, steps: list, input_data: dict = None) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
self.engine.start_workflow,
workflow_type, steps, input_data
)
async def execute_next_step(self, instance_id: str) -> bool:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
self.engine.execute_next_step,
instance_id
)
async def run_workflow(self, workflow_type: str, steps: list, input_data: dict = None):
"""运行完整工作流直到完成"""
instance_id = await self.start_workflow(workflow_type, steps, input_data)
while await self.execute_next_step(instance_id):
await asyncio.sleep(0.01) # 让出事件循环
return instance_id
# FastAPI 集成示例
# from fastapi import FastAPI
# app = FastAPI()
# async_engine = AsyncWorkflowEngine()
#
# @app.post("/orders")
# async def create_order(order: dict):
# instance_id = await async_engine.run_workflow(
# "order_processing", order_steps, order
# )
# return {"workflow_id": instance_id, "status": "started"}
4.3 监控与可观测性
-- 工作流监控查询:实时查看引擎健康状态
-- 1. 各状态工作流数量
SELECT status, COUNT(*) as count
FROM workflow_instances
WHERE created_at > datetime('now', '-24 hours')
GROUP BY status;
-- 2. 平均执行时间(已完成的工作流)
SELECT workflow_type,
AVG(julianday(wi.updated_at) - julianday(wi.created_at)) * 86400 as avg_seconds,
COUNT(*) as total_count
FROM workflow_instances wi
WHERE status = 'completed'
AND created_at > datetime('now', '-7 days')
GROUP BY workflow_type;
-- 3. 失败率最高的步骤类型
SELECT ws.step_type,
COUNT(*) FILTER (WHERE ws.status = 'failed') as failures,
COUNT(*) as total,
ROUND(100.0 * COUNT(*) FILTER (WHERE ws.status = 'failed') / COUNT(*), 2) as failure_rate
FROM workflow_steps ws
JOIN workflow_instances wi ON ws.instance_id = wi.id
WHERE wi.created_at > datetime('now', '-24 hours')
GROUP BY ws.step_type
ORDER BY failure_rate DESC;
-- 4. 卡住的工作流(超过 10 分钟未更新)
SELECT id, workflow_type, current_step, updated_at,
CAST((julianday('now') - julianday(updated_at)) * 86400 AS INTEGER) as stuck_seconds
FROM workflow_instances
WHERE status = 'running'
AND updated_at < datetime('now', '-10 minutes');
💡 总结
SQLite 驱动的持久化工作流不是银弹,但对于绝大多数中小规模应用,它是投入产出比最高的方案。核心优势在于:
- 零基础设施成本:不需要 Redis、Kafka、ZooKeeper
- 崩溃恢复极简:WAL 模式 + 单文件 = 无需手动干预
- 开发效率高:一个文件就是一个完整的工作流引擎
- 调试友好:用任何 SQLite 客户端就能查看工作流状态
如果你的项目日工作流量在 10,000 以下,单机部署,团队规模在 10 人以内——先用 SQLite,等到真的遇到瓶颈再迁移。过早优化架构是中小团队最常见的技术浪费。
相关工具推荐:
- 🔧 db-browser-for-sqlite — SQLite 可视化管理工具
- 🔧 LiteFS — SQLite 分布式复制方案(Fly.io 出品)
- 🔧 Litestream — SQLite 实时流式备份到 S3
- 🔧 Turso — SQLite 云服务(libSQL 分支)
- 🔧 jsjson.com JSON 格式化工具 — 在线 JSON 格式化和校验