写点什么

读懂才会用:Redis ZSet 的几种使用场景

发布于: 2020 年 06 月 11 日
读懂才会用:Redis ZSet 的几种使用场景

上一篇 (向右转),我们介绍了 Redis 的 ZSet 结构,在文末提了延时队列、排行榜、限流三种使用场景。本文展示具体的使用方法和问题,供大家参考。代码示例,仅仅是 Demo,在生产使用需要考虑更多细节问题。


1. 延时队列


zset 会按 score 进行排序,如果 score 代表想要执行时间的时间戳。在某个时间将它插入 zset 集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序。


起一个死循环线程不断地进行取第一个 key 值,如果当前时间戳大于等于该 key 值的 score 就将它取出来进行消费删除,可以达到延时执行的目的。



发送消息


代码如下:

public void sendMessage(long messageId, String message) {    System.out.println("发送消息");    Jedis client = jedisPool.getResource();    Pipeline pipeline = client.pipelined();    // score 设置成当前时间戳 + 延迟时间    pipeline.zadd(DELAY_QUEUE, System.currentTimeMillis() + DELAY_TIME * 1000,                  String.format(MEMBER_PREFIX, messageId));    Map<String, String> map = new HashMap<>();    map.put(String.format(MEMBER_PREFIX, messageId), message);    pipeline.hset(DELAY_MESSAGE, map);    pipeline.syncAndReturnAll();    pipeline.close();    client.close();    System.out.println("发送消息 over");}
复制代码

采用 pipeline 的方式,同时写入 zset 和 hash 中


消费消息


代码如下:

public void consumer() {    System.out.println("消费消息开始");    Jedis client = jedisPool.getResource();    Set<Tuple> tupleSet = client.zrangeByScoreWithScores(DELAY_QUEUE, 0, System.currentTimeMillis());    for (Tuple t : tupleSet) {        long messageId = Long.valueOf(t.getElement().replaceAll("[^0-9]", ""));        messageHandler(messageId);    }    client.close();    System.out.println("消费消息 over");}
public void messageHandler(long messageId) { System.out.println("==="); pool.execute(() -> { // 放到线程池处理 Jedis client = jedisPool.getResource(); String message = client.hget(DELAY_MESSAGE, String.format(MEMBER_PREFIX, messageId)); System.out.println("处理消息体" + message); System.out.println("处理消息体成功"); Pipeline pipeline = client.pipelined(); pipeline.multi(); pipeline.hdel(DELAY_MESSAGE, String.format(MEMBER_PREFIX, messageId)); pipeline.zrank(DELAY_QUEUE, String.format(MEMBER_PREFIX, messageId)); pipeline.exec(); pipeline.close(); client.close(); });}
复制代码


问题


  1. 没有 ack 机制,当消费失败的情况下队列如何处理?

  2. 这是 topic 模式,广播模式如何搞


示例代码是 demo,简单应用,投入生产中还需要考虑各种细节问题


2. 排行榜


经常浏览技术社区的话,应该对 “1 小时最热门” 这类榜单不陌生。如何实现呢?如果记录在数据库中,不太容易对实时统计数据做区分。我们以当前小时的时间戳作为 zset 的 key,把贴子 ID 作为 member ,点击数评论数等作为 score,当 score 发生变化时更新 score。利用 ZREVRANGE 或者 ZRANGE 查到对应数量的记录。



记录回复数


代码如下:

/**  * 模拟每次针对贴子的回复数加 1  *  * @param id*/public void post(long id) {    Jedis client = jedisPool.getResource();    client.zincrby(POSTLIST, 1, String.format(MEMBER_PREFIX, id));    client.close();}
复制代码


获取列表


代码如下:

/**  * 获取 Top 的贴子列表 ID  *  * @param size  * @return*/public List<Integer> getTopList(int size) {    List<Integer> result = new ArrayList<>();    if (size <= 0 || size > 100) {        return result;    }    Jedis client = jedisPool.getResource();    Set<Tuple> tupleSet = client.zrevrangeWithScores(POSTLIST, 0, size - 1);    client.close();    for (Tuple tuple : tupleSet) {        String t = tuple.getElement().replaceAll("[^0-9]", "");        result.add(Integer.valueOf(t));    }    return result;}
复制代码


模拟用户发帖的行为


代码如下:

public void test() throws InterruptedException {    int threadSize = 200;    long[] ids = {100, 102, 103, 104, 105, 106, 101, 108, 107, 200, 109, 201, 202};    CountDownLatch countDownLatch = new CountDownLatch(threadSize);    for (int i = 0; i < threadSize; i++) {        pool.execute(() -> {            for (int j = 0; j < 3; j++) {                Random r = new Random();                int index = (int) (r.nextDouble() * ids.length);                post(ids[index]);            }            countDownLatch.countDown();        });    }    countDownLatch.await();}
复制代码


问题


  1. 数量过大时会占用大量内存,需要清理很多冷数据

  2. 适合处理点击数、访问量之类,处理发帖回复这种还需要考虑,帖子审核不通过的情况


3. 限流


滑动窗口是限流常见的一种策略。如果我们把一个用户的 ID 作为 key 来定义一个 zset ,member 或者 score 都为访问时的时间戳。我们只需统计某个 key 下在指定时间戳区间内的个数,就能得到这个用户滑动窗口内访问频次,与最大通过次数比较,来决定是否允许通过。

滑动窗口



代码如下:

/**  *  * @param userId  * @param period 窗口大小  * @param maxCount 最大频次限制  * @return*/public boolean isActionAllowed(String userId, int period, int maxCount) {    String key = String.format(KEY, userId);    long nowTs = System.currentTimeMillis();    Jedis client = jedisPool.getResource();    Pipeline pipe = client.pipelined();    pipe.multi();    pipe.zadd(key, nowTs, String.format(MEMBER, userId, nowTs));    pipe.zremrangeByScore(key, 0, nowTs - period * 1000);    Response<Long> count = pipe.zcard(key);    pipe.expire(key, period + 1);    pipe.exec();    pipe.close();    client.close();    return count.get() <= maxCount;}
复制代码


思路是每一个请求到来时,将时间窗口外的记录全部清理掉,只保留窗口内的记录。zset 中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了

问题


  1. 需要清理额外的数据

  2. 限制的请求量过大时,会占用大量内存


本次分享先到这里,欢迎关注我一起交流,随时指出各种错误和不足。


发布于: 2020 年 06 月 11 日阅读数: 1919
用户头像

欢迎关注公众号“小眼睛聊技术” 2018.11.12 加入

互联网老兵,关注产品、技术、管理

评论 (4 条评论)

发布
用户头像
没有 ack 机制,当消费失败的情况下队列如何处理?
Redis List的特点就是只有消费,没有查看,所有只要拿出来就会消费,如果消费失败,可以再次写入队列,或者写入单独的队列做补偿


这是 topic 模式,广播模式如何搞
Reds也提供了订阅发布模式,只是不专长,不稳定,可以考虑专门的消息队列组件


限流的思路很不错


欢迎讨论
展开
2020 年 06 月 12 日 09:25
回复
哈哈,您就是群里跟我聊得那个小哥吧

赞同您的回复,消费失败,放入一个单独的队列更方便一些
2020 年 06 月 12 日 12:52
回复
是的 看回复内容就可以看出来的
2020 年 06 月 16 日 09:07
回复
用户头像
感谢持续分享,InfoQ首页推荐。
2020 年 06 月 12 日 09:12
回复
没有更多了
读懂才会用:Redis ZSet 的几种使用场景