Nanobot —— 渠道系统与 Provider 抽象层

本文档拆解 Nanobot 的渠道系统(Channel)、消息总线(MessageBus)与 LLM Provider 抽象层的设计实现。

1. 消息总线设计

MessageBus 是渠道与 Agent 之间的解耦层,所有消息通过事件队列异步传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌─────────────────────────────────────────────────────┐
│ MessageBus │
│ │
│ 渠道 A (Telegram) ──┐ │
│ 渠道 B (Discord) ──┤──▶ InboundMessage Queue │
│ 渠道 C (CLI) ──┘ │ │
│ ▼ │
│ AgentLoop.process() │
│ │ │
│ ▼ │
│ 渠道 A ◀────────────── OutboundMessage Queue │
│ 渠道 B ◀────────────── │ │
│ │ │
└─────────────────────────────────────────────────────┘

1.1 消息数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@dataclass
class InboundMessage:
channel_id: str # 渠道标识(如 "telegram")
chat_id: str # 会话 ID(如用户 ID 或群组 ID)
user_id: str # 发送者 ID
content: str # 文本内容
media: list | None # 附件(图片、文件、音频等)
reply_to: str | None # 回复的消息 ID

@dataclass
class OutboundMessage:
channel_id: str # 目标渠道
chat_id: str # 目标会话
content: str # 回复文本
media: list | None # 附件
metadata: dict # 渠道特定元数据(按钮、格式等)

1.2 解耦的优势

场景 说明
渠道独立 单个渠道崩溃不影响其他渠道和 Agent 处理
消息缓冲 高频消息自然排队,不会冲垮 Agent
多渠道并发 多个渠道同时接入,Agent 顺序处理
渠道热插拔 启停渠道不影响 Agent 核心运行

2. BaseChannel 抽象

所有渠道实现继承 BaseChannel,提供统一的消息收发接口。

2.1 BaseChannel 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class BaseChannel(ABC):
"""渠道基类,定义统一的消息接口"""

channel_id: str # 渠道唯一标识
bus: MessageBus # 消息总线引用

@abstractmethod
async def start(self) -> None:
"""启动渠道监听(轮询或 Webhook)"""

@abstractmethod
async def stop(self) -> None:
"""停止渠道"""

@abstractmethod
async def send_message(self, msg: OutboundMessage) -> None:
"""发送消息到渠道"""

async def transcribe_audio(self, audio: bytes) -> str:
"""音频转录(Whisper),子类按需调用"""

2.2 渠道实现示例(Telegram)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TelegramChannel(BaseChannel):
channel_id = "telegram"

async def start(self):
app = Application.builder().token(self.config.token).build()
app.add_handler(MessageHandler(filters.ALL, self._on_message))
await app.run_polling()

async def _on_message(self, update: Update, ctx: Context):
# 构造统一的 InboundMessage
msg = InboundMessage(
channel_id="telegram",
chat_id=str(update.effective_chat.id),
user_id=str(update.effective_user.id),
content=update.message.text or "",
media=self._extract_media(update.message),
)
# 推入消息总线
await self.bus.put(msg)

async def send_message(self, msg: OutboundMessage):
# Telegram 特定格式化(Markdown、按钮等)
await self.app.bot.send_message(
chat_id=msg.chat_id,
text=msg.content,
parse_mode="Markdown",
)

3. 渠道支持矩阵

Nanobot 内置 15+ 渠道适配,覆盖主流即时通讯和协作平台:

分类 渠道 特性支持
即时通讯 Telegram 文本 / 图片 / 文件 / 音频转录 / 按钮
Discord 文本 / 附件 / Slash Command
Slack 文本 / 附件 / Block Kit
WhatsApp 文本 / 图片 / 音频
Matrix 端对端加密消息
国内平台 飞书(Lark) 文本 / 卡片消息 / At 消息
钉钉 文本 / Markdown / 卡片
企业微信 文本 / 文件 / 应用消息
微信(WeCom) 公众号 / 企业号
QQ 文本 / 图片(Bot API)
开发工具 CLI 终端交互,本地调试
WebSocket 自定义前端接入
其他 Email 邮件收发(IMAP/SMTP)
MSTeams Microsoft Teams 集成
MoChat 模拟 Chat(测试用)

3.1 音频转录支持

支持音频消息的渠道(Telegram、WhatsApp)内置 Whisper 转录:

1
2
3
4
5
6
7
8
9
10
11
12
async def transcribe_audio(self, audio: bytes) -> str:
"""使用 OpenAI Whisper 或 Groq 转录"""
provider = self.config.transcription_provider # "openai" | "groq"
if provider == "openai":
result = await openai_client.audio.transcriptions.create(
model="whisper-1", file=audio, language="zh"
)
elif provider == "groq":
result = await groq_client.audio.transcriptions.create(
model="whisper-large-v3", file=audio
)
return result.text

4. LLM Provider 抽象层

Provider 层将所有 LLM 的调用接口统一,Agent 核心无需感知底层模型差异。

4.1 LLMProvider 基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class LLMProvider(ABC):
"""统一 LLM 接口"""

@abstractmethod
async def call_model(
self,
messages: list[dict],
system: str,
tools: list[dict],
model: str,
max_tokens: int,
stream: bool = False,
) -> LLMResponse:
"""调用 LLM,返回统一响应格式"""

@abstractmethod
def extract_tool_calls(
self, response: LLMResponse
) -> list[ToolCallRequest]:
"""从响应中提取工具调用"""

@abstractmethod
def extract_text(self, response: LLMResponse) -> str:
"""从响应中提取文本内容"""

4.2 统一响应格式

1
2
3
4
5
6
7
8
9
10
11
12
@dataclass
class LLMResponse:
content: list # 原始响应内容块
stop_reason: str # "end_turn" | "tool_use" | "max_tokens"
usage: TokenUsage # 输入/输出 token 统计
model: str # 实际使用的模型名

@dataclass
class ToolCallRequest:
id: str # 工具调用 ID
name: str # 工具名称
input: dict # 工具参数(已解析为 dict)

4.3 各 Provider 实现对比

Provider 工具调用格式 系统提示位置 消息格式
Anthropic tool_use block system 参数 标准 role/content
OpenAI function / tool_calls system role 消息 标准 role/content
Azure OpenAI 同 OpenAI 同 OpenAI 同 OpenAI
GitHub Copilot 同 OpenAI 同 OpenAI 同 OpenAI
OpenAI Compat 同 OpenAI 同 OpenAI 各厂商可能有差异

4.4 Anthropic Provider 适配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class AnthropicProvider(LLMProvider):

async def call_model(self, messages, system, tools, model, max_tokens, stream):
response = await self.client.messages.create(
model=model,
system=system,
messages=messages,
tools=tools, # Anthropic 工具格式
max_tokens=max_tokens,
stream=stream,
)
return LLMResponse(
content=response.content,
stop_reason=response.stop_reason,
usage=TokenUsage(
input=response.usage.input_tokens,
output=response.usage.output_tokens,
),
model=response.model,
)

def extract_tool_calls(self, response) -> list[ToolCallRequest]:
return [
ToolCallRequest(id=block.id, name=block.name, input=block.input)
for block in response.content
if block.type == "tool_use"
]

4.5 OpenAI 兼容 Provider 适配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class OpenAICompatProvider(LLMProvider):

async def call_model(self, messages, system, tools, model, ...):
# 将 system 转为 system role 消息
full_messages = [{"role": "system", "content": system}] + messages

# 将 Anthropic 工具格式转为 OpenAI 工具格式
openai_tools = self._convert_tools(tools)

response = await self.client.chat.completions.create(
model=model,
messages=full_messages,
tools=openai_tools,
...
)
return self._to_llm_response(response)

5. Provider 工厂与注册

5.1 工厂模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class ProviderFactory:
"""根据配置创建对应 Provider 实例"""

_registry: dict[str, type[LLMProvider]] = {}

@classmethod
def register(cls, name: str, provider_cls: type):
cls._registry[name] = provider_cls

@classmethod
def create(cls, config: ProvidersConfig) -> LLMProvider:
# 根据配置中的主 provider 名称实例化
primary = config.primary # "anthropic" | "openai" | ...
provider_cls = cls._registry[primary]
return provider_cls(config)

# 注册所有内置 Provider
ProviderFactory.register("anthropic", AnthropicProvider)
ProviderFactory.register("openai", OpenAICompatProvider)
ProviderFactory.register("azure_openai", AzureOpenAIProvider)
ProviderFactory.register("github_copilot", GitHubCopilotProvider)

5.2 Provider 切换

1
2
3
4
5
6
7
8
9
10
11
// config.json 中切换 Provider,无需修改代码
{
"providers": {
"primary": "openai",
"openai": {
"base_url": "https://api.deepseek.com",
"api_key": "sk-...",
"default_model": "deepseek-chat"
}
}
}

5.3 支持的 Provider

Provider base_url 配置 代表模型
Anthropic 官方 API claude-sonnet-4-6
OpenAI 官方 API gpt-4o
Azure OpenAI Azure 端点 gpt-4o(Azure 部署)
GitHub Copilot GitHub API copilot-chat
DeepSeek api.deepseek.com deepseek-chat
本地(Ollama) localhost:11434 llama3 / qwen
30+ 其他 OpenAI 兼容 各厂商模型