写点什么

把酒言欢话聊天,基于 Vue3.0+Tornado6.1+Redis 发布订阅 (pubsub) 模式打造异步非阻塞 (aioredis) 实时 (websocket) 通信聊天系统

  • 2021 年 12 月 21 日
  • 本文字数:8758 字

    阅读完需:约 29 分钟

把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统

原文转载自「刘悦的技术博客」https://v3u.cn/a_id_202


“表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,“以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就”。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为 2 个人的特殊聊天室。


为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C 端)或 web 应用程序(B 端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:


1.能够实时接收来自其他客户端的信息。


2.能够将每条信息实时推送给收件人。


当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择 Tornado 框架内置 Websocket 协议的接口,简单而又方便,安装 tornado6.1


pip3 install tornado==6.1
复制代码


随后编写程序启动文件 main.py:


import tornado.httpserver  import tornado.websocket    import tornado.ioloop    import tornado.web    import redis    import threading    import asyncio    # 用户列表  users = []    # websocket协议  class WB(tornado.websocket.WebSocketHandler):        # 跨域支持    def check_origin(self,origin):        return True      # 开启链接    def open(self):                    users.append(self)        # 接收消息    def on_message(self,message):        self.write_message(message['data'])      # 断开    def on_close(self):        users.remove(self)
# 建立torando实例 app = tornado.web.Application( [ (r'/wb/',WB) ],debug=True ) if __name__ == '__main__': # 声明服务器 http_server_1 = tornado.httpserver.HTTPServer(app) # 监听端口 http_server_1.listen(8000) # 开启事件循环 tornado.ioloop.IOLoop.instance().start()
复制代码


如此,就在短时间搭建起了一套 websocket 协议服务,每一次有客户端发起 websocket 连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。


下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到“聊天”的目的,这里选择 Redis 的发布订阅模式(pubsub),以一个 demo 来实例说明,server.py


import redis    r = redis.Redis()  r.publish("test",'hello')
复制代码


随后编写 client.py:


import redis  r = redis.Redis()  ps = r.pubsub()  ps.subscribe('test')    for item in ps.listen():       if item['type'] == 'message':          print(item['data'])
复制代码


可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。


频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行“消息隔离”,即不同频道的消息只会给订阅该频道的用户进行推送:



根据发布者订阅者逻辑,改写 main.py:


import tornado.httpserver  import tornado.websocket    import tornado.ioloop    import tornado.web    import redis    import threading    import asyncio    # 用户列表  users = []    # 频道列表  channels = ["channel_1","channel_2"]      # websocket协议  class WB(tornado.websocket.WebSocketHandler):        # 跨域支持    def check_origin(self,origin):        return True      # 开启链接    def open(self):          users.append(self)        # 接收消息    def on_message(self,message):        self.write_message(message['data'])      # 断开    def on_close(self):        users.remove(self)              # 基于redis监听发布者发布消息  def redis_listener(loop):      asyncio.set_event_loop(loop)      async def listen():         r = redis.Redis(decode_responses=True)        # 声明pubsb实例      ps = r.pubsub()        # 订阅聊天室频道        ps.subscribe(["channel_1","channel_2"])          # 监听消息      for message in ps.listen():          print(message)          # 遍历链接上的用户        for user in users:            print(user)            if message["type"] == "message" and message["channel"] == user.get_cookie("channel"):                user.write_message(message["data"])      future = asyncio.gather(listen())    loop.run_until_complete(future)        # 接口  发布信息  class Msg(tornado.web.RequestHandler):        # 重写父类方法    def set_default_headers(self):        # 设置请求头信息      print("开始设置")      # 域名信息      self.set_header("Access-Control-Allow-Origin","*")      # 请求信息      self.set_header("Access-Control-Allow-Headers","x-requested-with")      # 请求方式      self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")            # 发布信息    async def post(self):        data = self.get_argument("data",None)        channel = self.get_argument("channel","channel_1")        print(data)        # 发布      r = redis.Redis()        r.publish(channel,data)        return self.write("ok")      # 建立torando实例    app = tornado.web.Application(      [      (r'/send/',Msg),    (r'/wb/',WB)      ],debug=True    )    if __name__ == '__main__':        loop = asyncio.new_event_loop()      # 单线程启动订阅者服务    threading.Thread(target=redis_listener,args=(loop,)).start()        # 声明服务器    http_server_1 = tornado.httpserver.HTTPServer(app)      # 监听端口    http_server_1.listen(8000)      # 开启事件循环    tornado.ioloop.IOLoop.instance().start()
复制代码


这里假设默认有两个频道,逻辑是这样的:由前端控制 websocket 链接用户选择将消息发布到那个频道上,同时每个用户通过前端 cookie 的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过 redis 进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。


需要注意的一点是,通过线程启动 redis 订阅服务时,需要将当前的 loop 实例传递给协程对象,否则在订阅方法内将会获取不到 websocket 实例,报这个错误:


IOLoop.current() doesn't work in non-main
复制代码


这是因为 Tornado 底层基于事件循环 ioloop,而同步框架模式的 Django 或者 Flask 则没有这个问题。


下面编写前端代码,这里我们使用时下最流行的 vue3.0 框架,编写 chat.vue:


<template>    <div>                  <h1>聊天窗口</h1>                  <van-tabs v-model:active="active" @click="change_channel">                  <van-tab title="客服1号">                      <table>                                <tr v-for="item,index in msglist" :key="index">                                    {{ item }}                  </tr>                </table>                                      </van-tab>                    <van-tab title="客服2号">                                      <table>                                <tr v-for="item,index in msglist" :key="index">                                    {{ item }}                  </tr>                </table>                    </van-tab>                </van-tabs>                                    <van-field label="聊天信息" v-model="msg" />                <van-button color="gray" @click="commit">发送</van-button>           </div>  </template>    <script>    export default {   data() {      return {        auditlist:[],          //聊天记录        msglist:[],        msg:"",         websock: null, //建立的连接        lockReconnect: false, //是否真正建立连接        timeout: 3 * 1000, //30秒一次心跳        timeoutObj: null, //外层心跳倒计时        serverTimeoutObj: null, //内层心跳检测        timeoutnum: null, //断开 重连倒计时        active:0,        channel:"channel_1"             }    },    methods:{          //切换频道      change_channel:function(){                if(this.active === 0){                      this.channel = "channel_1";                    var name = "channel";            var value = "channel_1";                            }else{                    this.channel = "channel_2";                    var name = "channel";            var value = "channel_2";                }                //清空聊天记录            this.msglist = [];                var d = new Date();            d.setTime(d.getTime() + (24 * 60 * 60 * 1000));            var expires = "expires=" + d.toGMTString();            document.cookie = name + "=" + value + "; " + expires;                this.reconnect();          },       initWebSocket() {        //初始化weosocket        const wsuri = "ws://localhost:8000/wb/";        this.websock = new WebSocket(wsuri);        this.websock.onopen = this.websocketonopen;        this.websock.onmessage = this.websocketonmessage;        this.websock.onerror = this.websocketonerror;        this.websock.onclose = this.websocketclose;      },        reconnect() {        //重新连接        var that = this;        if (that.lockReconnect) {          // 是否真正建立连接          return;        }        that.lockReconnect = true;        //没连接上会一直重连,设置延迟避免请求过多        that.timeoutnum && clearTimeout(that.timeoutnum);        // 如果到了这里断开重连的倒计时还有值的话就清除掉        that.timeoutnum = setTimeout(function() {          //然后新连接          that.initWebSocket();          that.lockReconnect = false;        }, 5000);      },         reset() {        //重置心跳        var that = this;        //清除时间(清除内外两个心跳计时)        clearTimeout(that.timeoutObj);        clearTimeout(that.serverTimeoutObj);        //重启心跳        that.start();      },        start() {        //开启心跳        var self = this;        self.timeoutObj && clearTimeout(self.timeoutObj);        // 如果外层心跳倒计时存在的话,清除掉        self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj);        // 如果内层心跳检测倒计时存在的话,清除掉        self.timeoutObj = setTimeout(function() {          // 重新赋值重新发送 进行心跳检测          //这里发送一个心跳,后端收到后,返回一个心跳消息,          if (self.websock.readyState == 1) {            //如果连接正常            // self.websock.send("heartCheck");          } else {            //否则重连            self.reconnect();          }          self.serverTimeoutObj = setTimeout(function() {            // 在三秒一次的心跳检测中如果某个值3秒没响应就关掉这次连接            //超时关闭           // self.websock.close();          }, self.timeout);        }, self.timeout);        // 3s一次      },        websocketonopen(e) {        //连接建立之后执行send方法发送数据        console.log("成功");         // this.websock.send("123");        // this.websocketsend(JSON.stringify(actions));      },      websocketonerror() {        //连接建立失败重连        console.log("失败");        this.initWebSocket();      },      websocketonmessage(e) {          console.log(e);        //数据接收        //const redata = JSON.parse(e.data);        const redata = e.data;          //累加        this.msglist.push(redata);          console.log(redata);               },      websocketsend(Data) {        //数据发送        this.websock.send(Data);      },      websocketclose(e) {        //关闭        this.reconnect()        console.log("断开连接", e);      },        //提交表单      commit:function(){              //发送请求            this.myaxios("http://localhost:8000/send/","post",{"data":this.msg,channel:this.channel}).then(data =>{              console.log(data);            });            },          },      mounted(){            //连接后端websocket服务        this.initWebSocket();              var d = new Date();            d.setTime(d.getTime() + (24 * 60 * 60 * 1000));            var expires = "expires=" + d.toGMTString();            document.cookie = "channel" + "=" + "channel_1" + "; " + expires;              }    }  </script>      <style scoped>    @import url("../assets/style.css");      .chatbox{          color:black;      }      .mymsg{          background-color:green;      }      </style>
复制代码


这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如 x 秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用 vant 框架的标签页可以同步切换频道,切换后将频道标识写入 cookie,便于后端服务识别后匹配推送。


效果是这样的:



诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有 10 万人同时在线,每秒有 100 条新消息,那么后台 tornado 的 websocket 服务推送频率是 100w*10/s = 1000w/s 。


这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库 redis,这里我们需要异步 redis 库 aioredis 的帮助:


pip3 install aioredis
复制代码


aioredis 通过协程异步操作 redis 读写,避免了 io 阻塞问题,使消息的发布和订阅操作非阻塞。


此时,可以新建一个异步订阅服务文件 main_with_aioredis.py:


import asyncio  import aioredis  from tornado import web, websocket  from tornado.ioloop import IOLoop  import tornado.httpserver  import async_timeout
复制代码


之后主要的修改逻辑是,通过 aioredis 异步建立 redis 链接,并且异步订阅多个频道,随后通过原生协程的 asyncio.create_task 方法(也可以使用 asyncio.ensure_future)注册订阅消费的异步任务 reader:


async def setup():      r = await aioredis.from_url("redis://localhost", decode_responses=True)      pubsub = r.pubsub()        print(pubsub)      await pubsub.subscribe("channel_1","channel_2")        #asyncio.ensure_future(reader(pubsub))      asyncio.create_task(reader(pubsub))
复制代码


在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送:


async def reader(channel: aioredis.client.PubSub):      while True:          try:              async with async_timeout.timeout(1):                  message = await channel.get_message(ignore_subscribe_messages=True)                  if message is not None:                      print(f"(Reader) Message Received: {message}")                        for user in users:                            if user.get_cookie("channel") == message["channel"]:                                user.write_message(message["data"])                            await asyncio.sleep(0.01)          except asyncio.TimeoutError:              pass
复制代码


最后,利用 tornado 事件循环 IOLoop 传递中执行回调方法,将 setup 方法加入到事件回调中:


if __name__ == '__main__':        # 监听端口      application.listen(8000)        loop = IOLoop.current()      loop.add_callback(setup)      loop.start()
复制代码


完整的异步消息发布、订阅、推送服务改造 main_aioredis.py:


import asyncio  import aioredis  from tornado import web, websocket  from tornado.ioloop import IOLoop  import tornado.httpserver  import async_timeout    users = []    # websocket协议  class WB(tornado.websocket.WebSocketHandler):          # 跨域支持      def check_origin(self,origin):            return True        # 开启链接      def open(self):              users.append(self)          # 接收消息      def on_message(self,message):            self.write_message(message['data'])        # 断开      def on_close(self):            users.remove(self)      class Msg(web.RequestHandler):          # 重写父类方法      def set_default_headers(self):            # 设置请求头信息          print("开始设置")          # 域名信息          self.set_header("Access-Control-Allow-Origin","*")          # 请求信息          self.set_header("Access-Control-Allow-Headers","x-requested-with")          # 请求方式          self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")          # 发布信息      async def post(self):            data = self.get_argument("data",None)            channel = self.get_argument("channel","channel_1")            print(data)            # 发布          r = await aioredis.from_url("redis://localhost", decode_responses=True)            await r.publish(channel,data)            return self.write("ok")      async def reader(channel: aioredis.client.PubSub):      while True:          try:              async with async_timeout.timeout(1):                  message = await channel.get_message(ignore_subscribe_messages=True)                  if message is not None:                      print(f"(Reader) Message Received: {message}")                        for user in users:                            if user.get_cookie("channel") == message["channel"]:                                user.write_message(message["data"])                            await asyncio.sleep(0.01)          except asyncio.TimeoutError:              pass      async def setup():      r = await aioredis.from_url("redis://localhost", decode_responses=True)      pubsub = r.pubsub()        print(pubsub)      await pubsub.subscribe("channel_1","channel_2")        #asyncio.ensure_future(reader(pubsub))      asyncio.create_task(reader(pubsub))      application = web.Application([      (r'/send/',Msg),      (r'/wb/', WB),  ],debug=True)          if __name__ == '__main__':        # 监听端口      application.listen(8000)        loop = IOLoop.current()      loop.add_callback(setup)      loop.start()
复制代码


从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。


结语:实践操作来看,Redis 发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado_redis_vue3_chatroom


原文转载自「刘悦的技术博客」 https://v3u.cn/a_id_202

发布于: 2 小时前阅读数: 4
用户头像

专注技术,凝聚意志,解决问题 v3u.cn 2020.12.21 加入

还未添加个人简介

评论

发布
暂无评论
把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统