深入了解 RocketMQ 之过滤器
一. 概述
RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。
目前 RocketMQ 只支持两个模式过滤器,一个是基于 TAG,另外一个是基于 SQL92。其中 TAG 模式相对于比较简单;而另外一个就相当的复杂,实现方式跟 spring 表达式有点相似;同时也提供了一个配置项,来决定是否开启 SQL92;
enablePropertyFilter 是否支持根据属性过滤,默认为 false,如果使用基于标的式 SQL92 模式过滤消息,则该参数必须设置为 true.
另外关于类过滤的,很快就过期,官方不推荐使用该模式,所以这里不在这里解读。
二. 数据结构
2.1 ConsumerFilterManager
在 Broker 中是以文件的形式存放;文件存放的格式如下:
我们查看其对象的结构,如图所示:
从字面都可以大概猜出其用意;除了那个 BloomFilter 相关的字段属性;
ConsumerFilterManager 对象中的 bloomFilter 属性我们可以理解是一个工具方法;而 ConsumerFilterData 对象中的 bloomFilterData 属性是这个消费组中的数值数组,用来判断是否满足过滤条件的消息;
2.2 SubscriptionData
这个对象是存放在 Broker 服务器中的 ConsumerManager,用来记录客户端中的消费者的订阅信息,里面有包含 tag 以及表达式两个;
重点介绍:tagsSet 与 code 的关系;tagsSet 是原生的 tag 集合信息,而 codeSet 是 tag 的哈希值的集合信息;
三. 过滤原理
客户端向 Broker 端拉取消息时,Broker 从 commitlog、consumequeue 文件中拿到数据,接着会进行过滤,判断是否满足指定的条件;所以,过滤的工作是在于 DefaultMessageStore 对象中的 getMessage 方法,该方法入参中有这样的对象 MessageFilter;该 MessageFilter 是个接口,我们这里只介绍 ExpressionMessageFilter 这个实现类,通过对其进行解读,其他的实现类都会很快明白其运作原理。
我们先看一下其接口的暴露的两个方法:
从上面的两个方法中,不难猜测出其是对 consumequeue 以及 commitlog 进行过滤;
3.1 TAG 过滤
tag 过滤只针对 consumequeue 的,所以在 MessageFilter 接口的 isMatchedByCommitLog 是默认返回 true;
我们重点看 isMatchedByConsumeQueue 方法的 tag 模式过滤;
3.2 SQL92
在 isMatchedByConsumeQueue 方法中,并没有 SQL92 进行过滤,而是用 BloomFilter 进行过滤,可以理解为 BloomFilter 是 SQL92 的缓存过滤器。先通过 consumequeue 先过滤不符合的消息,然后在 isMatchedByCommitLog 严格过滤;
然而要使用这个布隆过滤器,需要打开相关的 RocketMQ 配置项才可以生效:
enableConsumeQueueExt 是否启用 ConsumeQueue 拓展属性,默认为 false,这样子的话 isMatchedByConsumeQueue 方法,永远都会返回 true;
enableCalcFilterBitMap 需要设置为 true,否则永远都不会被命中;因为设置为 true 时,CommitLogDispatcherCalcBitMap 才会去设置 ConsumerFilterData 对象中的 bloomFilterData 数组中的对应的位置为 1;如果不设置为 false,则 bloomFilterData 永远都是为 0;
需要设置 enableConsumeQueueExt 为 true 开启拓展属性,这样子才能使用 BloomFilter 进行过滤;
有关 BloomFilter 可以查阅Bloom Filter概念和原理文章,以及 RocketMQ 的 BloomFilter 对象以及相关的代码进行了解,这里不在过多阐述,毕竟目前我对其细节还不是太明白。
有关 SQL92 语法,没有友好的注释,很难看的出来是否完全参照其语法进行词法解析的;姑且给出这样子的结论:跟我们常规写 SQL 中的 where 后面的语法差不多;field 呢就是映射 message 中的 properties 集合 key。稍微注意的是,有关比较符运算的,类型需继承 Comparable 接口才行;
举个例子:例如 message 对象中的 properties 集合有这样的 key=privince,值有广东省,广西省等省份名称;过滤表达式:"privince in ('gd','gx')";那么就会过滤除广东省、广西省以外的省份的消息;
如果有时间的话,会花时间阅读其源码 SelectorParser;有兴趣的查看Spring 之 EL 表达式,其原理以及解析差不多的;spring 的源码先比 RocketMQ,可读性相对于友好;
四. 初始化
这里的初始化,主要的基于数据结构,也就是 ConsumerFilterManager 和 SubscriptionData 这两个对象;
什么时候初始化两个对象,主要是客户端定时发送心跳推送消费者的订阅信息,订阅信息中包含过滤信息;
重点看 ClientManageProcessor.heartBeat;其他地方目前就不在过多阐述;
引用:
版权声明: 本文为 InfoQ 作者【邱学喆】的原创文章。
原文链接:【http://xie.infoq.cn/article/cd087fb325ac2ceb0bf0ccf2d】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论