RocketMQ提供了2种消息过滤的方式:
SQL过滤默认是没有打开的,如果想要支持,必须在broker的配置文件中设置:enablePropertyFilter = true
一. 示例代码 1.1 producer 代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class Producer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer ("tag_p_g" ); producer.setNamesrvAddr("127.0.0.1:9876" ); producer.start(); String[] tags = {"TAG_A" , "TAG_B" , "TAG_C" }; for (int i = 0 ; i < 10 ; i++) { byte [] body = ("Hi filter message," + i).getBytes(); String tag = tags[i % tags.length]; Message msg = new Message ("MY_topic" , tag, body); msg.putUserProperty("age" , String.valueOf(i)); msg.putUserProperty("name" , "name" + (i + 1 )); msg.putUserProperty("isGender" , String.valueOf(new Random ().nextBoolean())); SendResult sendResult = producer.send(msg); System.out.println("sendResult = " + sendResult); } producer.shutdown(); } }
1.2 consumer 代码 1.2.1 TAG过滤 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Consumer { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("c_tag_group" ); consumer.setNamesrvAddr("127.0.0.1:9876" ); consumer.subscribe("MY_topic" , "TAG_A || TAG_C" ); consumer.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Filter Tag Consumer Started" ); } }
1.2.2 SQL92过滤 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class Consumer { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("cg" ); consumer.setNamesrvAddr(MQConstant.NAME_SERVER_ADDR); consumer.subscribe(MQConstant.FILTER_SQL_TOPIC, MessageSelector.bySql("(age between 6 and 9) AND (name IS NOT NULL) AND (isGender = TRUE)" )); consumer.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Filter SQL Consumer Started" ); } }
二. 说明 消费者去broker拉取消息时,先经过broker过滤一次,在经过消费者过滤一次
如果是 TAG 过滤。broker要先根据ConsumeQueue 中 Tag HashCode过滤一次,消费者在根据 Tag 值过滤一次。因为 ConsumeQueue 为了便于检索,文件中每一个条目都是定长20字节,所以条目在最后八个字节存储的是消息 Tag 的 HashCode,而不是hash值。这样broker在拉取磁盘中的消息时,只需要对比 ConsumeQueue中 的Tag HashCode,而不需要解析 CommitLog 中的 Tag 值,如果发生Hash冲突,则交给消费者客户端过滤消息中的Tag值。
如果是 SQL92 过滤。则全部由 broker 过滤。因为 SQL 过滤的是消息中的属性值,所以必须反序列化 CommitLog 中的属性值,既然在broker已经进行了精确匹配,那么客户端自然可以省去这个步骤了。
三. 消费者启动注册订阅信息到broker consumer订阅信息会保存到SubscriptionData
中,当consumer启动后,会通过心跳先将订阅信息发送到broker。broker主要是构建2部分:
保存consumer发送的订阅信息SubscriptionData
对象。
构建SQL过滤的ConsumerFilterData
对象。
那么我们看下consumer构建订阅数据以及发送到broker的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void subscribe (final String topic, final MessageSelector messageSelector) throws MQClientException { try { if (messageSelector == null ) { subscribe(topic, SubscriptionData.SUB_ALL); return ; } SubscriptionData subscriptionData = FilterAPI.build(topic, messageSelector.getExpression(), messageSelector.getExpressionType()); this .rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this .mQClientFactory != null ) { this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } catch (Exception e) { throw new MQClientException ("subscription exception" , e); } }
继续看FilterAPI.build(...)
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static SubscriptionData build (final String topic, final String subString, final String type) throws Exception { if (ExpressionType.TAG.equals(type) || type == null ) { return buildSubscriptionData(topic, subString); } if (subString == null || subString.length() < 1 ) { throw new IllegalArgumentException ("Expression can't be null! " + type); } SubscriptionData subscriptionData = new SubscriptionData (); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); subscriptionData.setExpressionType(type); return subscriptionData; } }
如果是TAG过滤,consumer会做些额外的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public static SubscriptionData buildSubscriptionData (final String consumerGroup, String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData (); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0 ) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|" ); if (tags.length > 0 ) { for (String tag : tags) { if (tag.length() > 0 ) { String trimString = tag.trim(); if (trimString.length() > 0 ) { subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception ("subString split error" ); } } return subscriptionData; }
这样consumer的订阅信息就准备好了,然后consumer启动,发送心跳数据:
1 2 3 4 5 6 7 8 9 10 public synchronized void start () throws MQClientException { this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }
我们再看下broker是如何处理心跳数据的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ClientManageProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.HEART_BEAT: return this .heartBeat(ctx, request); case RequestCode.UNREGISTER_CLIENT: return this .unregisterClient(ctx, request); case RequestCode.CHECK_CLIENT_CONFIG: return this .checkClientConfig(ctx, request); default : break ; } return null ; } }
heartBeat方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public RemotingCommand heartBeat (ChannelHandlerContext ctx, RemotingCommand request) { for (ConsumerData data : heartbeatData.getConsumerDataSet()) { SubscriptionGroupConfig subscriptionGroupConfig = this .brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName()); boolean changed = this .brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); } return response; }
继续往下走:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public boolean registerConsumer (final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { boolean r2 = consumerGroupInfo.updateSubscription(subList); this .consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); }
继续往里走:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public boolean register (final String topic, final String consumerGroup, final String expression, final String type, final long clientVersion) { if (ExpressionType.isTagType(type)) { return false ; } if (expression == null || expression.length() == 0 ) { return false ; } FilterDataMapByTopic filterDataMapByTopic = this .filterDataByTopic.get(topic); if (filterDataMapByTopic == null ) { FilterDataMapByTopic temp = new FilterDataMapByTopic (topic); FilterDataMapByTopic prev = this .filterDataByTopic.putIfAbsent(topic, temp); filterDataMapByTopic = prev != null ? prev : temp; } BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic); return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion); }
注册方法内部主要就是构建ConsumerFilterData
对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static ConsumerFilterData build (final String topic, final String consumerGroup, final String expression, final String type, final long clientVersion) { if (ExpressionType.isTagType(type)) { return null ; } ConsumerFilterData consumerFilterData = new ConsumerFilterData (); consumerFilterData.setTopic(topic); consumerFilterData.setConsumerGroup(consumerGroup); consumerFilterData.setBornTime(System.currentTimeMillis()); consumerFilterData.setDeadTime(0 ); consumerFilterData.setExpression(expression); consumerFilterData.setExpressionType(type); consumerFilterData.setClientVersion(clientVersion); try { consumerFilterData.setCompiledExpression( FilterFactory.INSTANCE.get(type).compile(expression) ); } catch (Throwable e) { log.error("parse error: expr={}, topic={}, group={}, error={}" , expression, topic, consumerGroup, e.getMessage()); return null ; } return consumerFilterData; }
最终工作的就是:
1 2 3 4 5 6 7 8 9 10 11 12 public class SqlFilter implements FilterSpi { @Override public Expression compile (final String expr) throws MQFilterException { return SelectorParser.parse(expr); } @Override public String ofType () { return ExpressionType.SQL92; } }
好了,到这里就铺垫好了,接下来我们继续看消息过滤的过程,这个过程中,上面的2个对象将会工作。
四. 拉取消息
broker处理拉取请求的处理器:PullMessageProcessor 方法内容比较多,还是关注和过滤相关的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 private RemotingCommand processRequest (final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); SubscriptionData subscriptionData = null ; ConsumerFilterData consumerFilterData = null ; if (hasSubscriptionFlag) { try { subscriptionData = FilterAPI.build( requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType() ); if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { consumerFilterData = ConsumerFilterManager.build( requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion() ); assert consumerFilterData != null ; } } catch (Exception e) { log.warn("Parse the consumer's subscription[{}] failed, group: {}" , requestHeader.getSubscription(), requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed" ); return response; } } else { ConsumerGroupInfo consumerGroupInfo = this .brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { consumerFilterData = this .brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup()); } } MessageFilter messageFilter; if (this .brokerController.getBrokerConfig().isFilterSupportRetry()) { messageFilter = new ExpressionForRetryMessageFilter (subscriptionData, consumerFilterData, this .brokerController.getConsumerFilterManager()); } else { messageFilter = new ExpressionMessageFilter (subscriptionData, consumerFilterData, this .brokerController.getConsumerFilterManager()); } final GetMessageResult getMessageResult = this .brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); }
接下来我们就看下从 CommitLog 读取消息并过滤的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public GetMessageResult getMessage (final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null , extRet ? cqExtUnit : null )) { if (getResult.getBufferTotalSize() == 0 ) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue ; } SelectMappedBufferResult selectResult = this .commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0 ) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this .commitLog.rollNextFile(offsetPy); continue ; } if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null )) { if (getResult.getBufferTotalSize() == 0 ) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } selectResult.release(); continue ; } }
主要就是做3件事:
在去 CommitLog 读取消息之前,先根据 TAG hashcode 过滤一次 ConsumeQueue 中的条目,如果ConsumeQueue中保存Tag HashCode与消费组需要消费Tag HashCode不一致,则不会读取CommitLog中的消息了。
broker先完成tag hashcode 过滤,consumer进一步完成tag 值过滤。
去 CommitLog 读取消息
从 CommitLog 读取出消息之后,如果是SQL过滤,则在broker完成过滤。
4.1 Broker完成 TAG HashCode 过滤 TAG 过滤就是ExpressionMessageFilter#isMatchedByConsumeQueue(..)
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public boolean isMatchedByConsumeQueue (Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { if (null == subscriptionData) { return true ; } if (subscriptionData.isClassFilterMode()) { return true ; } if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { if (tagsCode == null ) { return true ; } if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) { return true ; } return subscriptionData.getCodeSet().contains(tagsCode.intValue()); } else { } return true ; }
这个方法内部会完成TAG 的hashcode 过滤,不过这里只是TAG的初步过滤,因为两个不同TAG也可能有相同的hashcode,所以这里过滤并不完善,真正的TAG过滤是交给消费者来完成的。
4.2 Broker完成 SQL 过滤 SQL的过滤是在ExpressionMessageFilter#isMatchedByCommitLog(..)
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public boolean isMatchedByCommitLog (ByteBuffer msgBuffer, Map<String, String> properties) { if (subscriptionData == null ) { return true ; } if (subscriptionData.isClassFilterMode()) { return true ; } if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { return true ; } ConsumerFilterData realFilterData = this .consumerFilterData; Map<String, String> tempProperties = properties; Object ret = null ; try { MessageEvaluationContext context = new MessageEvaluationContext (tempProperties); ret = realFilterData.getCompiledExpression().evaluate(context); } catch (Throwable e) { log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e); } log.debug("Pull eval result: {}, {}, {}" , ret, realFilterData, tempProperties); if (ret == null || !(ret instanceof Boolean)) { return false ; } return (Boolean) ret; }
这里会根据SQL进行过滤,如果该条消息是消费者想要的,则将其放入容器中,返回给消费者,如果不是消费者想要的,则直接丢弃,继续查询下一条消息。
这里的丢弃只是不返回给消费者,在清除 CommitLog 文件之前,这条消息都是在的。
五. 消费消息 前面说了,如果是TAG 过滤,则Broker会率先完成一次TAG Hashcode过滤,但是这样过滤并不完全,因为不同TAG可能有相同Hashcode,所以消费者要根据TAG 值完成最后的过滤。
如果是SQL过滤,只能由Broker完成,消费者不做其他任何操作。
那么我们还是看消费者消费消息时的过滤逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void pullMessage (final PullRequest pullRequest) { PullCallback pullCallback = new PullCallback () { @Override public void onSuccess (PullResult pullResult) { if (pullResult != null ) { pullResult = DefaultMQPushConsumerImpl.this .pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); } } }
那么我们继续看下它的内部实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public PullResult processPullResult (final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this .updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); List<MessageExt> msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList <MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null ) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } pullResultExt.setMsgFoundList(msgListFilterAgain); } return pullResult; }
六. 总结
RocketMQ支持两种方式的消息过滤:TAG/SQL
要想使用SQL过滤,必须要在broker中配置:enablePropertyFilter = true
TAG 过滤分两个阶段完成:
第一阶段:broker率先根据tag的hashcode完成过滤
第二阶段:consumer根据tag值完成最后的过滤
SQL过滤只能在Broker中完成