RocketMQ是一款 分布式、隊(duì)列模型的消息中間件宪肖,由阿里巴巴團(tuán)隊(duì)研發(fā)表制,借鑒參考了JMS規(guī)范的MQ實(shí)現(xiàn),更參考了優(yōu)秀的開源消息中間件KAFKA控乾,并結(jié)合阿里實(shí)際業(yè)務(wù)需求么介,在天貓雙十一的場景,實(shí)現(xiàn)業(yè)務(wù)消峰蜕衡、分布式事物的優(yōu)秀框架壤短。
其具有以下特點(diǎn):
1.能夠保證嚴(yán)格的消息順序
2.提供豐富的消息拉取模式
3.高效的訂閱者水平擴(kuò)展能力
4.實(shí)時(shí)的消息訂閱機(jī)制
5.億級消息堆積能力
RocketMQ的順序消息需要滿足2點(diǎn):
1.Producer端保證發(fā)送消息有序,且發(fā)送到同一個(gè)隊(duì)列。
2.consumer端保證消費(fèi)同一個(gè)隊(duì)列久脯。
如何在集群消費(fèi)時(shí)保證消費(fèi)的有序呢纳胧?
1.ConsumeMessageOrderlyService類的start()方法,如果是集群消費(fèi)帘撰,則啟動(dòng)定時(shí)任務(wù)跑慕,定時(shí)向broker發(fā)送批量鎖住當(dāng)前正在消費(fèi)的隊(duì)列集合的消息,具體是consumer端拿到正在消費(fèi)的隊(duì)列集合摧找,發(fā)送鎖住隊(duì)列的消息至broker核行,broker端返回鎖住成功的隊(duì)列集合。consumer收到后蹬耘,設(shè)置是否鎖住標(biāo)志位芝雪。
這里注意2個(gè)變量:
consumer端的RebalanceImpl里的ConcurrentHashMap
processQueueTable,是否鎖住設(shè)置在ProcessQueue里综苔。
broker端的RebalanceLockManager里的ConcurrentHashMap mqLockTable惩系,這里維護(hù)著全局隊(duì)列鎖。
2.ConsumeMessageOrderlyService.ConsumeRequest的run方法是消費(fèi)消息如筛,這里還有個(gè)MessageQueueLock messageQueueLock蛆挫,維護(hù)當(dāng)前consumer端的本地隊(duì)列鎖。保證當(dāng)前只有一個(gè)線程能夠進(jìn)行消費(fèi)妙黍。
3.拉到消息存入ProcessQueue悴侵,然后判斷,本地是否獲得鎖拭嫁,全局隊(duì)列是否被鎖住可免,然后從ProcessQueue里取出消息,用MessageListenerOrderly進(jìn)行消費(fèi)做粤。拉到消息后調(diào)用ProcessQueue.putMessage(final List msgs)存入浇借,具體是存入TreeMap msgTreeMap。然后是調(diào)用ProcessQueue.takeMessags(final
int batchSize)消費(fèi)怕品,具體是把msgTreeMap里消費(fèi)過的消息妇垢,轉(zhuǎn)移到TreeMap msgTreeMapTemp。
4.本地消費(fèi)的事務(wù)控制肉康,ConsumeOrderlyStatus.SUCCESS(提交)闯估,ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(掛起一會再消費(fèi)),在此之前還有一個(gè)變量ConsumeOrderlyContext context的setAutoCommit()是否自動(dòng)提交吼和。
當(dāng)SUSPEND_CURRENT_QUEUE_A_MOMENT時(shí)涨薪,autoCommit設(shè)置為true或者false沒有區(qū)別,本質(zhì)跟消費(fèi)相反炫乓,把消息從msgTreeMapTemp轉(zhuǎn)移回msgTreeMap刚夺,等待下次消費(fèi)献丑。
當(dāng)SUCCESS時(shí),autoCommit設(shè)置為true時(shí)比設(shè)置為false多做了2個(gè)動(dòng)作侠姑,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),commitOffset, false);
ProcessQueue.commit():本質(zhì)是刪除msgTreeMapTemp里的消息创橄,msgTreeMapTemp里的消息在上面消費(fèi)時(shí)從msgTreeMap轉(zhuǎn)移過來的。
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset():本質(zhì)是把拉消息的偏移量更新到本地內(nèi)存中莽红,然后定時(shí)更新到broker妥畏。
那么少了這2個(gè)動(dòng)作會怎么樣呢,隨著消息的消費(fèi)進(jìn)行船老,msgTreeMapTemp里的消息堆積越來越多咖熟,消費(fèi)消息的偏移量一直沒有更新到broker導(dǎo)致consumer每次重新啟動(dòng)后都要從頭開始重復(fù)消費(fèi)圃酵。就算更新了offset到broker柳畔,那么msgTreeMapTemp里的消息堆積呢?不知道這算不算bug所以郭赐,還是把a(bǔ)utoCommit設(shè)置為true吧薪韩。
最后要感謝這個(gè)優(yōu)秀的平臺,可以讓我們相互交流捌锭,如果想進(jìn)一步學(xué)習(xí)交流俘陷,可以加群460570824,希望大家可以一起學(xué)習(xí)進(jìn)步观谦!