Nanobot —— Agent Loop 核心与 Hook 系统
本文档深入拆解 Nanobot 的 Agent Loop 核心引擎、AgentRunner 迭代机制与 Hook 生命周期系统。
1. Agent Loop 概述
AgentLoop 是 Nanobot 的核心处理引擎,负责接收消息、组装上下文、驱动 LLM 迭代并回复结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| ┌─────────────────────────────────────────────────────────────┐ │ AgentLoop │ │ │ │ InboundMessage │ │ │ │ │ ▼ │ │ [1] 加载会话历史(SessionManager) │ │ │ │ │ ▼ │ │ [2] 构建上下文(ContextBuilder) │ │ │ SOUL.md + USER.md + MEMORY.md │ │ │ + Skills 摘要 + 历史消息 │ │ │ │ │ ▼ │ │ [3] 触发 Hook: before_iteration() │ │ │ │ │ ▼ │ │ [4] AgentRunner.run() ←─────────────────────┐ │ │ │ │ │ │ ▼ │ │ │ LLM 调用 → 工具调用? → 执行工具 → 结果注回 ─┘ │ │ │ (stop_reason == end_turn) │ │ ▼ │ │ [5] 触发 Hook: after_iteration() │ │ │ │ │ ▼ │ │ [6] 保存会话 + 触发 Memory 更新 │ │ │ │ │ ▼ │ │ OutboundMessage → Channel │ └─────────────────────────────────────────────────────────────┘
|
2. AgentRunner 迭代机制
AgentRunner 是可复用的 LLM 迭代引擎,与渠道解耦,可被主 Agent、子 Agent 共同调用。
2.1 核心数据结构
1 2 3 4 5 6 7 8 9 10
| @dataclass class AgentRunSpec: """单次运行配置""" messages: list[dict] system: str model: str tools: list[dict] max_iterations: int = 50 max_tokens: int = 8000 hook: AgentHook | None = None
|
2.2 迭代流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| while iterations < max_iterations: │ ├─ 检查 Token 预算(context window 管理) │ ↓ ├─ 调用 LLM Provider.call_model(messages, tools) │ ↓ ├─ 流式处理(on_stream hook 逐 delta 触发) │ ↓ ├─ stop_reason == "end_turn" ? │ └─ 是 → 返回最终内容 │ ↓ ├─ stop_reason == "tool_use" ? │ └─ 是 → 并发执行所有工具调用 │ ↓ │ ToolRegistry.execute(name, input) │ ↓ │ tool_results 注回 messages │ ↓ └─ iterations += 1,继续循环
|
2.3 上下文窗口管理
AgentRunner 在每次迭代前估算 token 消耗,防止超出 context limit:
1 2 3 4 5
| 总 token 预算 = context_limit - max_output_tokens - buffer
if estimated_tokens > 总预算: 触发 AutoCompact(自动压缩历史消息) └─ 保留系统提示 + 未完成任务 + 最近 K 条消息
|
2.4 错误处理与重试
| 错误类型 |
处理策略 |
| Rate Limit (429) |
指数退避重试(1s → 2s → 4s) |
| 工具执行异常 |
返回结构化错误给 LLM,继续决策 |
| 超出最大迭代 |
返回当前最佳结果 + 提示未完成 |
| LLM 返回空内容 |
重试一次,仍空则 fallback |
3. Hook 系统
Hook 是 Nanobot 的核心扩展机制,在 Agent Loop 生命周期的关键节点注入自定义逻辑,无需修改核心代码。
3.1 Hook 生命周期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| 消息进入 │ ▼ before_iteration(context) # 每次迭代前,可修改上下文 │ ▼ LLM 流式输出 ├─ on_stream(delta: str) # 每个 token delta 触发 └─ on_stream_end() # 流式完成 │ ▼ before_execute_tools(tool_calls) # 工具执行前,可拦截或记录 │ ▼ [工具执行] │ ▼ after_iteration(result) # 每次迭代完成后 │ ▼ finalize_content(content) → str # 最终内容后处理(如剥离思维链) │ ▼ 消息输出
|
3.2 AgentHook 基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class AgentHook: wants_streaming: bool = False
async def before_iteration(self, context: dict) -> None: """每次 LLM 调用前触发"""
async def on_stream(self, delta: str) -> None: """流式输出每个 token 触发(需 wants_streaming=True)"""
async def on_stream_end(self) -> None: """流式输出完成"""
async def before_execute_tools( self, tool_calls: list[ToolCall] ) -> None: """工具执行前触发,可用于权限检查"""
async def after_iteration(self, result: AgentResult) -> None: """每次迭代完成后触发,可用于统计"""
async def finalize_content(self, content: str) -> str: """最终内容处理,返回处理后的内容""" return content
|
3.3 CompositeHook:多 Hook 组合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class CompositeHook(AgentHook): """组合多个 Hook,错误隔离"""
def __init__(self, hooks: list[AgentHook]): self.hooks = hooks self.wants_streaming = any(h.wants_streaming for h in hooks)
async def before_iteration(self, context): for hook in self.hooks: try: await hook.before_iteration(context) except Exception as e: logger.error(f"Hook {hook} error: {e}")
|
3.4 内置 Hook 示例
日志 Hook(记录工具调用):
1 2 3 4 5 6 7
| class LoggingHook(AgentHook): async def before_execute_tools(self, tool_calls): for call in tool_calls: logger.info(f"Tool: {call.name} | Input: {call.input}")
async def after_iteration(self, result): logger.info(f"Tokens used: {result.usage}")
|
成本追踪 Hook:
1 2 3 4 5 6 7 8
| class CostTrackingHook(AgentHook): def __init__(self): self.total_tokens = 0 self.total_cost = 0.0
async def after_iteration(self, result): self.total_tokens += result.usage.total_tokens self.total_cost += calculate_cost(result.usage, result.model)
|
3.5 Hook 设计原则
| 原则 |
说明 |
| 可选性 |
无 Hook 时系统正常运行 |
| 错误隔离 |
CompositeHook 中单个失败不影响其他 |
| 不阻塞主流程 |
Hook 异常只记录日志,不中断 Agent |
| 顺序确定 |
CompositeHook 按注册顺序依次触发 |
4. AutoCompact 上下文压缩
当会话历史接近 context limit 时,AutoCompact 自动触发压缩,防止超长报错。
4.1 压缩策略
1 2 3 4 5 6 7 8 9 10 11
| 检测: estimated_tokens > threshold (默认 context_limit × 0.8) │ ▼ 压缩方案选择: ├─ 微型压缩(对话进行中) │ 保留: 系统提示 + 最近 20 条消息 + 任务摘要 │ 压缩: 早期历史 → LLM 生成摘要段落 │ └─ 完整压缩(会话结束后) 保留: 系统提示 + 关键决策摘要 归档: 完整历史 → history.jsonl
|
4.2 压缩触发点
| 触发时机 |
压缩类型 |
触发条件 |
| 每次 LLM 调用前 |
微型压缩 |
token 超过 80% |
| 会话结束后 |
Consolidator |
消息数 > max_messages |
| 定期调度(cron) |
Dream 处理 |
配置的时间间隔 |
5. 子 Agent(Subagent)
Nanobot 支持主 Agent 启动隔离的子 Agent 处理子任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class SpawnTool(BaseTool): name = "spawn_agent"
async def run(self, task: str, context: str = "") -> str: sub_spec = AgentRunSpec( messages=[{"role": "user", "content": task}], system=build_subagent_system(context), model=self.model, tools=self.subagent_tools, ) runner = AgentRunner(sub_spec) result = await runner.run() return result.final_content
|
子 Agent 隔离的价值:
| 场景 |
说明 |
| 复杂子任务 |
子任务的中间步骤不污染主 Agent context |
| 并行执行 |
多个子 Agent 同时运行,各自独立 |
| 工具隔离 |
子 Agent 可使用不同的工具集(最小权限) |
| 错误隔离 |
子 Agent 失败不直接中断主 Agent |