写点什么

Serverless 与 WebSocket 的聊天工具

用户头像
刘宇
关注
发布于: 刚刚
Serverless与WebSocket的聊天工具

前言

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信,即允许服务器主动发送信息给客户端。WebSocket 在服务端有数据推送需求时,可以主动发送数据至客户端。而原有 HTTP 协议的服务端对于需推送的数据,仅能通过轮询或 long poll 的方式来让客户端获得。


基于传统架构实现 WebSocket 协议,在一定程度上是比较困难的。那么在 Serverless 架构上实现 WebSocket 协议呢?众所周知,Serverless 架构中,部署在 FaaS 平台的函数通常情况下是事件驱动的,且并不支持 WebSocket 协议,Serverless 架构下是否可以实现 WebSocket 协议就是一个问题,如果可以实现,相对传统架构来说,难度是否会降低也是一个值得探索的内容。


其实 Serverless 架构是可以实现 WebSocket 协议的,而且基于 Serverless 架构实现的 WebSocket 协议会非常的简单,在 FaaS 平台与 API 网关触发器的加持下,Serverless 架构是可以借助 API 网关等产品更简单的实现 WebSocket 协议。本文将会以阿里云函数计算为例,通过阿里云 API 网关,以及函数计算的 API 网关触发器实现一个基于 WebSocket 协议的聊天工具。

原理解析


由于函数计算是无状态且触发式的,即在有事件到来时才会被触发,因此,如图所示,为了实现 WebSocket,函数计算与 API 网关相结合,通过 API 网关承接及保持与客户端的连接,即 API 网关与函数计算一起实现了服务端。当客户端有消息发出时,会先传递给 API 网关,再由 API 网关触发函数执行。当服务端云函数要向客户端发送消息时,会先由云函数将消息 POST 到 API 网关的反向推送链接,再由 API 网关向客户端完成消息的推送。


在 API 网关处的业务简图:



整个流程为:


  1. 客户端在启动的时候和 API 网关建立了 WebSocket 连接,并且将自己的设备 ID 告知 API 网关;

  2. 客户端在 WebSocket 通道上发起注册信令;

  3. API 网关将注册信令转换成 HTTP 协议发送给用户后端服务,并且在注册信令上加上设备 ID 参数(增加在名称为 x-ca-deviceid 的 header 中);

  4. 用户后端服务验证注册信令,如果验证通过,记住用户设备 ID,返回 200 应答;

  5. 用户后端服务通过 HTTP/HTTPS/WebSocket 三种协议中的任意一种向 API 网关发送下行通知信令,请求中携带接收请求的设备 ID;

  6. API 网关解析下行通知信令,找到指定设备 ID 的连接,将下行通知信令通过 WebSocket 连接发送给指定客户端;

  7. 客户端在不想收到用户后端服务通知的时候,通过 WebSocket 连接发送注销信令给 API 网关,请求中不携带设备 ID;

  8. API 网关将注销信令转换成 HTTP 协议发送给用户后端服务,并且在注册信令上加上设备 ID 参数;

  9. 用户后端服务删除设备 ID,返回 200 应答。


上述整个流程,完整流程如图:



固然,想要在 API 网关与 FaaS 平台基础上,实现一个 WebSocket 协议的功能,步骤是比较多的,但是其实这里面已经有很多工作是 API 网关帮助我们完成的。如果说将上面的整个流程,进一步压缩,压缩成我们所需要执行的操作,那么整进一步简化后,可以得到核心的四个流程:


  1. 开通分组绑定的域名的 WebSocket 通道;

  2. 创建注册、下行通知、注销三个 API,给这三个 API 授权、并上线;

  3. 用户后端服务实现注册,注销信令逻辑,通过 SDK 发送下行通知;

  4. 下载 SDK,嵌入到客户端,建立 WebSocket 连接,发送注册请求,监听下行通知;


在这四个流程中,第一个流程是准备工作,第二个流程是涉及到 API 网关实现 WebSocket 协议的配置流程,第三个流程和第四个流程涉及到在 Serverless 架构下基于 API 网关实现 WebSocket 协议信息推动的核心功能。在上面的第二个流程中,涉及到注册、下行、注销三个 API,这三个 API 在阿里云 API 网关中,实际上是所需要实现 WebSocket 的三种管理信令对应的行为:


  1. 注册信令:注册信令是客户端发送给用户后端服务的信令,起到两个作用:


  • 将客户端的设备 ID 发送给用户后端服务,用户后端服务需要记住这个设备 ID。用户不需要定义设备 ID 字段,设备 ID 字段由 API 网关的 SDK 自动生成;

  • 用户可以将此信令定义为携带用户名和密码的 API,用户后端服务在收到注册信令的验证客户端的合法性。用户后端服务在返回注册信令应答的时候,返回非 200 时,API 网关会视此情况为注册失败。

  • 客户端要想收到用户后端服务发送过来的通知,需要先发送注册信令给 API 网关,收到用户后端服务的 200 应答后正式注册成功。


  1. 下行通知信令:用户后端服务,在收到客户端发送的注册信令后,记住注册信令中的设备 ID 字段,然后就可以向 API 网关发送接收方为这个设备的下行通知信令了。只要这个设备在线,API 网关就可以将此下行通知发送到端。

  2. 注销信令:客户端在不想收到用户后端服务的通知时发送注销信令发送给 API 网关,收到用户后端服务的 200 应答后注销成功,不再接受用户后端服务推送的下行消息。

匿名聊天室

API 网关配置

首先,我们需要在函数计算处新建三个事件函数分别对应三种信令,或者辅助三种信令进行工作:



创建完成三个基本的测试函数(使用默认函数代码即可,之后会重新实现这三个函数的业务逻辑)之后,我们需要在 API 网关处配置这三个测试函数的相关接口,首先需要创建一个 API 网管分组:



创建 API 网关分组之后,可以对该分组进行域名的绑定。这里需要额外注意的是,绑定域名之后,需要开启 WebSocket 通信状态:



配置域名之后,我们需要在这个 API 分组下面创建四个 API,这四个 API,分别用来实现三种信令,以及一个上行数据的接口:



websocket_register: 是实现注册信令,对应后端的函数为 register 函数:



websocket_notify: 为下行通知请求,协议为 HTTP 以及 Websocket,无需配置后端函数:



websocket_clean:注销请求,对应的后端函数计算中的 clean 函数:



websocket_send: 接收上行数据的普通请求,对应后端函数计算中的 send 函数:



创建完成之后,需要将这些 API 进行发布,并且创建应用:



创建应用完成之后,需要对 websocket_notify 接口进行授权:



并且创建对应的 AppKey:



完成上述配置,我们即完成了一个基于 Serverless 架构的 WebSocket 协议服务的框架搭建,接下来,只需要根据业务需求,进行对应函数的实现即可,这里所涉及到的对应函数包括注册函数、传输函数以及清理函数等。

函数计算配置

为了实现基于 Serverless 架构的匿名聊天室的功能,除了配置 API 网关之外,还需要对之前我们所创建的三个函数进行业务逻辑的实现,所涉及到的函数以及对应处理的业务逻辑主要为:


  • register 函数:注册函数,当函数注册时,将用户的 Id/设备 Id 存储到对象存储中;

  • send 函数:传输函数,当一个客户端发送消息后,通过 send 函数接收,并将消息通过 API 网关的下行通知请求发送给在线的其他客户端。判断在线的其他客户端的方法是通过对象存储中的 object 来进行判断;

  • clean 函数:清理函数,用来断开连接,并清理链接对象存储在对象存储中的 object 信息;


这其中 register 函数主要是将客户端在发起请求建链时携带的 x-ca-deviceid 进行持久化,可以选择存储到数据库中,也可以选择存储到对象存储等其他可持久化的平台上,以便我们可以随时查询和确定客户端的链接 ID,这一部分的代码实现为:


# -*- coding: utf-8 -*-import oss2import jsonossClient = oss2.Bucket(oss2.Auth('<AccessKeyID>', '<AccessKeySecret>'),                        'http://oss-cn-hongkong.aliyuncs.com',                        '<BucketName>')
def register(event, context): userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid'] # 注册的时候,将链接写入到对象存储 ossClient.put_object(userId, 'user-id') # 返回客户端注册结果 return { 'isBase64Encoded': 'false', 'statusCode': '200', 'body': { 'userId': userId }, }
复制代码


send 函数的作用主要是两个:


  • 接收客户端通过 API 网关发送过来的信息;

  • 将收到的信息推动到目前已有链接的其他客户端上;


除了上述两部分作用之外,该函数还会涉及到意外断开的客户端清理相关的操作,例如当向某客户端推送数据失败时,可以认为是该客户端已经断开链接,此时可以讲该客户端的 ID 在对象存储中清理掉。当然,这一部分还可以进行更多的能力建设,例如:


  • 针对用户发送的信息,进行鉴黄鉴恐的筛选;

  • 针对用的发送的信息内容,进行部分的持久化和分析,进而判断用户的聊天热点话题等;


针对这一部分的整体代码实现为:


# -*- coding: utf-8 -*-import oss2import jsonimport base64from apigateway import clientfrom apigateway.http import requestfrom apigateway.common import constant
ossClient = oss2.Bucket(oss2.Auth('<AccessKeyID>', '<AccessKeySecret>'), 'http://oss-cn-hongkong.aliyuncs.com', '<BucketName>')apigatewayClient = client.DefaultClient(app_key="<app_key>", app_secret="<app_secret>")
def send(event, context):
host = "http://websocket.serverless.fun" url = "/notify" userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']
# 获取链接对象 for obj in oss2.ObjectIterator(ossClient): if obj.key != userId: req_post = request.Request(host=host, protocol=constant.HTTP, url=url, method="POST", time_out=30000, headers={'x-ca-deviceid': obj.key}) req_post.set_body(json.dumps({ "from": userId, "message": base64.b64decode(json.loads(event.decode("utf-8"))['body']).decode("utf-8") })) req_post.set_content_type(constant.CONTENT_TYPE_STREAM) result = apigatewayClient.execute(req_post) print(result) if result[0] != 200: # 删除链接记录 ossClient.delete_object(obj.key) return { 'isBase64Encoded': 'false', 'statusCode': '200', 'body': { 'status': "ok" }, }
复制代码


在 send 函数中,涉及到向其他客户端推送相关信息的操作,针对这一部分,需要引入 API 网关提供的对应的 SDK 实现。


通过 API 网关提供的对应语言的 SDK,可以非常简单的向下行通知请求接口发起请求的行为:



最后一部分是 clean 函数,这一部分主要是当客户端发起断开连接的请求时,通过 API 网关触发函数计算,将所对应的 x-ca-deviceid 信息在对象存储中进行清理,其整体逻辑为:


# -*- coding: utf-8 -*-import oss2import jsonossClient = oss2.Bucket(oss2.Auth('<AccessKeyID>', '<AccessKeySecret>'),                        'http://oss-cn-hongkong.aliyuncs.com',                        '<BucketName>')
def clean(event, context): userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid'] # 删除链接记录 ossClient.delete_object(userId)
复制代码


至此,我们完成了匿名聊天似的服务端建设。

体验与测试

在完成上面功能编写之后,我们可以在本地进行基本的测试。在测试过程中,主要有创建链接、发送消息、接受推送的三个部分。


关于创建链接、断开链接以及接收消息部分,可以通过 WebSokcet 的相关模块实现:


const uuid = require('uuid');const util = require('util');
const register = function (editor, deviceId) { const ws = new WebSocket('ws://websocket.serverless.fun:8080'); const now = new Date();
const reg = { method: 'GET', host: 'websocket.serverless.fun:8080', querys: {}, headers: { 'x-ca-websocket_api_type': ['REGISTER'], 'x-ca-seq': ['0'], 'x-ca-nonce': [uuid.v4().toString()], 'date': [now.toUTCString()], 'x-ca-timestamp': [now.getTime().toString()], 'CA_VERSION': ['1'], }, path: '/register', body: '', };
ws.onopen = function open() { ws.send('RG#' + deviceId); };
var registered = false; var hbStarted = false;
ws.onmessage = function incoming(event) { if (event.data.startsWith('NF#')) { const msg = JSON.parse(event.data.substr(3)); editor.addHistory(util.format('%s > %s', msg.from, msg.message)); editor.setState({'prompt': deviceId + " > "}); return; } if (!hbStarted && event.data.startsWith('RO#')) { console.log('Login successfully'); if (!registered) { registered = true; ws.send(JSON.stringify(reg)); } hbStarted = true; setInterval(function () { ws.send('H1'); }, 15 * 1000); return; } };
ws.onclose = function (event) { console.log('ws closed:', event); };};
module.exports = register;
复制代码


发送信息到 send 函数:


execShellCommand: function (cmd) {    /*    cmd 是客户端发送的文本    post到ShellApi是send函数对应的接口,例如http://websocket.serverless.fun/send     */    const that = this;    that.setState({'prompt': ''})    that.offset = 0    that.cmds.push(cmd)    axios.post(ShellApi, cmd, {        headers: {            'Content-Type': 'application/octet-stream',            "x-ca-deviceid": deviceId        }    }).then(function (res) {        that.setState({'prompt': Prompt});    }).catch(function (err) {        const errText = err.response ? err.response.status + ' ' + err.response.statusText : err.toString();        that.addHistory(errText);        that.setState({'prompt': Prompt})    });}
复制代码


完成客户端的核心逻辑编辑之后,可以通过 HTML 和 CSS 实现部分页面,便于测试。如图所示,当我们完成页面样式的编辑,和本地逻辑的编辑之后,我们可以打开两个窗口进行项目的测试。我们可以看到,当我们打开两个窗口之后,每个窗口都会随机的生成一个客户端 ID:



如果所示,当我们在左侧窗口输入一个字符串并按回车发送:



可以看到在右侧,出现了左侧窗口的 ID,并且出现了左侧刚刚发送的信息。此时我们进一步测试,我们可以在右侧同样输入字符串,并按回车按钮发送:



可以看到左侧,也同样出现了相关的效果。至此,我们已经基于 Serverless 架构实现了匿名聊天室的功能,完成服务端的建设,和客户端的测试,可以确保项目创建链接、发送消息、接收消息。

总结

通过函数计算和 API 网关进行 WebSocket 的实践,绝对不仅仅是一个聊天工具这么简单,他可以用在很多方面,例如通过 WebSocket 进行实时日志系统的制作等。单独的函数计算,仅仅是一个计算平台,只有和周边的 BaaS 结合,才能展示出 Serverless 架构的价值和真正的能力,意义。这也是为什么很多人 Serverless = FaaS + BaaS 的一个原因。通过本文的抛砖引玉,希望读者可以进一步对 Serverless 有更深的认识,可以将 Serverless 和更多的触发器、事件源等进一步结合,探索更多有趣的应用,并将其更简单的应用到自己的项目中。

发布于: 刚刚阅读数: 2
用户头像

刘宇

关注

阿里云Serverless云布道师 2020.01.04 加入

阿里云Serverless产品经理,国防科大在读博士,《Serverless架构》、《Serverless实践》、《人人都能学会的Serverless架构》等书籍作者,Serverless Devs发起人,Anycodes在线编程负责人。

评论

发布
暂无评论
Serverless与WebSocket的聊天工具