Redis 高级数据结构 Stream 和 HyperLogLog
队列与 Stream
redis stream 结构如上图所示
消息链表
,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。
Stream唯一名称
,它就是 Redis 的 key,在我们首次使用xadd
指令追加消息时自动创建。
消费组
,一个 stream 支持多个last_delivered_id
,表示当前消费组已经消费到哪条消息了。
每个消费者组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create
进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。
同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系
,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。
pending_ids
,它记录了当前消费者已经被客户端读取,但是还没有 ack 的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为PEL
,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次
,而不会在网络传输的中途丢失了没处理。
消息ID
的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第 5 条消息。
消息内容就是键值对
,形如 hash 结构的键值对,这没什么特别之处。
常用命令
版本:redis-6.2.8
生产端
streamtest
表示当前这个队列的名字,也就是我们一般意义上 Redis 中的 key。
*
号表示服务器自动生成 ID,后面顺序跟着,是我们存入当前 streamtest 这个队列的消息,采用的也是 key/value 的存储形式,返回值1672574363910-0
则是生成的消息 ID,由两部分组成:时间戳-序号
。时间戳时毫秒级单位,是生成消息的 Redis 服务器时间,它是个 64 位整型。序号是在这个毫秒时间点内的消息序号。它也是个 64 位整型。建议使用 Redis 的方案生成消息 ID,因为这种时间戳+序号的单调递增的 ID 方案,几乎可以满足全部的需求,但 ID 是支持自定义的。
为了保证消息是有序的,因此 Redis 生成的ID是单调递增有序
的。由于 ID 中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis 的每个 Stream 类型数据都维护一个 latest_generated_id 属性,用于记录最后一个消息的 ID。若发现当前时间戳退后(小于 latest_generated_id 所记录的),则采用时间戳不变而序号递增的方案来作为新消息 ID(这也是序号为什么使用 int64 的原因,保证有足够多的的序号),从而保证 ID 的单调递增性质。
消费端
单消费者
Redis 设计了一个单独的消费指令 xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)。
一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。
消费者组
创建消费者组
XINFO stream streamtest
查看消息队列信息
XINFO groups streamtest
查看消息者组情况
消费消息
有了消费组,自然还需要消费者,Stream 提供了 xreadgroup
指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息
。读到新消息后,对应的消息 ID 就会进入消费者的PEL(正在处理的消息)
结构里,客户端处理完毕后使用xack
指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。
消费者组状态
更多的 Redis 的 Stream 命令请大家参考Redis官方文档:
Redis 队列几种实现的总结
基于 List 的 LPUSH+BRPOP 的实现
足够简单,消费消息延迟几乎为零,但是需要处理空闲连接的问题。
如果线程一直阻塞在那里,Redis 客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候 blpop 和 brpop 或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常需要重试。
其他缺点包括:
做消费者确认 ACK 麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个 Pending 列表,保证消息处理确认;不能做广播模式,如 pub/sub,消息发布/订阅模型;不能重复消费,一旦消费就会被删除;不支持分组消费。
基于 Sorted-Set 的实现
多用来实现延迟队列,当然也可以实现有序的普通的消息队列,但是消费者无法阻塞的获取消息,只能轮询,不允许重复消息
。
PUB/SUB,订阅/发布模式
优点:
典型的广播模式,一个消息可以发布到多个消费者;多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息;消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息。
缺点:
消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回;不能保证每个消费者接收的时间是一致的;若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时;可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播
,即时通讯,即时反馈的业务。
基于 Stream 类型的实现
基本上已经有了一个消息中间件的雏形,可以考虑在生产过程中使用,当然真正要在生产中应用,要做的事情还很多,比如消息队列的管理和监控
就需要花大力气去实现,而专业消息队列都已经自带或者存在着很好的第三方方案和插件。不保证消息不丢失。
消息队列问题
从我们上面对 Stream 的使用表明,Stream 已经具备了一个消息队列的基本要素,生产者 API、消费者 API,消息 Broker,消息的确认机制等等,所以在使用消息中间件中产生的问题,这里一样也会遇到。
Stream 消息太多怎么办?
要是消息积累太多,Stream 的链表岂不是很长,内容会不会爆掉?xdel
指令又不会删除消息,它只是给消息做了个标志位。
Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen
,就可以将老的消息干掉,确保最多不超过指定长度。
消息如果忘记 ACK 会怎样?
Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长
,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大。所以消息要尽可能的快速消费并确认。
PEL 如何避免消息丢失?
在客户端消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息 ID 不能为参数>,而必须是任意有效的消息 ID
,一般将参数设为 0-0,表示读取所有的 PEL 消息以及自 last_delivered_id 之后的新消息。
死信问题
如果某个消息,不能被消费者处理(处理失败)
,也就是不能被 XACK,这是要长时间处于 Pending 列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter
(通过 XPENDING 可以查询到)就会累加,当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息)
,删除即可。
删除一个消息,使用 XDEL 语法,注意,这个命令并没有删除 Pending 中的消息,因此查看 Pending,消息还会在,可以在执行执行 XDEL 之后,XACK 这个消息标识其处理完毕。
Stream 的高可用
Stream 的高可用是建立主从复制
基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过鉴于 Redis 的指令复制是异步
的,在 failover 发生时,Redis 可能会丢失极小部分数据,这点 Redis 的其它数据结构也是一样的。
分区 Partition
Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个 Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream。
Stream 小结
Stream 的消费模型借鉴了 Kafka 的消费分组的概念,它弥补了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的话,得在客户端做,提供不同的 Stream 名称,对消息进行 hash 取模来选择往哪个 Stream 里塞。
总的来说,如果是中小项目和企业,在工作中已经使用了 Redis,在业务量不是很大,而又需要消息中间件功能的情况下,可以考虑使用 Redis 的 Stream 功能。但是如果并发量很高,资源足够支持下,还是以专业的消息中间件,比如 RocketMQ、Kafka 等来支持业务更好。
HyperLogLog
HyperLogLog 并不是一种新的数据结构(实际类型为字符串类型),而是一种基数算法,通过 HyperLogLog 可以利用极小的内存空间完成独立总数的统计
,数据集可以是 IP、Email、ID 等。
如果你负责开发维护一个大型的网站,有一天产品经理要网站每个网页每天的 UV 数据,然后让你来开发这个统计模块,你会如何实现?
如果统计 PV 那非常好办,给每个网页一个独立的 Redis 计数器就可以了,这个计数器的 key 后缀加上当天的日期。这样来一个请求,incrby 一次,最终就可以统计出所有的 PV 数据。
但是 UV 不一样,它要去重,一个简单的方案,那就是为每一个页面一个独立的 set 集合来存储所有当天访问过此页面的用户 ID。当一个请求过来时,我们使用 sadd 将用户 ID 塞进去就可以了。通过 scard 可以取出这个集合的大小,这个数字就是这个页面的 UV 数据。
但是,如果你的页面访问量非常大,比如一个爆款页面几千万的 UV,你需要一个很大的 set 集合来统计,这就非常浪费空间。如果这样的页面很多,那所需要的存储空间是惊人的。为这样一个去重功能就耗费这样多的存储空间,值得么?其实需要的数据又不需要太精确,1050w 和 1060w 这两个数字对于老板们来说并没有多大区别,So,有没有更好的解决方案呢?
Redis 提供了 HyperLogLog 数据结构就是用来解决这种统计问题的。HyperLogLog 提供不精确的去重计数方案
,虽然不精确但是也不是非常不精确,Redis 官方给出标准误差是 0.81%
,这样的精确度已经可以满足上面的 UV 统计需求了。
HyperLogLog 提供了 3 个命令: pfadd、pfcount、pfmerge。
以使用集合类型和 HperLogLog 统计百万级用户访问次数的占用空间对比:
可以看到,HyperLogLog 内存占用量小得惊人,但是用如此小空间来估算如此巨大的数据,必然不是 100%的正确,其中一定存在误差率。前面说过,Redis 官方给出的数字是 0.81%的失误率。
Redis 事务
简单地说,事务表示一组动作,要么全部执行,要么全部不执行。例如在社交网站上用户 A 关注了用户 B,那么需要在用户 A 的关注表中加入用户 B,并且在用户 B 的粉丝表中添加用户 A,这两个行为要么全部执行,要么全部不执行,否则会出现数据不一致的情况。
可以看到 sadd 命令此时的返回结果是 QUEUED,代表命令并没有真正执行,而是暂时保存在 Redis 中的一个缓存队列(所以 discard 也只是丢弃这个缓存队列中的未执行命令,并不会回滚已经操作过的数据,这一点要和关系型数据库的 Rollback 操作区分开)。如果此时另一个客户端执行 sismember u:a:follow ub 返回结果应该为 0。
事务中出现错误
命令错误,属于语法错误,会造成整个事务无法执行
运行时错误,例如用户 B 在添加粉丝列表时,误把 sadd 命令(针对集合)写成了 zadd 命令(针对有序集合),这种就是运行时命令,因为语法是正确的,那第一条执行成功,第二条执行失败,可以看到 Redis 并不支持回滚功能,开发人员需要自己修复这类问题。
watch
有些应用场景需要在事务之前,确保事务中的 key 没有被其他客户端修改过,才执行事务,否则不执行(类似乐观锁)。Redis 提供了 watch 命令来解决这类问题。
可以看到“客户端-1”在执行 multi 之前执行了 watch 命令,“客户端-2”在“客户端-1”执行 exec 之前修改了 key 值,造成客户端-1 事务没有执行(exec 结果为 nil)。
Pipeline 和事务的区别
1、pipeline是客户端的行为
,对于服务器来说是透明的,可以认为服务器无法区分客户端发送来的查询命令是以普通命令的形式还是以 pipeline 的形式发送到服务器的;
2、而事务则是实现在服务器端的行为
,用户执行 MULTI 命令时,服务器会将对应这个用户的客户端对象设置为一个特殊的状态,在这个状态下后续用户执行的查询命令不会被真的执行,而是被服务器缓存起来,直到用户执行 EXEC 命令为止,服务器会将这个用户对应的客户端对象中缓存的命令按照提交的顺序依次执行
。
3、应用 pipeline 可以提服务器的吞吐能力,并提高 Redis 处理查询请求的能力。
存在问题,当通过 pipeline 提交的查询命令数据较少时(可以被内核缓冲区所容纳),Redis 可以保证这些命令执行的原子性。然而一旦数据量过大,超过了内核缓冲区的接收大小,那么命令的执行将会被打断,原子性也就无法得到保证。
因此 pipeline 只是一种提升服务器吞吐能力的机制,如果想要命令以事务的方式原子性的被执行,还是需要事务机制,或者使用更高级的脚本功能以及模块功能。
4、可以将事务和 pipeline 结合起来使用,减少事务的命令在网络上的传输时间,将多次网络 IO 缩减为一次网络 IO。
Redis 提供了简单的事务,之所以说它简单,主要是因为它不支持事务中的回滚特性,同时无法实现命令之间的逻辑关系计算,当然也体现了 Redis 的“keep it simple”的特性。
作者:咖啡冲不冲
链接:https://juejin.cn/post/7184007074945171513
来源:稀土掘金
评论