写点什么

深入了解 RocketMQ 之过滤器

用户头像
邱学喆
关注
发布于: 50 分钟前
深入了解RocketMQ之过滤器

一. 概述


RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

目前 RocketMQ 只支持两个模式过滤器,一个是基于 TAG,另外一个是基于 SQL92。其中 TAG 模式相对于比较简单;而另外一个就相当的复杂,实现方式跟 spring 表达式有点相似;同时也提供了一个配置项,来决定是否开启 SQL92;

  • enablePropertyFilter 是否支持根据属性过滤,默认为 false,如果使用基于标的式 SQL92 模式过滤消息,则该参数必须设置为 true.


另外关于类过滤的,很快就过期,官方不推荐使用该模式,所以这里不在这里解读。

二. 数据结构

2.1 ConsumerFilterManager

在 Broker 中是以文件的形式存放;文件存放的格式如下:

{	"filterDataByTopic":{		"Topic":{			"topic": String,			"groupFilterData": {				"consumerGroup":{					"consumerGroup" : String,					"topic":  String,					"expression": String,					"expressionType": String,					"bornTime": long,					"deadTime": long,					"bloomFilterData":{						"bitPos":  int[],						"bitNum": int					},					"clientVersion": long				}			},			....		},		....	}}
复制代码

我们查看其对象的结构,如图所示:

从字面都可以大概猜出其用意;除了那个 BloomFilter 相关的字段属性;

ConsumerFilterManager 对象中的 bloomFilter 属性我们可以理解是一个工具方法;而 ConsumerFilterData 对象中的 bloomFilterData 属性是这个消费组中的数值数组,用来判断是否满足过滤条件的消息;

2.2 SubscriptionData

这个对象是存放在 Broker 服务器中的 ConsumerManager,用来记录客户端中的消费者的订阅信息,里面有包含 tag 以及表达式两个;

重点介绍:tagsSet 与 code 的关系;tagsSet 是原生的 tag 集合信息,而 codeSet 是 tag 的哈希值的集合信息;

三. 过滤原理

客户端向 Broker 端拉取消息时,Broker 从 commitlog、consumequeue 文件中拿到数据,接着会进行过滤,判断是否满足指定的条件;所以,过滤的工作是在于 DefaultMessageStore 对象中的 getMessage 方法,该方法入参中有这样的对象 MessageFilter;该 MessageFilter 是个接口,我们这里只介绍 ExpressionMessageFilter 这个实现类,通过对其进行解读,其他的实现类都会很快明白其运作原理。

我们先看一下其接口的暴露的两个方法:

public interface MessageFilter {  boolean isMatchedByConsumeQueue(final Long tagsCode,                                  final ConsumeQueueExt.CqExtUnit cqExtUnit);  boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,                               final Map<String, String> properties);}
复制代码

从上面的两个方法中,不难猜测出其是对 consumequeue 以及 commitlog 进行过滤;

3.1 TAG 过滤

tag 过滤只针对 consumequeue 的,所以在 MessageFilter 接口的 isMatchedByCommitLog 是默认返回 true;

我们重点看 isMatchedByConsumeQueue 方法的 tag 模式过滤;

//如果consumequeue中没有tag,则返回true。能消费该消息if (tagsCode == null) {  return true;}//如果订阅组中的subString等于*,则说明订阅组是不需要过滤的,返回true,能消费该消息;if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {  return true;}//如果consumequeue中的tag在订阅组的codeSet中,则说明订阅组是能消费该消息的,返回true;否则返回false;return subscriptionData.getCodeSet().contains(tagsCode.intValue());
复制代码

3.2 SQL92

在 isMatchedByConsumeQueue 方法中,并没有 SQL92 进行过滤,而是用 BloomFilter 进行过滤,可以理解为 BloomFilter 是 SQL92 的缓存过滤器。先通过 consumequeue 先过滤不符合的消息,然后在 isMatchedByCommitLog 严格过滤;

然而要使用这个布隆过滤器,需要打开相关的 RocketMQ 配置项才可以生效:

  • enableConsumeQueueExt 是否启用 ConsumeQueue 拓展属性,默认为 false,这样子的话 isMatchedByConsumeQueue 方法,永远都会返回 true;

  • enableCalcFilterBitMap 需要设置为 true,否则永远都不会被命中;因为设置为 true 时,CommitLogDispatcherCalcBitMap 才会去设置 ConsumerFilterData 对象中的 bloomFilterData 数组中的对应的位置为 1;如果不设置为 false,则 bloomFilterData 永远都是为 0;

需要设置 enableConsumeQueueExt 为 true 开启拓展属性,这样子才能使用 BloomFilter 进行过滤;

有关 BloomFilter 可以查阅Bloom Filter概念和原理文章,以及 RocketMQ 的 BloomFilter 对象以及相关的代码进行了解,这里不在过多阐述,毕竟目前我对其细节还不是太明白。


有关 SQL92 语法,没有友好的注释,很难看的出来是否完全参照其语法进行词法解析的;姑且给出这样子的结论:跟我们常规写 SQL 中的 where 后面的语法差不多;field 呢就是映射 message 中的 properties 集合 key。稍微注意的是,有关比较符运算的,类型需继承 Comparable 接口才行;

举个例子:例如 message 对象中的 properties 集合有这样的 key=privince,值有广东省,广西省等省份名称;过滤表达式:"privince in ('gd','gx')";那么就会过滤除广东省、广西省以外的省份的消息;


如果有时间的话,会花时间阅读其源码 SelectorParser;有兴趣的查看Spring 之 EL 表达式,其原理以及解析差不多的;spring 的源码先比 RocketMQ,可读性相对于友好;

四. 初始化

这里的初始化,主要的基于数据结构,也就是 ConsumerFilterManager 和 SubscriptionData 这两个对象;

什么时候初始化两个对象,主要是客户端定时发送心跳推送消费者的订阅信息,订阅信息中包含过滤信息;

重点看 ClientManageProcessor.heartBeat;其他地方目前就不在过多阐述;


引用:


Bloom Filter概念和原理

发布于: 50 分钟前阅读数: 4
用户头像

邱学喆

关注

计算机原理的深度解读,源码分析。 2018.08.26 加入

在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan

评论

发布
暂无评论
深入了解RocketMQ之过滤器