系列
說到rocketMq的consumer撤缴,該篇文章特指pushConsumer垂蜗,pullConsumer在后續(xù)文章中在分享讲仰。
提到consumer抗斤,需要搞清楚幾個核心問題登钥,分別是consumer的初始化過程做了哪些事情,消息是如何消費(fèi)吭敢,consumer如何動態(tài)平衡的埂奈,整個邏輯還是比較繞的,其中這章節(jié)主要會講清楚兩個事情奈应,1澜掩、初始化過程中client做了哪些事情;2杖挣、consumer如何動態(tài)平衡拉取任務(wù)肩榕,具體的任務(wù)消費(fèi)會由額外的一章進(jìn)行講解。
consumer的初始化過程
說明:整體執(zhí)行過程如下惩妇,著重介紹subscribe和start兩個過程
? ? 1株汉、創(chuàng)建consumer并設(shè)置消費(fèi)分組
? ? 2、設(shè)置消費(fèi)位移
? ? 3歌殃、設(shè)置訂閱topic
? ? 4乔妈、設(shè)置消費(fèi)執(zhí)行的回調(diào)函數(shù)
? ? 5、啟動consumer
consumer的初始化流程圖
consumer內(nèi)部初始化過程
說明:整個初始化比較復(fù)雜氓皱,為了大家能夠理解路召,先用簡單的語句概述一遍
? ? 1、構(gòu)建consumer的訂閱信息波材,包括consumer本身的訂閱和消費(fèi)分組的重試隊列股淡。
? ? 2、創(chuàng)建Rebalance服務(wù)廷区,該服務(wù)每隔20s進(jìn)行消費(fèi)端負(fù)責(zé)的messageQueue的消費(fèi)唯灵。
? ? 3、啟動消費(fèi)偏移量獲取服務(wù)隙轻,獲取上一次消費(fèi)位移埠帕。
? ? 4垢揩、啟動定時任務(wù),其中核心任務(wù)之一是定時去namesrv拉取broker信息敛瓷。
? ? 5叁巨、啟動pullMessageService,負(fù)責(zé)從broker拉取待消費(fèi)消息
? ? 6琐驴、啟動rebalanceService俘种,負(fù)責(zé)定期調(diào)整consumer端負(fù)載均衡包括第一次觸發(fā)拉取任務(wù)
? ? 7秤标、其中rebalanceService和pullMessageService相互配合使用绝淡,前者負(fù)責(zé)將新加入messageQueue拉取任務(wù)加入到pullMessageservice當(dāng)中,將舊的messageQueue的拉取任務(wù)從pullMessageService中停止苍姜,兩者之間通過消息隊列的形式進(jìn)行通信牢酵。
構(gòu)建subscription過程
說明:參見DefaultMQPushConsumerImpl類
? ? 1、訂閱消息最后保存至RebalanceImpl當(dāng)中衙猪,因為這個是后面動態(tài)負(fù)載均衡的核心馍乙。
client端啟動過程
說明:參見DefaultMQPushConsumerImpl類
說明:參見DefaultMQPushConsumerImpl類
? ? 1、啟動了獲取消費(fèi)進(jìn)度的服務(wù)
說明:參見MQClientInstance類
? ? 1垫释、啟動定時任務(wù)丝格,主要是從namsrv中拉取broker的信息
? ? 2、啟動client從broker拉取消息的服務(wù)
? ? 3棵譬、啟動Rebalance服務(wù)显蝌,負(fù)責(zé)觸發(fā)消息拉取的任務(wù)
? ? 4、步驟3和步驟4之間的兩個服務(wù)通過消息隊列通信
說明:參見MQClientInstance類
? ? 1订咸、負(fù)責(zé)從namesrv拉取broker的信息
拉取任務(wù)的執(zhí)行過程
說明:參見PullMessageService類
? ? 1曼尊、負(fù)責(zé)從pullRequestQueue中獲取拉取任務(wù)并執(zhí)行,該任務(wù)由Rebalance服務(wù)投遞
拉取任務(wù)的生成過程
說明:參見RebalanceService類
? ? 1脏嚷、consumer端負(fù)載均衡的入口
說明:參見MQClientInstance類
? ? 1骆撇、每個consumer客戶端只會有一個對象,所以這里for循環(huán)只有一次父叙。
說明:參見MQClientInstance類
1神郊、針對每個訂閱信息都進(jìn)行動態(tài)負(fù)責(zé)均衡,包括consumer本身的訂閱分組和consumerGroup的重試分組趾唱。
說明:參見RebalanceImpl類
? ? 1屿岂、動態(tài)負(fù)載均衡就是一個topic下所有的messageQueue和消費(fèi)分組里面的消費(fèi)者按照一定的動態(tài)調(diào)整策略進(jìn)行分配,同一個消費(fèi)分組里面的消費(fèi)者每人負(fù)責(zé)一部分的messageQueue鲸匿。
說明:參見RebalanceImpl類
? ? 1爷怀、consumer新負(fù)責(zé)的messageQueue加入到拉取任務(wù)當(dāng)中來
? ? ? ? 2、consumer不負(fù)責(zé)的messageQueue從拉取任務(wù)中剔除带欢。
說明:參見PullMessageService類
說明:參見PullMessageService類
訂閱重試隊列邏輯
說明:
? ? 核心代碼邏輯运授,這個表明了consumer訂閱了重試隊列并對重試隊列進(jìn)行消費(fèi)烤惊。