rocketmq源碼系列(3)-topic與group和tag之間的關(guān)系

概述
rocketmq原理想必大家都有了解了,網(wǎng)上也有很多博客和資料講述的很詳細(xì)支竹。本章主要是想講一講rocketmq中topic字逗、group、tag之間的關(guān)系醒陆。

我一般喜歡帶著問題去查看源碼從而驗(yàn)證問題的結(jié)論瀑构。所以先來看看我的問題:

1、在consumer訂閱消息中中允許topic刨摩、tag相同寺晌、group不同的消費(fèi)者同時(shí)消費(fèi)消息嗎?

2澡刹、在consumer訂閱消息中允許group呻征、tag相同、topic不同的消費(fèi)者同時(shí)消費(fèi)消息嗎罢浇?

3陆赋、在consumer訂閱消息中允許group、topic相同嚷闭、tag不同的消費(fèi)者同時(shí)消費(fèi)消息嗎攒岛?

想要知道上面的問題“蹋看看rocketmq中是如何實(shí)現(xiàn)訂閱關(guān)系的吧灾锯。

1、訂閱關(guān)系核心管理類方法:ConsumerManager#registerConsumer


    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        //1嗅榕、獲取consumer組信息
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }
        //2顺饮、更新消費(fèi)客戶端ip信息
        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        //3、更新消費(fèi)端訂閱的topic凌那、tag等信息
        boolean r2 = consumerGroupInfo.updateSubscription(subList);

        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }

        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

        return r1 || r2;
    }

說明:consumerTable是一個(gè)Map類型變量兼雄,存放的是所有消費(fèi)組信息,key存放的是groupName案怯,value存放的是ConsumerGroupInfo組信息君旦,我們再繼續(xù)往下看consumerGroupInfo.updateSubscription方法澎办,該方法是更新消費(fèi)組信息的核心方法嘲碱。

    public boolean updateSubscription(final Set<SubscriptionData> subList) {
        boolean updated = false;
        //循環(huán)consumer訂閱信息
        for (SubscriptionData sub : subList) {
            //根據(jù)topic獲取訂閱信息
            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
                //如果訂閱信息不存在金砍,則直接新增
                if (old == null) {
                SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
                if (null == prev) {
                    updated = true;
                    log.info("subscription changed, add new topic, group: {} {}",
                        this.groupName,
                        sub.toString());
                }
            } 
            //如果訂閱信息存在,判斷新的訂閱信息版本高于老的訂閱信息(一般都是高于)
          else if (sub.getSubVersion() > old.getSubVersion()) {
                if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                    log.info("subscription changed, group: {} OLD: {} NEW: {}",
                        this.groupName,
                        old.toString(),
                        sub.toString()
                    );
                }
                //新的訂閱信息覆蓋老的訂閱信息麦锯,這里可以看出來恕稠,同一個(gè)group和topic的情況下,tag不同扶欣,      
                //也會被覆蓋掉鹅巍,所以問題3的答案有了
                this.subscriptionTable.put(sub.getTopic(), sub);
            }
        }

        Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
        //這里循環(huán)判斷subscriptionTable 與本次注冊進(jìn)來的subList比較
        while (it.hasNext()) {
            Entry<String, SubscriptionData> next = it.next();
            String oldTopic = next.getKey();

            boolean exist = false;
            for (SubscriptionData sub : subList) {
                //判斷當(dāng)前subscriptionTable中所有topic訂閱信息是否都在新注冊的列表中
                if (sub.getTopic().equals(oldTopic)) {
                    exist = true;
                    break;
                }
            }
            
            if (!exist) {
                log.warn("subscription changed, group: {} remove topic {} {}",
                    this.groupName,
                    oldTopic,
                    next.getValue().toString()
                );
                //如果不存在,則刪除group對應(yīng)的topic訂閱信息料祠,
                it.remove();
                updated = true;
            }
        }

        this.lastUpdateTimestamp = System.currentTimeMillis();

        return updated;
    }

現(xiàn)在我們來解答前面提的三個(gè)問題:
1骆捧、在consumer訂閱消息中中允許topic、tag相同髓绽、group不同的消費(fèi)者同時(shí)消費(fèi)消息嗎敛苇?
答:可以。因?yàn)樵贑onsumerManage#consumerTable 中是以groupName為key的顺呕,每個(gè)groupName對應(yīng)的ConsumerGroupInfo相互隔離的枫攀。
2、在consumer訂閱消息中允許group株茶、tag相同来涨、topic不同的消費(fèi)者同時(shí)消費(fèi)消息嗎?
答:不可以启盛。如下場景:
現(xiàn)有消費(fèi)客戶端consumer1蹦掐,consumer2,topic1僵闯,topic2笤闯,group:group1,tag:tag1
consumer1訂閱topic1棍厂,group1颗味,tag1的訂閱信息
consumer2訂閱topic2,group1牺弹,tag1的訂閱信息
步驟1:consumer1注冊consumerGroupInfo信息調(diào)用updateSubscription方法更新subscriptionTableMap信息
步驟2:consumer2注冊consumerGroupInfo信息調(diào)用updateSubscription方法時(shí)浦马,如上述源碼所示,因?yàn)閏onsumer2只訂閱了topic2张漂,所以consumer1訂閱的topic1訂閱信息會被刪除掉晶默。
3、在consumer訂閱消息中允許group航攒、topic相同磺陡、tag不同的消費(fèi)者同時(shí)消費(fèi)消息嗎?
答:不可以,如下場景:
現(xiàn)有消費(fèi)客戶端consumer1币他,consumer2坞靶,topic1,group:group1蝴悉,tag:tag1彰阴,tag2
步驟1:consumer1訂閱信息為:topic1,group1拍冠,tag1
步驟2:consumer2訂閱信息為:topic1尿这,group1,tag2庆杜,此時(shí)會更新訂閱信息的時(shí)候會拿consumer2的訂閱信息覆蓋掉consumer1的訂閱信息射众,具體代碼請參考ConsumerGroupInfo#updateSubscription方法

那么。晃财。责球。product中的group是用來干嘛的?ConsumerManage中的consumerGroupInfo信息從哪來的拓劝?
請聽下回分解雏逾。rocketmq源碼系列(4)-consumer啟動過程的那些事

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市郑临,隨后出現(xiàn)的幾起案子栖博,更是在濱河造成了極大的恐慌,老刑警劉巖厢洞,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仇让,死亡現(xiàn)場離奇詭異,居然都是意外死亡躺翻,警方通過查閱死者的電腦和手機(jī)丧叽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來公你,“玉大人踊淳,你說我怎么就攤上這事∩驴浚” “怎么了迂尝?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長剪芥。 經(jīng)常有香客問我垄开,道長,這世上最難降的妖魔是什么税肪? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任溉躲,我火速辦了婚禮榜田,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘锻梳。我一直安慰自己箭券,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布唱蒸。 她就那樣靜靜地躺著,像睡著了一般灸叼。 火紅的嫁衣襯著肌膚如雪神汹。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天古今,我揣著相機(jī)與錄音屁魏,去河邊找鬼。 笑死捉腥,一個(gè)胖子當(dāng)著我的面吹牛氓拼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播抵碟,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼桃漾,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了拟逮?” 一聲冷哼從身側(cè)響起撬统,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎敦迄,沒想到半個(gè)月后恋追,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡罚屋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年苦囱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片脾猛。...
    茶點(diǎn)故事閱讀 39,965評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡撕彤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出猛拴,到底是詐尸還是另有隱情喉刘,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布漆弄,位于F島的核電站睦裳,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏撼唾。R本人自食惡果不足惜廉邑,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蛛蒙,春花似錦糙箍、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至诺苹,卻和暖如春咕晋,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背收奔。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工掌呜, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人坪哄。 一個(gè)月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓质蕉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親翩肌。 傳聞我的和親對象是個(gè)殘疾皇子模暗,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 參考資料: https://blog.csdn.net/adaihao_/article/details/5429...
    ASD_92f7閱讀 3,306評論 0 0
  • 背景介紹 Kafka簡介 Kafka是一種分布式的汰蓉,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O...
    高廣超閱讀 12,833評論 8 167
  • 目標(biāo) 高吞吐量來支持高容量的事件流處理 支持從離線系統(tǒng)加載數(shù)據(jù) 低延遲的消息系統(tǒng) 持久化 依賴文件系統(tǒng)棒卷,持久化到本...
    jiangmo閱讀 1,284評論 0 4
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,469評論 0 34
  • 摘自Jason’s Blog顾孽,原文鏈接http://www.jasongj.com/2015/01/02/Kafk...
    誰動了MyWorld閱讀 425評論 0 5