????????在使用RocketMQ時,發(fā)現(xiàn)同一個consumerGroup下的不同topic在初始化不同的DefaultMQPushConsumer時會相互覆蓋前面的topic導(dǎo)致先初始化的topic無法消費外盯,所以總結(jié)了以下場景下消費者的消費情況
假設(shè)現(xiàn)在有一個producer和兩個consumer交播,分別為consumer1和consumer2,consumer1和consumer2依據(jù)topic的個數(shù)初始化DefaultMQPushConsumer。
consumer1和consumer2配置相同的單個consumerGroup挂签,單個topic和tags
????????此種場景下,consumer1與consumer2能正常消費到producer發(fā)送的所有消息盼产,并遵循broker端的lb策略饵婆,consumer1與consumer2消費到的消息之和等于producer發(fā)送的消息數(shù)目?
consumer1和consumer2配置相同的單個consumerGroup,多個topic和tags
? ? ? ? 此種場景下,若客戶端初始化DefaultMQPushConsumer依據(jù)topic的數(shù)量進(jìn)行初始化侨核,則consumer1與consumer2與broker建立心跳鏈接時會相互覆蓋相同consumerGroup下的消費topic信息草穆,這樣會導(dǎo)致部分消息消費不了。
????????詳情見RecketMQ-同一個訂閱組下有多個Topic搓译,消息能發(fā)送到Topic中悲柱,但無法被監(jiān)聽到 - CSDN博客
? ? ? ? 若想保證消費同一consumerGroup下的多個topic和tag的消息,客戶端在初始化時需要依據(jù)consumerGroup的數(shù)量只初始化一個DefaultMQPushConsumer實例些己,之后在設(shè)置consumer訂閱的topic和tag時豌鸡,多次設(shè)置topic和tag的組合,即
? ??????this.consumer.subscribe(topicA, tagsA);? ? ? ??
????????this.consumer.subscribe(topicB, tagsB);
? ? ? ? ...
????????DefaultMQPushConsumerImpl中關(guān)于subscribe源碼實現(xiàn)? ? ? ??
?????????public void subscribe(String topic, String subExpression)throws MQClientException {? ? ? ??
????????????????try {
????????????????????????SubscriptionData subscriptionData = ????????????????????????FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
? ? ? ????????????????? this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
? ? ? ? ????????????????if (this.mQClientFactory !=null) {
????????????????????????????????this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
? ? ? ????????????????? }
????????????????}catch (Exception var4) {
????????????????????????throw new MQClientException("subscription exception", var4);
? ? ????????????}
????????}
????????由DefaultMQPushConsumerImpl中對subscribe的實現(xiàn)源碼可知段标,不同topic和對應(yīng)的tag以key-value的形式存在于subsCriptionInner中涯冠,該對象的類型為ConcurrentHashMap,也就意味著如果重復(fù)注冊相同topic逼庞,會覆蓋之前關(guān)于相同topic的訂閱蛇更,這點在使用單個DefaultMQPushConsumer要注意。
consumer1和consumer2配置相同的多個consumerGroup赛糟,topic和tags與consumerGroup對應(yīng)
? ? ? ? 此種情況相當(dāng)于多個情況1的實現(xiàn)派任,消費者consumer1和consumer2可以正確消費所有消息。
consumer1和consumer2配置相同的單個consumerGroup璧南,單個topic和多個tag
????????此種情況要說一下為啥RocketMQ對consumerGroup和topic都不追加s定義掌逛,只對tag用復(fù)數(shù)詞修飾。這是因為tags表示的意思為一個或多個tag的意思司倚,RocketMQ自身會根據(jù)tags字符串中的分隔符“|”來訂閱不同的tag颤诀。實際上在我們定義topic和tags時也就默認(rèn)了一對多的情況,并不用針對多個tag設(shè)置多個相同名字的topic对湃,這樣反而會出現(xiàn)情況2中的tag覆蓋崖叫,丟失部分消息。? ?
????????所以此種情況下與1情況相同拍柒,consumer1和consumer2能正確消費所有消息心傀。? ? ?
consumer1和consumer2配置不同的多個consumerGroup,相同的單個topic和tags
????????此種情況用于consumer1和consumer2需要同時消費所有producer發(fā)出的消息拆讯。consumer1和consumer2消費的消息之和等于producer生產(chǎn)的消息的2倍脂男,所有發(fā)出的消息都會同時出現(xiàn)在consumer1和consumer2的消費隊列中? ? ?