构建主动型 AI Agent:从被动响应到自主行动的架构设计

AI Agent 正从「你问我答」进化为「主动行动」——Claude Fable 的发布标志着这一转折点。本文深入剖析主动型 Agent 的架构设计、安全约束与工程实现,附完整 TypeScript 代码示例。

开发者效率 2026-06-11 19 分钟

2026 年 6 月,Anthropic 发布了 Claude Fable——一个被描述为「relentlessly proactive」(不知疲倦地主动行动)的 AI Agent。它不只是回答问题,而是主动扫描环境、预判需求、发起行动。Simon Willison 在体验后感叹:「这不是一个聊天机器人,这是一个有主观能动性的协作者。」与此同时,一起 AI Agent 扫描 DN42 网络导致运营商破产的事件,也让开发者意识到:主动型 Agent 的架构设计,已经从学术问题变成了生产安全的核心议题。如果你正在构建任何形式的 AI Agent,理解「主动」与「被动」的架构差异,将决定你的系统是智能助手还是定时炸弹。

📌 记住: 主动型 Agent 的核心挑战不是技术实现——而是如何在「足够自主」和「足够安全」之间找到精确的平衡点。过度约束会让 Agent 退化为传统脚本,过度放任则可能引发资源失控。

🎯 一、主动型 Agent 与被动型 Agent 的本质区别

1.1 两种范式的对比

传统的 AI Agent(包括大多数 ChatGPT 应用)遵循请求-响应模式:用户发起请求,Agent 返回结果,循环结束。这种模式简单、可控、易于调试——但也极其有限。

主动型 Agent(Proactive Agent)则引入了感知-决策-行动循环:Agent 持续监控环境信号,在满足特定条件时自主发起行动,无需用户显式触发。

维度 被动型 Agent 主动型 Agent
触发方式 用户显式请求 环境信号/定时触发/条件满足
执行模式 单次请求-响应 持续感知-决策-行动循环
上下文范围 当前对话 长期记忆 + 实时环境
工具调用 按需调用(用户指示) 自主规划并调用
典型延迟 秒级 分钟到小时级(异步)
风险等级 ⚠️ 低 ❌ 高(可自主创建资源)

1.2 三个真实场景

场景一:代码质量守护者(被动型)

开发者提交 PR 后,手动触发 Agent 进行代码审查。Agent 分析代码变更,输出审查意见。这是典型的请求-响应模式。

场景二:依赖安全监控者(主动型)

Agent 持续监控项目的 package.jsonCargo.toml,当 npm/PyPI 发布新的安全公告时,自动分析受影响范围,生成修复 PR,并通知团队。没有人在任何时刻发出请求。

场景三:基础设施自愈者(高级主动型)

Agent 监控生产环境指标(CPU、内存、错误率),当检测到异常模式时,自动执行诊断脚本、分析日志、定位根因,甚至自动执行回滚或扩容操作。

⚠️ 警告: 场景三的 Agent 如果没有足够的约束,就可能像 DN42 事件中的 Agent 一样——发现一个问题后尝试修复,修复过程中创建更多资源,资源又产生新问题,形成正反馈循环直到系统崩溃。

1.3 主动型 Agent 的核心组件

一个完整的主动型 Agent 需要以下组件:

┌─────────────────────────────────────────────────┐
│              Proactive Agent Architecture         │
├─────────────────────────────────────────────────┤
│  ① Signal Collector(信号采集器)                 │
│     - Webhook / SSE / Polling / Cron             │
│  ② Context Engine(上下文引擎)                   │
│     - 长期记忆 / 工作状态 / 环境快照               │
│  ③ Decision Engine(决策引擎)                    │
│     - LLM 推理 + 规则引擎 + 优先级队列            │
│  ④ Action Planner(行动规划器)                   │
│     - 任务分解 / 资源估算 / 依赖分析               │
│  ⑤ Execution Sandbox(执行沙箱)                  │
│     - 资源配额 / 超时控制 / 权限隔离               │
│  ⑥ Feedback Loop(反馈回路)                      │
│     - 结果评估 / 经验积累 / 策略调整               │
└─────────────────────────────────────────────────┘

🏗️ 二、主动型 Agent 的工程实现

2.1 信号采集与事件驱动架构

主动型 Agent 的第一步是感知环境。不同于被动型 Agent 等待用户输入,主动型 Agent 需要一个事件驱动的信号采集系统:

// signal-collector.ts — 信号采集器核心实现
interface Signal {
  id: string
  type: 'webhook' | 'polling' | 'cron' | 'file-watch' | 'metric'
  source: string
  payload: unknown
  timestamp: number
  priority: 'low' | 'medium' | 'high' | 'critical'
}

interface SignalHandler {
  canHandle(signal: Signal): boolean
  handle(signal: Signal): Promise<AgentAction | null>
}

class SignalCollector {
  private handlers: SignalHandler[] = []
  private buffer: Signal[] = []
  private readonly maxBufferSize = 1000
  private readonly dedupWindow = 60_000 // 60 秒去重窗口
  private recentSignals = new Map<string, number>()

  register(handler: SignalHandler): void {
    this.handlers.push(handler)
  }

  async ingest(signal: Signal): Promise<void> {
    // 去重:相同 source + type 在窗口内只处理一次
    const dedupKey = `${signal.source}:${signal.type}`
    const lastSeen = this.recentSignals.get(dedupKey)
    if (lastSeen && signal.timestamp - lastSeen < this.dedupWindow) {
      return
    }
    this.recentSignals.set(dedupKey, signal.timestamp)

    // 优先级排序插入
    this.buffer.push(signal)
    this.buffer.sort((a, b) => {
      const priority = { critical: 4, high: 3, medium: 2, low: 1 }
      return priority[b.priority] - priority[a.priority]
    })

    // 缓冲区溢出保护
    if (this.buffer.length > this.maxBufferSize) {
      this.buffer = this.buffer.slice(0, this.maxBufferSize)
    }

    await this.processNext()
  }

  private async processNext(): Promise<void> {
    const signal = this.buffer.shift()
    if (!signal) return

    for (const handler of this.handlers) {
      if (handler.canHandle(signal)) {
        const action = await handler.handle(signal)
        if (action) {
          await this.actionQueue.enqueue(action)
        }
        break // 第一个匹配的 handler 处理
      }
    }
  }
}

💡 提示: 信号去重是防止 Agent 过度响应的关键机制。如果没有去重,一个重复触发的 Webhook 可能导致 Agent 对同一事件执行数十次相同操作。

2.2 决策引擎:何时行动,何时等待

决策引擎是主动型 Agent 的大脑。它需要回答一个关键问题:当前信号是否值得触发行动? 这个决策需要综合考虑:

// decision-engine.ts — 基于规则 + LLM 的混合决策引擎
interface DecisionContext {
  signal: Signal
  currentLoad: number        // 当前系统负载 0-1
  recentActions: ActionLog[] // 最近 N 次行动记录
  budgetRemaining: number    // 剩余预算(美元)
  timeOfDay: number          // 当前时间(0-23)
}

interface Decision {
  shouldAct: boolean
  confidence: number   // 0-1
  reason: string
  estimatedCost: number
  estimatedDuration: number // ms
}

class DecisionEngine {
  private readonly rules: DecisionRule[] = [
    // 规则 1:预算耗尽则不行动
    {
      name: 'budget-check',
      evaluate: (ctx) => ctx.budgetRemaining < 0.5 ? {
        shouldAct: false, confidence: 1.0,
        reason: '预算即将耗尽,暂停所有自主行动',
        estimatedCost: 0, estimatedDuration: 0
      } : null
    },
    // 规则 2:系统负载过高则延迟
    {
      name: 'load-check',
      evaluate: (ctx) => ctx.currentLoad > 0.85 ? {
        shouldAct: false, confidence: 0.9,
        reason: `系统负载 ${ctx.currentLoad * 100}%,延迟处理`,
        estimatedCost: 0, estimatedDuration: 0
      } : null
    },
    // 规则 3:同类操作冷却期
    {
      name: 'cooldown-check',
      evaluate: (ctx) => {
        const sameType = ctx.recentActions.filter(
          a => a.signalType === ctx.signal.type
        )
        const lastAction = sameType[sameType.length - 1]
        if (lastAction && Date.now() - lastAction.timestamp < 300_000) {
          return {
            shouldAct: false, confidence: 0.8,
            reason: `同类操作冷却中(5分钟内已执行过)`,
            estimatedCost: 0, estimatedDuration: 0
          }
        }
        return null
      }
    },
    // 规则 4:非工作时间降级
    {
      name: 'off-hours-check',
      evaluate: (ctx) => {
        const isOffHours = ctx.timeOfDay < 9 || ctx.timeOfDay > 18
        if (isOffHours && ctx.signal.priority !== 'critical') {
          return {
            shouldAct: false, confidence: 0.7,
            reason: '非工作时间,仅处理 critical 级别信号',
            estimatedCost: 0, estimatedDuration: 0
          }
        }
        return null
      }
    }
  ]

  async evaluate(ctx: DecisionContext): Promise<Decision> {
    // 先走规则引擎(快速、确定性、零成本)
    for (const rule of this.rules) {
      const result = rule.evaluate(ctx)
      if (result) return result
    }

    // 规则引擎无法决策时,调用 LLM(慢、非确定性、有成本)
    return await this.llmEvaluate(ctx)
  }

  private async llmEvaluate(ctx: DecisionContext): Promise<Decision> {
    const prompt = `你是一个 AI Agent 的决策引擎。评估以下信号是否值得触发行动。

信号:${JSON.stringify(ctx.signal)}
当前系统负载:${ctx.currentLoad * 100}%
剩余预算:$${ctx.budgetRemaining}
最近行动:${JSON.stringify(ctx.recentActions.slice(-3))}

判断标准:
1. 这个信号的紧急程度如何?
2. 行动的预期收益是否大于成本?
3. 是否有更合适的处理时机?

返回 JSON:{ "shouldAct": boolean, "confidence": number, "reason": string, "estimatedCost": number }`

    const response = await this.llm.complete(prompt)
    return JSON.parse(response)
  }
}

关键结论: 决策引擎应该采用「规则优先、LLM 兜底」的策略。规则引擎处理 80% 的常规决策(零成本、毫秒级),LLM 只在规则无法覆盖的模糊场景介入。这样既保证了响应速度,又控制了运行成本。

2.3 执行沙箱:约束 Agent 的行动半径

这是整个架构中最关键的安全层。执行沙箱确保 Agent 的每一次行动都在可控范围内:

// execution-sandbox.ts — 执行沙箱实现
interface SandboxConfig {
  maxTokenBudget: number       // 单次行动最大 Token
  maxToolCalls: number         // 单次行动最大工具调用次数
  maxDuration: number          // 最大执行时间(ms)
  maxConcurrentActions: number // 最大并发行动数
  allowedTools: string[]       // 允许使用的工具白名单
  costCeiling: number          // 单次行动成本上限(美元)
}

interface ActionPlan {
  id: string
  signal: Signal
  steps: ActionStep[]
  estimatedCost: number
  estimatedDuration: number
}

interface ActionResult {
  id: string
  status: 'completed' | 'timeout' | 'budget-exceeded' | 'error'
  actualCost: number
  actualDuration: number
  stepsExecuted: number
  stepsTotal: number
  output: string
  error?: string
}

class ExecutionSandbox {
  private activeActions = new Map<string, AbortController>()
  private totalCostToday = 0
  private dailyBudget: number

  constructor(
    private config: SandboxConfig,
    private costTracker: CostTracker,
    private logger: Logger
  ) {
    this.dailyBudget = config.costCeiling * 10 // 日预算 = 10 次单次上限
  }

  async execute(plan: ActionPlan): Promise<ActionResult> {
    // 预检:成本、并发、工具白名单
    const preCheck = await this.preFlightCheck(plan)
    if (!preCheck.passed) {
      return {
        id: plan.id,
        status: 'budget-exceeded',
        actualCost: 0,
        actualDuration: 0,
        stepsExecuted: 0,
        stepsTotal: plan.steps.length,
        output: '',
        error: preCheck.reason
      }
    }

    const controller = new AbortController()
    this.activeActions.set(plan.id, controller)

    // 设置超时
    const timeout = setTimeout(() => {
      controller.abort()
      this.logger.warn(`行动 ${plan.id} 超时 (${this.config.maxDuration}ms)`)
    }, this.config.maxDuration)

    let stepsExecuted = 0
    let actualCost = 0

    try {
      for (const step of plan.steps) {
        // 每步执行前检查是否被中止
        if (controller.signal.aborted) break

        // 每步执行前检查预算
        if (actualCost + step.estimatedCost > this.config.costCeiling) {
          this.logger.warn(`行动 ${plan.id} 达到成本上限 $${actualCost}`)
          break
        }

        // 执行步骤
        const stepResult = await this.executeStep(step, controller.signal)
        stepsExecuted++
        actualCost += stepResult.cost

        // 检查工具调用次数
        if (stepsExecuted >= this.config.maxToolCalls) {
          this.logger.warn(`行动 ${plan.id} 达到工具调用上限`)
          break
        }
      }

      this.totalCostToday += actualCost

      return {
        id: plan.id,
        status: controller.signal.aborted ? 'timeout' : 'completed',
        actualCost,
        actualDuration: Date.now() - plan.steps[0].startTime,
        stepsExecuted,
        stepsTotal: plan.steps.length,
        output: '行动完成'
      }
    } catch (error) {
      return {
        id: plan.id,
        status: 'error',
        actualCost,
        actualDuration: Date.now() - plan.steps[0].startTime,
        stepsExecuted,
        stepsTotal: plan.steps.length,
        output: '',
        error: String(error)
      }
    } finally {
      clearTimeout(timeout)
      this.activeActions.delete(plan.id)
    }
  }

  private async preFlightCheck(plan: ActionPlan): Promise<{ passed: boolean; reason: string }> {
    // 检查日预算
    if (this.totalCostToday + plan.estimatedCost > this.dailyBudget) {
      return { passed: false, reason: `日预算即将耗尽 (已用 $${this.totalCostToday.toFixed(2)} / $${this.dailyBudget})` }
    }

    // 检查并发数
    if (this.activeActions.size >= this.config.maxConcurrentActions) {
      return { passed: false, reason: `并发行动数已达上限 (${this.activeActions.size}/${this.config.maxConcurrentActions})` }
    }

    // 检查工具白名单
    for (const step of plan.steps) {
      if (!this.config.allowedTools.includes(step.tool)) {
        return { passed: false, reason: `工具 ${step.tool} 不在白名单中` }
      }
    }

    return { passed: true, reason: '' }
  }
}

⚠️ 警告: 永远不要让 Agent 无限期运行。DN42 事件的根本原因是 Agent 没有设置执行时间上限和成本上限。一个简单的 setTimeout 和成本检查,就能避免 90% 的资源失控事故。

2.4 Human-in-the-Loop:关键决策点的人工确认

主动型 Agent 不应该完全自主。对于高风险操作,必须引入人工确认机制:

// human-in-the-loop.ts — 人工确认中间件
interface ApprovalRequest {
  id: string
  action: ActionPlan
  riskLevel: 'low' | 'medium' | 'high' | 'critical'
  summary: string
  estimatedImpact: string
  timeout: number // 确认超时时间
}

interface ApprovalDecision {
  approved: boolean
  modifier?: Partial<ActionPlan> // 人工可以修改行动方案
  reason?: string
}

class HumanInTheLoop {
  private pendingApprovals = new Map<string, {
    resolve: (decision: ApprovalDecision) => void
    timer: ReturnType<typeof setTimeout>
  }>()

  // 需要人工确认的操作类型
  private readonly requiresApproval = new Set([
    'create-cloud-resource',
    'delete-data',
    'send-notification',
    'modify-production-config',
    'execute-shell-command',
    'make-payment'
  ])

  async requestApproval(request: ApprovalRequest): Promise<ApprovalDecision> {
    // 低风险操作自动通过
    if (request.riskLevel === 'low') {
      return { approved: true }
    }

    // 高风险操作需要人工确认
    return new Promise((resolve) => {
      const timer = setTimeout(() => {
        // 超时默认拒绝
        this.pendingApprovals.delete(request.id)
        resolve({ approved: false, reason: '确认超时,自动拒绝' })
      }, request.timeout)

      this.pendingApprovals.set(request.id, { resolve, timer })

      // 发送通知(微信/Slack/邮件)
      this.notify(request)
    })
  }

  respond(requestId: string, decision: ApprovalDecision): void {
    const pending = this.pendingApprovals.get(requestId)
    if (!pending) return

    clearTimeout(pending.timer)
    this.pendingApprovals.delete(requestId)
    pending.resolve(decision)
  }

  needsApproval(action: ActionPlan): boolean {
    return action.steps.some(step => this.requiresApproval.has(step.tool))
  }
}

💡 提示: 人工确认的超时时间应该根据操作类型设置——创建云资源给 5 分钟,删除数据给 30 分钟,发送通知给 1 分钟。超时后默认拒绝,而不是默认通过。

🛡️ 三、主动型 Agent 的安全设计模式

3.1 五种关键安全模式

经过对 Claude Fable、LangChain Agents、AutoGPT 等系统的分析,以下是主动型 Agent 必备的五种安全模式:

模式一:预算熔断器(Budget Circuit Breaker)

// budget-circuit-breaker.ts
class BudgetCircuitBreaker {
  private state: 'closed' | 'open' | 'half-open' = 'closed'
  private failures = 0
  private lastFailureTime = 0

  constructor(
    private readonly threshold: number,    // 触发熔断的连续超支次数
    private readonly resetTimeout: number  // 熔断恢复时间
  ) {}

  async checkBudget(estimatedCost: number, remainingBudget: number): Promise<boolean> {
    if (this.state === 'open') {
      if (Date.now() - this.lastFailureTime > this.resetTimeout) {
        this.state = 'half-open'
      } else {
        return false // 熔断中,拒绝所有行动
      }
    }

    if (estimatedCost > remainingBudget * 0.1) {
      // 单次行动预算超过剩余预算的 10%
      this.failures++
      this.lastFailureTime = Date.now()
      if (this.failures >= this.threshold) {
        this.state = 'open'
        console.error(`🔴 预算熔断器触发!连续 ${this.failures} 次预算超限`)
      }
      return false
    }

    // 预算正常,重置计数
    if (this.state === 'half-open') this.state = 'closed'
    this.failures = 0
    return true
  }
}

模式二:递归深度限制(Recursion Depth Limiter)

DN42 事件的核心问题就是 Agent 的递归任务分解没有深度限制。以下实现确保 Agent 不会无限展开子任务:

// recursion-limiter.ts
class RecursionLimiter {
  private depth = 0
  private readonly maxDepth: number
  private readonly maxBreadth: number // 每层最大子任务数
  private taskTree: Map<string, string[]> = new Map()

  constructor(maxDepth = 3, maxBreadth = 5) {
    this.maxDepth = maxDepth
    this.maxBreadth = maxBreadth
  }

  canSpawnSubtask(parentId: string): { allowed: boolean; reason?: string } {
    const depth = this.getDepth(parentId)
    if (depth >= this.maxDepth) {
      return { allowed: false, reason: `已达最大递归深度 ${this.maxDepth}` }
    }

    const siblings = this.taskTree.get(parentId) || []
    if (siblings.length >= this.maxBreadth) {
      return { allowed: false, reason: `同级子任务已达上限 ${this.maxBreadth}` }
    }

    return { allowed: true }
  }

  private getDepth(taskId: string): number {
    let depth = 0
    let current = taskId
    for (const [parent, children] of this.taskTree) {
      if (children.includes(current)) {
        depth++
        current = parent
      }
    }
    return depth
  }
}

模式三:操作速率限制(Action Rate Limiter)

// action-rate-limiter.ts
class ActionRateLimiter {
  private windows = new Map<string, number[]>()

  constructor(
    private readonly maxPerMinute: number = 10,
    private readonly maxPerHour: number = 100
  ) {}

  canProceed(actionType: string): { allowed: boolean; retryAfter?: number } {
    const now = Date.now()
    const timestamps = this.windows.get(actionType) || []

    // 清理过期记录
    const recent = timestamps.filter(t => now - t < 3_600_000)
    this.windows.set(actionType, recent)

    // 检查分钟级限制
    const lastMinute = recent.filter(t => now - t < 60_000)
    if (lastMinute.length >= this.maxPerMinute) {
      return { allowed: false, retryAfter: 60_000 - (now - lastMinute[0]) }
    }

    // 检查小时级限制
    if (recent.length >= this.maxPerHour) {
      return { allowed: false, retryAfter: 3_600_000 - (now - recent[0]) }
    }

    recent.push(now)
    return { allowed: true }
  }
}

模式四:行为审计日志(Behavior Audit Trail)

// audit-logger.ts
interface AuditEntry {
  timestamp: number
  agentId: string
  action: string
  signal?: Signal
  decision: Decision
  result: ActionResult
  costIncurred: number
  humanApproved: boolean
}

class AuditLogger {
  private entries: AuditEntry[] = []

  log(entry: AuditEntry): void {
    this.entries.push(entry)

    // 异常模式检测
    this.detectAnomalies(entry)
  }

  private detectAnomalies(entry: AuditEntry): void {
    const recent = this.entries.filter(
      e => e.timestamp > Date.now() - 300_000
    )

    // 检测:5 分钟内成本飙升
    const recentCost = recent.reduce((sum, e) => sum + e.costIncurred, 0)
    if (recentCost > 10) {
      console.error(`🚨 5 分钟内累计成本 $${recentCost.toFixed(2)},超出阈值`)
    }

    // 检测:同类操作频繁执行
    const sameAction = recent.filter(e => e.action === entry.action)
    if (sameAction.length > 5) {
      console.warn(`⚠️ 操作 "${entry.action}" 在 5 分钟内执行了 ${sameAction.length} 次`)
    }
  }
}

模式五:优雅降级(Graceful Degradation)

当系统资源不足或外部服务不可用时,Agent 应该自动降级到更保守的行为模式:

// graceful-degradation.ts
type AgentMode = 'full-autonomy' | 'supervised' | 'read-only' | 'suspended'

class GracefulDegradation {
  private currentMode: AgentMode = 'full-autonomy'

  evaluate(incoming: {
    errorRate: number
    budgetRemaining: number
    systemLoad: number
    recentFailures: number
  }): AgentMode {
    // 严重:暂停所有自主行动
    if (incoming.errorRate > 0.5 || incoming.budgetRemaining < 1) {
      this.currentMode = 'suspended'
    }
    // 高:降级为只读模式
    else if (incoming.errorRate > 0.3 || incoming.recentFailures > 3) {
      this.currentMode = 'read-only'
    }
    // 中:降级为监督模式(关键操作需确认)
    else if (incoming.systemLoad > 0.8 || incoming.errorRate > 0.1) {
      this.currentMode = 'supervised'
    }
    // 正常:完全自主
    else {
      this.currentMode = 'full-autonomy'
    }

    return this.currentMode
  }

  getPermissions(): string[] {
    switch (this.currentMode) {
      case 'full-autonomy': return ['read', 'write', 'execute', 'create-resource']
      case 'supervised': return ['read', 'write', 'execute'] // 无 create-resource
      case 'read-only': return ['read']
      case 'suspended': return []
    }
  }
}

3.2 安全模式组合效果对比

安全模式 防护目标 性能开销 实现复杂度 推荐优先级
预算熔断器 成本失控 ⚡ 极低 ⭐ 简单 ✅ 必须
递归深度限制 任务膨胀 ⚡ 极低 ⭐ 简单 ✅ 必须
操作速率限制 资源滥用 ⚡ 低 ⭐⭐ 中等 ✅ 推荐
行为审计日志 事后追溯 ⚡ 低 ⭐⭐ 中等 ✅ 推荐
优雅降级 系统韧性 ⚡ 极低 ⭐⭐⭐ 复杂 ⚠️ 可选

⚠️ 警告: 预算熔断器和递归深度限制是最低安全底线。没有这两个机制就部署主动型 Agent,等于在没有刹车的情况下上高速。

3.3 一个完整的主动型 Agent 示例

将以上所有组件组合起来:

// proactive-agent.ts — 完整的主动型 Agent 组装
import { SignalCollector } from './signal-collector'
import { DecisionEngine } from './decision-engine'
import { ExecutionSandbox } from './execution-sandbox'
import { HumanInTheLoop } from './human-in-the-loop'
import { BudgetCircuitBreaker } from './budget-circuit-breaker'
import { RecursionLimiter } from './recursion-limiter'
import { ActionRateLimiter } from './action-rate-limiter'
import { AuditLogger } from './audit-logger'
import { GracefulDegradation } from './graceful-degradation'

async function createProactiveAgent(config: AgentConfig) {
  const collector = new SignalCollector()
  const decision = new DecisionEngine()
  const sandbox = new ExecutionSandbox({
    maxTokenBudget: 50_000,
    maxToolCalls: 20,
    maxDuration: 300_000, // 5 分钟
    maxConcurrentActions: 3,
    allowedTools: config.allowedTools,
    costCeiling: 5.0 // 单次行动最多 $5
  })
  const hitl = new HumanInTheLoop()
  const circuitBreaker = new BudgetCircuitBreaker(3, 600_000)
  const recursionLimiter = new RecursionLimiter(3, 5)
  const rateLimiter = new ActionRateLimiter(10, 100)
  const audit = new AuditLogger()
  const degradation = new GracefulDegradation()

  // 注册信号处理器
  collector.register({
    canHandle: (signal) => signal.type === 'webhook',
    handle: async (signal) => {
      // 1. 检查降级状态
      const mode = degradation.evaluate({
        errorRate: await getErrorRate(),
        budgetRemaining: await getBudget(),
        systemLoad: await getSystemLoad(),
        recentFailures: await getRecentFailures()
      })
      if (mode === 'suspended') return null

      // 2. 速率限制
      const rateCheck = rateLimiter.canProceed(signal.type)
      if (!rateCheck.allowed) return null

      // 3. 决策
      const decision_result = await decision.evaluate({
        signal,
        currentLoad: await getSystemLoad(),
        recentActions: await getRecentActions(),
        budgetRemaining: await getBudget(),
        timeOfDay: new Date().getHours()
      })
      if (!decision_result.shouldAct) return null

      // 4. 预算熔断检查
      const budgetOk = await circuitBreaker.checkBudget(
        decision_result.estimatedCost,
        await getBudget()
      )
      if (!budgetOk) return null

      // 5. 构建行动计划
      const plan = await buildActionPlan(signal, decision_result)

      // 6. 人工确认(如果需要)
      if (hitl.needsApproval(plan)) {
        const approved = await hitl.requestApproval({
          id: plan.id,
          action: plan,
          riskLevel: assessRisk(plan),
          summary: summarize(plan),
          estimatedImpact: estimateImpact(plan),
          timeout: 300_000
        })
        if (!approved.approved) return null
      }

      // 7. 在沙箱中执行
      const result = await sandbox.execute(plan)

      // 8. 审计记录
      audit.log({
        timestamp: Date.now(),
        agentId: config.agentId,
        action: plan.id,
        signal,
        decision: decision_result,
        result,
        costIncurred: result.actualCost,
        humanApproved: hitl.needsApproval(plan)
      })

      return result
    }
  })

  return collector
}

📊 四、主动型 Agent 的成本与收益分析

在决定是否采用主动型 Agent 架构之前,需要理性评估投入产出比:

维度 被动型 Agent 主动型 Agent 差异
开发成本 1-2 周 4-8 周 +3-6 周
Token 消耗(日均) ~$2-5 ~$10-50 +5-10x
基础设施成本 最低 +$20-100/月 显著增加
安全审计需求 需要专门流程
用户满意度提升 基准 +40-60% 显著提升
人工干预频率 每次交互 每天 1-3 次 大幅减少
适用场景 明确任务 持续监控/自动化 互补

关键结论: 主动型 Agent 不是所有场景的最佳选择。如果你的需求是「用户问问题,系统给答案」,被动型 Agent 就足够了。只有当你需要「7×24 持续监控、自动响应、减少人工干预」时,主动型 Agent 的额外成本才值得投入。

🔧 五、相关工具与框架推荐

  • 🔧 LangGraph — LangChain 的 Agent 编排框架,内置状态管理和持久化
  • 🔧 CrewAI — 多 Agent 协作框架,支持角色定义和任务委派
  • 🔧 OpenAI Agents SDK — OpenAI 官方 Agent 框架,内置 Guardrails
  • 🔧 Anthropic Agent Protocol — Claude Agent 的设计指南和最佳实践
  • 🔧 Temporal — 持久化执行引擎,适合长时间运行的 Agent 工作流
  • 📊 LangSmith — Agent 可观测性平台,支持 Trace 追踪和成本分析
  • 📊 Helicone — LLM 请求监控和成本追踪

📚 相关文章