2026 年,AI Agent 已经从「单轮对话机器人」进化为能执行多步骤复杂任务的自主系统。但当你的产品需要同时处理 100 个用户请求、每个请求触发一个 Agent 执行 5-10 步工具调用时,串行执行模式立刻成为瓶颈。根据 LangChain 2026 Q2 的生产数据,采用并发 Agent 架构的系统,其吞吐量是串行架构的 8-15 倍,而 P99 延迟降低了 60%。 问题是,并发不只是「多开几个 Promise」那么简单——你需要处理任务调度、错误隔离、成本控制和分布式协调等一系列工程挑战。本文将从零构建一个生产级的 Agent 并发执行引擎。
🏗️ 一、为什么串行 Agent 架构撑不住生产流量
1.1 串行执行的三重瓶颈
一个典型的 Agent 执行流程是:接收用户输入 → LLM 推理 → 调用工具 → LLM 再推理 → 返回结果。单次执行耗时 3-15 秒,如果 100 个用户同时发起请求,串行处理意味着最后一个用户要等 5-25 分钟。
更隐蔽的问题是资源争用:当多个 Agent 共享同一个 LLM API Key 时,串行请求无法充分利用 API 的并发配额;当 Agent 调用外部工具(数据库、搜索引擎)时,I/O 等待时间白白浪费。
// ❌ 串行执行:每个 Agent 依次等待
async function runAgentsSerial(tasks: AgentTask[]): Promise<AgentResult[]> {
const results: AgentResult[] = [];
for (const task of tasks) {
const result = await runAgent(task); // 每次等 3-15 秒
results.push(result);
}
return results; // 100 个任务 = 300-1500 秒
}
// ✅ 并发执行:利用 Promise.allSettled 并行处理
async function runAgentsConcurrent(tasks: AgentTask[]): Promise<AgentResult[]> {
const results = await Promise.allSettled(
tasks.map(task => runAgent(task))
);
return results.map(r => r.status === 'fulfilled' ? r.value : { error: r.reason });
// 100 个任务 ≈ 3-15 秒(取决于最慢的那个)
}
⚠️ 警告:裸用
Promise.allSettled处理 100+ 并发 Agent 会导致 LLM API 限流(Rate Limit)、内存暴涨和无法追踪的错误。你需要一个有并发控制、队列管理和错误隔离的执行引擎。
1.2 并发 Agent 的五大工程挑战
| 挑战 | 串行架构 | 并发架构需求 |
|---|---|---|
| 并发控制 | 无(天然串行) | 信号量/Semaphore 限制并发数 |
| 错误隔离 | 一个失败全部停 | 单个失败不影响其他任务 |
| 成本控制 | 简单累加 | 实时 Token 预算 + 熔断 |
| 状态管理 | 线性状态 | 每个 Agent 独立状态空间 |
| 可观测性 | 简单日志 | 分布式 Trace + 聚合指标 |
🔧 二、从零构建 Agent Worker Pool 执行引擎
2.1 核心架构:Semaphore + Queue + Isolation
一个生产级的 Agent 并发引擎需要三个核心组件:
- 并发控制器(Semaphore):限制同时运行的 Agent 数量,防止 LLM API 限流
- 任务队列(Queue):缓冲待执行的任务,支持优先级和重试
- 错误隔离(Isolation):每个 Agent 在独立的 try-catch 中运行,互不影响
// agent-worker-pool.ts — Agent 并发执行引擎核心实现
import { EventEmitter } from 'events';
interface AgentTask {
id: string;
input: string;
priority: number; // 1-10,数字越大优先级越高
maxRetries: number;
tokenBudget: number; // 单任务 Token 上限
timeout: number; // 单任务超时(ms)
}
interface AgentResult {
taskId: string;
output: string;
tokensUsed: number;
duration: number;
retries: number;
error?: string;
}
interface PoolConfig {
concurrency: number; // 最大并发数
globalTokenBudget: number; // 全局 Token 预算
rateLimitRPM: number; // 每分钟最大请求数
taskTimeout: number; // 默认任务超时
}
class AgentWorkerPool extends EventEmitter {
private queue: AgentTask[] = [];
private running = new Map<string, Promise<AgentResult>>();
private semaphore: number;
private config: PoolConfig;
private totalTokensUsed = 0;
private requestTimestamps: number[] = [];
private aborted = false;
constructor(config: PoolConfig) {
super();
this.config = config;
this.semaphore = config.concurrency;
}
// 提交任务到队列
submit(task: AgentTask): void {
if (this.aborted) throw new Error('Pool has been shut down');
this.queue.push(task);
this.queue.sort((a, b) => b.priority - a.priority); // 高优先级排前面
this.emit('task:queued', task);
this.processQueue();
}
// 批量提交并等待所有完成
async submitAll(tasks: AgentTask[]): Promise<AgentResult[]> {
const promises = tasks.map(task => {
return new Promise<AgentResult>((resolve) => {
this.submit(task);
this.once(`task:done:${task.id}`, resolve);
});
});
return Promise.all(promises);
}
// 核心调度逻辑
private async processQueue(): Promise<void> {
while (this.semaphore > 0 && this.queue.length > 0 && !this.aborted) {
const task = this.queue.shift()!;
this.semaphore--;
const promise = this.executeWithIsolation(task);
this.running.set(task.id, promise);
promise.finally(() => {
this.running.delete(task.id);
this.semaphore++;
this.emit('task:done', task.id);
this.processQueue(); // 递归处理下一个
});
}
}
// 带错误隔离的任务执行
private async executeWithIsolation(task: AgentTask): Promise<AgentResult> {
const startTime = Date.now();
let retries = 0;
while (retries <= task.maxRetries) {
try {
// 检查全局 Token 预算
if (this.totalTokensUsed >= this.config.globalTokenBudget) {
throw new Error('Global token budget exhausted');
}
// 检查速率限制
await this.enforceRateLimit();
// 带超时的 Agent 执行
const result = await this.executeWithTimeout(task);
this.totalTokensUsed += result.tokensUsed;
this.emit('task:completed', result);
this.emit(`task:done:${task.id}`, result);
return result;
} catch (error) {
retries++;
if (retries > task.maxRetries) {
const result: AgentResult = {
taskId: task.id,
output: '',
tokensUsed: 0,
duration: Date.now() - startTime,
retries: retries - 1,
error: (error as Error).message,
};
this.emit('task:failed', result);
this.emit(`task:done:${task.id}`, result);
return result;
}
// 指数退避重试
await this.sleep(Math.min(1000 * Math.pow(2, retries), 10000));
this.emit('task:retry', { taskId: task.id, attempt: retries });
}
}
throw new Error('Unreachable');
}
// 超时控制
private async executeWithTimeout(task: AgentTask): Promise<AgentResult> {
return Promise.race([
this.runAgent(task),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error(`Task ${task.id} timed out after ${task.timeout}ms`)), task.timeout)
),
]);
}
// 速率限制(令牌桶)
private async enforceRateLimit(): Promise<void> {
const now = Date.now();
this.requestTimestamps = this.requestTimestamps.filter(t => now - t < 60000);
if (this.requestTimestamps.length >= this.config.rateLimitRPM) {
const oldestInWindow = this.requestTimestamps[0];
const waitTime = 60000 - (now - oldestInWindow) + 100; // +100ms buffer
await this.sleep(waitTime);
}
this.requestTimestamps.push(Date.now());
}
// 实际的 Agent 执行逻辑(接入你的 LLM + Tools)
private async runAgent(task: AgentTask): Promise<AgentResult> {
const startTime = Date.now();
// 这里接入你的 Agent 逻辑,例如:
// const agent = new Agent({ model: 'claude-sonnet-4-20250514', tools: [...] });
// const output = await agent.run(task.input);
// 模拟执行
const output = `Agent completed: ${task.input}`;
const tokensUsed = Math.floor(Math.random() * 1000) + 200;
return {
taskId: task.id,
output,
tokensUsed,
duration: Date.now() - startTime,
retries: 0,
};
}
// 优雅关闭
async shutdown(): Promise<void> {
this.aborted = true;
// 等待所有正在运行的任务完成
await Promise.allSettled(Array.from(this.running.values()));
this.emit('pool:shutdown');
}
// 获取运行状态
getStats() {
return {
queued: this.queue.length,
running: this.running.size,
availableSlots: this.semaphore,
totalTokensUsed: this.totalTokensUsed,
rpm: this.requestTimestamps.filter(t => Date.now() - t < 60000).length,
};
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
💡 **提示:**上述实现中,Semaphore 用简单的数字递减/递增实现,比
p-limit等第三方库更透明,且支持动态调整并发数。生产环境中建议增加 Prometheus metrics 导出。
2.2 使用示例:批量处理用户请求
// usage.ts — 批量 Agent 任务处理示例
const pool = new AgentWorkerPool({
concurrency: 10, // 最多 10 个 Agent 同时运行
globalTokenBudget: 500000, // 全局 50 万 Token 预算
rateLimitRPM: 200, // 每分钟最多 200 次 API 调用
taskTimeout: 30000, // 单任务 30 秒超时
});
// 监听事件
pool.on('task:completed', (result) => {
console.log(`✅ Task ${result.taskId} completed in ${result.duration}ms, ${result.tokensUsed} tokens`);
});
pool.on('task:failed', (result) => {
console.error(`❌ Task ${result.taskId} failed after ${result.retries} retries: ${result.error}`);
});
// 提交 100 个任务
const tasks: AgentTask[] = Array.from({ length: 100 }, (_, i) => ({
id: `task-${i}`,
input: `分析用户 #${i} 的行为数据并生成推荐`,
priority: i < 10 ? 10 : 5, // 前 10 个高优先级
maxRetries: 2,
tokenBudget: 5000,
timeout: 15000,
}));
const results = await pool.submitAll(tasks);
console.log(`\n📊 完成统计:`, pool.getStats());
🌐 三、分布式 Agent 调度:跨节点的并发编排
3.1 从单机到分布式的架构演进
当单机 Worker Pool 的并发能力触顶(通常受限于内存和单进程 CPU),你需要将 Agent 任务分发到多个节点。核心方案是消息队列 + 分布式 Worker。
| 方案 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| BullMQ + Redis | 中等规模,10-100 并发 | 成熟稳定,支持优先级/延迟/重试 | 依赖 Redis |
| RabbitMQ | 大规模,100-1000+ 并发 | 企业级消息路由,支持复杂拓扑 | 运维复杂度高 |
| Redis Streams | 轻量级,5-50 并发 | 无额外依赖,原生 Redis 功能 | 功能较简单 |
| Temporal | 需要持久化工作流 | 强一致性,自动重试,可视化 | 学习曲线陡峭 |
3.2 BullMQ 集成:生产级分布式 Agent 调度
// agent-bullmq-worker.ts — BullMQ 分布式 Agent Worker
import { Worker, Queue, Job } from 'bullmq';
import Redis from 'ioredis';
const connection = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: null });
// 定义 Agent 任务队列
const agentQueue = new Queue('agent-tasks', { connection });
// Agent Worker:消费队列中的任务
const agentWorker = new Worker(
'agent-tasks',
async (job: Job) => {
const { input, priority, tokenBudget, timeout } = job.data;
// 更新进度
await job.updateProgress({ stage: 'reasoning', step: 1 });
// 执行 Agent(带超时控制)
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeout);
try {
const result = await executeAgent(input, {
tokenBudget,
signal: controller.signal,
onProgress: (progress) => job.updateProgress(progress),
});
clearTimeout(timer);
return result;
} catch (error) {
clearTimeout(timer);
throw error; // BullMQ 会根据 retry 配置自动重试
}
},
{
connection,
concurrency: 5, // 每个 Worker 进程 5 并发
limiter: { // 速率限制
max: 50,
duration: 60000, // 每分钟最多 50 个任务
},
settings: {
backoffStrategy: (attemptsMade: number) => {
return Math.min(1000 * Math.pow(2, attemptsMade), 30000); // 指数退避
},
},
}
);
// 添加任务示例
await agentQueue.add('analyze', {
input: '分析销售数据',
priority: 1,
tokenBudget: 5000,
timeout: 15000,
}, {
priority: 1, // BullMQ 优先级(数字越小越优先)
attempts: 3, // 最多重试 3 次
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000, // 保留最近 1000 个完成的任务
removeOnFail: 5000, // 保留最近 5000 个失败的任务
});
// 事件监听
agentWorker.on('completed', (job) => {
console.log(`✅ Job ${job.id} completed`);
});
agentWorker.on('failed', (job, err) => {
console.error(`❌ Job ${job?.id} failed: ${err.message}`);
});
📌 **记住:**分布式 Agent 系统中,幂等性是第一优先级。由于网络抖动和 Worker 重启,同一个任务可能被执行多次。确保你的 Agent 工具调用是幂等的——例如,写入数据库时使用
INSERT ... ON CONFLICT DO UPDATE而非裸INSERT。
3.3 动态扩缩容:根据队列深度自动调整
// auto-scaler.ts — 根据队列深度动态调整 Worker 并发数
async function autoScaleWorker(worker: Worker, queue: Queue, config: {
minConcurrency: number;
maxConcurrency: number;
scaleUpThreshold: number; // 队列深度超过此值时扩容
scaleDownThreshold: number; // 队列深度低于此值时缩容
checkInterval: number; // 检查间隔(ms)
}) {
let currentConcurrency = config.minConcurrency;
setInterval(async () => {
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
if (waiting > config.scaleUpThreshold && currentConcurrency < config.maxConcurrency) {
currentConcurrency = Math.min(currentConcurrency + 2, config.maxConcurrency);
await worker.concurrency(currentConcurrency);
console.log(`📈 Scaled UP to ${currentConcurrency} (queue: ${waiting} waiting)`);
} else if (waiting < config.scaleDownThreshold && currentConcurrency > config.minConcurrency) {
currentConcurrency = Math.max(currentConcurrency - 1, config.minConcurrency);
await worker.concurrency(currentConcurrency);
console.log(`📉 Scaled DOWN to ${currentConcurrency} (queue: ${waiting} waiting)`);
}
}, config.checkInterval);
}
💰 四、并发成本控制:防止 Token 预算爆炸
4.1 并发场景下的成本失控风险
串行执行时,成本是可预测的——每个任务消耗 500-2000 Token,100 个任务就是 5-20 万 Token。但并发执行时,重试风暴可能让成本翻 3-5 倍:当 LLM API 返回 429 (Rate Limit) 时,所有并发任务同时触发重试,瞬间消耗大量 Token。
// cost-guard.ts — 并发 Agent 成本控制中间件
class CostGuard {
private totalTokens = 0;
private budget: number;
private circuitOpen = false;
private failureCount = 0;
private readonly failureThreshold = 5;
private readonly resetTimeout = 30000;
constructor(budget: number) {
this.budget = budget;
}
// 检查是否允许执行(熔断器模式)
canExecute(estimatedTokens: number): boolean {
if (this.circuitOpen) return false;
if (this.totalTokens + estimatedTokens > this.budget) return false;
return true;
}
// 记录 Token 消耗
recordUsage(tokens: number): void {
this.totalTokens += tokens;
}
// 记录失败(用于熔断器)
recordFailure(): void {
this.failureCount++;
if (this.failureCount >= this.failureThreshold) {
this.circuitOpen = true;
console.warn('🔴 Circuit breaker OPEN — stopping all agent execution');
setTimeout(() => {
this.circuitOpen = false;
this.failureCount = 0;
console.log('🟢 Circuit breaker RESET — resuming execution');
}, this.resetTimeout);
}
}
recordSuccess(): void {
this.failureCount = Math.max(0, this.failureCount - 1);
}
getStats() {
return {
totalTokens: this.totalTokens,
budgetRemaining: this.budget - this.totalTokens,
budgetUsedPercent: ((this.totalTokens / this.budget) * 100).toFixed(1) + '%',
circuitOpen: this.circuitOpen,
};
}
}
⚠️ 警告:并发 Agent 的最大成本风险不是单次调用,而是重试风暴。当 LLM API 出现间歇性故障时,N 个并发 Agent × M 次重试 = N×M 倍的 Token 消耗。务必在 Worker 层面设置全局熔断器,而非仅在单个 Agent 内部处理。
4.2 Token 预算分配策略
| 策略 | 适用场景 | 实现方式 |
|---|---|---|
| 固定预算 | 任务复杂度已知 | 每个任务分配固定 Token 额度 |
| 动态预算 | 任务复杂度未知 | 根据前几步消耗动态调整后续预算 |
| 优先级预算 | 有高低优先级混合 | 高优先级任务可「借用」低优先级额度 |
| 全局熔断 | 成本敏感场景 | 全局 Token 耗尽后停止所有新任务 |
🛡️ 五、生产避坑指南与最佳实践
5.1 六个常见的并发 Agent 陷阱
- ❌ 裸用
Promise.all不限制并发数 — 当任务量达到 1000+ 时,会同时创建 1000 个 LLM 连接,导致内存暴涨和 API 限流 - ❌ 错误处理放在 Agent 内部 — Agent 内部的 try-catch 无法防止外部的队列堆积和 Worker 崩溃
- ❌ 共享可变状态 — 多个 Agent 并发修改同一个数据库记录会导致数据竞争
- ❌ 不做超时控制 — 一个卡住的 Agent 会永久占用 Worker 槽位
- ❌ 忽略指数退避 — 所有 Agent 同时重试同一个失败的 API 会产生「惊群效应」
- ❌ 不监控队列深度 — 队列积压到 10000+ 才发现,已造成大量超时
5.2 生产检查清单
- ✅ 设置合理的 Worker 并发数(建议:LLM API RPM / 平均单任务耗时 × 60)
- ✅ 每个任务设置独立的超时(建议:P95 耗时 × 2)
- ✅ 实现全局 Token 预算 + 熔断器
- ✅ 使用指数退避 + 随机抖动(Jitter)避免重试风暴
- ✅ Agent 工具调用实现幂等性
- ✅ 监控队列深度、Worker 利用率、Token 消耗速率
- ✅ 设置死信队列(Dead Letter Queue)收集持续失败的任务
- ✅ 优雅关闭:收到 SIGTERM 时等待正在运行的任务完成
📝 总结
AI Agent 并发执行不是「多开几个 Promise」的简单问题,而是一个涉及调度、隔离、成本控制和分布式协调的系统工程。核心要点:
- Semaphore 控制并发:用信号量限制同时运行的 Agent 数量,避免 API 限流
- 错误隔离:每个 Agent 在独立的执行上下文中运行,单个失败不影响整体
- 成本熔断:全局 Token 预算 + 熔断器模式,防止重试风暴导致成本失控
- 分布式队列:超过单机能力时,用 BullMQ/RabbitMQ 将任务分发到多个 Worker 节点
- 可观测性:实时监控队列深度、Worker 利用率、Token 消耗和错误率
⚡ 关键结论:并发 Agent 系统的成功不取决于并发数有多高,而取决于错误隔离和成本控制有多完善。一个能优雅处理失败、精确控制成本的 10 并发系统,远比一个失控的 100 并发系统更有价值。
🔗 相关工具推荐
- BullMQ:基于 Redis 的高性能任务队列,支持优先级、延迟、重试和死信队列
- Temporal:持久化工作流引擎,适合需要强一致性的长时 Agent 任务
- p-limit:轻量级 Promise 并发控制库(适合简单场景)
- Langfuse:LLM 应用可观测性平台,支持 Token 追踪和 Agent Trace
- Inngest:无服务器任务编排平台,内置 Agent 执行支持