<返回更多

RocketMQ源码分析之过滤器ExpressionMessageFilter

2022-11-27  今日头条  程序员阿龙
加入收藏

一、前言

RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

目前 RocketMQ 只支持两个模式过滤器,一个是基于 TAG,另外一个是基于 SQL92。其中 TAG 模式相对于比较简单;而另外一个就相当的复杂,实现方式跟 spring 表达式有点相似;同时也提供了一个配置项,来决定是否开启 SQL92;

 

 

另外关于类过滤的,很快就过期,官方不推荐使用该模式,所以这里不在这里解读。

二、源码导读

1、消费者过滤器管理组件consumerFilterManager源码分析,其中布隆过滤器的数据结构是怎样的,怎么进行注册的;

2、过滤原理,通过解读ExpressionMessageFilter类来分析其运作原理;

三、过滤数据管理

BrokerController中有一个ConsumerFilterManager,就是用来管理消费者过滤器数据的;


 

 

  1. 构造方法;
  2. 数据对象的结构;
  3. 消费组批量注册过滤数据对象;
  4. 根据消费组取消注册;
  5. 判断数据是否死亡;

 

这里就分析这几个核心方法吧,其余的方法其实也差不多;

1、构造方法public class ConsumerFilterManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);// 定义了24小时常量private static final long MS_24_HOUR = 24 * 3600 * 1000;// topic -> filtersprivate ConcurrentMapfilterDataByTopic = new ConcurrentHashMap(256);private transient BrokerController brokerController;// 布隆过滤器,简单解释一下什么叫做布隆过滤器,bit位数组,写入数据先hash再就更正bit位里的01// 查询的时候,可以对查询数据计算hash再到一个位置找是01,如果是0肯定没出现过这条数据,如果是1,有可能出现过// 可以快速筛查你的数据要不然是肯定没出现过,要不然是可能出现过private transient BloomFilter bloomFilter;public ConsumerFilterManager() {// just for test,仅限于测试this.bloomFilter = BloomFilter.createByFn(20, 64);public ConsumerFilterManager(BrokerController brokerController) {this.brokerController = brokerController;this.BloomFilter = BloomFilter.createByFn( // 根据配置创建布隆过滤器brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()// then set bit map length of store config.brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(this.bloomFilter.getM()2、数据对象的结构

我们可以发现ConsumerFilterManager是继承至ConfigManager,如果看过我以前的文章就知道ConfigManager有一个抽象方法configFilePath,是用来标明文件持久化路径的;

我们直接找到子类的对应方法

@Overridepublic String configFilePath() {if (this.brokerController != null) {return BrokerPathConfigHelper.getConsumerFilterPath(this.brokerController.getMessageStoreConfig().getStorePathrootDir()return BrokerPathConfigHelper.getConsumerFilterPath("./unit_test");public static String getConsumerFilterPath(final String rootDir) {return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";

再找到对应的文件,只不过这个文件中数据为空


 

但是没事,我在网上找到一个文件数据结构,文件存放的格式如下:

"filterDataByTopic":{"Topic":{"topic": String,"groupFilterData": {"consumerGroup":{"consumerGroup" : String,"topic": String,"expression": String,"expressionType": String,"bornTime": long,"deadTime": long,"bloomFilterData":{"bitPos": int[],"bitNum": int},"clientVersion": long},},


 

从字面都可以大概猜出其用意;除了那个 BloomFilter 相关的字段属性;

ConsumerFilterManager 对象中的 bloomFilter 属性我们可以理解是一个工具方法;而 ConsumerFilterData 对象中的 bloomFilterData 属性是这个消费组中的数值数组,用来判断是否满足过滤条件的消息;

// 一个topic的各个消费组的过滤数据public static class FilterDataMapByTopic {// 一个topic,针对他订阅的各个消费组的过滤数据映射关系private ConcurrentMapgroupFilterData = new ConcurrentHashMap();private String topic;public class ConsumerFilterData {private String consumerGroup;private String topic;private String expression;private String expressionType;private transient Expression compiledExpression;private long bornTime;private long deadTime = 0;private BloomFilterData bloomFilterData;private long clientVersion;3、消费组批量注册过滤数据对象

场景:当一个消费组里的一个消费者客户端跟我的broker之间建立了连接了以后,注册消费者之后就需要批量注册布隆过滤器,会通过DefaultConsumerIdsChangeListener监听器的handle方法调用ConsumerFilterManager的register方法完成批量注册,之前在分析ConsumerManager消费者管理组件有分析过的;

// 注册消费组里的最新的一波订阅和过滤public void register(final String consumerGroup,final Collection subList) {// 对订阅数据进行遍历for (SubscriptionData subscriptionData : subList) {// 注册register(subscriptionData.getTopic(),consumerGroup,subscriptionData.getSubString(),subscriptionData.getexpressionType(),subscriptionData.getSubVersion()// make illegal topic dead.// 对我的一个消费组拿到我对各个topic的过滤数据Collection groupFilterData = getByGroup(consumerGroup);Iterator iterator = groupFilterData.iterator();while (iterator.hasNext()) {ConsumerFilterData filterData = iterator.next();boolean exist = false;for (SubscriptionData subscriptionData : subList) {// 判断是否存在if (subscriptionData.getTopic().equals(filterData.getTopic())) {exist = true;break;if (!exist && !filterData.isDead()) {filterData.setDeadTime(System.currentTimeMillis());log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);public boolean register(// topicfinal String topic,// 消费组final String consumerGroup,// 过滤表达式final String expression,// 过滤类型final String type,// 客户端版本号final long clientVersion) {if (ExpressionType.isTagType(type)) {return false;if (expression == null || expression.length() == 0) {return false;// 根据topic获取topic的各个消费组的过滤数据FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);if (filterDataMapByTopic == null) {FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic = prev != null ? prev : temp;// 生成了一个消费组对一个topic的布隆过滤器BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);

再调用FilterDataMapByTopic的register方法

public boolean register(// 消费组String consumerGroup,// 过滤表达式String expression,// 过滤类型String type,// 布隆过滤器数据BloomFilterData bloomFilterData,// 客户端版本long clientVersion) {// 获取到这个消费组的过滤器数据ConsumerFilterData old = this.groupFilterData.get(consumerGroup);if (old == null) {ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {return false;// 给这个过滤数据里设置进去一个布隆过滤器consumerFilterData.setBloomFilterData(bloomFilterData);old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);if (old == null) {log.info("New consumer filter registered: {}", consumerFilterData);return true;} else {// 如果存在则比较一下版本是否一致if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);// 如果数据已存在,并且数据状态为死亡,则重新激活if (clientVersion == old.getClientVersion() && old.isDead()) {// 重新激活reAlive(old);return true;return false;} else {this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);return true;} else {if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;return false;boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);if (old.getBloomFilterData() == null && bloomFilterData != null) {change = true;if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {change = true;// if subscribe data is changed, or consumer is died too long.if (change) {ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {// new expression compile error, remove old, let client report error.this.groupFilterData.remove(consumerGroup);return false;consumerFilterData.setBloomFilterData(bloomFilterData);this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("Consumer filter info change, old: {}, new: {}, change: {}",old, consumerFilterData, change);return true;} else {old.setClientVersion(clientVersion);if (old.isDead()) {reAlive(old);return true;public static ConsumerFilterData build(// topicfinal String topic,// 消费组final String consumerGroup,// 过滤表达式final String expression,// 过滤类型final String type,// 客户端版本final long clientVersion) {if (ExpressionType.isTagType(type)) {return null;ConsumerFilterData consumerFilterData = new ConsumerFilterData();consumerFilterData.setTopic(topic);consumerFilterData.setConsumerGroup(consumerGroup);consumerFilterData.setBornTime(System.currentTimeMillis());consumerFilterData.setDeadTime(0);consumerFilterData.setExpression(expression);consumerFilterData.setExpressionType(type);consumerFilterData.setClientVersion(clientVersion);try {consumerFilterData.setCompiledExpression(FilterFactory.INSTANCE.get(type).compile(expression)} catch (Throwable e) {log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());return null;return consumerFilterData;

重新激活

这里重新激活就是将死亡时间设置为0,判断是否死亡就是死亡时间deadTime是否大于出生时间bornTime;

protected void reAlive(ConsumerFilterData filterData) {long oldDeadTime = filterData.getDeadTime();filterData.setDeadTime(0);log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);4、根据消费组取消注册public void unRegister(final String consumerGroup) {for (Entry entry : filterDataByTopic.entrySet()) {entry.getValue().unRegister(consumerGroup);

org.Apache.rocketmq.broker.filter.ConsumerFilterManager.FilterDataMapByTopic#unRegister

public void unRegister(String consumerGroup) {if (!this.groupFilterData.containsKey(consumerGroup)) {return;// 获取消费组对应的过滤数据ConsumerFilterData data = this.groupFilterData.get(consumerGroup);// 如果为空,或者已经死亡或者说已不可用了则直接返回if (data == null || data.isDead()) {return;long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);// 设置数据死亡时间data.setDeadTime(now);5、判断数据是否死亡

org.apache.rocketmq.broker.filter.ConsumerFilterData#isDead

public boolean isDead() {return this.deadTime >= this.bornTime;四、过滤原理

客户端向 Broker 端拉取消息时,Broker 从 commitlog、consumequeue 文件中拿到数据,接着会进行过滤,判断是否满足指定的条件;所以,过滤的工作是在于 DefaultMessageStore 对象中的 getMessage 方法,该方法入参中有这样的对象 MessageFilter;ExpressionMessageFilter实现其接口;

我们先看一下其接口的暴露的两个方法:

public interface MessageFilter {boolean isMatchedByConsumeQueue(final Long tagsCode,final ConsumeQueueExt.CqExtUnit cqExtUnit);boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,final Map properties);

从上面的两个方法中,不难猜测出其是对 consumequeue 以及 commitlog 进行过滤;

1、TAG 过滤

tag 过滤只针对 consumequeue 的,所以在 MessageFilter 接口的 isMatchedByCommitLog 是默认返回 true;

2、SQL92

在 isMatchedByConsumeQueue 方法中,并没有 SQL92 进行过滤,而是用 BloomFilter 进行过滤,可以理解为 BloomFilter 是 SQL92 的缓存过滤器。先通过 consumequeue 先过滤不符合的消息,然后在 isMatchedByCommitLog 严格过滤;

然而要使用这个布隆过滤器,需要打开相关的 RocketMQ 配置项才可以生效:

 

 

需要设置 enableConsumeQueueExt 为 true 开启拓展属性,这样子才能使用 BloomFilter 进行过滤;

话不多说,上代码,我们重点看 isMatchedByConsumeQueue 方法的 tag 模式过滤;

public class ExpressionMessageFilter implements MessageFilter {protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);protected final SubscriptionData subscriptionData;protected final ConsumerFilterData consumerFilterData;protected final ConsumerFilterManager consumerFilterManager;protected final boolean bloomDataValid;public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,ConsumerFilterManager consumerFilterManager) {this.subscriptionData = subscriptionData;this.consumerFilterData = consumerFilterData;this.consumerFilterManager = consumerFilterManager;if (consumerFilterData == null) {bloomDataValid = false;return;BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {bloomDataValid = true;} else {bloomDataValid = false;@Overridepublic boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {if (null == subscriptionData) {return true;if (subscriptionData.isClassFilterMode()) {return true;// by tags code.if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {// 如果consumequeue中没有tag,则返回true。能消费该消息if (tagsCode == null) {return true;// 如果订阅组中的subString等于*,则说明订阅组是不需要过滤的,返回true,能消费该消息;if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {return true;// 如果consumequeue中的tag在订阅组的codeSet中,则说明订阅组是能消费该消息的,返回true;否则返回false;return subscriptionData.getCodeSet().contains(tagsCode.intValue());} else {// no expression or no bloomif (consumerFilterData == null || consumerFilterData.getExpression() == null|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {return true;// message is before consumerif (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);return true;byte[] filterBitMap = cqExtUnit.getFilterBitMap();BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();if (filterBitMap == null || !this.bloomDataValid|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {return true;BitsArray bitsArray = null;try {bitsArray = BitsArray.create(filterBitMap);boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);return ret;} catch (Throwable e) {log.error("bloom filter error, sub=" + subscriptionData+ ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);return true;@Overridepublic boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map properties) {if (subscriptionData == null) {return true;if (subscriptionData.isClassFilterMode()) {return true;if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {return true;ConsumerFilterData realFilterData = this.consumerFilterData;Map tempProperties = properties;// no expressionif (realFilterData == null || realFilterData.getExpression() == null|| realFilterData.getCompiledExpression() == null) {return true;if (tempProperties == null && msgBuffer != null) {tempProperties = MessageDecoder.decodeProperties(msgBuffer);Object ret = null;try {MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);ret = realFilterData.getCompiledExpression().evaluate(context);} catch (Throwable e) {log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);if (ret == null || !(ret instanceof Boolean)) {return false;return (Boolean) ret;五、总结

  1. ConsumerFilterManager管理topic对应的各个消费组的过滤数据,提供了注册数据过滤对象,取消注册过滤数据对象等;
  2. ExpressionMessageFilter调用ConsumerFilterManager的BloomFilter组件中的isHit进行过滤;
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>