写点什么

构建基于 SSE 协议通信的 MCP Server 和 Client

作者:Se7en
  • 2025-02-09
    上海
  • 本文字数:3880 字

    阅读完需:约 13 分钟

在之前的系列教程中,我们编写的 MCP 服务器与 MCP 客户端是通过 **stdio(Standard Input/Output,标准输入输出)**来进行交互的。客户端通过启动服务器子进程,并利用标准输入(stdin)和标准输出(stdout)建立双向通信。这种模式导致 MCP 客户端与服务器之间存在强耦合,且每个服务器进程只能与启动它的客户端通信(1:1 的关系)。


为了解耦 MCP 客户端与服务器,本文将演示如何使用 **SSE(Server-Sent Events,服务器发送事件)**协议进行通信,使服务器能够作为一个独立运行的进程,支持多个客户端的灵活连接、使用和断开。换句话说,基于 SSE 的服务器和客户端可以是完全解耦的进程,甚至运行在不同的节点上,从而提供更高的灵活性和扩展性。


本文使用到的所有代码可以在 Github 上找到:https://github.com/cr7258/hands-on-lab/tree/main/ai/claude/mcp/sse/elasticsearch-mcp-sse


MCP 系列文章:


什么是 SSE?

Server-Sent Events(SSE,服务器发送事件)是一种基于 HTTP 协议的技术,允许服务器向客户端单向、实时地推送数据。在 SSE 模式下,客户端通过创建一个 EventSource 对象与服务器建立持久连接,服务器则通过该连接持续发送数据流,而无需客户端反复发送请求。


SSE 的主要特点包括:


  • 单向通信:服务器主动向客户端推送数据,客户端无法通过同一连接向服务器发送数据。

  • 基于 HTTP 协议:利用现有的 HTTP 协议,无需额外的协议支持,易于实现和部署。

  • 轻量级:实现简单,适用于需要实时更新的应用场景,如新闻推送、股票行情等。

  • 自动重连:客户端在连接断开时会自动尝试重新连接,确保数据传输的连续性。


需要注意的是,SSE 仅支持服务器到客户端的单向通信,若应用场景需要双向实时通信,可能需要考虑使用 WebSocket 等其他技术。

环境准备

在本教程中,我们将使用 MCP Python SDK 来编写项目,使用 uv 来管理 Python 项目依赖。


uv add "mcp[cli]" elasticsearch python-dotenv uvicorn openai
复制代码


实验用到的 Elasticsearch 集群可以通过 Github 中的 docker-compose.yaml 文件启动:


docker-compose up -d
复制代码


浏览器输入 http://localhost:5601 访问 Kibana 界面,用户名 elastic,密码 test123


.env 文件中设置好 Elasticsearch 的连接信息。


ELASTIC_HOST=https://localhost:9200ELASTIC_USERNAME=elasticELASTIC_PASSWORD=test123
复制代码

准备测试数据

在 Kibana 中打开 Management -> Dev Tools 页面, 执行以下代码创建两个索引 studentteacher,分别插入几条数据:


POST /student/_doc{  "name": "Alice",  "age": 20,  "major": "Computer Science"}
POST /student/_doc{ "name": "Bob", "age": 22, "major": "Mathematics"}
POST /student/_doc{ "name": "Carol", "age": 21, "major": "Physics"}
POST /teacher/_doc{ "name": "Tom", "subject": "English", "yearsOfExperience": 10}
POST /teacher/_doc{ "name": "John", "subject": "History", "yearsOfExperience": 7}
POST /teacher/_doc{ "name": "Lily", "subject": "Mathematics", "yearsOfExperience": 5}
复制代码

实现基于 SSE 协议的 MCP Server

MCP 服务器部分的代码与MCP Server 开发实战:无缝对接 LLM 和 Elasticsearch一文中介绍的内容相同,因此在此不再赘述,读者可参阅该文章以获取详细信息。


这里重点说明一下如何让 MCP 服务器运行在 SSE 协议之上。在本教程中,我们使用 Starlette 框架来实现 SSE 服务器,代码主要分为以下几个部分:


  1. SSE 传输对象的初始化

  2. 在函数开始处,通过创建 SseServerTransport 对象,并指定基础路径 /messages/,用于后续管理 SSE 连接和消息传递。

  3. 定义 SSE 连接处理函数

  4. handle_sse 函数:这是一个异步请求处理函数,当客户端请求建立 SSE 连接时会被调用。

  5. 连接过程

  6. 利用 sse.connect_sse 方法,传入当前请求的 scopereceive 方法和 _send 方法,建立一个异步上下文管理器。

  7. 管理器返回两个数据流:read_stream 用于读取客户端发送的数据,write_stream 用于向客户端发送数据。

  8. 启动 MCP 服务器:在成功建立连接后,调用 mcp_server.run 方法,并传入读取、写入流以及由 mcp_server.create_initialization_options() 生成的初始化参数。这一过程实现了 MCP 服务器与客户端之间的实时数据交互。

  9. Starlette 应用及路由配置

  10. Starlette 实例化:函数返回一个新的 Starlette 应用实例,其调试模式根据传入的 debug 参数设置。

  11. 路由设置

  12. 使用 Route("/sse", endpoint=handle_sse) 定义 /sse 路径,当客户端访问此路径时将触发 handle_sse 函数处理 SSE 连接。

  13. 使用 Mount("/messages/", app=sse.handle_post_message)/messages/ 路径挂载到 sse.handle_post_message 应用上,用于处理通过 POST 请求发送的消息,实现与 SSE 长连接的消息传递功能。


def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:    """Create a Starlette application that can server the provied mcp server with SSE."""    sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None: async with sse.connect_sse( request.scope, request.receive, request._send, ) as (read_stream, write_stream): await mcp_server.run( read_stream, write_stream, mcp_server.create_initialization_options(), )
return Starlette( debug=debug, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], )
复制代码


接下来就是创建 MCP 服务器实例,然后通过上面定义的 create_starlette_app 方法创建 Starlette 应用,最后使用 uvicorn 启动 ASGI 服务器,实现实时的 SSE 数据传输。


if __name__ == "__main__":    mcp_server = mcp._mcp_server
parser = argparse.ArgumentParser(description='Run MCP SSE-based server') parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') parser.add_argument('--port', type=int, default=18080, help='Port to listen on') args = parser.parse_args()
# Bind SSE request handling to MCP server starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=args.host, port=args.port)
复制代码

启动 MCP 服务器

执行以下命令可以启动 MCP 服务器,默认监听在 18080 端口。


uv run server.py
# 输出INFO: Started server process [82035]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://0.0.0.0:18080 (Press CTRL+C to quit)
复制代码

实现基于 SSE 协议的 MCP Client

MCP 客户端部分的代码可以参考快速上手:实现你的第一个 MCP Client一文,这里不再重复说明。


唯一的区别就是在连接 MCP 服务器的时候通过 SSE 协议进行交互。以下是具体的代码:


async def connect_to_sse_server(self, server_url: str):    """Connect to an MCP server running with SSE transport"""    # 创建 SSE 客户端连接上下文管理器    self._streams_context = sse_client(url=server_url)    # 异步初始化 SSE 连接,获取数据流对象    streams = await self._streams_context.__aenter__()
# 使用数据流创建 MCP 客户端会话上下文 self._session_context = ClientSession(*streams) # 初始化客户端会话对象 self.session: ClientSession = await self._session_context.__aenter__()
# 执行 MCP 协议初始化握手 await self.session.initialize()
复制代码

启动 MCP 客户端

执行以下命令启动 MCP 客户端并连接 MCP 服务器。


uv run client.py http://localhost:18080/sse
复制代码


启动 MCP 客户端后,我们输入问题 Elasticsearch 集群中有哪些索引?,可以看到 MCP 客户端成功调用了 MCP 服务器提供的 list_indices 从 Elasticsearch 集群中获取到了索引信息。


Initialized SSE client...Listing tools...
Connected to server with tools: ['list_indices', 'get_index']
MCP Client Started!Type your queries or 'quit' to exit.
Query: 集群中有哪些索引?

[Calling tool list_indices with args {}]在集群中,有以下索引:
1. 索引名称: student - 健康状况: green - 状态: open - UUID: gPUyqTHZQ12rSTEYX-Ho3w - 主分片数: 1 - 副本分片数: 1 - 文档数量: 3 - 已删除的文档数量: 0 - 存储大小: 12kb - 主分片的存储大小: 6kb
2. 索引名称: teacher - 健康状况: green - 状态: open - UUID: ygkfkvjJSMaB946myzanNg - 主分片数: 1 - 副本分片数: 1 - 文档数量: 3 - 已删除的文档数量: 0 - 存储大小: 22.6kb - 主分片的存储大小: 6kb
复制代码

总结

本文介绍了如何利用 Server-Sent Events(SSE)协议,实现 MCP 服务器与客户端之间的解耦通信。通过采用 SSE,服务器能够作为独立进程运行,支持多个客户端的灵活连接与断开,从而提升系统的灵活性和可扩展性。

欢迎关注


发布于: 刚刚阅读数: 3
用户头像

Se7en

关注

还未添加个人签名 2020-01-10 加入

喜欢钻研技术,熟悉 Elasticsearch、Kubernetes 。

评论

发布
暂无评论
构建基于 SSE 协议通信的 MCP Server 和 Client_Se7en_InfoQ写作社区