關(guān)于RocketMQ消息消費(fèi)與重平衡的一些問(wèn)題探討

其實(shí)最好的學(xué)習(xí)方式就是互相交流晾嘶,最近也有跟網(wǎng)友討論了一些關(guān)于 RocketMQ 消息拉取與重平衡的問(wèn)題吃衅,我姑且在這里寫(xiě)下我的一些總結(jié)。

關(guān)于 push 模式下的消息循環(huán)拉取問(wèn)題

之前發(fā)表了一篇關(guān)于重平衡的文章:「Kafka 重平衡機(jī)制」由桌,里面有說(shuō)到 RocketMQ 重平衡機(jī)制是每隔 20s 從任意一個(gè) Broker 節(jié)點(diǎn)獲取消費(fèi)組的消費(fèi) ID 以及訂閱信息失球,再根據(jù)這些訂閱信息進(jìn)行分配,然后將分配到的信息封裝成 pullRequest 對(duì)象 pull 到 pullRequestQueue 隊(duì)列中霸奕,拉取線程喚醒后執(zhí)行拉取任務(wù)溜宽,流程圖如下:

[站外圖片上傳中...(image-a5d0cc-1573041585174)]

但是其中有一些是沒(méi)有詳細(xì)說(shuō)的,比如每次拉消息都要等 20s 嗎质帅?真的有個(gè)網(wǎng)友問(wèn)了我如下問(wèn)題:

[圖片上傳失敗...(image-10ed11-1573041585174)]

很顯然他的項(xiàng)目是用了 push 模式進(jìn)行消息拉取适揉,要回答這個(gè)問(wèn)題,就要從 RockeMQ 的消息拉取說(shuō)起:

RocketMQ 的 push 模式的實(shí)現(xiàn)是基于 pull 模式临梗,只不過(guò)在 pull 模式上套了一層涡扼,所以RocketMQ push 模式并不是真正意義上的 ”推模式“,因此盟庞,在 push 模式下吃沪,消費(fèi)者拉取完消息后,立馬就有開(kāi)始下一個(gè)拉取任務(wù)什猖,并不會(huì)真的等 20s 重平衡后才拉取票彪,至于 push 模式是怎么實(shí)現(xiàn)的,那就從源碼去找答案不狮。

之前有寫(xiě)過(guò)一篇文章:「RocketMQ為什么要保證訂閱關(guān)系的一致性降铸?」,里面有說(shuō)過(guò) 消息拉取是從 PullRequestQueue 阻塞隊(duì)列中取出 PullRequest 拉取任務(wù)進(jìn)行消息拉取的摇零,但 PullRequest 是怎么放進(jìn) PullRequestQueue 阻塞隊(duì)列中的呢推掸?

RocketMQ 一共提供了以下方法:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:

public void executePullRequestImmediately(final PullRequest pullRequest) {
  try {
    this.pullRequestQueue.put(pullRequest);
  } catch (InterruptedException e) {
    log.error("executePullRequestImmediately pullRequestQueue.put", e);
  }
}

從調(diào)用鏈發(fā)現(xiàn),除了重平衡會(huì)調(diào)用該方法之外驻仅,在 push 模式下谅畅,PullCallback 回調(diào)對(duì)象中的 onSuccess 方法在消息消費(fèi)時(shí),也調(diào)用了該方法:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

case FOUND:

// 如果本次拉取消息為空噪服,則繼續(xù)將pullRequest放入阻塞隊(duì)列中
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
  // 將消息放入消費(fèi)者消費(fèi)線程去執(zhí)行
  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
    pullResult.getMsgFoundList(), //
    processQueue, //
    pullRequest.getMessageQueue(), //
    dispathToConsume);
  // 將pullRequest放入阻塞隊(duì)列中
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);  
}

當(dāng)從 broker 拉取到消息后毡泻,如果消息被過(guò)濾掉,則繼續(xù)將pullRequest放入阻塞隊(duì)列中繼續(xù)循環(huán)執(zhí)行消息拉取任務(wù)粘优,否則將消息放入消費(fèi)者消費(fèi)線程去執(zhí)行仇味,在pullRequest放入阻塞隊(duì)列中。

case NO_NEW_MESSAGE:

case NO_MATCHED_MSG:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

如果從 broker 端沒(méi)有可拉取的新消息或者沒(méi)有匹配到消息雹顺,則將pullRequest放入阻塞隊(duì)列中繼續(xù)循環(huán)執(zhí)行消息拉取任務(wù)丹墨。

從以上消息消費(fèi)邏輯可以看出,當(dāng)消息處理完后嬉愧,立即將 pullRequest 重新放入阻塞隊(duì)列中带到,因此這就很好解釋為什么 push 模式可以持續(xù)拉取消息了:

在 push 模式下消息消費(fèi)完后,還會(huì)調(diào)用該方法重新將 PullRequest 對(duì)象放進(jìn) PullRequestQueue 阻塞隊(duì)列中,不斷地從 broker 中拉取消息揽惹,實(shí)現(xiàn) push 效果被饿。

重平衡后隊(duì)列被其它消費(fèi)者分配后如何處理?

繼續(xù)再想一個(gè)問(wèn)題搪搏,如果重平衡后狭握,發(fā)現(xiàn)某個(gè)隊(duì)列被新的消費(fèi)者分配了,怎么辦疯溺,總不能繼續(xù)從該隊(duì)列中拉取消息吧论颅?

RocketMQ 重平衡后會(huì)檢查 pullRequest 是否還在新分配的列表中,如果不在囱嫩,則丟棄恃疯,調(diào)用 isDrop() 可查出該pullRequest是否已丟棄:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
  log.info("the pull request[{}] is dropped.", pullRequest.toString());
  return;
}

在消息拉取之前,首先判斷該隊(duì)列是否被丟棄墨闲,如果已丟棄今妄,則直接放棄本次拉取任務(wù)。

那什么時(shí)候隊(duì)列被丟棄呢鸳碧?

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
  Entry<MessageQueue, ProcessQueue> next = it.next();
  MessageQueue mq = next.getKey();
  ProcessQueue pq = next.getValue();

  if (mq.getTopic().equals(topic)) {
    // 判斷當(dāng)前緩存 MessageQueue 是否包含在最新的 mqSet 中盾鳞,如果不存在則將隊(duì)列丟棄
    if (!mqSet.contains(mq)) {
      pq.setDropped(true);
      if (this.removeUnnecessaryMessageQueue(mq, pq)) {
        it.remove();
        changed = true;
        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
      }
    } else if (pq.isPullExpired()) {
      // 如果隊(duì)列拉取過(guò)期則丟棄
      switch (this.consumeType()) {
        case CONSUME_ACTIVELY:
          break;
        case CONSUME_PASSIVELY:
          pq.setDropped(true);
          if (this.removeUnnecessaryMessageQueue(mq, pq)) {
            it.remove();
            changed = true;
            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                      consumerGroup, mq);
          }
          break;
        default:
          break;
      }
    }
  }
}

updateProcessQueueTableInRebalance 方法在重平衡時(shí)執(zhí)行,用于更新 processQueueTable瞻离,它是當(dāng)前消費(fèi)者的隊(duì)列緩存列表腾仅,以上方法邏輯判斷當(dāng)前緩存 MessageQueue 是否包含在最新的 mqSet 中,如果不包含其中套利,則說(shuō)明經(jīng)過(guò)這次重平衡后推励,該隊(duì)列被分配給其它消費(fèi)者了,或者拉取時(shí)間間隔太大過(guò)期了肉迫,則調(diào)用 setDropped(true) 方法將隊(duì)列置為丟棄狀態(tài)验辞。

可能你會(huì)問(wèn),processQueueTable 跟 pullRequest 里面 processQueue 有什么關(guān)聯(lián)昂拂,往下看:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

// 新建 ProcessQueue 
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
  // 將ProcessQueue放入processQueueTable中
  ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  if (pre != null) {
    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  } else {
    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    PullRequest pullRequest = new PullRequest();
    pullRequest.setConsumerGroup(consumerGroup);
    pullRequest.setNextOffset(nextOffset);
    pullRequest.setMessageQueue(mq);
    // 將ProcessQueue放入pullRequest拉取任務(wù)對(duì)象中
    pullRequest.setProcessQueue(pq);
    pullRequestList.add(pullRequest);
    changed = true;
  }
}

可以看出受神,重平衡時(shí)會(huì)創(chuàng)建 ProcessQueue 對(duì)象抛猖,將其放入 processQueueTable 緩存隊(duì)列表中格侯,再將其放入 pullRequest 拉取任務(wù)對(duì)象中,也就是 processQueueTable 中的 ProcessQueue 與 pullRequest 的中 ProcessQueue 是同一個(gè)對(duì)象财著。

重平衡后會(huì)導(dǎo)致消息重復(fù)消費(fèi)嗎联四?

之前在群里有個(gè)網(wǎng)友提了這個(gè)問(wèn)題:

[站外圖片上傳中...(image-22b929-1573041585174)]

我當(dāng)時(shí)回答他 RocketMQ 正常也是沒(méi)有重復(fù)消費(fèi),但后來(lái)發(fā)現(xiàn)其實(shí) RocketMQ 在某些情況下撑教,也是會(huì)出現(xiàn)消息重復(fù)消費(fèi)的現(xiàn)象朝墩。

前面講到,RocketMQ 消息消費(fèi)時(shí)伟姐,會(huì)將消息放進(jìn)消費(fèi)線程中去執(zhí)行收苏,代碼如下:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
  pullResult.getMsgFoundList(), //
  processQueue, //
  pullRequest.getMessageQueue(), //
  dispathToConsume);

ConsumeMessageService 類實(shí)現(xiàn)消息消費(fèi)的邏輯亿卤,它有兩個(gè)實(shí)現(xiàn)類:

// 并發(fā)消息消費(fèi)邏輯實(shí)現(xiàn)類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 順序消息消費(fèi)邏輯實(shí)現(xiàn)類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

先看并發(fā)消息消費(fèi)相關(guān)處理邏輯:

ConsumeMessageConcurrentlyService:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:

if (this.processQueue.isDropped()) {
  log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  return;
}

// 消息消費(fèi)邏輯
// ...

// 如果隊(duì)列被設(shè)置為丟棄狀態(tài),則不提交消息消費(fèi)進(jìn)度
if (!processQueue.isDropped()) {
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}

ConsumeRequest 是一個(gè)繼承了 Runnable 的類鹿霸,它是消息消費(fèi)核心邏輯的實(shí)現(xiàn)類排吴,submitConsumeRequest 方法將 ConsumeRequest 放入 消費(fèi)線程池中執(zhí)行消息消費(fèi),從它的 run 方法中可看出懦鼠,如果在執(zhí)行消息消費(fèi)邏輯中有節(jié)點(diǎn)加入钻哩,重平衡后該隊(duì)列被分配給其它節(jié)點(diǎn)進(jìn)行消費(fèi)了,此時(shí)的隊(duì)列被丟棄肛冶,則不提交消息消費(fèi)進(jìn)度街氢,因?yàn)橹耙呀?jīng)消費(fèi)了,此時(shí)就會(huì)造成消息重復(fù)消費(fèi)的情況睦袖。

再來(lái)看看順序消費(fèi)相關(guān)處理邏輯:

ConsumeMessageOrderlyService:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:

public void run() {
  // 判斷隊(duì)列是否被丟棄
  if (this.processQueue.isDropped()) {
    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
    return;
  }

  final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
  synchronized (objLock) {
    // 如果不是廣播模式珊肃,且隊(duì)列已加鎖且鎖沒(méi)有過(guò)期
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
      final long beginTime = System.currentTimeMillis();
      for (boolean continueConsume = true; continueConsume; ) {
        // 再次判斷隊(duì)列是否被丟棄
        if (this.processQueue.isDropped()) {
          log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
          break;
        }
        
        // 消息消費(fèi)處理邏輯
        // ...
        
          continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
        } else {
          continueConsume = false;
        }
      }
    } else {
      if (this.processQueue.isDropped()) {
        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
      }
      ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
    }
  }
}

RocketMQ 順序消息消費(fèi)會(huì)將隊(duì)列鎖定,當(dāng)隊(duì)列獲取鎖之后才能進(jìn)行消費(fèi)扣泊,所以近范,即使消息在消費(fèi)過(guò)程中有節(jié)點(diǎn)加入,重平衡后該隊(duì)列被分配給其它節(jié)點(diǎn)進(jìn)行消費(fèi)了延蟹,此時(shí)的隊(duì)列被丟棄评矩,依然不會(huì)造成重復(fù)消費(fèi)。

image.png

公眾號(hào)「后端進(jìn)階」阱飘,專注后端技術(shù)分享:Java斥杜、Golang、WEB框架沥匈、分布式中間件蔗喂、服務(wù)治理等等。
關(guān)注公眾號(hào)回復(fù)關(guān)鍵字「后端」免費(fèi)領(lǐng)取后端開(kāi)發(fā)大禮包高帖!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末缰儿,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子散址,更是在濱河造成了極大的恐慌乖阵,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,270評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件预麸,死亡現(xiàn)場(chǎng)離奇詭異瞪浸,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)吏祸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門对蒲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人,你說(shuō)我怎么就攤上這事蹈矮∨槁撸” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,630評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵泛鸟,是天一觀的道長(zhǎng)诱渤。 經(jīng)常有香客問(wèn)我,道長(zhǎng)谈况,這世上最難降的妖魔是什么勺美? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,906評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮碑韵,結(jié)果婚禮上赡茸,老公的妹妹穿的比我還像新娘。我一直安慰自己祝闻,他們只是感情好占卧,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著联喘,像睡著了一般华蜒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上豁遭,一...
    開(kāi)封第一講書(shū)人閱讀 51,718評(píng)論 1 305
  • 那天叭喜,我揣著相機(jī)與錄音,去河邊找鬼蓖谢。 笑死捂蕴,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的闪幽。 我是一名探鬼主播啥辨,決...
    沈念sama閱讀 40,442評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼盯腌!你這毒婦竟也來(lái)了溉知?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,345評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤腕够,失蹤者是張志新(化名)和其女友劉穎级乍,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體燕少,經(jīng)...
    沈念sama閱讀 45,802評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡卡者,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評(píng)論 3 337
  • 正文 我和宋清朗相戀三年蒿囤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了客们。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,117評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖底挫,靈堂內(nèi)的尸體忽然破棺而出恒傻,到底是詐尸還是另有隱情,我是刑警寧澤建邓,帶...
    沈念sama閱讀 35,810評(píng)論 5 346
  • 正文 年R本政府宣布盈厘,位于F島的核電站,受9級(jí)特大地震影響官边,放射性物質(zhì)發(fā)生泄漏沸手。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評(píng)論 3 331
  • 文/蒙蒙 一注簿、第九天 我趴在偏房一處隱蔽的房頂上張望契吉。 院中可真熱鬧,春花似錦诡渴、人聲如沸捐晶。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,011評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)惑灵。三九已至,卻和暖如春眼耀,著一層夾襖步出監(jiān)牢的瞬間英支,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,139評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工哮伟, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留潭辈,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,377評(píng)論 3 373
  • 正文 我出身青樓澈吨,卻偏偏與公主長(zhǎng)得像把敢,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子谅辣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容