写点什么

快速上手:实现你的第一个 MCP Client

作者:Se7en
  • 2025-02-03
    福建
  • 本文字数:7328 字

    阅读完需:约 24 分钟

快速上手:实现你的第一个 MCP Client

MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch 一文中,我们详细介绍了如何利用 MCP Python SDK 编写一个 Elasticsearch MCP 服务器,并通过 Claude Desktop 作为 MCP 客户端进行交互。本文将进一步介绍如何使用 MCP Python SDK 编写一个 MCP 客户端,以便更加灵活地与 MCP 服务器进行通信和集成。本文的完整代码可以在 Github 上找到:https://github.com/cr7258/hands-on-lab/tree/main/ai/claude/mcp/client/elasticsearch-mcp-client-example


MCP 系列文章:


MCP 客户端的作用

MCP 客户端充当 LLM 和 MCP 服务器之间的桥梁,MCP 客户端的工作流程如下:


  • MCP 客户端首先从 MCP 服务器获取可用的工具列表。

  • 将用户的查询连同工具描述通过 function calling 一起发送给 LLM。

  • LLM 决定是否需要使用工具以及使用哪些工具。

  • 如果需要使用工具,MCP 客户端会通过 MCP 服务器执行相应的工具调用。

  • 工具调用的结果会被发送回 LLM。

  • LLM 基于所有信息生成自然语言响应。

  • 最后将响应展示给用户。

MCP 通信方式

MCP 支持两种通信方式:


  • 标准输入输出(Standard Input/Output, stdio):客户端通过启动服务器子进程并使用标准输入(stdin)和标准输出(stdout)建立双向通信,一个服务器进程只能与启动它的客户端通信(1:1 关系)。stdio 适用于本地快速集成的场景,在本文中,我们将使用这种方式来编写 MCP 客户端。

  • 服务器发送事件(Server-Sent Events, SSE):服务器作为独立进程运行,客户端和服务器代码完全解耦,支持多个客户端随时连接和断开。这种方式将在后续的系列文章中单独进行介绍。

简单的示例

在开始构建复杂的应用之前,让我们先创建一个最简单的 MCP 客户端。这个基础示例将展示如何连接到 MCP 服务器并使用其提供的工具。


MCP 服务器的代码以及 Elasticsearch 集群的配置文件请参考 MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch 一文。

添加依赖

在本教程中,我们将使用 MCP Python SDK 来编写项目,使用 uv 来管理 Python 项目依赖。需要添加如下依赖:


uv add mcp elasticsearch openai
复制代码

设置服务器连接参数

在使用 stdio 方式进行通信时,MCP 服务器的进程由 MCP 客户端程序负责启动。因此,我们通过 StdioServerParameters 来配置服务器进程的启动参数,包括运行 MCP 服务器的命令及其对应的参数。其中,sys.argv[1] 代表客户端程序运行时传入的第一个参数,用于指定服务器脚本的路径,从而确保 MCP 客户端能够正确启动并连接到 MCP 服务器。


server_params = StdioServerParameters(    command="python",           # 运行命令    args=[sys.argv[1]],        # 服务器脚本路径    env=None                   # 可选的环境变量)
复制代码

建立服务器连接

stdio_client 负责启动服务器进程并建立双向通信通道,它返回用于读写数据的流对象。ClientSession 则在这些流的基础上提供高层的会话管理,包括初始化连接、维护会话状态等。


async with stdio_client(server_params) as (read, write):    async with ClientSession(read, write) as session:         await session.initialize()
复制代码

调用工具

在 MCP 客户端中,我们使用了以下两个函数来与 MCP 服务器进行交互。


  • list_tools():获取 MCP 服务器提供的所有可用工具。

  • call_tool(name, args):调用指定的工具并获取结果,这里调用 list_indices 来获取 Elasticsearch 集群中的索引信息。


from mcp import ClientSession, StdioServerParametersfrom mcp.client.stdio import stdio_client
async def run(): # 建立连接 async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # 初始化连接 await session.initialize()
# 列出可用工具 tools = await session.list_tools() print("Tools:", tools)
# 调用工具 indices = await session.call_tool("list_indices") print("Indices:", indices)
if __name__ == "__main__": import asyncio asyncio.run(run())
复制代码

运行程序

使用以下命令运行 MCP 客户端:


uv run simple.py <MCP 服务器的代码路径>
复制代码


程序首先会列出所有可用的工具,可以看到总共有 3 个工具:list_indicesget_indexwrite_documents


Tools: meta=None nextCursor=None tools=[    Tool(        name='list_indices',         description='List all Elasticsearch indices',         inputSchema={            'properties': {},             'title': 'list_indicesArguments',             'type': 'object'        }    ),     Tool(        name='get_index',         description='Get detailed information about a specific Elasticsearch index',         inputSchema={            'properties': {                'index': {                    'title': 'Index',                     'type': 'string'                }            },             'required': ['index'],             'title': 'get_indexArguments',             'type': 'object'        }    ),     Tool(        name='write_documents',         description='Write multiple documents to an Elasticsearch index using bulk API',        inputSchema={            'properties': {                'index': {                    'title': 'Index',                     'type': 'string'                },                 'documents': {                    'items': {                        'type': 'object'                    },                     'title': 'Documents',                     'type': 'array'                }            },             'required': ['index', 'documents'],             'title': 'write_documentsArguments',             'type': 'object'        }    )]
复制代码


然后,程序会调用 list_indices 工具,获取 Elasticsearch 集群中的索引信息,并打印出来。


Indices: meta=None content=[    TextContent(        type='text',         text="[            {'health': 'green', 'status': 'open', 'index': 'student', 'uuid': 'iKl3j9ujSJ2i0GF5v8NUDw',              'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '12kb', 'pri.store.size': '6kb'}, 
{'health': 'green', 'status': 'open', 'index': 'teacher', 'uuid': 'vfbwvH7yQGWqRGEG-t4FnA', 'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '22.6kb', 'pri.store.size': '11.3kb'}, {'health': 'green', 'status': 'open', 'index': 'movies', 'uuid': 'JYhacHmXREWmkvwXBaFPmg', 'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '13.2kb', 'pri.store.size': '6.6kb'} ]" )]isError=False
复制代码

进阶示例:集成 LLM

在实际应用中,我们通常希望让 LLM(如 OpenAI、Claude、通义千问等)自主决定调用哪些工具。下面的代码将以通义千问为示例进行演示,并使用 OpenAI SDK 与其交互。为了简化这一过程,我们将借助 OpenRouter -- 一个统一的 LLM 网关,它提供了 OpenAI 兼容的接口,使我们能够通过相同的 OpenAI API 访问包括通义千问在内的多种 LLM。


OpenRouter 的使用方式非常简单。我们只需在创建 OpenAI 客户端时指定 OpenRouter 的 base_urlapi_key,并在调用模型时以 <provider>/<model> 的格式(例如 qwen/qwen-plus)指定目标模型,OpenRouter 就会根据模型名称自动将请求路由到对应的 LLM 上。除此之外,其他代码与标准的 OpenAI SDK 保持一致。关于 OpenRouter 的使用方法可以参考这里


接下来介绍一下 MCP 客户端的主要代码。

初始化客户端类

MCPClient 类的初始化包含以下 3 个组件:


  • self.session:用于存储与 MCP 服务器的会话对象,初始设为 None,将在连接服务器时被赋值。

  • self.exit_stack:使用 AsyncExitStack 来管理异步资源,确保所有资源(如服务器连接、会话等)在程序结束时能够正确关闭。

  • self.client:创建 OpenAI 异步客户端,通过 OpenRouter 来访问 LLM。这里我们:

  • 设置 base_url 为 OpenRouter 的 API 端点。

  • 从环境变量获取 API Key(请确保设置了 OPENROUTER_API_KEY 环境变量)。可以参考该文档创建 OpenRouter 的 API Key。


class MCPClient:    def __init__(self):        self.session: Optional[ClientSession] = None        self.exit_stack = AsyncExitStack()        self.client = AsyncOpenAI(            base_url="https://openrouter.ai/api/v1",            api_key=os.getenv("OPENROUTER_API_KEY"),        )
复制代码

服务器连接

connect_to_server 方法负责建立与 MCP 服务器的连接。它首先配置服务器进程的启动参数,然后通过 stdio_client 建立双向通信通道,最后创建并初始化会话。所有的资源管理都通过 AsyncExitStack 来处理,确保资源能够正确释放。连接成功后,它会打印出 MCP 服务器提供的所有可用工具。


async def connect_to_server(self, server_script_path: str):    server_params = StdioServerParameters(        command="python",        args=[server_script_path],        env=None    )        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))    self.stdio, self.write = stdio_transport    self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))        await self.session.initialize()        # 列出可用工具    response = await self.session.list_tools()    tools = response.tools    print("\nConnected to server with tools:", [tool.name for tool in tools])
复制代码

处理请求

process_query 方法中定义了处理请求的流程:


  • 首先,将用户的查询作为初始消息发送给 LLM,同时提供 MCP 服务器上所有可用工具的描述信息。

  • LLM 分析用户查询,决定是直接回答还是需要调用工具。如果需要工具,它会指定要调用的工具名称和参数。

  • 对于每个工具调用,MCP 客户端执行调用并收集结果。

  • 将工具调用的结果返回给 LLM,让它基于这些新信息生成或更新回答。

  • 如果 LLM 认为还需要更多信息,它会继续请求调用其他工具。这个过程会一直重复,直到 LLM 收集了足够的信息来完整回答用户的查询。


async def process_query(self, query: str) -> str:    """使用 LLM 和 MCP 服务器提供的工具处理查询"""    messages = [        {            "role": "user",            "content": query        }    ]
response = await self.session.list_tools() available_tools = [{ "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } } for tool in response.tools]
# 初始化 LLM API 调用 response = await self.client.chat.completions.create( model="qwen/qwen-plus", messages=messages, tools=available_tools )
final_text = [] message = response.choices[0].message final_text.append(message.content or "")
# 处理响应并处理工具调用 while message.tool_calls: # 处理每个工具调用 for tool_call in message.tool_calls: tool_name = tool_call.function.name tool_args = json.loads(tool_call.function.arguments) # 执行工具调用 result = await self.session.call_tool(tool_name, tool_args) final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用和结果添加到消息历史 messages.append({ "role": "assistant", "tool_calls": [ { "id": tool_call.id, "type": "function", "function": { "name": tool_name, "arguments": json.dumps(tool_args) } } ] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": str(result.content) })
# 将工具调用的结果交给 LLM response = await self.client.chat.completions.create( model="qwen/qwen-plus", messages=messages, tools=available_tools ) message = response.choices[0].message if message.content: final_text.append(message.content)
return "\n".join(final_text)
复制代码

交互式界面

chat_loop 方法提供了一个简单的命令行交互界面:


  • 在一个循环中持续接收用户输入,并通过前面定义地 process_query 方法处理查询。

  • 将处理结果格式化后显示给用户。

  • 通过异常处理机制优雅地处理可能出现的错误。

  • 用户输入 quit 时安全退出程序。


async def chat_loop(self):    """运行交互式聊天循环"""    print("\nMCP Client Started!")    print("Type your queries or 'quit' to exit.")        while True:        try:            query = input("\nQuery: ").strip()            if query.lower() == 'quit':                break            response = await self.process_query(query)            print("\n" + response)        except Exception as e:            print(f"\nError: {str(e)}")
复制代码

程序入口

程序入口代码完成以下工作:


  • 检查命令行参数,确保用户提供了服务器脚本的路径。

  • 创建 MCPClient 实例并连接到服务器。

  • 启动交互式聊天循环。

  • 使用 try-finally 结构确保在程序退出时正确清理资源。

  • 通过 asyncio.run() 运行异步主函数。


async def main():    if len(sys.argv) < 2:        print("Usage: uv run client.py <path_to_server_script>")        sys.exit(1)            client = MCPClient()    try:        await client.connect_to_server(sys.argv[1])        await client.chat_loop()    finally:        await client.cleanup()
if __name__ == "__main__": asyncio.run(main())
复制代码

运行程序

使用以下命令运行 MCP 客户端:


uv run client.py <MCP 服务器的代码路径>
复制代码


启动 MCP 客户端后,我们输入问题 Elasticsearch 集群中有哪些索引?,可以看到 MCP 客户端成功调用了 MCP 服务器提供的 list_indices 从 Elasticsearch 集群中获取到了索引信息。


2025-02-03 17:13:56,518 - mcp.server.lowlevel.server - INFO - Processing request of type ListToolsRequest
Connected to server with tools: ['list_indices', 'get_index', 'write_documents']
MCP Client Started!Type your queries or 'quit' to exit.
Query: Elasticsearch 集群中有哪些索引?2025-02-03 17:14:04,537 - mcp.server.lowlevel.server - INFO - Processing request of type ListToolsRequest2025-02-03 17:14:06,185 - mcp.server.lowlevel.server - INFO - Processing request of type CallToolRequest2025-02-03 17:14:06,248 - elastic_transport.transport - INFO - GET https://localhost:9200/_cat/indices?format=json [status:200 duration:0.062s]2025-02-03 17:14:06,248 - mcp.server.lowlevel.server - INFO - Warning: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings

[Calling tool list_indices with args {}]在Elasticsearch集群中有以下索引:
1. 索引名称: student - 健康状态: green - 状态: open - UUID: iKl3j9ujSJ2i0GF5v8NUDw - 主分片数: 1 - 副本分片数: 1 - 文档数量: 3 - 删除的文档数量: 0 - 存储大小: 12kb - 主分片存储大小: 6kb
2. 索引名称: teacher - 健康状态: green - 状态: open - UUID: vfbwvH7yQGWqRGEG-t4FnA - 主分片数: 1 - 副本分片数: 1 - 文档数量: 3 - 删除的文档数量: 0 - 存储大小: 22.6kb - 主分片存储大小: 11.3kb
3. 索引名称: movies - 健康状态: green - 状态: open - UUID: JYhacHmXREWmkvwXBaFPmg - 主分片数: 1 - 副本分片数: 1 - 文档数量: 3 - 删除的文档数量: 0 - 存储大小: 13.2kb - 主分片存储大小: 6.6kb
复制代码

总结

本文介绍了如何使用 MCP Python SDK 编写一个 MCP 客户端,并集成 LLM 来实现灵活的工具调用和数据处理。通过简单的示例和进阶示例,展示了如何通过标准输入输出(stdio)方式与 MCP 服务器建立连接,并集成 LLM(如通义千问)来实现更复杂的应用场景。

参考资料

  • For Client Developers:https://modelcontextprotocol.io/quickstart/client

  • https://github.com/modelcontextprotocol/python-sdk:https://github.com/modelcontextprotocol/python-sdk

欢迎关注


发布于: 刚刚阅读数: 4
用户头像

Se7en

关注

还未添加个人签名 2020-01-10 加入

喜欢钻研技术,熟悉 Elasticsearch、Kubernetes 。

评论

发布
暂无评论
快速上手:实现你的第一个 MCP Client_Se7en_InfoQ写作社区