RocketMQ集群消費時隊列分配

RocketMQ集群消費時隊列分配

何時需要消息隊列
業(yè)務(wù)解耦
最終一致性
廣播
錯峰流控
RocketMQ的核心概念

詳情見于文檔

PushConsumer調(diào)用負載均衡的方法
    public void doRebalance() {

        if (this.rebalanceImpl != null && !this.pause) {

            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());

        }

    }

可以看出真正負載均衡的是rebalanceImpl這個成員變量的工作,在RebelanceImpl類中代碼為


public void doRebalance(final boolean isOrder) {

        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

        if (subTable != null) {

            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {

                final String topic = entry.getKey();

                try {

                    this.rebalanceByTopic(topic, isOrder);

                } catch (Exception e) {

                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

                        log.warn("rebalanceByTopic Exception", e);

                    }

                }

            }

        }



        this.truncateMessageQueueNotMyTopic();

    }

  1. 獲取訂閱關(guān)系,訂閱關(guān)系是在consumer.start()中從consumer中復(fù)制過來的,訂閱關(guān)系是以topic為key,tags組成的subscriptionData鹉勒。

  2. 第二步遍歷訂閱關(guān)系馋贤,調(diào)用rebalanceByTopc(topic,isOrder)方法,根據(jù)topic和isOrder進行負載均衡

  3. 遍歷結(jié)束制圈,調(diào)用truncateMessageQueueNotMyTopic()方法,去除不屬于當前consumer的topic對應(yīng)的消息隊列

分析rebalanceByTopic方法


private void rebalanceByTopic(final String topic, final boolean isOrder) {

        switch (messageModel) {

            case CLUSTERING: {

                //SD:集群模式下,從訂閱信息表中獲取topic對應(yīng)的所有消息隊列

                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

                //SD:根據(jù)topic和consumerGroupName獲取所有相關(guān)的consumerId

                //SD:此處如果同一個consumerGroupName下面的訂閱關(guān)系不一致的話,會導(dǎo)致消息消費失敗

                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

                if (null == mqSet) {

                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);

                    }

                }

                if (null == cidAll) {

                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);

                }

                if (mqSet != null && cidAll != null) {

                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();

                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);

                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;

                    //把所有的消息隊列按照配置的分配策略進行分配淹接,獲取當前consumer獲得的消息隊列

                    try {

                        allocateResult = strategy.allocate(//

                                this.consumerGroup, //

                                this.mQClientFactory.getClientId(), //

                                mqAll, //

                                cidAll);

                    } catch (Throwable e) {

                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),

                                e);

                        return;

                    }



                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();

                    if (allocateResult != null) {

                        allocateResultSet.addAll(allocateResult);

                    }

                    //根據(jù)負載均衡的結(jié)果更新處理隊列,consumer根據(jù)消息隊列拉取消息

                    //移除不屬于當前consumer的隊列或者距離上次拉取時間超過最大間隔的隊列

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

                    if (changed) {

                        log.info(

                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",

                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),

                                allocateResultSet.size(), allocateResultSet);

                        this.messageQueueChanged(topic, mqSet, allocateResultSet);

                    }

                }

                break;

            }

            default:

                break;

        }

    }

默認情況下叛溢,使用平均分配消息隊列的策略;
分析truncateMessageQueueNotMyTopic方法


private void truncateMessageQueueNotMyTopic() {

        //獲取從consumer處copy的訂閱關(guān)系

        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

        //遍歷自己的執(zhí)行隊列的消息隊列集合

        // 如果目標消息隊列的topic不存在當前的訂閱關(guān)系中塑悼,移除這個消息隊列

        for (MessageQueue mq : this.processQueueTable.keySet()) {

            if (!subTable.containsKey(mq.getTopic())) {



                ProcessQueue pq = this.processQueueTable.remove(mq);

                if (pq != null) {

                    pq.setDropped(true);

                    log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);

                }

            }

        }

    }

什么時候負載均衡

在consumer啟動的過程中,rebalanceImpl會從consumer處復(fù)制訂閱關(guān)系


public void start() throws MQClientException {

        switch (this.serviceState) {

            case CREATE_JUST:

                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),

                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

                this.serviceState = ServiceState.START_FAILED;



                this.checkConfig();

                //復(fù)制訂閱關(guān)系

                this.copySubscription();



                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {

                    this.defaultMQPushConsumer.changeInstanceNameToPID();

                }



                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);



                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());

                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());

                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);



                this.pullAPIWrapper = new PullAPIWrapper(//

                        mQClientFactory, //

                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());

                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);



                if (this.defaultMQPushConsumer.getOffsetStore() != null) {

                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();

                } else {

                    switch (this.defaultMQPushConsumer.getMessageModel()) {

                        case BROADCASTING:

                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

                            break;

                        case CLUSTERING:

                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

                            break;

                        default:

                            break;

                    }

                }

                this.offsetStore.load();



                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

                    this.consumeOrderly = true;

                    this.consumeMessageService =

                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {

                    this.consumeOrderly = false;

                    this.consumeMessageService =

                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

                }

                //啟動消費消息的服務(wù)

                this.consumeMessageService.start();



                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

                if (!registerOK) {

                    this.serviceState = ServiceState.CREATE_JUST;

                    this.consumeMessageService.shutdown();

                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()

                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

                            null);

                }

                //啟動工廠方法

                mQClientFactory.start();

                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());

                this.serviceState = ServiceState.RUNNING;

                break;

            case RUNNING:

            case START_FAILED:

            case SHUTDOWN_ALREADY:

                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "http://

                        + this.serviceState//

                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

                        null);

            default:

                break;

        }

        //當訂閱關(guān)系發(fā)生變化省楷掉,更新訂閱關(guān)系

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();



        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        //立即負載均衡

        this.mQClientFactory.rebalanceImmediately();

    }

工廠方法啟動邏輯


public void start() throws MQClientException {



        synchronized (this) {

            switch (this.serviceState) {

                case CREATE_JUST:

                    this.serviceState = ServiceState.START_FAILED;

                    // If not specified,looking address from name server

                    if (null == this.clientConfig.getNamesrvAddr()) {

                        this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());

                    }

                    // Start request-response channel

                    this.mQClientAPIImpl.start();

                    // Start various schedule tasks

                    this.startScheduledTask();

                    // Start pull service

                    this.pullMessageService.start();

                    // Start rebalance service

                    this.rebalanceService.start();

                    // Start push service

                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

                    log.info("the client factory [{}] start OK", this.clientId);

                    this.serviceState = ServiceState.RUNNING;

                    break;

                case RUNNING:

                    break;

                case SHUTDOWN_ALREADY:

                    break;

                case START_FAILED:

                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

                default:

                    break;

            }

        }

    }

rebalanceService是一個線程任務(wù)類,在線程任務(wù)中定時factory執(zhí)行調(diào)用負載均衡


    public void run() {

        log.info(this.getServiceName() + " service started");



        while (!this.isStoped()) {

            this.waitForRunning(WaitInterval);

            this.mqClientFactory.doRebalance();

        }



        log.info(this.getServiceName() + " service end");

    }

factory執(zhí)行負載均衡厢蒜,其實就是遍歷factory中的所有consumer,調(diào)用doRebalance()方法


    public void doRebalance() {

        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {

            MQConsumerInner impl = entry.getValue();

            if (impl != null) {

                try {

                    impl.doRebalance();

                } catch (Exception e) {

                    log.error("doRebalance exception", e);

                }

            }

        }

    }

AllocateMessageQueueStrategy 隊列分配策略

默認情況下PushConsumer的AllocateMessageQueueStrategy
    public DefaultMQPushConsumer() {

        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());

    }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市斑鸦,隨后出現(xiàn)的幾起案子愕贡,更是在濱河造成了極大的恐慌,老刑警劉巖巷屿,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件固以,死亡現(xiàn)場離奇詭異,居然都是意外死亡嘱巾,警方通過查閱死者的電腦和手機憨琳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來旬昭,“玉大人栽渴,你說我怎么就攤上這事∥壤粒” “怎么了闲擦?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長场梆。 經(jīng)常有香客問我墅冷,道長,這世上最難降的妖魔是什么或油? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任寞忿,我火速辦了婚禮,結(jié)果婚禮上顶岸,老公的妹妹穿的比我還像新娘腔彰。我一直安慰自己,他們只是感情好辖佣,可當我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布霹抛。 她就那樣靜靜地躺著,像睡著了一般卷谈。 火紅的嫁衣襯著肌膚如雪杯拐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天世蔗,我揣著相機與錄音端逼,去河邊找鬼。 笑死污淋,一個胖子當著我的面吹牛顶滩,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播寸爆,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼礁鲁,長吁一口氣:“原來是場噩夢啊……” “哼盐欺!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起救氯,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤找田,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后着憨,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體墩衙,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年甲抖,在試婚紗的時候發(fā)現(xiàn)自己被綠了漆改。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡准谚,死狀恐怖挫剑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情柱衔,我是刑警寧澤樊破,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站唆铐,受9級特大地震影響哲戚,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜艾岂,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一顺少、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧王浴,春花似錦脆炎、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至筛婉,卻和暖如春簇爆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背爽撒。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留响蓉,地道東北人硕勿。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像枫甲,于是被迫代替她去往敵國和親源武。 傳聞我的和親對象是個殘疾皇子扼褪,可洞房花燭夜當晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容