Cloudflare 在 2025 年底将 Workflows 推向正式可用(GA),这是继 Durable Objects 之后又一个边缘原生的有状态计算原语。与传统的 Cron Job 或消息队列不同,Workflows 提供了一种声明式的、自动持久化的、自带重试的多步骤任务执行模型——每个 Step 独立执行、独立重试、独立计费,整个工作流的状态由 Cloudflare 平台自动管理。如果你正在为「如何在边缘运行一个可靠的多步骤任务」而头疼,这篇文章会给你一个完整的答案。
📌 记住: Cloudflare Workflows 不是简单的任务编排工具,它是基于 Durable Objects 之上的一个完整的 Durable Execution 引擎。理解这一点,才能真正用好它。
🏗️ 一、Workflows 架构原理与核心概念
1.1 什么是 Durable Execution
Durable Execution(持久化执行)是一种编程范式:你写的代码在执行过程中,每一步的状态都会被自动持久化到存储层。如果执行中断(机器重启、网络抖动、代码部署),恢复后可以从上次成功的步骤继续执行,而不是从头开始。
这个概念最早由微软的 Durable Functions(2017)引入,后来 Temporal 将其推广到通用场景。Cloudflare Workflows 则把它带到了边缘:
| 特性 | Cloudflare Workflows | Temporal | AWS Step Functions |
|---|---|---|---|
| 运行环境 | 边缘(300+ 节点) | 自托管/Cloud | 单区域 |
| 状态持久化 | 自动(Durable Objects) | 需配置数据库 | 自动(DynamoDB) |
| 单步重试 | ✅ 内置 | ✅ 内置 | ✅ 内置 |
| 最大执行时长 | 无限制(按步计费) | 无限制 | 1 年 |
| 冷启动 | ~50ms | 取决于部署 | ~200ms |
| 定价模型 | 按 Step 执行次数 | 按操作数 | 按状态转换次数 |
| 编程语言 | TypeScript/JavaScript | Go/Java/Python/TS | JSON/YAML 定义 |
| 适合场景 | 边缘任务、Webhook 处理 | 复杂企业工作流 | AWS 生态集成 |
⚠️ 警告: 不要把 Workflows 当成「便宜的 Temporal」。它们的设计目标不同:Temporal 面向复杂的企业级编排,Workflows 面向边缘场景的轻量级可靠执行。
1.2 Workflows 的执行模型
一个 Workflow 由多个 Step 组成。每个 Step 是一个独立的执行单元:
// 工作流执行流程示意
// Step 1: 验证订单 → 成功 → 持久化状态
// Step 2: 扣减库存 → 失败 → 自动重试 3 次
// Step 2: 扣减库存 → 重试成功 → 持久化状态
// Step 3: 发送通知 → 成功 → 工作流完成
关键特性:
- ✅ Step 间自动持久化:每一步完成后,状态立即写入持久存储
- ✅ 单步独立重试:某一步失败不会影响已成功的步骤
- ✅ 幂等保证:同一步骤不会因为重试而执行多次(需要你自己保证幂等性)
- ✅ 事件驱动触发:可以通过 HTTP、Queue、Cron 等方式触发
1.3 Workflows 与 Durable Objects 的关系
Workflows 底层就是 Durable Objects。每个 Workflow 实例对应一个 Durable Object 实例,Step 的状态持久化依赖 Durable Objects 的事务性存储 API。
// 本质上,Workflows 是 Durable Objects 的高级封装
// 你不需要直接操作 DurableObjectStorage
// Workflows 帮你处理了状态序列化、重试调度、事件通知
💡 提示: 如果你的场景需要更细粒度的状态控制(比如实时 WebSocket、自定义存储结构),直接用 Durable Objects 更合适。Workflows 适合「步骤明确、需要可靠执行」的场景。
🚀 二、从零构建生产级 Workflow
2.1 项目初始化与基础结构
# 创建 Cloudflare Workers 项目
npm create cloudflare@latest my-workflow -- --type=hello-world
cd my-workflow
# Wrangler 3.x 已内置 Workflows 支持,无需额外依赖
项目结构:
my-workflow/
├── src/
│ ├── index.ts # Worker 入口(HTTP 触发器)
│ └── workflows/
│ └── order.ts # Workflow 定义
├── wrangler.toml # 配置文件
└── package.json
wrangler.toml 配置:
# wrangler.toml
name = "my-workflow"
main = "src/index.ts"
compatibility_date = "2026-01-01"
# 声明 Workflow 绑定
[[workflows]]
name = "order-workflow"
binding = "ORDER_WORKFLOW"
class_name = "OrderWorkflow"
2.2 完整的订单处理 Workflow
下面是一个真实的电商订单处理场景,包含库存检查、支付处理、通知发送三个核心步骤:
// src/workflows/order.ts
// 完整的订单处理工作流:库存 → 支付 → 通知
import {
WorkflowEntrypoint,
WorkflowStep,
WorkflowEvent,
} from "cloudflare:workers";
// 订单数据接口
interface OrderPayload {
orderId: string;
userId: string;
items: Array<{ sku: string; quantity: number; price: number }>;
totalAmount: number;
paymentMethod: "alipay" | "wechat" | "card";
}
// 步骤结果接口
interface StepResult {
success: boolean;
data?: unknown;
error?: string;
}
export class OrderWorkflow extends WorkflowEntrypoint {
async run(event: WorkflowEvent<OrderPayload>, step: WorkflowStep) {
const order = event.payload;
// Step 1: 验证并扣减库存
// 自动重试 3 次,每次间隔指数退避(1s, 2s, 4s)
const inventoryResult = await step.do(
"check-and-reserve-inventory",
{
retries: { limit: 3, delay: "1 second", backoff: "exponential" },
timeout: "30 seconds",
},
async (): Promise<StepResult> => {
const response = await fetch(
`https://api.inventory.example.com/reserve`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
orderId: order.orderId,
items: order.items,
}),
}
);
if (!response.ok) {
throw new Error(`库存服务返回 ${response.status}`);
}
return { success: true, data: await response.json() };
}
);
if (!inventoryResult.success) {
// 库存不足,触发补偿流程
await step.do("notify-inventory-failure", async () => {
await this.notifyUser(order.userId, "订单失败:库存不足");
});
return { status: "failed", reason: "inventory_insufficient" };
}
// Step 2: 处理支付
// 支付是关键步骤,重试 5 次,但需要更长的超时
const paymentResult = await step.do(
"process-payment",
{
retries: { limit: 5, delay: "2 seconds", backoff: "exponential" },
timeout: "60 seconds",
},
async (): Promise<StepResult> => {
const response = await fetch(
`https://api.payment.example.com/charge`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
orderId: order.orderId,
amount: order.totalAmount,
method: order.paymentMethod,
idempotencyKey: `pay-${order.orderId}`,
}),
}
);
if (!response.ok) {
const body = await response.text();
throw new Error(`支付失败: ${response.status} - ${body}`);
}
return { success: true, data: await response.json() };
}
);
if (!paymentResult.success) {
// 支付失败,需要释放库存
await step.do("release-inventory", async () => {
await fetch(`https://api.inventory.example.com/release`, {
method: "POST",
body: JSON.stringify({ orderId: order.orderId }),
});
});
await step.do("notify-payment-failure", async () => {
await this.notifyUser(order.userId, "订单失败:支付异常");
});
return { status: "failed", reason: "payment_failed" };
}
// Step 3: 并行执行通知(使用 step.sleep 等待异步确认)
await step.do("send-notifications", async () => {
// 并行发送多种通知
await Promise.all([
this.sendEmail(order.userId, order.orderId),
this.sendSMS(order.userId, order.orderId),
this.updateOrderStatus(order.orderId, "paid"),
]);
});
// Step 4: 等待 24 小时后检查支付回调
// step.sleep 是持久化的等待,不消耗 Worker 执行时间
await step.sleep("wait-for-callback", "24 hours");
// Step 5: 最终确认
await step.do("finalize-order", async () => {
await fetch(`https://api.orders.example.com/finalize`, {
method: "POST",
body: JSON.stringify({ orderId: order.orderId }),
});
});
return { status: "completed", orderId: order.orderId };
}
private async notifyUser(userId: string, message: string) {
// 通知逻辑
}
private async sendEmail(userId: string, orderId: string) {
// 邮件发送逻辑
}
private async sendSMS(userId: string, orderId: string) {
// 短信发送逻辑
}
private async updateOrderStatus(orderId: string, status: string) {
// 状态更新逻辑
}
}
📌 记住:
step.sleep()是 Workflows 最强大的特性之一。它实现了一个持久化的等待——Workflow 实例在等待期间不会消耗任何计算资源,24 小时后自动唤醒继续执行。这在传统 Worker 中需要复杂的 Cron + 数据库方案才能实现。
2.3 HTTP 触发器与状态查询
// src/index.ts
// Worker 入口:HTTP 触发 Workflow 并查询状态
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// POST /api/orders —— 创建订单并触发 Workflow
if (request.method === "POST" && url.pathname === "/api/orders") {
const order: OrderPayload = await request.json();
// 触发 Workflow 实例
const instance = await env.ORDER_WORKFLOW.create({
params: order,
id: order.orderId, // 使用订单号作为 Workflow ID,保证幂等
});
return Response.json({
workflowId: instance.id,
status: "started",
});
}
// GET /api/orders/:id/status —— 查询 Workflow 状态
if (request.method === "GET" && url.pathname.startsWith("/api/orders/")) {
const orderId = url.pathname.split("/").pop();
const instance = await env.ORDER_WORKFLOW.get(orderId);
const status = await instance.status();
return Response.json({
orderId,
workflowStatus: status.status, // "running" | "paused" | "errored" | "complete" | "terminated"
steps: status.steps, // 每一步的执行详情
createdAt: status.createdAt,
modifiedAt: status.modifiedAt,
});
}
// POST /api/orders/:id/terminate —— 终止 Workflow
if (request.method === "POST" && url.pathname.includes("/terminate")) {
const orderId = url.pathname.split("/").at(-2);
const instance = await env.ORDER_WORKFLOW.get(orderId!);
await instance.terminate();
return Response.json({ status: "terminated" });
}
return new Response("Not Found", { status: 404 });
},
};
💡 三、高级模式与最佳实践
3.1 事件驱动的 Workflow:等待外部信号
有些场景需要 Workflow 等待外部事件(比如人工审批、Webhook 回调)。Workflows 提供了 step.waitForEvent 来实现:
// 等待人工审批的 Workflow 模式
export class ApprovalWorkflow extends WorkflowEntrypoint {
async run(event: WorkflowEvent<{ requestId: string }>, step: WorkflowStep) {
// Step 1: 提交审批请求
await step.do("submit-approval", async () => {
await fetch("https://api.approval.example.com/submit", {
method: "POST",
body: JSON.stringify({ requestId: event.payload.requestId }),
});
});
// Step 2: 等待审批事件(最多等 7 天)
const approvalEvent = await step.waitForEvent(
"wait-for-approval",
{ type: "approval-result", timeout: "7 days" }
);
if (approvalEvent.payload.decision === "approved") {
await step.do("execute-approved-action", async () => {
// 执行审批通过后的逻辑
});
return { status: "approved" };
} else {
await step.do("handle-rejection", async () => {
// 处理拒绝逻辑
});
return { status: "rejected" };
}
}
}
外部系统可以通过 API 向 Workflow 发送事件:
// 外部审批系统回调时,向 Workflow 发送事件
const instance = await env.APPROVAL_WORKFLOW.get(requestId);
await instance.sendEvent({
type: "approval-result",
payload: { decision: "approved", approvedBy: "manager@example.com" },
});
3.2 数据管道模式:批处理与分页
// 数据管道工作流:分批处理大量数据
export class DataPipelineWorkflow extends WorkflowEntrypoint {
async run(
event: WorkflowEvent<{ sourceId: string; totalRecords: number }>,
step: WorkflowStep
) {
const { sourceId, totalRecords } = event.payload;
const batchSize = 1000;
const totalBatches = Math.ceil(totalRecords / batchSize);
const results: Array<{ batch: number; processed: number; errors: number }> = [];
for (let batch = 0; batch < totalBatches; batch++) {
// 每个批次独立执行、独立重试
const result = await step.do(
`process-batch-${batch}`,
{
retries: { limit: 3, delay: "5 seconds" },
timeout: "5 minutes",
},
async () => {
const offset = batch * batchSize;
const response = await fetch(
`https://api.data.example.com/records?source=${sourceId}&offset=${offset}&limit=${batchSize}`
);
const records = await response.json();
let processed = 0;
let errors = 0;
for (const record of records) {
try {
await this.processRecord(record);
processed++;
} catch (e) {
errors++;
}
}
return { batch, processed, errors };
}
);
results.push(result);
// 每处理 10 个批次暂停 30 秒,避免压垮下游服务
if ((batch + 1) % 10 === 0 && batch < totalBatches - 1) {
await step.sleep("rate-limit-pause", "30 seconds");
}
}
// 最终汇总
await step.do("generate-report", async () => {
const totalProcessed = results.reduce((s, r) => s + r.processed, 0);
const totalErrors = results.reduce((s, r) => s + r.errors, 0);
await this.saveReport({ sourceId, totalProcessed, totalErrors });
});
return { status: "completed", batches: results.length };
}
private async processRecord(record: unknown) {
// 处理单条记录
}
private async saveReport(report: unknown) {
// 保存报告
}
}
3.3 常见坑点与避坑指南
坑点 1:Step 函数必须是幂等的
// ❌ 错误写法:非幂等操作
await step.do("charge-payment", async () => {
await paymentApi.charge(orderId, amount); // 重试可能导致重复扣款
});
// ✅ 正确写法:使用幂等键
await step.do("charge-payment", async () => {
await paymentApi.charge(orderId, amount, {
idempotencyKey: `charge-${orderId}-${step.name}`,
});
});
坑点 2:不要在 Step 中存储大量数据
// ❌ 错误写法:Step 返回大量数据(会被序列化到持久存储)
const bigData = await step.do("fetch-data", async () => {
return await fetchHugeDataset(); // 10MB 的数据
});
// ✅ 正确写法:Step 只返回必要的引用或摘要
const dataRef = await step.do("fetch-data", async () => {
const data = await fetchHugeDataset();
const ref = await saveToR2(data); // 存到 R2
return { ref: ref.key, count: data.length }; // 只返回引用
});
坑点 3:注意 Step 超时配置
// ❌ 默认超时可能不够
await step.do("heavy-computation", async () => {
// 复杂计算可能需要更长时间
});
// ✅ 显式设置超时
await step.do("heavy-computation", {
timeout: "5 minutes", // 根据实际需要设置
retries: { limit: 2, delay: "10 seconds" },
}, async () => {
// 复杂计算
});
⚠️ 警告: Step 函数中的
fetch调用默认超时是 30 秒。如果你的下游服务响应慢,必须在 Step 级别设置更长的超时,否则 Step 会因超时而重试,导致下游收到重复请求。
3.4 监控与调试
// 使用 Workflow status API 获取详细的执行日志
async function getWorkflowDetails(env: Env, workflowId: string) {
const instance = await env.ORDER_WORKFLOW.get(workflowId);
const status = await instance.status();
// status 包含每个 Step 的详细信息
console.log(JSON.stringify(status, null, 2));
// {
// "status": "running",
// "steps": [
// {
// "name": "check-and-reserve-inventory",
// "status": "complete",
// "start": "2026-06-05T10:00:00Z",
// "end": "2026-06-05T10:00:02Z",
// "output": { "success": true, "data": { ... } }
// },
// {
// "name": "process-payment",
// "status": "running",
// "start": "2026-06-05T10:00:02Z",
// "attempt": 2,
// "previousErrors": ["支付超时"]
// }
// ]
// }
}
📊 四、成本分析与选型建议
4.1 定价模型
Cloudflare Workflows 的定价基于 Step 执行次数:
| 指标 | 免费额度 | 超出部分 |
|---|---|---|
| Workflow 实例创建 | 100,000 次/月 | $0.000025/次 |
| Step 执行 | 1,000,000 次/月 | $0.000001/次 |
| 持久化存储 | 1GB | $0.20/GB/月 |
举个例子:一个每天处理 10,000 个订单的电商系统,每个订单平均 5 个 Step:
- 月 Step 执行量:10,000 × 5 × 30 = 1,500,000 次
- 免费额度内:1,000,000 次
- 超出部分:500,000 × $0.000001 = $0.50
- 月总成本约 $0.50
💡 提示: 与 AWS Step Functions 的 $0.025/千次状态转换相比,Workflows 在中小规模场景下有明显的成本优势。但在大规模场景(日均百万级),需要仔细对比。
4.2 选型决策树
需要可靠的多步骤执行?
├── 是 → 需要边缘运行?
│ ├── 是 → 需要复杂编排(子工作流、信号、查询)?
│ │ ├── 是 → 考虑 Temporal Cloud
│ │ └── 否 → Cloudflare Workflows ✅
│ └── 否 → 深度 AWS 集成?
│ ├── 是 → AWS Step Functions
│ └── 否 → Temporal 自托管
└── 否 → 简单 Cron + 数据库
✅ 总结
Cloudflare Workflows 是 2026 年边缘计算领域最重要的新原语之一。它让「可靠的多步骤任务执行」变得像写普通的 async 函数一样简单——你不需要管理消息队列、不需要自己实现重试逻辑、不需要操心状态持久化。
⚡ 关键结论: 如果你的应用已经运行在 Cloudflare Workers 上,或者你需要在边缘处理 Webhook、定时任务、数据管道等多步骤场景,Workflows 是目前最优雅的解决方案。它的学习成本极低(就是写 async 函数),但解决的问题非常实际。
适用场景:
- ✅ 电商订单处理(库存 → 支付 → 通知)
- ✅ 数据 ETL 管道(抽取 → 转换 → 加载)
- ✅ AI Agent 多步骤任务(搜索 → 推理 → 验证)
- ✅ Webhook 处理与重试(接收 → 验证 → 转发)
- ✅ 审批流程(提交 → 等待 → 执行)
不适用场景:
- ❌ 需要复杂子工作流和信号传递的企业级编排(用 Temporal)
- ❌ 纯无状态的简单请求处理(用普通 Worker)
- ❌ 需要实时 WebSocket 的交互式应用(用 Durable Objects)
相关资源: