RocketMq ConsumerGroup Tag對消費的影響

官方對consumer的定義如下:
Similar to previously mentioned producer group, consumers of the exactly same role are grouped together and named Consumer Group.
Consumer Group is a great concept with which achieving goals of load-balance and fault-tolerance, in terms of message consuming, is super easy.
Warning: consumer instances of a consumer group must have exactly the same topic subscription(s).
大意是消費者準(zhǔn)確按照相同角色來分組,分組的目的是負(fù)載均衡和失敗轉(zhuǎn)移妨托,并且警告同個分組中的消費者一定要訂閱相同的topic。
在代碼中看到consumerGroup變量定義注釋如下镀岛,大意差不多竣灌。
Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve load balance. It's required and needs to be globally unique.

通過上面的定義,可以準(zhǔn)確知道同一組的consumer一定要訂閱相同的topic,那么問題來了衩侥,訂閱的時候除了topic還有tags,這個tag會有影響嗎矛物?沒有找到相關(guān)資料茫死,我就自己做了測試。

instanceName groupName Topic Tag
A GroupA TopicA TagA
B GroupA TopicA TagB

測試目標(biāo):測試在相同的消費組中的消費者泽谨,訂閱相同的topic時璧榄,tag不同會不會影響消費和負(fù)載均衡特漩。
測試計劃:
1.創(chuàng)建生產(chǎn)者發(fā)送100條消息;
2.創(chuàng)建消費者

instanceName groupName Topic Tag
A GroupA TopicA TagA
B GroupA TopicA TagB

3.觀察消息消費情況和隊列分配情況骨杂。

發(fā)消息

public class SimpleProducer {
    public static void sendSync() throws Exception {
        ClientConfig clientConfig=new ClientConfig();
        clientConfig.setNamesrvAddr("localhost:9876");
        MQClientInstance clientInstance=MQClientManager.getInstance().getAndCreateMQClientInstance(clientConfig);
        DefaultMQProducer producer = clientInstance.getDefaultMQProducer();
        producer.setProducerGroup("GroupA");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicA", "TagA", ("Hello mq" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.println("send " + i + " , result:" + sendResult.getMsgId());
        }
        producer.shutdown();
    }
}

消費者

public class SimpleConsumer {
    public static void pushConsume(final String instanceName, final String group, final String topic, final String tag) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(topic, tag);
        consumer.setInstanceName(instanceName);
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("[" + instanceName + "," + group + "," + topic + "," + tag + "] consume: " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

測試

public static void main(String[] args) throws Exception {
        
        try {
            SimpleProducer.sendSync();
        } catch (Exception e) {
            e.printStackTrace();
        }

        Thread t2 = new Thread() {
            @Override
            public void run() {
                try {
                    SimpleConsumer.pushConsume("A", "GroupA", "TopicA", "TagA");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        t2.start();

        Thread t3 = new Thread() {
            @Override
            public void run() {
                try {
                    SimpleConsumer.pushConsume("B", "GroupA", "TopicA", "TagB");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        t3.start();


        t2.join();
        t3.join();
    }

結(jié)果:

....
send 97 , result:AC1100013B2118B4AAC2808264CC0061
send 98 , result:AC1100013B2118B4AAC2808264CD0062
send 99 , result:AC1100013B2118B4AAC2808264CE0063
[A,GroupA,TopicA,TagA] consume: Hello mq1
[A,GroupA,TopicA,TagA] consume: Hello mq2
[A,GroupA,TopicA,TagA] consume: Hello mq5
....

查看隊列分布情況:
消費者A

./bin/mqadmin consumerStatus -n "localhost:9876" -g "GroupA" -i "172.17.0.1@A"
#Consumer MQ Detail#
#Topic                            #Broker Name                      #QID  #ProcessQueueInfo   
%RETRY%GroupA                     mo-x                              0     ProcessQueueInfo [commitOffset=0, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=false, lastPullTimestamp=20180625231132580, lastConsumeTimestamp=20180625231129554]
TopicA                            mo-x                              0     ProcessQueueInfo [commitOffset=50, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=true, lastPullTimestamp=20180625231132552, lastConsumeTimestamp=20180625231129547]
TopicA                            mo-x                              1     ProcessQueueInfo [commitOffset=50, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=true, lastPullTimestamp=20180625231132572, lastConsumeTimestamp=20180625231129549]

消費者B

./bin/mqadmin consumerStatus -n "localhost:9876" -g "GroupA" -i "172.17.0.1@B"
#Consumer MQ Detail#
#Topic                            #Broker Name                      #QID  #ProcessQueueInfo   
TopicA                            mo-x                              2     ProcessQueueInfo [commitOffset=25, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231150500, droped=false, lastPullTimestamp=20180625231209016, lastConsumeTimestamp=20180625231149571]
TopicA                            mo-x                              3     ProcessQueueInfo [commitOffset=25, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231150500, droped=false, lastPullTimestamp=20180625231209016, lastConsumeTimestamp=20180625231149565]
mo@mo-x:~/rocket-mq$ ./bin/mqadmin consumerProgress -n localhost:9876 -g "GroupA"
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
#Topic                            #Broker Name                      #QID  #Broker Offset        #Consumer Offset      #Client IP           #Diff                 #LastTime
%RETRY%GroupA                     mo-x                              0     0                     0                     172.17.0.1           0                     1970-01-01 08:00:00
TopicA                            mo-x                              0     25                    25                    172.17.0.1           0                     2018-06-25 23:11:29
TopicA                            mo-x                              1     25                    25                    172.17.0.1           0                     2018-06-25 23:11:29
TopicA                            mo-x                              2     25                    25                    172.17.0.1           0                     2018-06-25 23:11:29
TopicA                            mo-x                              3     25                    25                    172.17.0.1           0                     2018-06-25 23:11:29

測試結(jié)果總結(jié):
1.生產(chǎn)者發(fā)送了100條TagA消息到TopicA
2.消費者A和消費者B都在GroupA中涂身,都訂閱TopicA
3.消費者A訂閱TagA,消費者B訂閱TagB
4.消費者A收到了部分消息
5.消費者A分配到了兩個GroupA-TopicA的隊列
6.消費者B分配到了兩個GroupA-TopicA的隊列

總結(jié):
Tag對同組同Topic的消費者有影響搓蚪,當(dāng)存在不同Tag的時候蛤售,會導(dǎo)致消費混亂,比如TagA的消息被TagB的消費者消費了妒潭。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悴能,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子雳灾,更是在濱河造成了極大的恐慌漠酿,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,029評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谎亩,死亡現(xiàn)場離奇詭異炒嘲,居然都是意外死亡,警方通過查閱死者的電腦和手機匈庭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,395評論 3 385
  • 文/潘曉璐 我一進店門夫凸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人阱持,你說我怎么就攤上這事夭拌。” “怎么了衷咽?”我有些...
    開封第一講書人閱讀 157,570評論 0 348
  • 文/不壞的土叔 我叫張陵鸽扁,是天一觀的道長。 經(jīng)常有香客問我镶骗,道長,這世上最難降的妖魔是什么卖词? 我笑而不...
    開封第一講書人閱讀 56,535評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮此蜈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘裆赵。我一直安慰自己东囚,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,650評論 6 386
  • 文/花漫 我一把揭開白布战授。 她就那樣靜靜地躺著桨嫁,像睡著了一般份帐。 火紅的嫁衣襯著肌膚如雪璃吧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,850評論 1 290
  • 那天废境,我揣著相機與錄音畜挨,去河邊找鬼。 笑死噩凹,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的驮宴。 我是一名探鬼主播,決...
    沈念sama閱讀 39,006評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼禀忆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起离熏,我...
    開封第一講書人閱讀 37,747評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎滋戳,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體奸鸯,經(jīng)...
    沈念sama閱讀 44,207評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,536評論 2 327
  • 正文 我和宋清朗相戀三年窗怒,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片扬虚。...
    茶點故事閱讀 38,683評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡球恤,死狀恐怖辜昵,靈堂內(nèi)的尸體忽然破棺而出咽斧,到底是詐尸還是另有隱情堪置,我是刑警寧澤,帶...
    沈念sama閱讀 34,342評論 4 330
  • 正文 年R本政府宣布岭洲,位于F島的核電站,受9級特大地震影響钦椭,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜彪腔,卻給世界環(huán)境...
    茶點故事閱讀 39,964評論 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望进栽。 院中可真熱鬧德挣,春花似錦、人聲如沸快毛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,772評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽唠帝。三九已至屯掖,卻和暖如春襟衰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瀑晒。 一陣腳步聲響...
    開封第一講書人閱讀 32,004評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留轩褐,地道東北人。 一個月前我還...
    沈念sama閱讀 46,401評論 2 360
  • 正文 我出身青樓把介,卻偏偏與公主長得像蟋座,于是被迫代替她去往敵國和親劳澄。 傳聞我的和親對象是個殘疾皇子蜈七,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,566評論 2 349

推薦閱讀更多精彩內(nèi)容