Redis stream Java API 实践
最近工作中使用到了消息中间件,另外一个组的同事经过评估选择了 Redis stream 作为最终选择。我自己写的性能测试框架自然也需要接入这套消息系统。所以我也抓紧学习起来。
Redis Stream 是 Redis 5.0 版本新增加的数据结构。Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
之前还没发现 Redis 还有这种使用方法,着实有点少见过怪了。照例我后面会进行一些基本功能的性能测试,下面分享基本功能的使用演示。
准备工作
依赖
如果想自己操作的话,请注意这个版本,因为在我找资料的过程中发现,不同版本的 API 有不少的差异,算是踩了一些坑。如果你使用其他版本的redis.clients
遇到代码无法运行的时候,可以直接翻看源码查看相关参数类型。
Maven 依赖:
Gradle 依赖:
Redis server 版本:Redis 6.2.5
。
XADD - 添加消息到末尾
如果 key 对应的队列不存在,则会自动创建。
首先我们需要创建一个redis.clients.jedis.params.XAddParams
,顾名思义就是查询参数,这里面有重要的参数:redis.clients.jedis.params.XAddParams#maxLen
表示设置队列的长度,但是不常用。语法如下:
def len = XAddParams.xAddParams()
xadd API 使用方式如下:
XTRIM - 对流进行修剪,限制长度
这个 API 就是设置队列长度。使用方式也非常简单。
返回值是丢弃的消息的数量。
XDEL - 删除消息
这个就是删除某个消息,使用更简单了。
XLEN - 获取流包含的元素数量,即消息长度
话不多说了,使用如下:
XREAD - 以阻塞或非阻塞方式获取消息列表
这个要着重介绍一下,因为我用的就是这个,首先我们需要创建一个redis.clients.jedis.params.XReadParams
,这里有两个参数:redis.clients.jedis.params.XReadParams#count
和redis.clients.jedis.params.XReadParams#block
。前者控制返回数量,后者控制阻塞时间,如果时间小于 0 则认为不阻塞,等于 0 则一直会阻塞,小于 0 会报错。不设置该参数责任无非阻塞模式。PS:数量不足不会造成阻塞。示例如下:
还有我们需要redis.clients.jedis.Jedis#xread(redis.clients.jedis.params.XReadParams, java.util.Map<java.lang.String,redis.clients.jedis.StreamEntryID>)
第二个参数,这里常用的两种:
遍历消息:
控制台响应如下:
XRANGE - 获取消息列表,会自动过滤已经删除的消息
这个 API 获取某个范围内的消息,有个start
和end
的参数,可以传String
类型的消息 ID,也可以传redis.clients.jedis.StreamEntryID
,方法重载的比较多,有兴趣可以翻一翻源码。
后面会对 Redis stream API 进行性能测试,欢迎继续关注 FunTester。
Have Fun ~ Tester !
阅读原文,跳转我的仓库地址
版权声明: 本文为 InfoQ 作者【FunTester】的原创文章。
原文链接:【http://xie.infoq.cn/article/60a973bdc4461c613c8ec779a】。文章转载请联系作者。
评论