在 MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch 一文中,我们详细介绍了如何利用 MCP Python SDK 编写一个 Elasticsearch MCP 服务器,并通过 Claude Desktop 作为 MCP 客户端进行交互。本文将进一步介绍如何使用 MCP Python SDK 编写一个 MCP 客户端,以便更加灵活地与 MCP 服务器进行通信和集成。本文的完整代码可以在 Github 上找到:https://github.com/cr7258/hands-on-lab/tree/main/ai/claude/mcp/client/elasticsearch-mcp-client-example
MCP 系列文章:
MCP 客户端的作用
MCP 客户端充当 LLM 和 MCP 服务器之间的桥梁,MCP 客户端的工作流程如下:
MCP 客户端首先从 MCP 服务器获取可用的工具列表。
将用户的查询连同工具描述通过 function calling 一起发送给 LLM。
LLM 决定是否需要使用工具以及使用哪些工具。
如果需要使用工具,MCP 客户端会通过 MCP 服务器执行相应的工具调用。
工具调用的结果会被发送回 LLM。
LLM 基于所有信息生成自然语言响应。
最后将响应展示给用户。
MCP 通信方式
MCP 支持两种通信方式:
标准输入输出(Standard Input/Output, stdio):客户端通过启动服务器子进程并使用标准输入(stdin)和标准输出(stdout)建立双向通信,一个服务器进程只能与启动它的客户端通信(1:1 关系)。stdio 适用于本地快速集成的场景,在本文中,我们将使用这种方式来编写 MCP 客户端。
服务器发送事件(Server-Sent Events, SSE):服务器作为独立进程运行,客户端和服务器代码完全解耦,支持多个客户端随时连接和断开。这种方式将在后续的系列文章中单独进行介绍。
简单的示例
在开始构建复杂的应用之前,让我们先创建一个最简单的 MCP 客户端。这个基础示例将展示如何连接到 MCP 服务器并使用其提供的工具。
MCP 服务器的代码以及 Elasticsearch 集群的配置文件请参考 MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch 一文。
添加依赖
在本教程中,我们将使用 MCP Python SDK 来编写项目,使用 uv 来管理 Python 项目依赖。需要添加如下依赖:
uv add mcp elasticsearch openai
复制代码
设置服务器连接参数
在使用 stdio 方式进行通信时,MCP 服务器的进程由 MCP 客户端程序负责启动。因此,我们通过 StdioServerParameters 来配置服务器进程的启动参数,包括运行 MCP 服务器的命令及其对应的参数。其中,sys.argv[1] 代表客户端程序运行时传入的第一个参数,用于指定服务器脚本的路径,从而确保 MCP 客户端能够正确启动并连接到 MCP 服务器。
server_params = StdioServerParameters( command="python", # 运行命令 args=[sys.argv[1]], # 服务器脚本路径 env=None # 可选的环境变量)
复制代码
建立服务器连接
stdio_client 负责启动服务器进程并建立双向通信通道,它返回用于读写数据的流对象。ClientSession 则在这些流的基础上提供高层的会话管理,包括初始化连接、维护会话状态等。
async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize()
复制代码
调用工具
在 MCP 客户端中,我们使用了以下两个函数来与 MCP 服务器进行交互。
list_tools():获取 MCP 服务器提供的所有可用工具。
call_tool(name, args):调用指定的工具并获取结果,这里调用 list_indices 来获取 Elasticsearch 集群中的索引信息。
from mcp import ClientSession, StdioServerParametersfrom mcp.client.stdio import stdio_client
async def run(): # 建立连接 async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # 初始化连接 await session.initialize()
# 列出可用工具 tools = await session.list_tools() print("Tools:", tools)
# 调用工具 indices = await session.call_tool("list_indices") print("Indices:", indices)
if __name__ == "__main__": import asyncio asyncio.run(run())
复制代码
运行程序
使用以下命令运行 MCP 客户端:
uv run simple.py <MCP 服务器的代码路径>
复制代码
程序首先会列出所有可用的工具,可以看到总共有 3 个工具:list_indices、get_index 和 write_documents。
Tools: meta=None nextCursor=None tools=[ Tool( name='list_indices', description='List all Elasticsearch indices', inputSchema={ 'properties': {}, 'title': 'list_indicesArguments', 'type': 'object' } ), Tool( name='get_index', description='Get detailed information about a specific Elasticsearch index', inputSchema={ 'properties': { 'index': { 'title': 'Index', 'type': 'string' } }, 'required': ['index'], 'title': 'get_indexArguments', 'type': 'object' } ), Tool( name='write_documents', description='Write multiple documents to an Elasticsearch index using bulk API', inputSchema={ 'properties': { 'index': { 'title': 'Index', 'type': 'string' }, 'documents': { 'items': { 'type': 'object' }, 'title': 'Documents', 'type': 'array' } }, 'required': ['index', 'documents'], 'title': 'write_documentsArguments', 'type': 'object' } )]
复制代码
然后,程序会调用 list_indices 工具,获取 Elasticsearch 集群中的索引信息,并打印出来。
Indices: meta=None content=[ TextContent( type='text', text="[ {'health': 'green', 'status': 'open', 'index': 'student', 'uuid': 'iKl3j9ujSJ2i0GF5v8NUDw', 'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '12kb', 'pri.store.size': '6kb'},
{'health': 'green', 'status': 'open', 'index': 'teacher', 'uuid': 'vfbwvH7yQGWqRGEG-t4FnA', 'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '22.6kb', 'pri.store.size': '11.3kb'}, {'health': 'green', 'status': 'open', 'index': 'movies', 'uuid': 'JYhacHmXREWmkvwXBaFPmg', 'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '13.2kb', 'pri.store.size': '6.6kb'} ]" )]isError=False
复制代码
进阶示例:集成 LLM
在实际应用中,我们通常希望让 LLM(如 OpenAI、Claude、通义千问等)自主决定调用哪些工具。下面的代码将以通义千问为示例进行演示,并使用 OpenAI SDK 与其交互。为了简化这一过程,我们将借助 OpenRouter -- 一个统一的 LLM 网关,它提供了 OpenAI 兼容的接口,使我们能够通过相同的 OpenAI API 访问包括通义千问在内的多种 LLM。
OpenRouter 的使用方式非常简单。我们只需在创建 OpenAI 客户端时指定 OpenRouter 的 base_url 和 api_key,并在调用模型时以 <provider>/<model> 的格式(例如 qwen/qwen-plus)指定目标模型,OpenRouter 就会根据模型名称自动将请求路由到对应的 LLM 上。除此之外,其他代码与标准的 OpenAI SDK 保持一致。关于 OpenRouter 的使用方法可以参考这里。
接下来介绍一下 MCP 客户端的主要代码。
初始化客户端类
MCPClient 类的初始化包含以下 3 个组件:
self.session:用于存储与 MCP 服务器的会话对象,初始设为 None,将在连接服务器时被赋值。
self.exit_stack:使用 AsyncExitStack 来管理异步资源,确保所有资源(如服务器连接、会话等)在程序结束时能够正确关闭。
self.client:创建 OpenAI 异步客户端,通过 OpenRouter 来访问 LLM。这里我们:
设置 base_url 为 OpenRouter 的 API 端点。
从环境变量获取 API Key(请确保设置了 OPENROUTER_API_KEY 环境变量)。可以参考该文档创建 OpenRouter 的 API Key。
class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.client = AsyncOpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"), )
复制代码
服务器连接
connect_to_server 方法负责建立与 MCP 服务器的连接。它首先配置服务器进程的启动参数,然后通过 stdio_client 建立双向通信通道,最后创建并初始化会话。所有的资源管理都通过 AsyncExitStack 来处理,确保资源能够正确释放。连接成功后,它会打印出 MCP 服务器提供的所有可用工具。
async def connect_to_server(self, server_script_path: str): server_params = StdioServerParameters( command="python", args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) await self.session.initialize() # 列出可用工具 response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools])
复制代码
处理请求
process_query 方法中定义了处理请求的流程:
首先,将用户的查询作为初始消息发送给 LLM,同时提供 MCP 服务器上所有可用工具的描述信息。
LLM 分析用户查询,决定是直接回答还是需要调用工具。如果需要工具,它会指定要调用的工具名称和参数。
对于每个工具调用,MCP 客户端执行调用并收集结果。
将工具调用的结果返回给 LLM,让它基于这些新信息生成或更新回答。
如果 LLM 认为还需要更多信息,它会继续请求调用其他工具。这个过程会一直重复,直到 LLM 收集了足够的信息来完整回答用户的查询。
async def process_query(self, query: str) -> str: """使用 LLM 和 MCP 服务器提供的工具处理查询""" messages = [ { "role": "user", "content": query } ]
response = await self.session.list_tools() available_tools = [{ "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } } for tool in response.tools]
# 初始化 LLM API 调用 response = await self.client.chat.completions.create( model="qwen/qwen-plus", messages=messages, tools=available_tools )
final_text = [] message = response.choices[0].message final_text.append(message.content or "")
# 处理响应并处理工具调用 while message.tool_calls: # 处理每个工具调用 for tool_call in message.tool_calls: tool_name = tool_call.function.name tool_args = json.loads(tool_call.function.arguments) # 执行工具调用 result = await self.session.call_tool(tool_name, tool_args) final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用和结果添加到消息历史 messages.append({ "role": "assistant", "tool_calls": [ { "id": tool_call.id, "type": "function", "function": { "name": tool_name, "arguments": json.dumps(tool_args) } } ] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": str(result.content) })
# 将工具调用的结果交给 LLM response = await self.client.chat.completions.create( model="qwen/qwen-plus", messages=messages, tools=available_tools ) message = response.choices[0].message if message.content: final_text.append(message.content)
return "\n".join(final_text)
复制代码
交互式界面
chat_loop 方法提供了一个简单的命令行交互界面:
async def chat_loop(self): """运行交互式聊天循环""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == 'quit': break response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\nError: {str(e)}")
复制代码
程序入口
程序入口代码完成以下工作:
async def main(): if len(sys.argv) < 2: print("Usage: uv run client.py <path_to_server_script>") sys.exit(1) client = MCPClient() try: await client.connect_to_server(sys.argv[1]) await client.chat_loop() finally: await client.cleanup()
if __name__ == "__main__": asyncio.run(main())
复制代码
运行程序
使用以下命令运行 MCP 客户端:
uv run client.py <MCP 服务器的代码路径>
复制代码
启动 MCP 客户端后,我们输入问题 Elasticsearch 集群中有哪些索引?,可以看到 MCP 客户端成功调用了 MCP 服务器提供的 list_indices 从 Elasticsearch 集群中获取到了索引信息。
2025-02-03 17:13:56,518 - mcp.server.lowlevel.server - INFO - Processing request of type ListToolsRequest
Connected to server with tools: ['list_indices', 'get_index', 'write_documents']
MCP Client Started!Type your queries or 'quit' to exit.
Query: Elasticsearch 集群中有哪些索引?2025-02-03 17:14:04,537 - mcp.server.lowlevel.server - INFO - Processing request of type ListToolsRequest2025-02-03 17:14:06,185 - mcp.server.lowlevel.server - INFO - Processing request of type CallToolRequest2025-02-03 17:14:06,248 - elastic_transport.transport - INFO - GET https://localhost:9200/_cat/indices?format=json [status:200 duration:0.062s]2025-02-03 17:14:06,248 - mcp.server.lowlevel.server - INFO - Warning: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings
[Calling tool list_indices with args {}]在Elasticsearch集群中有以下索引:
1. 索引名称: student - 健康状态: green - 状态: open - UUID: iKl3j9ujSJ2i0GF5v8NUDw - 主分片数: 1 - 副本分片数: 1 - 文档数量: 3 - 删除的文档数量: 0 - 存储大小: 12kb - 主分片存储大小: 6kb
2. 索引名称: teacher - 健康状态: green - 状态: open - UUID: vfbwvH7yQGWqRGEG-t4FnA - 主分片数: 1 - 副本分片数: 1 - 文档数量: 3 - 删除的文档数量: 0 - 存储大小: 22.6kb - 主分片存储大小: 11.3kb
3. 索引名称: movies - 健康状态: green - 状态: open - UUID: JYhacHmXREWmkvwXBaFPmg - 主分片数: 1 - 副本分片数: 1 - 文档数量: 3 - 删除的文档数量: 0 - 存储大小: 13.2kb - 主分片存储大小: 6.6kb
复制代码
总结
本文介绍了如何使用 MCP Python SDK 编写一个 MCP 客户端,并集成 LLM 来实现灵活的工具调用和数据处理。通过简单的示例和进阶示例,展示了如何通过标准输入输出(stdio)方式与 MCP 服务器建立连接,并集成 LLM(如通义千问)来实现更复杂的应用场景。
参考资料
欢迎关注
评论