在 MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch 一文中,我们详细介绍了如何利用 MCP Python SDK 编写一个 Elasticsearch MCP 服务器,并通过 Claude Desktop 作为 MCP 客户端进行交互。本文将进一步介绍如何使用 MCP Python SDK 编写一个 MCP 客户端,以便更加灵活地与 MCP 服务器进行通信和集成。本文的完整代码可以在 Github 上找到:https://github.com/cr7258/hands-on-lab/tree/main/ai/claude/mcp/client/elasticsearch-mcp-client-example
MCP 系列文章:
MCP 客户端的作用
MCP 客户端充当 LLM 和 MCP 服务器之间的桥梁,MCP 客户端的工作流程如下:
MCP 客户端首先从 MCP 服务器获取可用的工具列表。
将用户的查询连同工具描述通过 function calling 一起发送给 LLM。
LLM 决定是否需要使用工具以及使用哪些工具。
如果需要使用工具,MCP 客户端会通过 MCP 服务器执行相应的工具调用。
工具调用的结果会被发送回 LLM。
LLM 基于所有信息生成自然语言响应。
最后将响应展示给用户。
MCP 通信方式
MCP 支持两种通信方式:
标准输入输出(Standard Input/Output, stdio):客户端通过启动服务器子进程并使用标准输入(stdin)和标准输出(stdout)建立双向通信,一个服务器进程只能与启动它的客户端通信(1:1 关系)。stdio 适用于本地快速集成的场景,在本文中,我们将使用这种方式来编写 MCP 客户端。
服务器发送事件(Server-Sent Events, SSE):服务器作为独立进程运行,客户端和服务器代码完全解耦,支持多个客户端随时连接和断开。这种方式将在后续的系列文章中单独进行介绍。
简单的示例
在开始构建复杂的应用之前,让我们先创建一个最简单的 MCP 客户端。这个基础示例将展示如何连接到 MCP 服务器并使用其提供的工具。
MCP 服务器的代码以及 Elasticsearch 集群的配置文件请参考 MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch 一文。
添加依赖
在本教程中,我们将使用 MCP Python SDK 来编写项目,使用 uv 来管理 Python 项目依赖。需要添加如下依赖:
uv add mcp elasticsearch openai
复制代码
设置服务器连接参数
在使用 stdio
方式进行通信时,MCP 服务器的进程由 MCP 客户端程序负责启动。因此,我们通过 StdioServerParameters
来配置服务器进程的启动参数,包括运行 MCP 服务器的命令及其对应的参数。其中,sys.argv[1]
代表客户端程序运行时传入的第一个参数,用于指定服务器脚本的路径,从而确保 MCP 客户端能够正确启动并连接到 MCP 服务器。
server_params = StdioServerParameters(
command="python", # 运行命令
args=[sys.argv[1]], # 服务器脚本路径
env=None # 可选的环境变量
)
复制代码
建立服务器连接
stdio_client
负责启动服务器进程并建立双向通信通道,它返回用于读写数据的流对象。ClientSession
则在这些流的基础上提供高层的会话管理,包括初始化连接、维护会话状态等。
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
复制代码
调用工具
在 MCP 客户端中,我们使用了以下两个函数来与 MCP 服务器进行交互。
list_tools()
:获取 MCP 服务器提供的所有可用工具。
call_tool(name, args)
:调用指定的工具并获取结果,这里调用 list_indices
来获取 Elasticsearch 集群中的索引信息。
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def run():
# 建立连接
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 初始化连接
await session.initialize()
# 列出可用工具
tools = await session.list_tools()
print("Tools:", tools)
# 调用工具
indices = await session.call_tool("list_indices")
print("Indices:", indices)
if __name__ == "__main__":
import asyncio
asyncio.run(run())
复制代码
运行程序
使用以下命令运行 MCP 客户端:
uv run simple.py <MCP 服务器的代码路径>
复制代码
程序首先会列出所有可用的工具,可以看到总共有 3 个工具:list_indices
、get_index
和 write_documents
。
Tools:
meta=None
nextCursor=None
tools=[
Tool(
name='list_indices',
description='List all Elasticsearch indices',
inputSchema={
'properties': {},
'title': 'list_indicesArguments',
'type': 'object'
}
),
Tool(
name='get_index',
description='Get detailed information about a specific Elasticsearch index',
inputSchema={
'properties': {
'index': {
'title': 'Index',
'type': 'string'
}
},
'required': ['index'],
'title': 'get_indexArguments',
'type': 'object'
}
),
Tool(
name='write_documents',
description='Write multiple documents to an Elasticsearch index using bulk API',
inputSchema={
'properties': {
'index': {
'title': 'Index',
'type': 'string'
},
'documents': {
'items': {
'type': 'object'
},
'title': 'Documents',
'type': 'array'
}
},
'required': ['index', 'documents'],
'title': 'write_documentsArguments',
'type': 'object'
}
)
]
复制代码
然后,程序会调用 list_indices
工具,获取 Elasticsearch 集群中的索引信息,并打印出来。
Indices:
meta=None
content=[
TextContent(
type='text',
text="[
{'health': 'green', 'status': 'open', 'index': 'student', 'uuid': 'iKl3j9ujSJ2i0GF5v8NUDw',
'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '12kb', 'pri.store.size': '6kb'},
{'health': 'green', 'status': 'open', 'index': 'teacher', 'uuid': 'vfbwvH7yQGWqRGEG-t4FnA',
'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '22.6kb', 'pri.store.size': '11.3kb'},
{'health': 'green', 'status': 'open', 'index': 'movies', 'uuid': 'JYhacHmXREWmkvwXBaFPmg',
'pri': '1', 'rep': '1', 'docs.count': '3', 'docs.deleted': '0', 'store.size': '13.2kb', 'pri.store.size': '6.6kb'}
]"
)
]
isError=False
复制代码
进阶示例:集成 LLM
在实际应用中,我们通常希望让 LLM(如 OpenAI、Claude、通义千问等)自主决定调用哪些工具。下面的代码将以通义千问为示例进行演示,并使用 OpenAI SDK 与其交互。为了简化这一过程,我们将借助 OpenRouter -- 一个统一的 LLM 网关,它提供了 OpenAI 兼容的接口,使我们能够通过相同的 OpenAI API 访问包括通义千问在内的多种 LLM。
OpenRouter 的使用方式非常简单。我们只需在创建 OpenAI 客户端时指定 OpenRouter 的 base_url
和 api_key
,并在调用模型时以 <provider>/<model>
的格式(例如 qwen/qwen-plus
)指定目标模型,OpenRouter 就会根据模型名称自动将请求路由到对应的 LLM 上。除此之外,其他代码与标准的 OpenAI SDK 保持一致。关于 OpenRouter 的使用方法可以参考这里。
接下来介绍一下 MCP 客户端的主要代码。
初始化客户端类
MCPClient
类的初始化包含以下 3 个组件:
self.session
:用于存储与 MCP 服务器的会话对象,初始设为 None
,将在连接服务器时被赋值。
self.exit_stack
:使用 AsyncExitStack
来管理异步资源,确保所有资源(如服务器连接、会话等)在程序结束时能够正确关闭。
self.client
:创建 OpenAI 异步客户端,通过 OpenRouter 来访问 LLM。这里我们:
设置 base_url
为 OpenRouter 的 API 端点。
从环境变量获取 API Key(请确保设置了 OPENROUTER_API_KEY
环境变量)。可以参考该文档创建 OpenRouter 的 API Key。
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.getenv("OPENROUTER_API_KEY"),
)
复制代码
服务器连接
connect_to_server
方法负责建立与 MCP 服务器的连接。它首先配置服务器进程的启动参数,然后通过 stdio_client
建立双向通信通道,最后创建并初始化会话。所有的资源管理都通过 AsyncExitStack
来处理,确保资源能够正确释放。连接成功后,它会打印出 MCP 服务器提供的所有可用工具。
async def connect_to_server(self, server_script_path: str):
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# 列出可用工具
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
复制代码
处理请求
process_query
方法中定义了处理请求的流程:
首先,将用户的查询作为初始消息发送给 LLM,同时提供 MCP 服务器上所有可用工具的描述信息。
LLM 分析用户查询,决定是直接回答还是需要调用工具。如果需要工具,它会指定要调用的工具名称和参数。
对于每个工具调用,MCP 客户端执行调用并收集结果。
将工具调用的结果返回给 LLM,让它基于这些新信息生成或更新回答。
如果 LLM 认为还需要更多信息,它会继续请求调用其他工具。这个过程会一直重复,直到 LLM 收集了足够的信息来完整回答用户的查询。
async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
# 初始化 LLM API 调用
response = await self.client.chat.completions.create(
model="qwen/qwen-plus",
messages=messages,
tools=available_tools
)
final_text = []
message = response.choices[0].message
final_text.append(message.content or "")
# 处理响应并处理工具调用
while message.tool_calls:
# 处理每个工具调用
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用和结果添加到消息历史
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(tool_args)
}
}
]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})
# 将工具调用的结果交给 LLM
response = await self.client.chat.completions.create(
model="qwen/qwen-plus",
messages=messages,
tools=available_tools
)
message = response.choices[0].message
if message.content:
final_text.append(message.content)
return "\n".join(final_text)
复制代码
交互式界面
chat_loop
方法提供了一个简单的命令行交互界面:
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
复制代码
程序入口
程序入口代码完成以下工作:
async def main():
if len(sys.argv) < 2:
print("Usage: uv run client.py <path_to_server_script>")
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
复制代码
运行程序
使用以下命令运行 MCP 客户端:
uv run client.py <MCP 服务器的代码路径>
复制代码
启动 MCP 客户端后,我们输入问题 Elasticsearch 集群中有哪些索引?
,可以看到 MCP 客户端成功调用了 MCP 服务器提供的 list_indices
从 Elasticsearch 集群中获取到了索引信息。
2025-02-03 17:13:56,518 - mcp.server.lowlevel.server - INFO - Processing request of type ListToolsRequest
Connected to server with tools: ['list_indices', 'get_index', 'write_documents']
MCP Client Started!
Type your queries or 'quit' to exit.
Query: Elasticsearch 集群中有哪些索引?
2025-02-03 17:14:04,537 - mcp.server.lowlevel.server - INFO - Processing request of type ListToolsRequest
2025-02-03 17:14:06,185 - mcp.server.lowlevel.server - INFO - Processing request of type CallToolRequest
2025-02-03 17:14:06,248 - elastic_transport.transport - INFO - GET https://localhost:9200/_cat/indices?format=json [status:200 duration:0.062s]
2025-02-03 17:14:06,248 - mcp.server.lowlevel.server - INFO - Warning: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings
[Calling tool list_indices with args {}]
在Elasticsearch集群中有以下索引:
1. 索引名称: student
- 健康状态: green
- 状态: open
- UUID: iKl3j9ujSJ2i0GF5v8NUDw
- 主分片数: 1
- 副本分片数: 1
- 文档数量: 3
- 删除的文档数量: 0
- 存储大小: 12kb
- 主分片存储大小: 6kb
2. 索引名称: teacher
- 健康状态: green
- 状态: open
- UUID: vfbwvH7yQGWqRGEG-t4FnA
- 主分片数: 1
- 副本分片数: 1
- 文档数量: 3
- 删除的文档数量: 0
- 存储大小: 22.6kb
- 主分片存储大小: 11.3kb
3. 索引名称: movies
- 健康状态: green
- 状态: open
- UUID: JYhacHmXREWmkvwXBaFPmg
- 主分片数: 1
- 副本分片数: 1
- 文档数量: 3
- 删除的文档数量: 0
- 存储大小: 13.2kb
- 主分片存储大小: 6.6kb
复制代码
总结
本文介绍了如何使用 MCP Python SDK 编写一个 MCP 客户端,并集成 LLM 来实现灵活的工具调用和数据处理。通过简单的示例和进阶示例,展示了如何通过标准输入输出(stdio)方式与 MCP 服务器建立连接,并集成 LLM(如通义千问)来实现更复杂的应用场景。
参考资料
欢迎关注
评论