Cloudflare Workflows 实战:用边缘原生工作流引擎构建可靠的分布式任务

深入解析 Cloudflare Workflows 的架构原理、Step API、重试机制与状态持久化,对比 Temporal 和 AWS Step Functions,附完整可运行代码示例,教你构建订单处理、数据管道等生产级工作流。

DevOps 与部署 2026-06-04 18 分钟

Cloudflare 在 2025 年底将 Workflows 推向正式可用(GA),这是继 Durable Objects 之后又一个边缘原生的有状态计算原语。与传统的 Cron Job 或消息队列不同,Workflows 提供了一种声明式的、自动持久化的、自带重试的多步骤任务执行模型——每个 Step 独立执行、独立重试、独立计费,整个工作流的状态由 Cloudflare 平台自动管理。如果你正在为「如何在边缘运行一个可靠的多步骤任务」而头疼,这篇文章会给你一个完整的答案。

📌 记住: Cloudflare Workflows 不是简单的任务编排工具,它是基于 Durable Objects 之上的一个完整的 Durable Execution 引擎。理解这一点,才能真正用好它。

🏗️ 一、Workflows 架构原理与核心概念

1.1 什么是 Durable Execution

Durable Execution(持久化执行)是一种编程范式:你写的代码在执行过程中,每一步的状态都会被自动持久化到存储层。如果执行中断(机器重启、网络抖动、代码部署),恢复后可以从上次成功的步骤继续执行,而不是从头开始。

这个概念最早由微软的 Durable Functions(2017)引入,后来 Temporal 将其推广到通用场景。Cloudflare Workflows 则把它带到了边缘:

特性 Cloudflare Workflows Temporal AWS Step Functions
运行环境 边缘(300+ 节点) 自托管/Cloud 单区域
状态持久化 自动(Durable Objects) 需配置数据库 自动(DynamoDB)
单步重试 ✅ 内置 ✅ 内置 ✅ 内置
最大执行时长 无限制(按步计费) 无限制 1 年
冷启动 ~50ms 取决于部署 ~200ms
定价模型 按 Step 执行次数 按操作数 按状态转换次数
编程语言 TypeScript/JavaScript Go/Java/Python/TS JSON/YAML 定义
适合场景 边缘任务、Webhook 处理 复杂企业工作流 AWS 生态集成

⚠️ 警告: 不要把 Workflows 当成「便宜的 Temporal」。它们的设计目标不同:Temporal 面向复杂的企业级编排,Workflows 面向边缘场景的轻量级可靠执行。

1.2 Workflows 的执行模型

一个 Workflow 由多个 Step 组成。每个 Step 是一个独立的执行单元:

// 工作流执行流程示意
// Step 1: 验证订单 → 成功 → 持久化状态
// Step 2: 扣减库存 → 失败 → 自动重试 3 次
// Step 2: 扣减库存 → 重试成功 → 持久化状态
// Step 3: 发送通知 → 成功 → 工作流完成

关键特性:

  • Step 间自动持久化:每一步完成后,状态立即写入持久存储
  • 单步独立重试:某一步失败不会影响已成功的步骤
  • 幂等保证:同一步骤不会因为重试而执行多次(需要你自己保证幂等性)
  • 事件驱动触发:可以通过 HTTP、Queue、Cron 等方式触发

1.3 Workflows 与 Durable Objects 的关系

Workflows 底层就是 Durable Objects。每个 Workflow 实例对应一个 Durable Object 实例,Step 的状态持久化依赖 Durable Objects 的事务性存储 API。

// 本质上,Workflows 是 Durable Objects 的高级封装
// 你不需要直接操作 DurableObjectStorage
// Workflows 帮你处理了状态序列化、重试调度、事件通知

💡 提示: 如果你的场景需要更细粒度的状态控制(比如实时 WebSocket、自定义存储结构),直接用 Durable Objects 更合适。Workflows 适合「步骤明确、需要可靠执行」的场景。

🚀 二、从零构建生产级 Workflow

2.1 项目初始化与基础结构

# 创建 Cloudflare Workers 项目
npm create cloudflare@latest my-workflow -- --type=hello-world
cd my-workflow
# Wrangler 3.x 已内置 Workflows 支持,无需额外依赖

项目结构:

my-workflow/
├── src/
│   ├── index.ts          # Worker 入口(HTTP 触发器)
│   └── workflows/
│       └── order.ts      # Workflow 定义
├── wrangler.toml         # 配置文件
└── package.json

wrangler.toml 配置:

# wrangler.toml
name = "my-workflow"
main = "src/index.ts"
compatibility_date = "2026-01-01"

# 声明 Workflow 绑定
[[workflows]]
name = "order-workflow"
binding = "ORDER_WORKFLOW"
class_name = "OrderWorkflow"

2.2 完整的订单处理 Workflow

下面是一个真实的电商订单处理场景,包含库存检查、支付处理、通知发送三个核心步骤:

// src/workflows/order.ts
// 完整的订单处理工作流:库存 → 支付 → 通知
import {
  WorkflowEntrypoint,
  WorkflowStep,
  WorkflowEvent,
} from "cloudflare:workers";

// 订单数据接口
interface OrderPayload {
  orderId: string;
  userId: string;
  items: Array<{ sku: string; quantity: number; price: number }>;
  totalAmount: number;
  paymentMethod: "alipay" | "wechat" | "card";
}

// 步骤结果接口
interface StepResult {
  success: boolean;
  data?: unknown;
  error?: string;
}

export class OrderWorkflow extends WorkflowEntrypoint {
  async run(event: WorkflowEvent<OrderPayload>, step: WorkflowStep) {
    const order = event.payload;

    // Step 1: 验证并扣减库存
    // 自动重试 3 次,每次间隔指数退避(1s, 2s, 4s)
    const inventoryResult = await step.do(
      "check-and-reserve-inventory",
      {
        retries: { limit: 3, delay: "1 second", backoff: "exponential" },
        timeout: "30 seconds",
      },
      async (): Promise<StepResult> => {
        const response = await fetch(
          `https://api.inventory.example.com/reserve`,
          {
            method: "POST",
            headers: { "Content-Type": "application/json" },
            body: JSON.stringify({
              orderId: order.orderId,
              items: order.items,
            }),
          }
        );

        if (!response.ok) {
          throw new Error(`库存服务返回 ${response.status}`);
        }

        return { success: true, data: await response.json() };
      }
    );

    if (!inventoryResult.success) {
      // 库存不足,触发补偿流程
      await step.do("notify-inventory-failure", async () => {
        await this.notifyUser(order.userId, "订单失败:库存不足");
      });
      return { status: "failed", reason: "inventory_insufficient" };
    }

    // Step 2: 处理支付
    // 支付是关键步骤,重试 5 次,但需要更长的超时
    const paymentResult = await step.do(
      "process-payment",
      {
        retries: { limit: 5, delay: "2 seconds", backoff: "exponential" },
        timeout: "60 seconds",
      },
      async (): Promise<StepResult> => {
        const response = await fetch(
          `https://api.payment.example.com/charge`,
          {
            method: "POST",
            headers: { "Content-Type": "application/json" },
            body: JSON.stringify({
              orderId: order.orderId,
              amount: order.totalAmount,
              method: order.paymentMethod,
              idempotencyKey: `pay-${order.orderId}`,
            }),
          }
        );

        if (!response.ok) {
          const body = await response.text();
          throw new Error(`支付失败: ${response.status} - ${body}`);
        }

        return { success: true, data: await response.json() };
      }
    );

    if (!paymentResult.success) {
      // 支付失败,需要释放库存
      await step.do("release-inventory", async () => {
        await fetch(`https://api.inventory.example.com/release`, {
          method: "POST",
          body: JSON.stringify({ orderId: order.orderId }),
        });
      });

      await step.do("notify-payment-failure", async () => {
        await this.notifyUser(order.userId, "订单失败:支付异常");
      });

      return { status: "failed", reason: "payment_failed" };
    }

    // Step 3: 并行执行通知(使用 step.sleep 等待异步确认)
    await step.do("send-notifications", async () => {
      // 并行发送多种通知
      await Promise.all([
        this.sendEmail(order.userId, order.orderId),
        this.sendSMS(order.userId, order.orderId),
        this.updateOrderStatus(order.orderId, "paid"),
      ]);
    });

    // Step 4: 等待 24 小时后检查支付回调
    // step.sleep 是持久化的等待,不消耗 Worker 执行时间
    await step.sleep("wait-for-callback", "24 hours");

    // Step 5: 最终确认
    await step.do("finalize-order", async () => {
      await fetch(`https://api.orders.example.com/finalize`, {
        method: "POST",
        body: JSON.stringify({ orderId: order.orderId }),
      });
    });

    return { status: "completed", orderId: order.orderId };
  }

  private async notifyUser(userId: string, message: string) {
    // 通知逻辑
  }

  private async sendEmail(userId: string, orderId: string) {
    // 邮件发送逻辑
  }

  private async sendSMS(userId: string, orderId: string) {
    // 短信发送逻辑
  }

  private async updateOrderStatus(orderId: string, status: string) {
    // 状态更新逻辑
  }
}

📌 记住: step.sleep() 是 Workflows 最强大的特性之一。它实现了一个持久化的等待——Workflow 实例在等待期间不会消耗任何计算资源,24 小时后自动唤醒继续执行。这在传统 Worker 中需要复杂的 Cron + 数据库方案才能实现。

2.3 HTTP 触发器与状态查询

// src/index.ts
// Worker 入口:HTTP 触发 Workflow 并查询状态
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // POST /api/orders —— 创建订单并触发 Workflow
    if (request.method === "POST" && url.pathname === "/api/orders") {
      const order: OrderPayload = await request.json();

      // 触发 Workflow 实例
      const instance = await env.ORDER_WORKFLOW.create({
        params: order,
        id: order.orderId, // 使用订单号作为 Workflow ID,保证幂等
      });

      return Response.json({
        workflowId: instance.id,
        status: "started",
      });
    }

    // GET /api/orders/:id/status —— 查询 Workflow 状态
    if (request.method === "GET" && url.pathname.startsWith("/api/orders/")) {
      const orderId = url.pathname.split("/").pop();
      const instance = await env.ORDER_WORKFLOW.get(orderId);
      const status = await instance.status();

      return Response.json({
        orderId,
        workflowStatus: status.status,     // "running" | "paused" | "errored" | "complete" | "terminated"
        steps: status.steps,               // 每一步的执行详情
        createdAt: status.createdAt,
        modifiedAt: status.modifiedAt,
      });
    }

    // POST /api/orders/:id/terminate —— 终止 Workflow
    if (request.method === "POST" && url.pathname.includes("/terminate")) {
      const orderId = url.pathname.split("/").at(-2);
      const instance = await env.ORDER_WORKFLOW.get(orderId!);
      await instance.terminate();

      return Response.json({ status: "terminated" });
    }

    return new Response("Not Found", { status: 404 });
  },
};

💡 三、高级模式与最佳实践

3.1 事件驱动的 Workflow:等待外部信号

有些场景需要 Workflow 等待外部事件(比如人工审批、Webhook 回调)。Workflows 提供了 step.waitForEvent 来实现:

// 等待人工审批的 Workflow 模式
export class ApprovalWorkflow extends WorkflowEntrypoint {
  async run(event: WorkflowEvent<{ requestId: string }>, step: WorkflowStep) {
    // Step 1: 提交审批请求
    await step.do("submit-approval", async () => {
      await fetch("https://api.approval.example.com/submit", {
        method: "POST",
        body: JSON.stringify({ requestId: event.payload.requestId }),
      });
    });

    // Step 2: 等待审批事件(最多等 7 天)
    const approvalEvent = await step.waitForEvent(
      "wait-for-approval",
      { type: "approval-result", timeout: "7 days" }
    );

    if (approvalEvent.payload.decision === "approved") {
      await step.do("execute-approved-action", async () => {
        // 执行审批通过后的逻辑
      });
      return { status: "approved" };
    } else {
      await step.do("handle-rejection", async () => {
        // 处理拒绝逻辑
      });
      return { status: "rejected" };
    }
  }
}

外部系统可以通过 API 向 Workflow 发送事件:

// 外部审批系统回调时,向 Workflow 发送事件
const instance = await env.APPROVAL_WORKFLOW.get(requestId);
await instance.sendEvent({
  type: "approval-result",
  payload: { decision: "approved", approvedBy: "manager@example.com" },
});

3.2 数据管道模式:批处理与分页

// 数据管道工作流:分批处理大量数据
export class DataPipelineWorkflow extends WorkflowEntrypoint {
  async run(
    event: WorkflowEvent<{ sourceId: string; totalRecords: number }>,
    step: WorkflowStep
  ) {
    const { sourceId, totalRecords } = event.payload;
    const batchSize = 1000;
    const totalBatches = Math.ceil(totalRecords / batchSize);
    const results: Array<{ batch: number; processed: number; errors: number }> = [];

    for (let batch = 0; batch < totalBatches; batch++) {
      // 每个批次独立执行、独立重试
      const result = await step.do(
        `process-batch-${batch}`,
        {
          retries: { limit: 3, delay: "5 seconds" },
          timeout: "5 minutes",
        },
        async () => {
          const offset = batch * batchSize;
          const response = await fetch(
            `https://api.data.example.com/records?source=${sourceId}&offset=${offset}&limit=${batchSize}`
          );
          const records = await response.json();

          let processed = 0;
          let errors = 0;

          for (const record of records) {
            try {
              await this.processRecord(record);
              processed++;
            } catch (e) {
              errors++;
            }
          }

          return { batch, processed, errors };
        }
      );

      results.push(result);

      // 每处理 10 个批次暂停 30 秒,避免压垮下游服务
      if ((batch + 1) % 10 === 0 && batch < totalBatches - 1) {
        await step.sleep("rate-limit-pause", "30 seconds");
      }
    }

    // 最终汇总
    await step.do("generate-report", async () => {
      const totalProcessed = results.reduce((s, r) => s + r.processed, 0);
      const totalErrors = results.reduce((s, r) => s + r.errors, 0);
      await this.saveReport({ sourceId, totalProcessed, totalErrors });
    });

    return { status: "completed", batches: results.length };
  }

  private async processRecord(record: unknown) {
    // 处理单条记录
  }

  private async saveReport(report: unknown) {
    // 保存报告
  }
}

3.3 常见坑点与避坑指南

坑点 1:Step 函数必须是幂等的

// ❌ 错误写法:非幂等操作
await step.do("charge-payment", async () => {
  await paymentApi.charge(orderId, amount); // 重试可能导致重复扣款
});

// ✅ 正确写法:使用幂等键
await step.do("charge-payment", async () => {
  await paymentApi.charge(orderId, amount, {
    idempotencyKey: `charge-${orderId}-${step.name}`,
  });
});

坑点 2:不要在 Step 中存储大量数据

// ❌ 错误写法:Step 返回大量数据(会被序列化到持久存储)
const bigData = await step.do("fetch-data", async () => {
  return await fetchHugeDataset(); // 10MB 的数据
});

// ✅ 正确写法:Step 只返回必要的引用或摘要
const dataRef = await step.do("fetch-data", async () => {
  const data = await fetchHugeDataset();
  const ref = await saveToR2(data); // 存到 R2
  return { ref: ref.key, count: data.length }; // 只返回引用
});

坑点 3:注意 Step 超时配置

// ❌ 默认超时可能不够
await step.do("heavy-computation", async () => {
  // 复杂计算可能需要更长时间
});

// ✅ 显式设置超时
await step.do("heavy-computation", {
  timeout: "5 minutes", // 根据实际需要设置
  retries: { limit: 2, delay: "10 seconds" },
}, async () => {
  // 复杂计算
});

⚠️ 警告: Step 函数中的 fetch 调用默认超时是 30 秒。如果你的下游服务响应慢,必须在 Step 级别设置更长的超时,否则 Step 会因超时而重试,导致下游收到重复请求。

3.4 监控与调试

// 使用 Workflow status API 获取详细的执行日志
async function getWorkflowDetails(env: Env, workflowId: string) {
  const instance = await env.ORDER_WORKFLOW.get(workflowId);
  const status = await instance.status();

  // status 包含每个 Step 的详细信息
  console.log(JSON.stringify(status, null, 2));
  // {
  //   "status": "running",
  //   "steps": [
  //     {
  //       "name": "check-and-reserve-inventory",
  //       "status": "complete",
  //       "start": "2026-06-05T10:00:00Z",
  //       "end": "2026-06-05T10:00:02Z",
  //       "output": { "success": true, "data": { ... } }
  //     },
  //     {
  //       "name": "process-payment",
  //       "status": "running",
  //       "start": "2026-06-05T10:00:02Z",
  //       "attempt": 2,
  //       "previousErrors": ["支付超时"]
  //     }
  //   ]
  // }
}

📊 四、成本分析与选型建议

4.1 定价模型

Cloudflare Workflows 的定价基于 Step 执行次数:

指标 免费额度 超出部分
Workflow 实例创建 100,000 次/月 $0.000025/次
Step 执行 1,000,000 次/月 $0.000001/次
持久化存储 1GB $0.20/GB/月

举个例子:一个每天处理 10,000 个订单的电商系统,每个订单平均 5 个 Step:

  • 月 Step 执行量:10,000 × 5 × 30 = 1,500,000 次
  • 免费额度内:1,000,000 次
  • 超出部分:500,000 × $0.000001 = $0.50
  • 月总成本约 $0.50

💡 提示: 与 AWS Step Functions 的 $0.025/千次状态转换相比,Workflows 在中小规模场景下有明显的成本优势。但在大规模场景(日均百万级),需要仔细对比。

4.2 选型决策树

需要可靠的多步骤执行?
├── 是 → 需要边缘运行?
│   ├── 是 → 需要复杂编排(子工作流、信号、查询)?
│   │   ├── 是 → 考虑 Temporal Cloud
│   │   └── 否 → Cloudflare Workflows ✅
│   └── 否 → 深度 AWS 集成?
│       ├── 是 → AWS Step Functions
│       └── 否 → Temporal 自托管
└── 否 → 简单 Cron + 数据库

✅ 总结

Cloudflare Workflows 是 2026 年边缘计算领域最重要的新原语之一。它让「可靠的多步骤任务执行」变得像写普通的 async 函数一样简单——你不需要管理消息队列、不需要自己实现重试逻辑、不需要操心状态持久化。

关键结论: 如果你的应用已经运行在 Cloudflare Workers 上,或者你需要在边缘处理 Webhook、定时任务、数据管道等多步骤场景,Workflows 是目前最优雅的解决方案。它的学习成本极低(就是写 async 函数),但解决的问题非常实际。

适用场景:

  • ✅ 电商订单处理(库存 → 支付 → 通知)
  • ✅ 数据 ETL 管道(抽取 → 转换 → 加载)
  • ✅ AI Agent 多步骤任务(搜索 → 推理 → 验证)
  • ✅ Webhook 处理与重试(接收 → 验证 → 转发)
  • ✅ 审批流程(提交 → 等待 → 执行)

不适用场景:

  • ❌ 需要复杂子工作流和信号传递的企业级编排(用 Temporal)
  • ❌ 纯无状态的简单请求处理(用普通 Worker)
  • ❌ 需要实时 WebSocket 的交互式应用(用 Durable Objects)

相关资源:

📚 相关文章