Server-Sent Events(SSE)协议原理与实践
这些年,语言类大模型相关的应用成为了非常热门的提效工具。各行各业都可以通过定制化的 AI 工具来提高工作效率。在这类应用的客户端中,由于大模型的运算效率和输出格式的原因,你会发现几乎所有的实时交流都是流式输出的,一个体验良好的客户端会实时接收数据流,进行 MarkDown 格式的内容渲染。本篇文章将主要讨论实现这种流式数据接收的一种方法,使用 SSE 协议:Server-Sent Events,并以 JavaScript 和 Swift 语言分别来对 Web 端和 iOS 端做实践。如果你正需要一种类似语言大模型交流方式的交互体验,希望本篇文章可以为你带来启发。
一 认识 SSE 协议
1 简介
SSE 协议全称为 Server-Sent Events,从名称也可以得知,这是一种服务端向客户端发送事件消息的协议。我们知道,通常在服务端和客户端交互的的 HTTP 请求中,请求会在一次收发数据后结束掉(下载除外),客户端首先发起请求,将数据发送到服务端,服务端根据业务逻辑将数据返回给客户端,则这次请求就完成了。但有时候,我们需要客户端和服务端进行多轮的有状态的通信(每次 HTTP 请求本质都是无状态的),这当然也不复杂,通过建立 Socket 长连接,在同一次连接生命周期中,客户端不仅可以多次随时的向发服务端发送数据,服务端也能够主动的多次随时向客户端推送信息。我们平时使用的 IM 即是通信软件,通常就是使用长链接来实现的。
回到本篇文章讨论的主题,我们的核心需求是客户端一次请求,服务端可以多次向客户端推送消息。这当然使用长连接也可以实现,但 Socket 全双工的机制用在这里会有些浪费,也会增加复杂性和使用成本。SSE 协议则是专门针对这种需求场景所产生的。
总体来说,SSE 与 Socket 类似,都是在客户端和服务端之间建立持久的通信通道。不同的是,SSE 更加轻量,并且不是全双工的,它是一个单向的通道,SSE 的数据流接收过程更类似于下载。我们可以通过以下几点来特性来理解 SSE 协议:
1 SSE 使用的是 HTTP 协议,天然的能在大多数互联网应用中直接使用。
2 SSE 非常轻量,更加面向应用层,比 Socket 使用简单。
3 天然支持断线重连,减少开发成本。
4 通常用来传输文本数据,客户端可以多次接收。
5 丰富的自定义能力。
2 协议细节
SSE 协议本质上依然是一个 HTTP 请求,要使用 SSE,客户端接收到用户端的请求时,需要将 Response Header 中的 Content-Type 字段设置为:text/event-steam,状态码信息则正常返回 200 OK 即可,如下:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
复制代码
text/event-stream 是标记返回数据为流式数据,SSE 理论上需要服务端多次推送消息到客户端,需要保持连接为 keep-alive 状态。
服务端在向客户端发送数据时,每次可以发送多个消息,每个消息间使用\n\n 分隔,每个消息可以有多行组成,行之间使用\n 分隔。
每行的格式如下:
其中“字段”一项是可选的,如果没有字段,以冒号开头,则表示此行为注释。
SSE 协议中约定的字段包括以下 4 种:
下面我们来具体介绍这几种字段以及其值的意义。
data 字段标识此部分信息为数据内容,数据内容如果很长,可以分为多行,例如:
data: conent1\n
data: conent1e1_end\n\n
复制代码
也可以将一个长的 JSON 拆分成多行发送:
data: {\n
data: "a": "a",\n
data: "b", 1\n
data: }\n\n
复制代码
id 字段通常标识数据的编号,便于实现断线重连等逻辑。
event 字段用来标识时间的额类型,例如当服务端数据推送完成后,通常会发送一个特殊的 event 事件表示数据全部发送完,之后断开连接。
retry 字段用来配置一个数值,指定客户端重新发起连接的间隔。
二 实践-基于 Node.js 的服务端 SSE 实践
使用 Node.js 平台来编写一个 SSE 协议的测试程序非常简单,实例代码如下:
var http = require("http");
http.createServer(function (req, res) {
var url = req.url;
if (url === "/stream") {
var count = 0
res.writeHead(200, {
"Content-Type":"text/event-stream",
"Cache-Control":"no-cache",
"Connection":"keep-alive",
"Access-Control-Allow-Origin": '*',
});
// 设置重连间隔
res.write("retry: 10000\n");
// 发起开始事件
res.write("event: start\n");
res.write("data: " + (new Date()) + "\n\n");
// 发送数据
res.write("data: " + (new Date()) + " count:" + (count++) + "\n\n");
// 每秒发送一次数据
interval = setInterval(function () {
if (count == 20) {
// 发送结束事件
res.write("event: end\n");
res.write("data: " + (new Date()) + "\n\n");
res.end();
clearInterval(interval);
} else {
res.write("data: " + (new Date()) + " count:" + (count++) + "\n\n");
}
}, 1000);
req.connection.addListener("close", function () {
clearInterval(interval);
}, false);
}
}).listen(8844, "127.0.0.1");
复制代码
使用 node 执行上面的程序,可以在浏览器中输入如下地址进行测试:
http://localhost:8844/stream
复制代码
SSE 是一个 GET 请求(其实使用 POST 也没有问题)。服务端代码的逻辑是会每秒输出测试数据,直到输出 20 条后结束,并关闭连接。页面展示效果如下:
三 实践-SSE 客户端实践
1 JavaScript 客户端
实现了 SSE 协议的浏览器中会提供一个名为 EventSource 的对象,此对象对 SSE 协议的数据交互提供了支持。
<html>
<script>
var source = new EventSource("http://localhost.charlesproxy.com:8844/stream");
source.onopen = function (event) {
// 连接建立后会回调
console.log(event)
};
source.onmessage = function (event) {
// 接收到数据回调
var data = event.data;
console.log(data)
};
source.onerror = function (event) {
// 连接错误回调
console.log(event)
};
</script>
</html>
复制代码
运行客户端代码,从打印信息可以看到,start 和 end 事件并没有监听到,这是因为这两个事件时我们自定义的事件,要监听自定义事件,方法如下:
// 监听自定义事件
source.addEventListener('start', function (event) {
var data = event.data;
console.log(event)
}, false);
source.addEventListener('end', function (event) {
var data = event.data;
console.log(event)
}, false);
复制代码
2 Swift 客户端
Swift 客户端可以直接使用原生提供的 URLSession 来实现,示例代码如下:
import UIKit
class ViewController: UIViewController, URLSessionDataDelegate {
private var url: URL!
private var task: URLSessionDataTask!
private var session: URLSession!
private var receivedData = Data()
override func viewDidLoad() {
super.viewDidLoad()
self.url = URL(string: "http://localhost.charlesproxy.com:8844/stream")
let configuration = URLSessionConfiguration.default
self.session = URLSession(configuration: configuration, delegate: self, delegateQueue: OperationQueue.main)
startListening()
}
func startListening() {
let request = URLRequest(url: url)
task = session.dataTask(with: request)
task.resume()
}
private func parseSSE(data: Data) {
guard let eventsString = String(data: data, encoding: .utf8) else {
print("Unable to decode data")
return
}
let events = eventsString.split(separator: "\n\n")
for event in events {
let eventLines = event.split(separator: "\n")
var eventData: [String: String] = [:]
for line in eventLines {
let keyValue = line.split(separator: ":", maxSplits: 1)
if keyValue.count == 2 {
let key = String(keyValue[0]).trimmingCharacters(in: .whitespaces)
let value = String(keyValue[1]).trimmingCharacters(in: .whitespaces)
eventData[key] = value
}
}
handleEvent(eventData: eventData)
}
}
private func handleEvent(eventData: [String: String]) {
if let eventType = eventData["event"], let data = eventData["data"] {
print("Event Type: \(eventType), Data: \(data)")
} else if let data = eventData["data"] {
print("Data: \(data)")
}
}
func stopListening() {
task?.cancel()
task = nil
}
// MARK: - URLSessionDataDelegate methods
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
receivedData.append(data)
// 处理完整的事件
if let eventsString = String(data: receivedData, encoding: .utf8), eventsString.contains("\n\n") {
let eventsData = eventsString.data(using: .utf8)!
parseSSE(data: eventsData)
// 清除已处理的数据
receivedData = Data()
}
}
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
if let error = error {
print("Error occurred: \(error)")
} else {
print("Connection closed by server")
}
}
}
复制代码
需要注意,要使用代理的方式接收数据,而不是 block 回调的方式,block 回调方式的接口会等待所有数据接收完成后才会回调。运行上面的 iOS 客户端代码,控制台打印数据如下:
Event Type: start, Data: Mon Sep 30 2024 14:11:04 GMT+0800 (China Standard Time)
Data: Mon Sep 30 2024 14:11:04 GMT+0800 (China Standard Time) count:0
Data: Mon Sep 30 2024 14:11:05 GMT+0800 (China Standard Time) count:1
Data: Mon Sep 30 2024 14:11:06 GMT+0800 (China Standard Time) count:2
Data: Mon Sep 30 2024 14:11:07 GMT+0800 (China Standard Time) count:3
Data: Mon Sep 30 2024 14:11:08 GMT+0800 (China Standard Time) count:4
Data: Mon Sep 30 2024 14:11:09 GMT+0800 (China Standard Time) count:5
Data: Mon Sep 30 2024 14:11:10 GMT+0800 (China Standard Time) count:6
Data: Mon Sep 30 2024 14:11:11 GMT+0800 (China Standard Time) count:7
Data: Mon Sep 30 2024 14:11:12 GMT+0800 (China Standard Time) count:8
Data: Mon Sep 30 2024 14:11:13 GMT+0800 (China Standard Time) count:9
Data: Mon Sep 30 2024 14:11:14 GMT+0800 (China Standard Time) count:10
Data: Mon Sep 30 2024 14:11:15 GMT+0800 (China Standard Time) count:11
Data: Mon Sep 30 2024 14:11:16 GMT+0800 (China Standard Time) count:12
Data: Mon Sep 30 2024 14:11:17 GMT+0800 (China Standard Time) count:13
Data: Mon Sep 30 2024 14:11:18 GMT+0800 (China Standard Time) count:14
Data: Mon Sep 30 2024 14:11:19 GMT+0800 (China Standard Time) count:15
Data: Mon Sep 30 2024 14:11:20 GMT+0800 (China Standard Time) count:16
Data: Mon Sep 30 2024 14:11:21 GMT+0800 (China Standard Time) count:17
Data: Mon Sep 30 2024 14:11:22 GMT+0800 (China Standard Time) count:18
Data: Mon Sep 30 2024 14:11:23 GMT+0800 (China Standard Time) count:19
Event Type: end, Data: Mon Sep 30 2024 14:11:24 GMT+0800 (China Standard Time)
Connection closed by server
复制代码
到此,我们对 SSE 协议的原理,定义以及服务端和客户端的用法实践做了简单介绍,对于单向的服务端发送序列数据到客户端且在短时间内会完成的场景,你都可以考虑使用 SSE 实现,希望本篇文章可以为你带来一些收获。
评论