Skip to content

Commit 23ef9a1

Browse files
lmjhhcoderhrhtuhahaha
authored
supports calling mcp tools. (QwenLM#443)
* Qwen-agent supports calling mcp server tools. I have implemented basic MCP (Model-Context-Protocol) support for Qwen-agent, enabling it to invoke tools from an MCP server. Additionally, I have provided an example to demonstrate the functionality of this new feature. * Update assistant_mcp_sqlite_bot.py Modify file description. --------- Co-authored-by: coderhrh <8843271+coderhrh@user.noreply.gitee.com> Co-authored-by: Jianhong Tu <37433392+tuhahaha@users.noreply.github.com>
1 parent 8dd8054 commit 23ef9a1

File tree

5 files changed

+297
-1
lines changed

5 files changed

+297
-1
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""A sqlite database assistant implemented by assistant"""
2+
3+
import os
4+
import asyncio
5+
from typing import Optional
6+
7+
from qwen_agent.agents import Assistant
8+
from qwen_agent.gui import WebUI
9+
10+
ROOT_RESOURCE = os.path.join(os.path.dirname(__file__), 'resource')
11+
12+
13+
def init_agent_service():
14+
llm_cfg = {'model': 'qwen-max'}
15+
system = ('你扮演一个数据库助手,你具有查询数据库的能力')
16+
tools = [{
17+
"mcpServers": {
18+
"sqlite" : {
19+
"command": "uvx",
20+
"args": [
21+
"mcp-server-sqlite",
22+
"--db-path",
23+
"test.db"
24+
]
25+
}
26+
}
27+
}]
28+
bot = Assistant(
29+
llm=llm_cfg,
30+
name='数据库助手',
31+
description='数据库查询',
32+
system_message=system,
33+
function_list=tools,
34+
)
35+
36+
return bot
37+
38+
39+
def test(query='数据库里有几张表', file: Optional[str] = os.path.join(ROOT_RESOURCE, 'poem.pdf')):
40+
# Define the agent
41+
bot = init_agent_service()
42+
43+
# Chat
44+
messages = []
45+
46+
if not file:
47+
messages.append({'role': 'user', 'content': query})
48+
else:
49+
messages.append({'role': 'user', 'content': [{'text': query}, {'file': file}]})
50+
51+
for response in bot.run(messages):
52+
print('bot response:', response)
53+
54+
55+
def app_tui():
56+
# Define the agent
57+
bot = init_agent_service()
58+
59+
# Chat
60+
messages = []
61+
while True:
62+
# Query example: 数据库里有几张表
63+
query = input('user question: ')
64+
# File example: resource/poem.pdf
65+
file = input('file url (press enter if no file): ').strip()
66+
if not query:
67+
print('user question cannot be empty!')
68+
continue
69+
if not file:
70+
messages.append({'role': 'user', 'content': query})
71+
else:
72+
messages.append({'role': 'user', 'content': [{'text': query}, {'file': file}]})
73+
74+
response = []
75+
for response in bot.run(messages):
76+
print('bot response:', response)
77+
messages.extend(response)
78+
79+
80+
def app_gui():
81+
# Define the agent
82+
bot = init_agent_service()
83+
chatbot_config = {
84+
'prompt.suggestions': [
85+
'数据库里有几张表',
86+
'创建一个学生表包括学生的姓名、年龄',
87+
'增加一个学生名字叫韩梅梅,今年6岁',
88+
]
89+
}
90+
WebUI(
91+
bot,
92+
chatbot_config=chatbot_config,
93+
).run()
94+
95+
96+
if __name__ == '__main__':
97+
# test()
98+
# app_tui()
99+
app_gui()

‎qwen_agent/agent.py‎

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from qwen_agent.llm.base import BaseChatModel
99
from qwen_agent.llm.schema import CONTENT, DEFAULT_SYSTEM_MESSAGE, ROLE, SYSTEM, ContentItem, Message
1010
from qwen_agent.log import logger
11-
from qwen_agent.tools import TOOL_REGISTRY, BaseTool
11+
from qwen_agent.tools import TOOL_REGISTRY, BaseTool, MCPManager
1212
from qwen_agent.tools.base import ToolServiceError
1313
from qwen_agent.tools.simple_doc_parser import DocParserError
1414
from qwen_agent.utils.utils import has_chinese_messages, merge_generate_cfgs
@@ -205,6 +205,13 @@ def _init_tool(self, tool: Union[str, Dict, BaseTool]):
205205
if tool_name in self.function_map:
206206
logger.warning(f'Repeatedly adding tool {tool_name}, will use the newest tool in function list')
207207
self.function_map[tool_name] = tool
208+
elif isinstance(tool, dict) and 'mcpServers' in tool:
209+
tools = MCPManager().initConfig(tool)
210+
for tool in tools:
211+
tool_name = tool.name
212+
if tool_name in self.function_map:
213+
logger.warning(f'Repeatedly adding tool {tool_name}, will use the newest tool in function list')
214+
self.function_map[tool_name] = tool
208215
else:
209216
if isinstance(tool, dict):
210217
tool_name = tool['name']

‎qwen_agent/tools/__init__.py‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .simple_doc_parser import SimpleDocParser
1111
from .storage import Storage
1212
from .web_extractor import WebExtractor
13+
from .mcp_manager import MCPManager
1314
from .web_search import WebSearch
1415

1516
__all__ = [
@@ -29,5 +30,6 @@
2930
'FrontPageSearch',
3031
'ExtractDocVocabulary',
3132
'PythonExecutor',
33+
'MCPManager',
3234
'WebSearch',
3335
]

‎qwen_agent/tools/mcp_manager.py‎

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import json
2+
import urllib.parse
3+
import asyncio
4+
import threading
5+
from typing import Optional, Union, List, Dict
6+
from contextlib import AsyncExitStack
7+
8+
from mcp import ClientSession, StdioServerParameters
9+
from mcp.client.stdio import stdio_client
10+
11+
from qwen_agent.log import logger
12+
from qwen_agent.tools.base import BaseTool, register_tool
13+
14+
from dotenv import load_dotenv
15+
16+
class MCPManager:
17+
_instance = None # Private class variable to store the unique instance
18+
19+
def __new__(cls, *args, **kwargs):
20+
if cls._instance is None:
21+
cls._instance = super(MCPManager, cls).__new__(cls, *args, **kwargs)
22+
cls._instance.__init__()
23+
return cls._instance
24+
25+
def __init__(self):
26+
if not hasattr(self, 'clients'):
27+
"""Set a new event loop in a separate thread"""
28+
load_dotenv() # Load environment variables from .env file
29+
self.clients: dict = {}
30+
self.exit_stack = AsyncExitStack()
31+
self.loop = asyncio.new_event_loop()
32+
self.loop_thread = threading.Thread(target=self.start_loop, daemon=True)
33+
self.loop_thread.start()
34+
35+
def start_loop(self):
36+
asyncio.set_event_loop(self.loop)
37+
self.loop.run_forever()
38+
39+
def is_valid_mcp_servers(self, config: dict):
40+
"""Example of mcp servers configuration:
41+
{
42+
"mcpServers": {
43+
"memory": {
44+
"command": "npx",
45+
"args": ["-y", "@modelcontextprotocol/server-memory"]
46+
},
47+
"filesystem": {
48+
"command": "npx",
49+
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/allowed/files"]
50+
},
51+
"github": {
52+
"command": "npx",
53+
"args": ["-y", "@modelcontextprotocol/server-github"],
54+
"env": {
55+
"GITHUB_PERSONAL_ACCESS_TOKEN": "<YOUR_TOKEN>"
56+
}
57+
}
58+
}
59+
}
60+
"""
61+
62+
# Check if the top-level key "mcpServers" exists and its value is a dictionary
63+
if not isinstance(config, dict) or 'mcpServers' not in config or not isinstance(config['mcpServers'], dict):
64+
return False
65+
mcp_servers = config['mcpServers']
66+
# Check each sub-item under "mcpServers"
67+
for key in mcp_servers:
68+
server = mcp_servers[key]
69+
# Each sub-item must be a dictionary and contain the keys "command" and "args"
70+
if not isinstance(server, dict) or 'command' not in server or 'args' not in server:
71+
return False
72+
# "command" must be a string
73+
if not isinstance(server['command'], str):
74+
return False
75+
# "args" must be a list
76+
if not isinstance(server['args'], list):
77+
return False
78+
# If the "env" key exists, it must be a dictionary
79+
if 'env' in server and not isinstance(server['env'], dict):
80+
return False
81+
return True
82+
83+
def initConfig(self, config: Dict):
84+
logger.info(f'Initialize from config {config}. ')
85+
if not self.is_valid_mcp_servers(config):
86+
raise ValueError('Config format error')
87+
# Submit coroutine to the event loop and wait for the result
88+
future = asyncio.run_coroutine_threadsafe(self.init_config_async(config), self.loop)
89+
try:
90+
result = future.result() # You can specify a timeout if desired
91+
return result
92+
except Exception as e:
93+
logger.info(f"Error executing function: {e}")
94+
return None
95+
96+
async def init_config_async(self, config: Dict):
97+
tools : list = []
98+
mcp_servers = config['mcpServers']
99+
for server_name in mcp_servers:
100+
client = MCPClient()
101+
server = mcp_servers[server_name]
102+
await client.connection_server(self.exit_stack, server) # Attempt to connect to the server
103+
self.clients[server_name] = client # Add to clients dict after successful connection
104+
for tool in client.tools:
105+
"""MCP tool example:
106+
{
107+
"name": "read_query",
108+
"description": "Execute a SELECT query on the SQLite database",
109+
"inputSchema": {
110+
"type": "object",
111+
"properties": {
112+
"query": {
113+
"type": "string",
114+
"description": "SELECT SQL query to execute"
115+
}
116+
},
117+
"required": ["query"]
118+
}
119+
"""
120+
parameters = tool.inputSchema
121+
# The required field in inputSchema may be empty and needs to be initialized.
122+
if 'required' not in parameters:
123+
parameters['required'] = []
124+
register_name = server_name + "-" + tool.name
125+
agent_tool = self.create_tool_class(register_name, server_name, tool.name, tool.description, parameters)
126+
tools.append(agent_tool)
127+
return tools
128+
129+
def create_tool_class(self, register_name, server_name, tool_name, tool_desc, tool_parameters):
130+
@register_tool(register_name)
131+
class ToolClass(BaseTool):
132+
description = tool_desc
133+
parameters = tool_parameters
134+
135+
def call(self, params: Union[str, dict], **kwargs) -> str:
136+
tool_args = json.loads(params)
137+
# Submit coroutine to the event loop and wait for the result
138+
manager = MCPManager()
139+
client = manager.clients[server_name]
140+
future = asyncio.run_coroutine_threadsafe(client.execute_function(tool_name, tool_args), manager.loop)
141+
try:
142+
result = future.result()
143+
return result
144+
except Exception as e:
145+
logger.info(f"Error executing function: {e}")
146+
return None
147+
return "Function executed"
148+
149+
ToolClass.__name__ = f"{register_name}_Class"
150+
return ToolClass()
151+
152+
async def clearup(self):
153+
await self.exit_stack.aclose()
154+
155+
156+
class MCPClient:
157+
def __init__(self):
158+
# Initialize session and client objects
159+
self.session: Optional[ClientSession] = None
160+
self.tools : list = None
161+
162+
async def connection_server(self, exit_stack, mcp_server):
163+
"""Connect to an MCP server and retrieve the available tools."""
164+
try:
165+
server_params = StdioServerParameters(
166+
command = mcp_server["command"],
167+
args = mcp_server["args"],
168+
env = mcp_server.get("env", None)
169+
)
170+
stdio_transport = await exit_stack.enter_async_context(stdio_client(server_params))
171+
self.stdio, self.write = stdio_transport
172+
self.session = await exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
173+
174+
await self.session.initialize()
175+
176+
list_tools = await self.session.list_tools()
177+
self.tools = list_tools.tools
178+
except Exception as e:
179+
logger.info(f"Failed to connect to server: {e}")
180+
181+
async def execute_function(self, tool_name, tool_args: dict):
182+
response = await self.session.call_tool(tool_name, tool_args)
183+
for content in response.content:
184+
if content.type == 'text':
185+
return content.text
186+
else:
187+
return "execute error"

‎setup.py‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def read_description() -> str:
5252
'pydantic>=2.3.0',
5353
'requests',
5454
'tiktoken',
55+
'mcp',
5556
],
5657
extras_require={
5758
# Extra dependencies for RAG:

0 commit comments

Comments
 (0)