需求:
根據(jù)數(shù)據(jù)的特征發(fā)送同步至不同的consumer端蚕苇,比如說(shuō)某個(gè)message特征數(shù) 10,
有可能有的consumer消費(fèi)是消費(fèi)1-20凿叠,有的consumer消費(fèi)的數(shù)據(jù)特征為3-13這種不相同不固定的消費(fèi)范圍
這種需求我一下就感覺(jué)RocketMQ的SQL剛好合適涩笤,并且根據(jù)這種需求,當(dāng)然是使用廣播模式盒件,
然后我就看到有的文檔上寫著蹬碧,廣播模式的consumerGroup相當(dāng)于無(wú)效的,因?yàn)槭墙o所有consumer發(fā)送message
這時(shí)炒刁,我的想法是在同一個(gè)group中有著各自不相同的sql條件恩沽,每個(gè)consumer根據(jù)自己的限制條件進(jìn)行限制消費(fèi)。
關(guān)于pull還是push?
- push支持SQL限制tag切心,pull不能使用SQL92(官方文檔這么寫的)飒筑;
- 如果不需要這個(gè)SQL過(guò)濾,我們就可以使用pull
于是绽昏,我就有了以下的想法:(producer 生成10條測(cè)試數(shù)據(jù),coal值從0-9)
//第一種consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup1");
consumer.setNamesrvAddr("localhost:9876");
consumer.setInstanceName("consumer 1");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", MessageSelector.bySql("coal < 3"));
consumer.registerMessageListener(MessageListener.getInstance()); // 簡(jiǎn)單自定義打印
consumer.start();
//第二種consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup1");
consumer.setNamesrvAddr("localhost:9876");
consumer.setInstanceName("consumer 2");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", MessageSelector.bySql("coal >=2 and coal <= 7"));
consumer.registerMessageListener(MessageListener.getInstance()); // 簡(jiǎn)單自定義打印
consumer.start();
然后我就發(fā)現(xiàn):consumer并沒(méi)有按照我理想中的情況各自消費(fèi)
經(jīng)過(guò)多次測(cè)試俏脊,得到結(jié)論:
** 一個(gè)consumerGroup只能指定一種過(guò)濾條件(不管是Tag還是SQL全谤,都一樣)**
后生效的consumer的過(guò)濾條件會(huì)覆蓋之前congsumer的過(guò)濾條件
然后多次各種情況下測(cè)試,發(fā)現(xiàn)tag的“*”不太一樣爷贫。
tag的“*”不會(huì)覆蓋同組其他的tag认然,除了“*”其他tag都會(huì)在consumerGroup中覆蓋生效
我是這么理解:
同一個(gè)consumerGroup組补憾,
tag的“*”其實(shí)就是相當(dāng)于沒(méi)有tag.
詳細(xì)測(cè)試情況如下概述:
producer生產(chǎn)10條數(shù)據(jù),tag為0-9第一個(gè)生效的consumer1的tag是“*”
第二個(gè)生效的consumer2的tag是“1”
則consumer1和consumer2都只會(huì)消費(fèi)tag為“1”的message反過(guò)來(lái)卷员,第一個(gè)生效的consumer1的tag是“1”盈匾,第二個(gè)生效的consumer2的tag是“*”
則consumer1消費(fèi)tag為“1”的message,consumer2消費(fèi)所有的10條message
隨后我就想到了一種騷操作:
既然consumer的過(guò)濾條件會(huì)沖突毕骡,那么tag和sql之間會(huì)沖突嗎削饵?
代碼:
//第一種consumer
...
consumer.subscribe("TopicTest", MessageSelector.byTag("a"));
...
//第二種consumer
...
consumer.subscribe("TopicTest", MessageSelector.bySql("coal >=2 and coal <= 7"));
...
結(jié)論:
麻蛋,果然會(huì)沖突未巫。