写点什么

通俗易懂的 redis 发布订阅原理实现!

用户头像
我是阿沐
关注
发布于: 23 小时前
通俗易懂的redis发布订阅原理实现!

前言

可能小伙伴的工作年限大部分已经超过三年甚至四年五年,不知道是否有一种危机感,我们写了那么多的需求代码没有 20w 行也有个 10w 行了吧,但是出去找工作的时候不是笔试被pass掉就是面试被pass,你会发现好多你只是知道但是回答不上来。这个时候你才知道去补习知识点,其实这种做法对自身发展不太友好的。

我去年疫情期间,在大家都不敢跳槽季节我义无反顾选择跳槽,进入大家说的 bat 一线大厂。最近跟之前老东家的同事聊了聊技术栈、家常;说是面试了很多两三年经验开发者基础太薄弱等等。


所以我也从 4 月底跟随之前的朋友一起开始了写作之路,我基本上是以面对对象是小白讲解方式开展自己的写作模式,期间也有小伙伴让我写高级点的😭 😭 😭 ,但是确实不敢在那么大佬面前造次;还是坚持从 0 到 1 的redis讲解之路,希望能得到小伙伴的支持。

大部分是根据自己以往的面试和同事交流得出来的,有不对的地方还希望小伙伴们多多指正。

那么开启我们新的一轮面试知识点之旅.....

正文

今天要聊的知识点是 redis 的订阅发布功能,虽然说现在大厂都使用了kafkaRabbitMQActiveMQ, RocketMQ;这几种我大概用了三种,其实实现原理和内部使用方式都大同小异。为什么讲 redis 的呢?因为轻量、直接使用,而上面几种适合大数据量,对数据准确性要求高的场景,作为第三方组件,在小公司考虑到成本人力是不是太有好的,存在更多风险。

为什么要用发布订阅

其实理论上我们之前的列表场景使用双端链表就可以实现发布与订阅功能,但是这种通过链表来实现的发布与订阅功能有两个局限性:

  • 1、基于链表实现的消息队列,不能支持一对多的消息分发。

  • 2、假如生产者生成的速率远远大于消费者消费消息的速率,可能会导致未消费消息占用大量的内存(需要开启足够多的消费进程)。

我画两张图进行对比,小伙伴们一眼就能看出来区别:

普通消息队列结构图


PubSub 结构图

从上面的图中可以看出普通消息队列:只能有一个多个消费者去消费,却不能将消息分发给其他消费者;redis 订阅发布:生产者生产完消息通过频道分发消息给订阅该频道的消费者,这样就可以较少队列数据的积攒,导致内存暴增。

所以为了解决这两个局限性,Redis 当中选择了通过其他命令来实现发布与订阅模式。

redis 订阅发布的基本命令

psubscribe指令: psubscribe pattern [pattern ...] 订阅一个或多个符合给定模式的频道;时间复杂度O(n),n是订阅的模式的数量。
注意点:每个模式以 * 作为匹配符;例如 mumu*匹配所有以 mumu 开头的频道:mumu.juejin、mumu.zhihu、mumu.csdn
publist指令:publish channel message 把信息message发送到指定的频道channel;时间复杂度O(n+m),n是频道channel的订阅者数量,m则是使用模式订阅(subscribed patterns)的客户端的数量。
注意点:结果集返回是接收到message的订阅者数量,没有订阅者返回0。
pubsub指令:pubsub channels [argument [argument ...]] 查看订阅与发布系统状态;时间复杂度O(n),n为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常数)。
注意:列出当前的活跃频道(指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内),返回一个活跃频道组成的列表。
punsubscribe指令:punsubscribe [pattern [pattern ...]] 退订所有给定模式的频道;时间复杂度O(n+m),其中n是客户端已订阅的模式的数量, m则是系统中所有客户端订阅的模式的数量。
注意:pattern未指定那么订阅的所有模式都会被退订;否则只会退订指定的订阅的模式
subscribe指令:subscribe channel [channel ...] 订阅给定的一个或多个频道的信息;时间复杂度O(n),其中n是订阅的频道的数量。
unsubscribe指令:unsubscribe channel [channel ...] 指退订给定的频道;时间复杂度O(n),其中n是订阅的频道的数量。
注意:若没有指定退订channel,则默认退订所有频道;否则退订指定频道。BSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。

复制代码

那么在 Redis 中的发布与订阅也分为两种类型,一种是基于频道来实现,一种是基于模式来实现。

基于频道实现讲解

  • subscribe channe1 channel2 channel3 ... :订阅一个或者多个频道

  • unsubscribe channe1 channel2 channel3 ... :退订订阅的指定频道(关闭客户端终端没用,需要命令退订)

  • publish channe1 message:对指定频道发送消息

  • pubsub numsub channel1 channel2:查看指定频道的订阅数

好记性不如烂笔头,光看不练假把戏:

127.0.0.1:6379> SUBSCRIBE mumu_1 mumu_2Reading messages... (press Ctrl-C to quit)1) "subscribe"    -- 返回值的类型:显示订阅成功2) "mumu_1"       -- 订阅的频道名字3) (integer) 1    -- 目前已订阅的频道数量
1) "subscribe"2) "mumu_2"3) (integer) 2
1) "message"      -- 返回值的类型:信息2) "mumu_1"       -- 来源(从那个频道发送过来)3) "\xe6\x88\x91\xe6\x98\xaf\xe9\x98\xbf\xe6\xb2\x90\xe5\x95\x8a" -- 消息内容
复制代码



订阅频道发消息截图

//获取指定频道的订阅的客户端数量127.0.0.1:6379> PUBSUB numsub mumu_1 mumu_21) "mumu_1"    -- 频道名称2) (integer) 1 -- 订阅该频道的客户端数量3) "mumu_2"4) (integer) 1127.0.0.1:6379> pubsub channels1) "mumu_2"  -- 频道名称2) "mumu_1"  -- 频道名称
复制代码


查看订阅数频道信息截图

127.0.0.1:6379> UNSUBSCRIBE mumu_11) "unsubscribe"  -- 返回值的类型:显示取消订阅成功2) "mumu_1"       -- 取消订阅的频道名字3) (integer) 0
复制代码

我们看下基于频道的实现原理:

源码路径redis-5.0.7/src/server.h我把 redis 源码下载到本地查看了;大约 1239 行

struct redisServer {    /* General */    pid_t pid;          //省略百十行        // 百度翻译😜 😜 😜之后意思是: 将频道映射到已订阅客户端的列表(就是保存客户端和订阅的频道信息)    dict *pubsub_channels;  /* Map channels to list of subscribed clients */}
复制代码

pubsub_channels定义的属性是一个字典类型,保存着客户端和频道信息,key 值保存的就是频道名value 是一个链表,链表中保存的是客户端 id


订阅频道内部存储结构

频道订阅:订阅频道时先检查字段内部是否存在;不存在则为当前频道创建一个字典且创建一个链表存储客户端 id;否则直接将客户端 id 插入到链表中。

取消频道订阅:取消时将客户端 id 从对应的链表中删除;如果删除之后链表已经是空链表了,则将会把这个频道从字典中删除。

基于模式实现讲解

  • psubscribe pattern1 pattern2 pattern3 ... :订阅一个或多个符合给定模式的频道,每个模式以 * 作为匹配符

  • punsubscribe pattern1 pattern2 pattern3 ... :取消模式的订阅(关闭客户端终端没用,需要命令退订)

  • pubsub numpat pattern1 返回订阅模式的数量,返回的不是订阅模式的客户端的数量,而是客户端订阅的所有模式的数量总和。时间复杂度 O(1),(具体为啥,请看下面原来解析结构)

说起时那时快,赶紧动手来实践,眼见为实:

127.0.0.1:6379> PSUBSCRIBE mumu.*Reading messages... (press Ctrl-C to quit)1) "psubscribe"       -- 返回值的类型:显示订阅成功2) "mumu.*"           -- 订阅的模式3) (integer) 1        -- 目前已订阅的模式的数量
1) "pmessage"         -- 返回值的类型:信息2) "mumu.*"           -- 信息匹配的模式3) "mumu.list"        -- 信息本身的目标频道4) "i am a mumu"      -- 信息的内容

复制代码



模式订阅实践操作图

127.0.0.1:6379> PUNSUBSCRIBE mumu.*1) "punsubscribe"  -- 返回值的类型:显示退订成功2) "mumu.*"        -- 退订的模式3) (integer) 1     -- 目前已退订的模式的数量
复制代码

我们看下基于模式的实现原理:

源码路径redis-5.0.7/src/server.h我把 redis 源码下载到本地查看了;大约 1240 行

struct redisServer {    /* General */    pid_t pid;          //省略百十行        // 百度翻译😄 😄 😄之后意思是:pubsub订阅的列表信息(大致就是存储订阅模式的信息)    list *pubsub_patterns;  /* A list of pubsub_patterns */}
// 1303行订阅模式列表结构:typedef struct pubsubPattern {    client *client;  -- 订阅模式客户端    robj *pattern;   -- 被订阅的模式} pubsubPattern;
复制代码



模式订阅内部结构图

模式订阅:新增一个pubsub_pattern数据结构添加到链表的最后尾部,同时保存客户端 ID

取消模式订阅:从当前的链表pubsub_patterns结构中删除需要取消的模式订阅。

从上面的一些实际实践结果和结合图形是不是对 redis 发布订阅进一步了解了呢?

那么我们使用 redis 发布订阅能做什么?

发布订阅(pub/sub)可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。

  • 电商中,用户下单成功之后向指定频道发送消息,下游业务订阅支付结果这个频道处理自己相关业务逻辑

  • 粉丝关注功能

  • 文章推送

  • 等等等等

实践编码

消费者订阅 Subscribe.php
<?php/** * Created by 我是阿沐. * Date: 2021/05/04 * Time: 下午16:02 * QQ: 2511221051@qq.com */
// 设置php脚本执行时间set_time_limit(0);
// 申明测试的平道名称$channel_names = ['mumu_test1', 'mumu_test2', 'mumu_test3'];//当前执行时间$cur_time = time();try {    // 实例化redis    $redis = new Redis();
    // 创建redis链接    $redis->pconnect('127.0.0.1', 6379);        //echo "Server is running: " . $redis->ping();    //阻塞获取消息    while (true) {        // 阻塞获取消息 $redis redis的实例  $channel_name 频道名称  $msg 生产者生成的消息体        $redis->subscribe($channel_names, function ($redis, $channel_name, $msg) {              switch ($channel_name) {                case 'mumu_test1':                    echo "channel:".$channel_name.",message:".$msg."\n";                    break;                case 'mumu_test2':
                    break;                case 'mumu_test3':
                    break;            }            if (!$msg) { //当没有收到消息时 就休眠1s钟                echo "channel:".$channel_name.",message: not appoint channel name"."\n";                sleep(1);            }        });        // 本地测试 运行超过10分钟 则自动结束 并关闭redis链接        if (time() - $cur_time > 10*60){            $redis -> close();            break;        }    }} catch (Exception $e) {    echo $e->getMessage();}
复制代码
生产者发送消息 Publish.php
<?php/** * Created by 我是阿沐. * Date: 2021/05/04 * Time: 下午16:04 * QQ: 2511221051@qq.com */
// 申明测试的平道名称$channel_names = ['mumu_test1', 'mumu_test2', 'mumu_test3', 'mumu_test4'];
$channel_name = $channel_names[rand(0,3)];
try {    // 实例化redis类    $redis = new Redis();    // 建立redis链接    $redis->connect('127.0.0.1', 6379);
    for ($i = 0; $i < 10; $i++) {
        $data = array('key' => 'key' . ($i+1), 'msg' => 'I am li a mu !');
        $ret = $redis->publish($channel_name, json_encode($data));
        print_r($ret);    }} catch (Exception $e) {    echo $e->getMessage();}
复制代码

🐧 执行结果集

终端执行消费者订阅,开始阻塞获取消息/usr/local/opt/php@7.2/bin/php Subscribe.php;结果集:
➜  publish-subscribe git:(master) ✗ /usr/local/opt/php@7.2/bin/php Subscribe.phpchannel:mumu_test1,message:{"key":"key1","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key2","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key3","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key4","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key5","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key6","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key7","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key8","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key9","msg":"I am li a mu !"}channel:mumu_test1,message:{"key":"key10","msg":"I am li a mu !"}
上面就是生产者生成的消息内容:msg字符串
终端执行生产者生产数据,开始发送消息/usr/local/opt/php@7.2/bin/php Publish.php;结果集:➜  publish-subscribe git:(master) ✗ /usr/local/opt/php@7.2/bin/php Publish.php1111111111
// 这种场景是 我模拟发送了没有创建的频道 mumu_test4 导致返回的结果集都是0 说明没有被订阅的channel,消息会被丢弃➜  publish-subscribe git:(master) ✗ /usr/local/opt/php@7.2/bin/php Publish.php0000000000
复制代码

🐮 注意事项

1、订阅的消费者需要一直执行,阻塞获取消息,如果断开则表示退订了。

2、channel 只接收 publish 发送的消息,自身是不存储消息,假如 channel 没有被订阅,则消息会被丢弃掉。

3、生产者生成消息时,只需要向频道内丢入消息即可。

🙈 当然还有这些命令可以玩耍

$redis->pubsub('channels'); // 获取所有频道$redis->pubsub('channels', '*pattern*'); // 仅仅获取指定频道$redis->pubsub('numsub', ['channel1', 'channel2']); // 查看指定频道的订阅数$redis->pubsub('numpat'); // 返回订阅模式的数量$redis->unsubscribe(['channel1', 'channel2']); // 客户端退订指定的频道$redis->punsubscribe(['pattern1', 'pattern2']); // 客户端退订所有指定定模式
复制代码

小伙伴们本地实践操作起来~~~,千看不如写一遍。

redis 发布订阅的优缺点

小伙伴们从上面的实践操作来看,PubSub 生产的消息,如果没有对应的频道或者消费者,消息会被丢弃,直接投递失败返回0状态。假如我们实际生产环境在消费的时候,突然网络波动,导致其中一个消费者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在。也就是说 Redis 本身是不会存储消息体信息的。

那么在我们生产环境数量不大且想节约成本的时候,redis 的发布订阅功能可能比较适合我们公司;轻量级、方便使用配合consul+supervisor+swool可以常驻内存,开多进程消费(消息队列也可以用的)。

🐣 总结

哇哇哇,能有幸看到这里的小伙伴,我很服气你们了,我花了两天的时间去想去画去构思写好的文章;你们竟然也看到了这里,阿沐心里贼开心;阿沐确实很佩服小伙伴们,贼棒👍 、贼有毅力;同时也贼能包容阿沐的不足之处。


本文主要通过整理 PubSub 的实际操作指令,然后结合底层的源码分析它们之间的存储结构;再通过实际的客户端操作,来说明返回参数的具体意思;最最最后通过实践写代码运行展示。同时也列出 PubSub 的优缺点,帮助大家在实际的工作中可以有更好的选择。最后好记性不如多亲自动手实践,唯有实践,才知其本质。


最后,欢迎关注我的个人公众号「我是阿沐」,会不定期的更新后端知识点和学习笔记。也欢迎直接公众号私信或者邮箱联系我,我们可以一起学习,一起进步。


好了,我是阿沐,一个不想 30 岁就被淘汰的打工人 ⛽️ ⛽️ ⛽️ 。创作不易觉得「阿沐」写的有点料话:👍 关注一下,💖 分享一下,我们下期再见。

发布于: 23 小时前阅读数: 17
用户头像

我是阿沐

关注

生活最重要的是开心 | 保持一个好心态 2021.05.29 加入

公众号:我是阿沐 | 思绪来得快去得也快,偶尔会在这里停留

评论

发布
暂无评论
通俗易懂的redis发布订阅原理实现!