📰 来源: 博客园
企业微信机器人以前通常采用 Webhook 回调方式接收消息,但这种方式存在延迟较高、需要公网服务器等局限性。随着OpenClaw爆火,企业微信机器人也支持 WebSocket 长连接方式。本文介绍一种基于 WebSocket 长连接的企业微信机器人实现方案,并集成 DeepAgents 框架实现智能对话能力。
wecom-aibot-python-sdk - 官方提供的 WebSocket 连接库qywx-bot/
├── main.py # FastAPI 主入口
├── pyproject.toml # 项目依赖配置
├── conf/
│ └── config.toml # 应用配置
├── pkg/
│ ├── config/ # 配置管理模块
│ ├── log/ # 日志模块
│ └── qywx/ # 企业微信客户端
└── ai_agent/
├── ai_agent.py # DeepAgents 集成
└── mcp_servers/ # MCP 工具服务器
uv add fastapi deepagents langchain-openai langchain-mcp-adapters wecom-aibot-python-sdk uvicorn
使用 TOML 格式管理配置,支持多环境切换:
[service]
host = "127.0.0.1"
port = 8000
env = "dev"
[qywx.v2]
bot_id = "your-bot-id"
secret = "your-bot-secret"
bot_name = "智能助手"
2. 企业微信 WebSocket 客户端
通过 WebSocket 长连接接收企业微信消息,实现低延迟实时交互:
class QywxClient:
async def start(self):
self.ws_client = WSClient(
WSClientOptions(
bot_id=cfg.qywx_bot_id,
secret=cfg.qywx_secret,
logger=self.logger,
)
)
# 注册事件处理器
self.ws_client.on("authenticated", self._on_authenticated)
self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
self.ws_client.on("message.text", self._on_message_text)
await self.ws_client.connect()
3. DeepAgents 集成
构建具备工具调用能力的智能体,通过 MCP 协议加载工具:
class AIAgent:
async def _create_root_agent(self, session: ClientSession):
tools = await load_mcp_tools(session)
return create_deep_agent(
model=self.model,
tools=tools,
system_prompt=f"你是一个企业微信机器人,名字叫{cfg.qywx_bot_name}",
)
实现企业微信流式消息回复,提升用户体验:
async def _on_message_text(self, frame: WsFrameHeaders):
stream_id = generate_req_id('stream')
await self.ws_client.reply_stream(frame, stream_id, "思考中...", False)
async for chunk in aiops.invoke(content):
await self.ws_client.reply_stream(frame, stream_id, str(chunk), False)
await self.ws_client.reply_stream(frame, stream_id, "", True)
5. FastAPI Lifespan 管理
正确管理应用生命周期,包括 WebSocket 连接、MCP 服务器和 AI 代理:
@asynccontextmanager
async def lifespan(app: FastAPI):
await aiops.start()
await qywx_client.start()
# 挂载 MCP 服务器
mcp_app = datetime_mcp.streamable_http_app()
async with datetime_mcp.session_manager.run():
app.mount("/mcp", mcp_app)
yield
await aiops.shutdown()
await qywx_client.shutdown()
将 FastMCP 服务器挂载到 FastAPI 时,需注意正确初始化 session manager:
# 错误方式:直接挂载会导致 task group 未初始化
app.mount("/mcp", datetime_mcp.streamable_http_app())
# 正确方式:在 lifespan 中启动 session manager
async with datetime_mcp.session_manager.run():
app.mount("/mcp", mcp_app)
yield
DeepAgents 的 astream() 返回的 chunk 是嵌套字典结构,需正确提取内容:
async for chunk in root_agent.astream(input={"messages": [HumanMessage(content=input)]}):
if isinstance(chunk, dict):
messages = chunk.get("model", {}).get("messages", [])
for msg in messages:
if hasattr(msg, "content") and m
🔗 原文链接: 点击阅读原文
文章评论