在构建 AI 应用的过程中,实时通信是不可或缺的要素。Server-Sent Events(SSE)作为一种轻量级的实时通信机制,在提供实时数据更新方面发挥着至关重要的作用。
什么是 SSE
SSE 是一种基于 HTTP 的服务器向客户端推送实时更新数据的机制。它允许服务器通过一个持久的 HTTP 连接向客户端发送事件流,客户端则能够实时接收并处理这些事件。SSE 具有单向通信、轻量级和实时性强的特点,使其成为 AI 应用的理想选择。
为何选择 SSE 而非 WebSocket 或其他技术
ChatGPT 等大语言模型在处理自然语言时需要实时与用户进行交互,因此实时通信机制的选择至关重要。虽然 WebSocket 也提供了实时通信的能力,但它需要建立一个全双工的通信通道,这在某些场景下可能过于复杂和重量级。相比之下,SSE 的单向通信模式更加简单且高效,特别适合用于服务器向客户端推送数据的场景。此外,SSE 还基于 HTTP 协议,无需额外的配置和端口,与现有的 Web 基础设施兼容性更好。
除了 SSE 和 WebSocket 之外,还有一些其他实时通信技术可供选择,如长轮询(long-polling)和轮询(polling)。然而,长轮询可能会导致服务器资源浪费,而轮询则可能因频繁请求而增加网络负担。相比之下,SSE 在性能和资源利用方面更具优势。
如何使用 SSE
创建一个用于处理 SSE 请求的控制器。在该控制器中,定义一个路由方法,用于返回 SSE 响应。在该方法中,设置响应头为text/event-stream
,并通过res.write
方法向客户端发送事件流数据。
首先创建一个 LLMService,这里以智普清言大模型为例:
// service/LLMService.ts
import { Injectable } from '@/core';
import dayjs from 'dayjs';
import http from 'https';
import { PassThrough, Stream } from 'stream';
import jwt from 'jsonwebtoken';
export interface Message {
role: string;
content: string;
}
export interface LLMChatCompletionParams {
model: string;
messages: Message[];
stream?: boolean;
}
@Injectable()
export class LLMService {
private readonly apiUrl: string = 'https://open.bigmodel.cn/api/paas/v4';
private readonly appId: string = '';
private readonly appSecret: string = '';
async chatCompletions(params: LLMChatCompletionParams): Promise<PassThrough> {
const token = await this.getToken();
return new Promise(r => {
const data = JSON.stringify({
...params,
stream: true,
});
const stream = new Stream.PassThrough();
const req = http.request(
`${this.apiUrl}/chat/completions`,
{
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(data),
},
},
res => res.pipe(stream)
);
req.write(data);
req.end();
r(stream);
});
}
private async getToken() {
const currentDate = dayjs();
const header = { alg: 'HS256', sign_type: 'SIGN' };
const payload = {
api_key: this.appId,
exp: currentDate.add(3, 'days').unix() * 1000,
timestamp: Date.now(),
};
return new Promise(r => {
jwt.sign(payload, this.appSecret, { header }, (err, token) => {
if (err) {
throw err;
}
r(token);
});
});
}
}
复制代码
接下来在控制器中定义路由方法:
// controller/HomeController.ts
import { Controller, Get } from '@/core';
import { LLMService } from '@/service/LLMService';
import dayjs from 'dayjs';
import { createParser, type ParsedEvent, type ReconnectInterval } from 'eventsource-parser';
import { Request, Response } from 'hyper-express';
@Controller()
export class HomeController {
constructor(private readonly llmService: LLMService) {}
@Get('chat')
async chat(_: Request, res: Response) {
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = await this.llmService.chatCompletions({
model: 'glm-4',
messages: [
{
role: 'user',
content: '你是唐朝诗人王勃,请创作一首诗句',
},
],
});
const parser = createParser((e: ParsedEvent | ReconnectInterval) => {
if (e.type === 'event') {
if (e.data === '[DONE]') {
res.end();
} else {
const data = JSON.parse(e.data);
const content = data.choices[0]?.delta?.content ?? '';
console.log(dayjs(data?.created * 1000).format('YYYY-MM-DD HH:mm:ss'), content);
res.write(content);
}
}
});
stream
.on('data', chunk => {
parser.feed(Buffer.from(chunk, 'utf-8').toString());
})
.on('end', () => {
res.end();
});
}
}
复制代码
在浏览器中查看效果:
总结
在本文中,我们探讨了使用 Server-Sent Events(SSE)和大语言模型 API 来构建实时、智能化的 Web 应用。SSE 作为一种轻量级且高效的实时通信机制,为服务器向客户端推送实时数据提供了理想的解决方案。
评论