1 RocketMQ基本理論
1.1 發(fā)展歷史
阿里巴巴消息中間件起源于2001年的五彩石項(xiàng)目哑芹,Notify在這期間應(yīng)運(yùn)而生花盐,用于交易核心消息的流轉(zhuǎn)执虹。
2010年拓挥,B2B開(kāi)始大規(guī)模使用ActiveMQ作為消息內(nèi)核,隨著阿里業(yè)務(wù)的快速發(fā)展袋励,急需一款支持順序消息侥啤、擁有海量消息堆積能力的消息中間件当叭,MetaQ1.0在2011年誕生。
2012年盖灸,MetaQ已經(jīng)發(fā)展到了3.0版本蚁鳖,并抽象出了通用的消息引擎RocketMQ。隨后赁炎,對(duì)RocketMQ進(jìn)行了開(kāi)源醉箕,阿里的消息中間件正式走人了公眾視野。
2015年徙垫,RocketMQ已經(jīng)經(jīng)歷了多年雙十一的洗禮讥裤,在可用性、可靠性以及穩(wěn)定性等方面都有出色的表現(xiàn)松邪。與此同時(shí)坞琴,云計(jì)算大行其道,阿里消息中間件基于RocketMQ推出了AliwareMQ1.0逗抑,開(kāi)始為阿里云上成千上萬(wàn)家企業(yè)提供消息服務(wù)剧辐。
2016年,MetaQ在雙十一期間承載了萬(wàn)億級(jí)消息的流轉(zhuǎn)邮府,跨越了一個(gè)新的里程碑荧关,同時(shí)RocketMQ進(jìn)入Apache孵化。
1.2 RocketMQ基本術(shù)語(yǔ)
1.2.1 message
代表一條消息褂傀,使用messageId唯一識(shí)別忍啤,用戶在發(fā)送時(shí)可以設(shè)置messageKey,便于之后查詢和跟蹤仙辟。
一個(gè)message必須指定topic同波,相當(dāng)于寄信的地址。message還有一個(gè)可選的tag設(shè)置叠国,以便消費(fèi)端可以基于tag進(jìn)行過(guò)濾消息未檩。也可以添加額外的鍵值對(duì),例如你需要一個(gè)業(yè)務(wù)key來(lái)查找broker上的消息粟焊,方便在開(kāi)發(fā)過(guò)程中診斷問(wèn)題冤狡。
1.2.2 topic
topic用于將消息按主題做劃分,producer將消息發(fā)往指定的topic项棠,consumer訂閱該topic就可以收到這條消息悲雳。topic跟發(fā)送方和消費(fèi)方都沒(méi)有強(qiáng)關(guān)聯(lián)關(guān)系,發(fā)送方可以同時(shí)往多個(gè)topic投放消息香追,消費(fèi)方也可以訂閱多個(gè)topic的消息合瓢。在RocketMQ中,topic是一個(gè)上邏輯概念透典。消息存儲(chǔ)不會(huì)按topic分開(kāi)晴楔。
topic 表示消息的第一級(jí)類(lèi)型迁央,比如一個(gè)電商系統(tǒng)的消息可以分為:交易消息、物流消息等滥崩。一條消息必須有一個(gè) topic。topic 是最細(xì)粒度的訂閱單位讹语,一個(gè) Group 可以訂閱多個(gè) topic 的消息钙皮。
1.2.3 queue
消息的物理管理單位。一個(gè)topic下可以有多個(gè)queue顽决,queue的引入使得消息的存儲(chǔ)可以分布式集群化短条,具有了水平擴(kuò)展能力。
topic和queue是一對(duì)多的關(guān)系才菠,一個(gè)topic下可以包含多個(gè)queue茸时,主要用于負(fù)載均衡。發(fā)送消息時(shí)赋访,用戶只指定topic可都,producer會(huì)根據(jù)topic的路由信息選擇具體發(fā)到哪個(gè)queue上。consumer訂閱消息時(shí)蚓耽,會(huì)根據(jù)負(fù)載均衡策略決定訂閱哪些queue的消息渠牲。
RocketMQ是磁盤(pán)消息隊(duì)列的模式,對(duì)于同一個(gè)消費(fèi)組步悠,一個(gè)分區(qū)只支持一個(gè)消費(fèi)線程來(lái)消費(fèi)消息签杈。過(guò)少的分區(qū),會(huì)導(dǎo)致消費(fèi)速度大大落后于消息的生產(chǎn)速度鼎兽。所以在實(shí)際生產(chǎn)環(huán)境中答姥,一個(gè)topic會(huì)設(shè)置成多分區(qū)的模式,來(lái)支持多個(gè)消費(fèi)者谚咬,參照下圖:
在RocketMQ中鹦付,所有消息隊(duì)列都是持久化,長(zhǎng)度無(wú)限的數(shù)據(jù)結(jié)構(gòu)序宦。所謂長(zhǎng)度無(wú)限是指隊(duì)列中的每個(gè)存儲(chǔ)單元都是定長(zhǎng)睁壁,訪問(wèn)其中的存儲(chǔ)單元使用offset來(lái)訪問(wèn),offset為java的long類(lèi)型互捌,理論上在100年內(nèi)不會(huì)溢出潘明,所以認(rèn)為是長(zhǎng)度無(wú)限。 另外隊(duì)列中只保存最近幾天的數(shù)據(jù)秕噪,之前的數(shù)據(jù)會(huì)按照過(guò)期時(shí)間來(lái)刪除钳降。
可以認(rèn)為messagequeue是一個(gè)長(zhǎng)度無(wú)限的數(shù)組,offset就是下標(biāo)腌巾。
1.2.4 offset
RocketMQ在存儲(chǔ)消息時(shí)會(huì)為每個(gè)topic下的每個(gè)queue生成一個(gè)消息的索引文件遂填,每個(gè)queue都對(duì)應(yīng)一個(gè)offset記錄當(dāng)前queue中消息條數(shù)
1.2.5 tag
標(biāo)簽可以被認(rèn)為是對(duì)topic進(jìn)一步細(xì)化铲觉。一般在相同業(yè)務(wù)模塊中通過(guò)引入標(biāo)簽來(lái)標(biāo)記不同用途的消息。tag表示消息的第二級(jí)類(lèi)型吓坚,比如交易消息又可以分為:交易創(chuàng)建消息撵幽,交易完成消息等。
RocketMQ提供2級(jí)消息分類(lèi)礁击,方便靈活控制盐杂。
1.2.6 nameserver
1.2.6.1 基本概念
nameserver是整個(gè)消息隊(duì)列中的狀態(tài)服務(wù)器,集群的各個(gè)組件通過(guò)它來(lái)了解全局的信息 哆窿。同時(shí)链烈,各個(gè)角色的機(jī)器都要定期向 nameserver上報(bào)自己的狀態(tài),超時(shí)不上報(bào)的話挚躯, nameserver會(huì)認(rèn)為某個(gè)機(jī)器出故障不可用了强衡,其他的組件會(huì)把這個(gè)機(jī)器從可用列表里移除 。
nameserver維護(hù)這些配置信息 码荔、 狀態(tài)信 息漩勤,其他角色都通過(guò) nameserver 來(lái)協(xié)同執(zhí)行
nameserver可以部署多個(gè),相互之間獨(dú)立目胡,其他角色同時(shí)向多個(gè) nameserver機(jī)器上報(bào)狀態(tài)信息锯七,從而達(dá)到熱備份的目的。 nameserver本身是無(wú)狀態(tài)的誉己,也就是說(shuō) nameserver中的 Broker眉尸、 Topic 等狀態(tài)信息不會(huì)持久存儲(chǔ),都是由各個(gè)角色定時(shí)上報(bào)并存儲(chǔ)到內(nèi)存中的巨双。
相對(duì)來(lái)說(shuō)噪猾,nameserver的穩(wěn)定性非常高,原因有二:
(1)nameserver互相獨(dú)立筑累,彼此沒(méi)有通信關(guān)系袱蜡,單臺(tái)nameserver掛掉,不影響其他nameserver慢宗,即使全部掛掉坪蚁,也不影響業(yè)務(wù)系統(tǒng)使用。
(2)nameserver不會(huì)有頻繁的讀寫(xiě)镜沽,所以性能開(kāi)銷(xiāo)非常小敏晤,穩(wěn)定性很高。
1.2.6.2 nameserver存在意義
服務(wù)發(fā)現(xiàn)機(jī)制:當(dāng)發(fā)出請(qǐng)求服務(wù)時(shí)缅茉,客戶端通過(guò)注冊(cè)中心服務(wù)知道所有的服務(wù)實(shí)例嘴脾。客戶端接著使用負(fù)載均衡算法選擇可用的服務(wù)實(shí)例中的一個(gè)并進(jìn)行發(fā)送。
總結(jié):nameserver是一個(gè)幾乎無(wú)狀態(tài)的節(jié)點(diǎn)译打,可集群部署耗拓,節(jié)點(diǎn)之間無(wú)任何信息同步。
1.2.7 broker
broker是RocketMQ的核心模塊奏司,負(fù)責(zé)接收并存儲(chǔ)消息乔询,同時(shí)提供Push/Pull接口來(lái)將消息發(fā)送給consumer。consumer可選擇從master或者slave讀取數(shù)據(jù)韵洋。
broker通常都是以集群的方式存在哥谷,多個(gè)主/從組成broker集群,集群內(nèi)的master節(jié)點(diǎn)之間不做數(shù)據(jù)交互麻献。
broker同時(shí)提供消息查詢的功能,可以通過(guò)messageID和messageKey來(lái)查詢消息猜扮。
broker會(huì)將自己的topic配置信息實(shí)時(shí)同步到nameserver勉吻。
broker部署相對(duì)復(fù)雜,broker分為master與slave旅赢。一個(gè)master可以對(duì)應(yīng)多個(gè)slave齿桃,但是一個(gè)slave只能對(duì)應(yīng)一個(gè)master。master與slave的對(duì)應(yīng)關(guān)系通過(guò)指定相同的brokerName不同的brokerId來(lái)定義煮盼,brokerId為0表示master短纵,非0表示slave。master也可以部署多個(gè)僵控。每個(gè)broker與nameserver集群中的所有節(jié)點(diǎn)建立長(zhǎng)連接香到,定時(shí)注冊(cè)topic信息到所有nameserver。
1.2.8 producer
消息生產(chǎn)者报破,位于用戶的進(jìn)程內(nèi)悠就,producer 通過(guò) nameserver 獲取所有 broker 的路由信息,根據(jù)負(fù)載均衡策略選擇將消息發(fā)到哪個(gè) broker充易,然后調(diào)用 broker 接口提交消息梗脾。
producer與nameserver集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇,但不同于上一次)建立長(zhǎng)連接盹靴,定期從nameserver取topic路由信息炸茧,并向提供topic服務(wù)的master建立長(zhǎng)連接,且定時(shí)向master發(fā)送心跳稿静。
1.2.9 producerGroup
生產(chǎn)者組梭冠,簡(jiǎn)單來(lái)說(shuō)就是多個(gè)發(fā)送同一類(lèi)消息的生產(chǎn)者稱之為一個(gè)生產(chǎn)者組。
1.2.10 consumer
消息消費(fèi)者自赔,位于用戶進(jìn)程內(nèi)妈嘹。consumer通過(guò)nameserver獲取所有broker的路由信息后,向broker發(fā)送Pull請(qǐng)求來(lái)獲取消息數(shù)據(jù)绍妨。consumer可以以兩種模式啟動(dòng)润脸,廣播(Broadcast)和集群(Cluster)柬脸,廣播模式下,一條消息會(huì)發(fā)送給所有consumer毙驯,集群模式下消息只會(huì)發(fā)送給一個(gè)consumer倒堕。
consumer與nameserver集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇,但不同于上一次)建立長(zhǎng)連接爆价,定期從nameserver取topic路由信息垦巴,并向提供topic服務(wù)的master、slave建立長(zhǎng)連接铭段,且定時(shí)向master骤宣、slave發(fā)送心跳
1.2.16 consumerGroup
消費(fèi)者組,和生產(chǎn)者類(lèi)似序愚,消費(fèi)同一類(lèi)消息的多個(gè)consumer實(shí)例組成一個(gè)消費(fèi)者組憔披。
1.3 消息發(fā)送
1.3.1 簡(jiǎn)化流程
一個(gè)消息從發(fā)送到接收最簡(jiǎn)單的步驟:producer、topic爸吮、consumer芬膝。
消息先由生產(chǎn)者發(fā)送到topic,然后消費(fèi)者去topic拿消息形娇。topic在這里只是個(gè)概念锰霜。
1.3.2 細(xì)化流程
詳細(xì)的消息發(fā)送及接收流程:
消息被發(fā)送到queue中進(jìn)行標(biāo)記:
1.3.3 發(fā)送消息的三種方式
從功能上來(lái)說(shuō),rocketmq支持三種發(fā)送消息的方式桐早,分別是同步發(fā)送(sync)癣缅,異步發(fā)送(async)和直接發(fā)送(oneway)。順序消息只支持同步發(fā)送. 下面來(lái)簡(jiǎn)單說(shuō)明一下這三種發(fā)送消息的方式哄酝,以便了解它們之間的差異所灸。
同步發(fā)送 sync
發(fā)送消息采用同步模式,這種方式只有在消息完全發(fā)送完成之后才返回結(jié)果炫七,此方式存在需要同步等待發(fā)送結(jié)果的時(shí)間代價(jià)爬立。
這種方式具有內(nèi)部重試機(jī)制,即在主動(dòng)聲明本次消息發(fā)送失敗之前万哪,內(nèi)部實(shí)現(xiàn)將重試一定次數(shù)侠驯,默認(rèn)為2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 發(fā)送的結(jié)果存在同一個(gè)消息可能被多次發(fā)送給給broker奕巍,這里需要應(yīng)用的開(kāi)發(fā)者自己在消費(fèi)端處理冪等性問(wèn)題吟策。
public class {
public static void main(String[] args) Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
//指定NameServer地址
//修改為自己的
//多個(gè)可以用";"隔開(kāi)
//producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876");
producer.setNamesrvAddr("112.74.43.136:9876");
/*
* Producer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可
* 注意:切記不可以在每次發(fā)送消息時(shí)的止,都調(diào)用start方法
*/
producer.start();
for (int i = 0; i <= 100; i++) {
/*
構(gòu)建消息
參數(shù) topic: Message 所屬的 Topic
tags: 可理解為對(duì)消息進(jìn)行再歸類(lèi)檩坚,方便 Consumer 指定過(guò)濾條件在 MQ 服務(wù)器過(guò)濾
keys: 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一,以方便您在無(wú)法正常收到消息情況下,可通過(guò)阿里云服務(wù)器管理控制臺(tái)查詢消息并補(bǔ)發(fā)匾委。注意:不設(shè)置也不會(huì)影響消息正常收發(fā)
body: body可以是任何二進(jìn)制形式的數(shù)據(jù)拖叙,MQ不做任何干預(yù),需要 Producer 與 Consumer 協(xié)商好一致的序列化和反序列化方式
*/
Message msg = new Message("TopicTest", "TagA", "keys", ("測(cè)試RocketMQ" + i).getBytes("UTF-8")
);
try {
//發(fā)送同步消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
// 消息發(fā)送失敗赂乐,需要進(jìn)行重試處理薯鳍,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
Thread.sleep(101);
}
}
producer.shutdown();
}
}
異步發(fā)送 async
發(fā)送消息采用異步發(fā)送模式,消息發(fā)送后立刻返回挨措,當(dāng)消息完全完成發(fā)送后挖滤,會(huì)調(diào)用回調(diào)函數(shù)sendCallback來(lái)告知發(fā)送者本次發(fā)送是成功或者失敗。異步模式通常用于響應(yīng)時(shí)間敏感業(yè)務(wù)場(chǎng)景浅役,即承受不了同步發(fā)送消息時(shí)等待返回的耗時(shí)代價(jià)斩松。
同同步發(fā)送一樣,異步模式也在內(nèi)部實(shí)現(xiàn)了重試機(jī)制觉既,默認(rèn)次數(shù)為2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})砸民。發(fā)送的結(jié)果同樣存在同一個(gè)消息可能被多次發(fā)送給給broker,需要應(yīng)用的開(kāi)發(fā)者自己在消費(fèi)端處理冪等性問(wèn)題奋救。
public class AsynProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
producer.setNamesrvAddr("112.74.43.136:9876");
producer.start();
for (int i = 0; i <= 100; i++) {
Message msg = new Message("TopicTest", "TagA", "keys", ("測(cè)試RocketMQ" + i).getBytes("UTF-8")
);
try {
// 異步發(fā)送消息, 發(fā)送結(jié)果通過(guò) callback 返回給客戶端。
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
// 消費(fèi)發(fā)送成功
System.out.println("SUCCESS信息:" + sendResult.toString());
System.out.println("send message success. topic=" + sendResult.getRegionId() + ", msgId=" + sendResult.getMsgId());
}
public void onException(Throwable throwable) {
// 消息發(fā)送失敗反惕,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理
System.out.println("FAIL信息:" + throwable.getMessage());
}
});
} catch (Exception e) {
e.printStackTrace();
// 消息發(fā)送失敗,需要進(jìn)行重試處理第租,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
Thread.sleep(101);
}
}
producer.shutdown();
}
}
直接發(fā)送 one-way
采用one-way發(fā)送模式發(fā)送消息的時(shí)候勉痴,發(fā)送端發(fā)送完消息后會(huì)立即返回,不會(huì)等待來(lái)自broker的ack來(lái)告知本次消息發(fā)送是否完全完成發(fā)送悬赏。這種方式吞吐量很大狡汉,但是存在消息丟失的風(fēng)險(xiǎn),所以其適用于不重要的消息發(fā)送闽颇,比如日志收集盾戴。
public class OneWayProducer {
public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
producer.setNamesrvAddr("112.74.43.136:9876");
producer.start();
for (int i = 0; i <= 10; i++) {
Message msg = new Message("TopicTest", "TagA", "keys", ("測(cè)試RocketMQ" + i).getBytes("UTF-8")
);
try {
// 由于在 oneway 方式發(fā)送消息時(shí)沒(méi)有請(qǐng)求應(yīng)答處理,一旦出現(xiàn)消息發(fā)送失敗兵多,則會(huì)因?yàn)闆](méi)有重試而導(dǎo)致數(shù)據(jù)丟失尖啡。若數(shù)據(jù)不可丟,建議選用可靠同步或可靠異步發(fā)送方式剩膘。
producer.sendOneway(msg);
} catch (Exception e) {
e.printStackTrace();
// 消息發(fā)送失敗衅斩,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
Thread.sleep(101);
}
}
producer.shutdown();
}
}
1.4 消息存儲(chǔ)
topic是一個(gè)邏輯上的概念怠褐,實(shí)際上message是在每個(gè)broker上以queue的形式記錄畏梆。
從上面的圖片可以總結(jié)下幾條結(jié)論:
(1)消費(fèi)者發(fā)送的message會(huì)在broker中的queue隊(duì)列中記錄
(2)一個(gè)topic的數(shù)據(jù)可能會(huì)存在多個(gè)broker中
(3)一個(gè)broker存在多個(gè)queue
也就是說(shuō)每個(gè)topic在broker上會(huì)劃分成幾個(gè)邏輯隊(duì)列,每個(gè)邏輯隊(duì)列保存一部分消息數(shù)據(jù),但是保存的消息數(shù)據(jù)實(shí)際上不是真正的消息數(shù)據(jù)奠涌,而是指向commitlog的消息索引
1.5 消息消費(fèi)
1.5.1 廣播消費(fèi)
一條消息被多個(gè)Consumer消費(fèi)宪巨,即使這些Consumer屬于同一個(gè)ConsumerGroup,消息也會(huì)被ConsumerGroup中的每個(gè)Consumer消費(fèi)一次铣猩,廣播消費(fèi)中的ConsumerGroup概念可以認(rèn)為在消息劃分層面沒(méi)有意義揖铜,適用于一些分發(fā)消息的場(chǎng)景,比如我訂單下單成功了达皿,需要通知財(cái)務(wù)系統(tǒng)天吓,客服系統(tǒng)等等這種分發(fā)的場(chǎng)景,可以通過(guò)修改Consumer中的MessageModel來(lái)設(shè)置消費(fèi)方式為廣播消費(fèi) 峦椰。consumer.setMessageModel(MessageModel.BROADCASTING)
1.5.2 集群消費(fèi)
一個(gè)ConsumerGroup中的Consumer實(shí)例平均分?jǐn)傁M(fèi)生產(chǎn)者發(fā)送的消息龄寞。例如某個(gè)Topic有九條消息,其中一個(gè)Consumer Group有三個(gè)實(shí)例(可能是3個(gè)進(jìn)程汤功,或者3臺(tái)機(jī)器)物邑,那么每個(gè)實(shí)例只消費(fèi)其中的3條消息,Consumer不指定消費(fèi)方式的話默認(rèn)是集群消費(fèi)的滔金,適用于大部分消息的業(yè)務(wù) 色解。
1.6 網(wǎng)絡(luò)架構(gòu)
對(duì)于上圖中幾個(gè)角色的說(shuō)明:
(1)nameserver:RocketMQ集群的命名服務(wù)器(也可以說(shuō)是注冊(cè)中心),它本身是無(wú)狀態(tài)的(實(shí)際情況下可能存在每個(gè)nameserver實(shí)例上的數(shù)據(jù)有短暫的不一致現(xiàn)象餐茵,但是通過(guò)定時(shí)更新科阎,在大部分情況下都是一致的),用于管理集群的元數(shù)據(jù)( 例如忿族,KV配置锣笨、topic、broker的注冊(cè)信息)道批。
(2)broker(master):RocketMQ消息代理服務(wù)器主節(jié)點(diǎn)错英,起到串聯(lián)producer的消息發(fā)送和consumer的消息消費(fèi),和將消息的落盤(pán)存儲(chǔ)的作用隆豹;
(3)broker(slave):RocketMQ消息代理服務(wù)器備份節(jié)點(diǎn)椭岩,主要是通過(guò)同步/異步的方式將主節(jié)點(diǎn)的消息同步過(guò)來(lái)進(jìn)行備份,為RocketMQ集群的高可用性提供保障璃赡;
(4)producer(消息生產(chǎn)者):在這里為普通消息的生產(chǎn)者簿煌,主要基于RocketMQ-Client模塊將消息發(fā)送至RocketMQ的主節(jié)點(diǎn)。
對(duì)于上面圖中幾條通信鏈路的關(guān)系:
(1)producer與NamerServer:每一個(gè)producer會(huì)與nameserver集群中的一個(gè)實(shí)例建立TCP連接鉴吹,從這個(gè)nameserver實(shí)例上拉取topic路由信息姨伟;
(2)producer和broker:producer會(huì)和它要發(fā)送的topic相關(guān)聯(lián)的master的broker代理服務(wù)器建立TCP連接,用于發(fā)送消息以及定時(shí)的心跳信息豆励;
(3)broker和NamerServer:broker(master or slave)均會(huì)和每一個(gè)nameserver實(shí)例來(lái)建立TCP連接夺荒。broker在啟動(dòng)的時(shí)候會(huì)注冊(cè)自己配置的topic信息到nameserver集群的每一臺(tái)機(jī)器中瞒渠。
即每一個(gè)nameserver均有該broker的topic路由配置信息。其中技扼,master與master之間無(wú)連接伍玖,master與slave之間有連接;
2 深入RocketMQ
2.1 發(fā)送消息負(fù)載均衡
發(fā)送消息通過(guò)輪詢隊(duì)列的方式發(fā)送剿吻,每個(gè)隊(duì)列接收平均的消息量窍箍。通過(guò)增加機(jī)器,可以水平擴(kuò)展隊(duì)列容量丽旅。另外也可以自定義方式選擇發(fā)往哪個(gè)隊(duì)列椰棘。注:另外多個(gè)隊(duì)列可以部署在一臺(tái)機(jī)器上,也可以分別部署在多臺(tái)不同的機(jī)器上榄笙。
2.1.1 消息發(fā)送時(shí)選擇queue的算法
分為兩種邪狞,一種是直接發(fā)消息,client內(nèi)部有選擇queue的算法茅撞,不允許外界改變帆卓。還有一種是可以自定義queue的選擇算法(內(nèi)置了三種算法,不喜歡的話可以自定義算法實(shí)現(xiàn))
public class org.apache.rocketmq.client.producer.DefaultMQProducer {
// 只發(fā)送消息米丘,queue的選擇由默認(rèn)的算法來(lái)實(shí)現(xiàn)
@Override
public SendResult send(Collection<Message> msgs) {}
// 自定義選擇queue的算法進(jìn)行發(fā)消息
@Override
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {}
}
2.1.1.1 send(msg, mq)
2.1.1.1.1 使用場(chǎng)景
有時(shí)候我們不希望默認(rèn)的queue選擇算法剑令,而是需要自定義,一般最常用的場(chǎng)景在順序消息拄查,順序消息的發(fā)送一般都會(huì)指定某組特征的消息都發(fā)當(dāng)同一個(gè)queue里吁津,這樣才能保證順序,因?yàn)閱蝢ueue是有序的靶累。
2.1.1.1.2 原理剖析
內(nèi)置了SelectMessageQueueByRandom、SelectMessageQueueByHash癣疟、SelectMessageQueueByMachineRoom三種算法挣柬,都實(shí)現(xiàn)了一個(gè)共同的接口:org.apache.rocketmq.client.producer.MessageQueueSelector.要想自定義邏輯的話,直接實(shí)現(xiàn)接口重寫(xiě)select方法即可睛挚。
很典型的策略模式邪蛔,不同算法不同實(shí)現(xiàn)類(lèi),有個(gè)頂層接口扎狱。
SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// mqs.size():隊(duì)列的個(gè)數(shù)侧到。假設(shè)隊(duì)列個(gè)數(shù)是4,那么這個(gè)value就是0-3之間隨機(jī)淤击。
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}
SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
// 防止出現(xiàn)負(fù)數(shù)匠抗,取個(gè)絕對(duì)值,這也是我們平時(shí)開(kāi)發(fā)中需要注意到的點(diǎn)
if (value < 0) {
value = Math.abs(value);
}
// 直接取余隊(duì)列個(gè)數(shù)污抬。
value = value % mqs.size();
return mqs.get(value);
}
}
SelectMessageQueueByMachineRoom
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
自定義算法
public class MySelectMessageQueue implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
}
2.1.1.2 send(msg)
2.1.1.2.1 使用場(chǎng)景
一般沒(méi)特殊需求的場(chǎng)景都用這個(gè)汞贸。因?yàn)樗J(rèn)的queue選擇算法很不錯(cuò)绳军,各種優(yōu)化場(chǎng)景都替我們想到了。我們稱之為隨機(jī)遞增取模算法矢腻。
2.1.1.2.2 原理剖析
// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl}
// 這是發(fā)送消息核心原理
// 選擇消息要發(fā)送的隊(duì)列
MessageQueue mq = null;
for (int times = 0; times < 3; times++) {
// 首次肯定是null
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 調(diào)用下面的方法進(jìn)行選擇queue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
// 給mq賦值门驾,如果首次失敗了,那么下次重試的時(shí)候(也就是下次for的時(shí)候)多柑,mq就有值了奶是。
mq = mqSelected;
......
// 很關(guān)鍵,能解答下面會(huì)提到的兩個(gè)問(wèn)題:
// 1.faultItemTable是什么時(shí)候放進(jìn)去的竣灌?
// 2.isAvailable() 為什么只是判斷一個(gè)時(shí)間就可以知道Broker是否可用聂沙?
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
}
}
選擇queue的主入口
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 默認(rèn)為false,代表不啟用broker故障延遲
if (this.sendLatencyFaultEnable) {
try {
// 隨機(jī)數(shù)且+1
int index = tpInfo.getSendWhichQueue().getAndIncrement();
// 遍歷
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 先(隨機(jī)數(shù) +1) % queue.size()
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0) {
pos = 0;
}
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 看找到的這個(gè)queue所屬的broker是不是可用的
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
// 非失敗重試帐偎,直接返回到的隊(duì)列
// 失敗重試的情況逐纬,如果和選擇的隊(duì)列是上次重試是一樣的,則返回
// 也就是說(shuō)如果你這個(gè)queue所在的broker可用削樊,
// 且不是重試進(jìn)來(lái)的或失敗重試的情況豁生,如果和選擇的隊(duì)列是上次重試是一樣的,那你就是天選之子了漫贞。
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
}
// 如果所有隊(duì)列都不可用甸箱,那么選擇一個(gè)相對(duì)好的broker,不考慮可用性的消息隊(duì)列
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 隨機(jī)選擇一個(gè)queue
return tpInfo.selectOneMessageQueue();
}
// 當(dāng)sendLatencyFaultEnable=false的時(shí)候選擇queue的方法迅脐,默認(rèn)就是false芍殖。
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
2.1.1.2.2.1 不啟用broker故障延遲
既然sendLatencyFaultEnable默認(rèn)是false,那就先看當(dāng)sendLatencyFaultEnable=false時(shí)候的邏輯
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 第一次就是null谴蔑,第二次(也就是重試的時(shí)候)就不是null了豌骏。
if (lastBrokerName == null) {
// 第一次選擇隊(duì)列的邏輯
return selectOneMessageQueue();
} else {
// 第一次選擇隊(duì)列發(fā)送消息失敗了,第二次重試的時(shí)候選擇隊(duì)列的邏輯
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 過(guò)濾掉上次發(fā)送消息失敗的隊(duì)列
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
那就繼續(xù)看第一次選擇隊(duì)列的邏輯:
public MessageQueue selectOneMessageQueue() {
// 當(dāng)前線程有個(gè)ThreadLocal變量隐锭,存放了一個(gè)隨機(jī)數(shù)
// {@link org.apache.rocketmq.client.common.ThreadLocalIndex#getAndIncrement}
// 然后取出隨機(jī)數(shù)根據(jù)隊(duì)列長(zhǎng)度取模且將隨機(jī)數(shù)+1
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
return this.messageQueueList.get(pos);
}
其實(shí)也有點(diǎn)隨機(jī)一個(gè)的意思窃躲。但是亮點(diǎn)在于取出隨機(jī)數(shù)根據(jù)隊(duì)列長(zhǎng)度取模且將隨機(jī)數(shù)+1,(getAndIncrement cas +1)钦睡。
當(dāng)消息第一次發(fā)送失敗時(shí)蒂窒,lastBrokerName會(huì)存放當(dāng)前選擇失敗的broker(mq = mqSelected),通過(guò)重試荞怒,此時(shí)lastBrokerName有值洒琢,代表上次選擇的boker發(fā)送失敗,則重新對(duì)sendWhichQueue本地線程變量+1褐桌,遍歷選擇消息隊(duì)列衰抑,直到不是上次的broker,也就是為了規(guī)避上次發(fā)送失敗的broker的邏輯所在荧嵌。
舉個(gè)例子:你這次隨機(jī)數(shù)是1停士,隊(duì)列長(zhǎng)度是4挖帘,1%4=1,這時(shí)候失敗了恋技,進(jìn)入重試拇舀,那么重試之前,也就是在上一步1%4之后蜻底,他把1進(jìn)行了++操作骄崩,變成了2,那么你這次重試的時(shí)候就是2%4=2薄辅,直接過(guò)濾掉了剛才失敗的broker要拂。
那就繼續(xù)看第二次重試選擇隊(duì)列的邏輯:
// +1
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
// 取模
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 過(guò)濾掉上次發(fā)送消息失敗的隊(duì)列
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// 沒(méi)找到能用的queue的話繼續(xù)走默認(rèn)的那個(gè)
return selectOneMessageQueue();
2.1.1.2.2.2 啟用broker故障延遲
也就是下面if里的邏輯
if (this.sendLatencyFaultEnable) {
....
}
我先(隨機(jī)數(shù) +1) % queue.size(),然后看你這個(gè)queue所屬的broker是否可用站楚,可用的話且不是重試進(jìn)來(lái)的或失敗重試的情況脱惰,如果和選擇的隊(duì)列是上次重試是一樣的,那直接return你就完事了窿春。那么怎么看broker是否可用的呢拉一?
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#isAvailable(String)}
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable()}
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
faultItemTable是什么時(shí)候放進(jìn)去的?isAvailable() 為什么只是判斷一個(gè)時(shí)間就可以知道Broker是否可用旧乞?這就需要上面發(fā)送消息完成后所調(diào)用的這個(gè)方法了:
// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem}
// 發(fā)送開(kāi)始時(shí)間
beginTimestampPrev = System.currentTimeMillis();
// 進(jìn)行發(fā)送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
// 發(fā)送結(jié)束時(shí)間
endTimestamp = System.currentTimeMillis();
// 更新broker的延遲情況
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
細(xì)節(jié)邏輯如下:
// {@link org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem}
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 首次isolation傳入的是false蔚润,currentLatency是發(fā)送消息所耗費(fèi)的時(shí)間,如下
// this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
long duration = computeNotAvailableDuration(isolation ? 3010 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long[] latencyMax = {50L, 100L, 550L, 101L, 201L, 301L, 1501L};
private long[] notAvailableDuration = {0L, 0L, 3010L, 6010L, 12010L, 18010L, 60100L};
// 根據(jù)延遲時(shí)間對(duì)比MQFaultStrategy中的延遲級(jí)別數(shù)組latencyMax 不可用時(shí)長(zhǎng)數(shù)組notAvailableDuration 來(lái)將該broker加進(jìn)faultItemTable中尺栖。
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
// 假設(shè)currentLatency花費(fèi)了10ms嫡纠,那么latencyMax里的數(shù)據(jù)顯然不符合下面的所有判斷,所以直接return 0;
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem()}
@Override
// 其實(shí)主要就是給startTimestamp賦值為當(dāng)前時(shí)間+computeNotAvailableDuration(isolation ? 3010 : currentLatency);的結(jié)果延赌,給isAvailable()所用
// 也就是說(shuō)只有notAvailableDuration == 0的時(shí)候除盏,isAvailable()才會(huì)返回true。
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
// 給startTimestamp賦值為當(dāng)前時(shí)間+computeNotAvailableDuration(isolation ? 3010 : currentLatency);的結(jié)果挫以,給isAvailable()所用
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
// 給startTimestamp賦值為當(dāng)前時(shí)間+computeNotAvailableDuration(isolation ? 3010 : currentLatency);的結(jié)果者蠕,給isAvailable()所用
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
// 給startTimestamp賦值為當(dāng)前時(shí)間+computeNotAvailableDuration(isolation ? 3010 : currentLatency);的結(jié)果,給isAvailable()所用
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
RocketMQ為每個(gè)Broker預(yù)測(cè)了個(gè)可用時(shí)間(當(dāng)前時(shí)間+notAvailableDuration)屡贺,當(dāng)當(dāng)前時(shí)間大于該時(shí)間蠢棱,才代表Broker可用锌杀,而notAvailableDuration有6個(gè)級(jí)別和latencyMax的區(qū)間一一對(duì)應(yīng)甩栈,根據(jù)傳入的currentLatency去預(yù)測(cè)該Broker在什么時(shí)候可用。所以再來(lái)看這個(gè)
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
根據(jù)執(zhí)行時(shí)間來(lái)看落入哪個(gè)區(qū)間糕再,在0~100的時(shí)間內(nèi)notAvailableDuration都是0量没,都是可用的,大于該值后突想,可用的時(shí)間就會(huì)開(kāi)始變大了殴蹄,就認(rèn)為不是最優(yōu)解究抓,直接舍棄。
2.1.1.3 總結(jié)
在不開(kāi)啟容錯(cuò)的情況下袭灯,輪詢隊(duì)列進(jìn)行發(fā)送刺下,如果失敗了,重試的時(shí)候過(guò)濾失敗的Broker
如果開(kāi)啟了容錯(cuò)策略稽荧,會(huì)通過(guò)RocketMQ的預(yù)測(cè)機(jī)制來(lái)預(yù)測(cè)一個(gè)Broker是否可用
如果上次失敗的Broker可用那么還是會(huì)選擇該Broker的隊(duì)列
如果上述情況失敗橘茉,則隨機(jī)選擇一個(gè)進(jìn)行發(fā)送
在發(fā)送消息的時(shí)候會(huì)記錄一下調(diào)用的時(shí)間與是否報(bào)錯(cuò),根據(jù)該時(shí)間去預(yù)測(cè)broker的可用時(shí)間
2.2 消息存儲(chǔ)
2.2.1 存儲(chǔ)模型
RocketMQ文件存儲(chǔ)模型層次結(jié)構(gòu)如上圖所示姨丈,根據(jù)類(lèi)別和作用從概念模型上大致可以劃分為5層畅卓,下面將從各個(gè)層次分別進(jìn)行分析和闡述:
(1)RocketMQ業(yè)務(wù)處理器層:Broker端對(duì)消息進(jìn)行讀取和寫(xiě)入的業(yè)務(wù)邏輯入口,這一層主要包含了業(yè)務(wù)邏輯相關(guān)處理操作(根據(jù)解析RemotingCommand中的RequestCode來(lái)區(qū)分具體的業(yè)務(wù)操作類(lèi)型蟋恬,進(jìn)而執(zhí)行不同的業(yè)務(wù)處理流程)翁潘,比如前置的檢查和校驗(yàn)步驟、構(gòu)造MessageExtBrokerInner對(duì)象歼争、decode反序列化拜马、構(gòu)造Response返回對(duì)象等;
(2)RocketMQ數(shù)據(jù)存儲(chǔ)組件層矾飞;該層主要是RocketMQ的存儲(chǔ)核心類(lèi)—DefaultMessageStore一膨,其為RocketMQ消息數(shù)據(jù)文件的訪問(wèn)入口,通過(guò)該類(lèi)的“putMessage()”和“getMessage()”方法完成對(duì)CommitLog消息存儲(chǔ)的日志數(shù)據(jù)文件進(jìn)行讀寫(xiě)操作(具體的讀寫(xiě)訪問(wèn)操作還是依賴下一層中CommitLog對(duì)象模型提供的方法)洒沦;另外豹绪,在該組件初始化時(shí)候,還會(huì)啟動(dòng)很多存儲(chǔ)相關(guān)的后臺(tái)服務(wù)線程申眼,包括AllocateMappedFileService(MappedFile預(yù)分配服務(wù)線程)瞒津、ReputMessageService(回放存儲(chǔ)消息服務(wù)線程)、HAService(Broker主從同步高可用服務(wù)線程)括尸、StoreStatsService(消息存儲(chǔ)統(tǒng)計(jì)服務(wù)線程)巷蚪、IndexService(索引文件服務(wù)線程)等;
(3)RocketMQ存儲(chǔ)邏輯對(duì)象層:該層主要包含了RocketMQ數(shù)據(jù)文件存儲(chǔ)直接相關(guān)的三個(gè)模型類(lèi)IndexFile濒翻、ConsumerQueue和CommitLog屁柏。IndexFile為索引數(shù)據(jù)文件提供訪問(wèn)服務(wù),ConsumerQueue為邏輯消息隊(duì)列提供訪問(wèn)服務(wù)有送,CommitLog則為消息存儲(chǔ)的日志數(shù)據(jù)文件提供訪問(wèn)服務(wù)淌喻。這三個(gè)模型類(lèi)也是構(gòu)成了RocketMQ存儲(chǔ)層的整體結(jié)構(gòu)(對(duì)于這三個(gè)模型類(lèi)的深入分析將放在后續(xù)篇幅中);
(4)封裝的文件內(nèi)存映射層:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數(shù)據(jù)文件的讀寫(xiě)雀摘。其中裸删,采用MappedByteBuffer這種內(nèi)存映射磁盤(pán)文件的方式完成對(duì)大文件的讀寫(xiě),在RocketMQ中將該類(lèi)封裝成MappedFile類(lèi)阵赠。這里限制的問(wèn)題在上面已經(jīng)講過(guò)涯塔;對(duì)于每類(lèi)大文件(IndexFile/ConsumerQueue/CommitLog)肌稻,在存儲(chǔ)時(shí)分隔成多個(gè)固定大小的文件(單個(gè)IndexFile文件大小約為400M、單個(gè)ConsumerQueue文件大小約5.72M匕荸、單個(gè)CommitLog文件大小為1G)爹谭,其中每個(gè)分隔文件的文件名為前面所有文件的字節(jié)大小數(shù)+1,即為文件的起始偏移量榛搔,從而實(shí)現(xiàn)了整個(gè)大文件的串聯(lián)旦棉。這里,每一種類(lèi)的單個(gè)文件均由MappedFile類(lèi)提供讀寫(xiě)操作服務(wù)(其中药薯,MappedFile類(lèi)提供了順序?qū)?隨機(jī)讀绑洛、內(nèi)存數(shù)據(jù)刷盤(pán)、內(nèi)存清理等和文件相關(guān)的服務(wù))童本;
(5)磁盤(pán)存儲(chǔ)層:主要指的是部署RocketMQ服務(wù)器所用的磁盤(pán)真屯。這里,需要考慮不同磁盤(pán)類(lèi)型(如SSD或者普通的HDD)特性以及磁盤(pán)的性能參數(shù)(如IOPS穷娱、吞吐量和訪問(wèn)時(shí)延等指標(biāo))對(duì)順序?qū)?隨機(jī)讀操作帶來(lái)的影響
2.2.2 存儲(chǔ)流程
(1)RocketMQ消息存儲(chǔ)結(jié)構(gòu)類(lèi)型及缺點(diǎn)
上圖即為RocketMQ的消息存儲(chǔ)整體架構(gòu)绑蔫,RocketMQ采用的是混合型的存儲(chǔ)結(jié)構(gòu),即為Broker單個(gè)實(shí)例下所有的隊(duì)列共用一個(gè)日志數(shù)據(jù)文件(即為CommitLog)來(lái)存儲(chǔ)泵额。而Kafka采用的是獨(dú)立型的存儲(chǔ)結(jié)構(gòu)配深,每個(gè)隊(duì)列一個(gè)文件。這里小編認(rèn)為嫁盲,RocketMQ采用混合型存儲(chǔ)結(jié)構(gòu)的缺點(diǎn)在于篓叶,會(huì)存在較多的隨機(jī)讀操作,因此讀的效率偏低羞秤。同時(shí)消費(fèi)消息需要依賴ConsumeQueue缸托,構(gòu)建該邏輯消費(fèi)隊(duì)列需要一定開(kāi)銷(xiāo)。
(2)RocketMQ消息存儲(chǔ)架構(gòu)深入分析
從上面的整體架構(gòu)圖中可見(jiàn)瘾蛋,RocketMQ的混合型存儲(chǔ)結(jié)構(gòu)針對(duì)Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲(chǔ)結(jié)構(gòu)俐镐,Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對(duì)消息刷盤(pán)持久化哺哼,保存至CommitLog中佩抹。只要消息被刷盤(pán)持久化至磁盤(pán)文件CommitLog中,那么Producer發(fā)送的消息就不會(huì)丟失取董。正因?yàn)槿绱斯髌唬珻onsumer也就肯定有機(jī)會(huì)去消費(fèi)這條消息,至于消費(fèi)的時(shí)間可以稍微滯后一些也沒(méi)有太大的關(guān)系甲葬。退一步地講廊勃,即使Consumer端第一次沒(méi)法拉取到待消費(fèi)的消息懈贺,Broker服務(wù)端也能夠通過(guò)長(zhǎng)輪詢機(jī)制等待一定時(shí)間延遲后再次發(fā)起拉取消息的請(qǐng)求经窖。
這里坡垫,RocketMQ的具體做法是,使用Broker端的后臺(tái)服務(wù)線程—ReputMessageService不停地分發(fā)請(qǐng)求并異步構(gòu)建ConsumeQueue(邏輯消費(fèi)隊(duì)列)和IndexFile(索引文件)數(shù)據(jù)画侣。然后冰悠,Consumer即可根據(jù)ConsumerQueue來(lái)查找待消費(fèi)的消息了。其中配乱,ConsumeQueue(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引溉卓,保存了指定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值搬泥。而IndexFile(索引文件)則只是為了消息查詢提供了一種通過(guò)key或時(shí)間區(qū)間來(lái)查詢消息的方法(ps:這種通過(guò)IndexFile來(lái)查找消息的方法不影響發(fā)送與消費(fèi)消息的主流程)桑寨。
(3)PageCache與Mmap內(nèi)存映射
這里有必要先稍微簡(jiǎn)單地介紹下page cache的概念。系統(tǒng)的所有文件I/O請(qǐng)求忿檩,操作系統(tǒng)都是通過(guò)page cache機(jī)制實(shí)現(xiàn)的尉尾。對(duì)于操作系統(tǒng)來(lái)說(shuō),磁盤(pán)文件都是由一系列的數(shù)據(jù)塊順序組成燥透,數(shù)據(jù)塊的大小由操作系統(tǒng)本身而決定沙咏,x86的linux中一個(gè)標(biāo)準(zhǔn)頁(yè)面大小是4KB。
操作系統(tǒng)內(nèi)核在處理文件I/O請(qǐng)求時(shí)班套,首先到page cache中查找(page cache中的每一個(gè)數(shù)據(jù)塊都設(shè)置了文件以及偏移量地址信息)肢藐,如果未命中,則啟動(dòng)磁盤(pán)I/O吱韭,將磁盤(pán)文件中的數(shù)據(jù)塊加載到page cache中的一個(gè)空閑塊吆豹,然后再copy到用戶緩沖區(qū)中。
page cache本身也會(huì)對(duì)數(shù)據(jù)文件進(jìn)行預(yù)讀取理盆,對(duì)于每個(gè)文件的第一個(gè)讀請(qǐng)求操作瞻讽,系統(tǒng)在讀入所請(qǐng)求頁(yè)面的同時(shí)會(huì)讀入緊隨其后的少數(shù)幾個(gè)頁(yè)面。因此熏挎,想要提高page cache的命中率(盡量讓訪問(wèn)的頁(yè)在物理內(nèi)存中)速勇,從硬件的角度來(lái)說(shuō)肯定是物理內(nèi)存越大越好。從操作系統(tǒng)層面來(lái)說(shuō)坎拐,訪問(wèn)page cache時(shí)烦磁,即使只訪問(wèn)1k的消息,系統(tǒng)也會(huì)提前預(yù)讀取更多的數(shù)據(jù)哼勇,在下次讀取消息時(shí), 就很可能可以命中內(nèi)存都伪。
在RocketMQ中,ConsumeQueue邏輯消費(fèi)隊(duì)列存儲(chǔ)的數(shù)據(jù)較少积担,并且是順序讀取陨晶,在page cache機(jī)制的預(yù)讀取作用下,Consume Queue的讀性能會(huì)比較高近乎內(nèi)存,即使在有消息堆積情況下也不會(huì)影響性能先誉。而對(duì)于CommitLog消息存儲(chǔ)的日志數(shù)據(jù)文件來(lái)說(shuō)湿刽,讀取消息內(nèi)容時(shí)候會(huì)產(chǎn)生較多的隨機(jī)訪問(wèn)讀取,嚴(yán)重影響性能褐耳。如果選擇合適的系統(tǒng)IO調(diào)度算法诈闺,比如設(shè)置調(diào)度算法為“Noop”(此時(shí)塊存儲(chǔ)采用SSD的話),隨機(jī)讀的性能也會(huì)有所提升铃芦。
另外雅镊,RocketMQ主要通過(guò)MappedByteBuffer對(duì)文件進(jìn)行讀寫(xiě)操作。其中刃滓,利用了NIO中的FileChannel模型直接將磁盤(pán)上的物理文件直接映射到用戶態(tài)的內(nèi)存地址中(這種Mmap的方式減少了傳統(tǒng)IO將磁盤(pán)文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū)和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來(lái)回進(jìn)行拷貝的性能開(kāi)銷(xiāo))仁烹,將對(duì)文件的操作轉(zhuǎn)化為直接對(duì)內(nèi)存地址進(jìn)行操作,從而極大地提高了文件的讀寫(xiě)效率(這里需要注意的是咧虎,采用MappedByteBuffer這種內(nèi)存映射的方式有幾個(gè)限制晃危,其中之一是一次只能映射1.5~2G 的文件至用戶態(tài)的虛擬內(nèi)存,這也是為何RocketMQ默認(rèn)設(shè)置單個(gè)CommitLog日志數(shù)據(jù)文件為1G的原因了)老客。
2.3 RocketMQ消費(fèi)者端消息列隊(duì)六種負(fù)載均衡算法
在RocketMQ啟動(dòng)的時(shí)候會(huì)啟動(dòng)負(fù)載均衡線程僚饭,過(guò)程如下:
//DefaultMQPullConsumerImpl.start()
mQClientFactory.start();
//上面點(diǎn)進(jìn)去 ->MQClientInstance.start(),rebalanceService繼承了ServiceThread胧砰,
//ServiceThread實(shí)現(xiàn)了Runnable接口
this.rebalanceService.start();
//繼續(xù)下一層鳍鸵,MQClientInstance.doRebalance()找到下面
impl.doRebalance();
//..在一層層點(diǎn)進(jìn)去,最后找到RebalanceImpl.rebalanceByTopic方法,找到
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
AllocateMessageQueueStrategy就是實(shí)現(xiàn)消費(fèi)者消息隊(duì)列負(fù)載均衡算法的接口尉间。
該接口在rocketMq-4.3.0版本中有六種實(shí)現(xiàn)方法:
AllocateMessageQueueAveragely:平均算法
AllocateMessageQueueAveragelyByCircle:環(huán)形平均算法
AllocateMessageQueueByConfig:根據(jù)配置負(fù)載均衡算法
AllocateMessageQueueByMachineRoom:根據(jù)機(jī)房負(fù)載均衡算法
AllocateMessageQueueConsistentHash:一致性哈希負(fù)載均衡算法
AllocateMachineRoomNearby:靠近機(jī)房策略
在客戶端沒(méi)有指定的情況下偿乖,RocketMQ默認(rèn)使用AllocateMessageQueueAveragely平均算法。
2.3.1 AllocateMessageQueueAveragely 平均負(fù)載均衡算法
平均算法顧名思義就是取平均值哲嘲,該方法四個(gè)參數(shù)贪薪,consumerGroup(消費(fèi)者組名稱)、
currentCID(當(dāng)前消費(fèi)者的id)眠副、mqAll(當(dāng)前topic下面所有的消息隊(duì)列)画切、cidAll(當(dāng)前消費(fèi)者組下面所有的消費(fèi)者id)己单。算法思想就是把算出平均值然后將連續(xù)的隊(duì)列分配給每個(gè)消費(fèi)者错忱。假設(shè)隊(duì)列大小是8(編號(hào)0-7),消費(fèi)者數(shù)量3(編號(hào)0-2)身弊,分配結(jié)果就是:消費(fèi)者0:隊(duì)列0娃弓,1典格,2;消費(fèi)者1:隊(duì)列3台丛,4耍缴,5;消費(fèi)者2:隊(duì)列6,7防嗡。
2.3.2 AllocateMessageQueueAveragelyByCircle 環(huán)形平均分配算法
環(huán)形分配就可以看成:所有消費(fèi)者圍成一個(gè)環(huán)变汪,然后循環(huán)這個(gè)環(huán)分配隊(duì)列。AllocateMessageQueueAveragely方法平均分配的是連續(xù)的隊(duì)列本鸣,環(huán)形分配的就是間隔的隊(duì)列。核心代碼就一個(gè)for循環(huán)硅蹦,也很好理解荣德。假設(shè)mq8個(gè),消費(fèi)者3個(gè)童芹,分配后的結(jié)果就是{0涮瞻,3,6}假褪,{1署咽,4,7}生音,{2宁否,5}。
2.3.3 AllocateMessageQueueByMachineRoom 機(jī)房分配算法
機(jī)房分配現(xiàn)根據(jù)MQ中的brokerName找出有效的機(jī)房信息(也就是消息隊(duì)列)缀遍,然后再平分慕匠。這個(gè)算法的邏輯是先算出平均值和余數(shù),它和AllocateMessageQueueAveragely平均算法的不同在于域醇,它是先給每個(gè)消費(fèi)者分配mod(平均值個(gè)數(shù))個(gè)消息隊(duì)列台谊,然后余數(shù)在從頭開(kāi)始一個(gè)個(gè)分配。假設(shè)mq有8個(gè)譬挚,消費(fèi)者3個(gè)锅铅,那么平均值mod = 2,余數(shù)2减宣,分配方式就是每個(gè)消費(fèi)者先分配兩個(gè)mq盐须,{0,1}漆腌,{2丰歌,3},{4屉凯,5}立帖,然后余數(shù)2個(gè)在從頭開(kāi)始分配,最后就是{0悠砚,1晓勇,6},{2,3绑咱,7}绰筛,{4,5}描融。
2.3.4 AllocateMessageQueueConsistentHash 一致性哈希負(fù)載均衡算法
一致性哈希負(fù)載均衡的目的是要保證相同的請(qǐng)求盡可能落在同一個(gè)服務(wù)器上铝噩。為什么是說(shuō)盡可能?因?yàn)榉?wù)器會(huì)發(fā)生上下線窿克,在少數(shù)服務(wù)器變化的時(shí)候不應(yīng)該影響大多數(shù)的請(qǐng)求骏庸。
普通hash算法存在的問(wèn)題
普通hash算法我們可以簡(jiǎn)單理解為對(duì)key值進(jìn)行hash之后對(duì)服務(wù)器取模,也就是hash(key) % n年叮。這個(gè)時(shí)候如果我們的一臺(tái)服務(wù)器宕機(jī)或者需要新增一臺(tái)服務(wù)器具被,那么我們的n值就會(huì)變更,這樣就會(huì)導(dǎo)致我們所有的請(qǐng)求都會(huì)變更只损。舉個(gè)簡(jiǎn)單的例子一姿,我們有個(gè)redis集群,部署了4臺(tái)服務(wù)器跃惫,如果我們將key1使用隨機(jī)存儲(chǔ)叮叹,那么我們找key1的時(shí)候可能就需要遍歷4服務(wù)器,效率差爆存。換種方式衬横,對(duì)key1哈希操作后取模,將它定位到一臺(tái)服務(wù)器上终蒂,這樣在查找key1的時(shí)候我們就可以很快的定位到一臺(tái)服務(wù)器上蜂林。可是這樣還有種問(wèn)題就是之前所說(shuō)的拇泣,如果我們r(jià)edis集群增加了一臺(tái)服務(wù)器或者有一臺(tái)服務(wù)器宕機(jī)噪叙,這樣再通過(guò)哈希算法算出的值就發(fā)生了變化,短時(shí)間發(fā)生緩存雪崩霉翔。
一致性hash算法
哈希環(huán):剛才的hash算法是對(duì)服務(wù)器取模睁蕾,一致性哈希算法使用的是對(duì)232取模,就是一致性哈希將整個(gè)hash空間組織成了一個(gè)圓環(huán)债朵,0-232-1子眶。
物理節(jié)點(diǎn):將服務(wù)器(ip+端口)進(jìn)行hash,映射成環(huán)上的一個(gè)節(jié)點(diǎn)序芦。當(dāng)請(qǐng)求到來(lái)時(shí)臭杰,根據(jù)請(qǐng)求的key,hash映射到環(huán)上谚中,順時(shí)針選取最近的一個(gè)服務(wù)器進(jìn)行請(qǐng)求渴杆。
虛擬節(jié)點(diǎn):當(dāng)環(huán)上的服務(wù)器較少的時(shí)候寥枝,會(huì)出現(xiàn)分配不均勻的情況,即大量的請(qǐng)求落在同一臺(tái)服務(wù)器上磁奖,為了避免這種情況囊拜,就引入了虛擬節(jié)點(diǎn)。比如通過(guò)添加后綴的方式給物理節(jié)點(diǎn)克隆出三個(gè)虛擬節(jié)點(diǎn)比搭,如果兩臺(tái)物理節(jié)點(diǎn)冠跷,都克隆三個(gè)虛擬節(jié)點(diǎn),那么環(huán)上就一共有8個(gè)節(jié)點(diǎn)身诺。只是被克隆的虛擬節(jié)點(diǎn)最后還是會(huì)定位到實(shí)際物理節(jié)點(diǎn)上蜜托,但是可以有效的分?jǐn)傉?qǐng)求。
一致性哈希相對(duì)于普通hash戚长,優(yōu)點(diǎn)在于映射到環(huán)上的其請(qǐng)求盗冷,是發(fā)送到環(huán)上離他最近的一個(gè)服務(wù)器怠苔,如果我們一臺(tái)服務(wù)器宕機(jī)或者新增一臺(tái)服務(wù)器同廉,那么影響的請(qǐng)求只有這臺(tái)服務(wù)器和前一個(gè)服務(wù)器節(jié)點(diǎn)之間的請(qǐng)求,其他的并不會(huì)影響柑司。
2.3.5 AllocateMessageQueueByConfig 通過(guò)配置負(fù)載均衡
這個(gè)沒(méi)什么好說(shuō)的迫肖,自定義配置。
2.3.6 AllocateMachineRoomNearby 靠近機(jī)房策略
2.3 順序消息
2.3.1 什么是順序消息
順序消息(FIFO 消息)是 MQ 提供的一種嚴(yán)格按照順序進(jìn)行發(fā)布和消費(fèi)的消息類(lèi)型攒驰。順序消息由兩個(gè)部分組成:順序發(fā)布和順序消費(fèi)蟆湖。
順序消息包含兩種類(lèi)型:
分區(qū)順序:一個(gè)Partition內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)
全局順序:一個(gè)topic內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)
這是阿里云上對(duì)順序消息的定義,把順序消息拆分成了順序發(fā)布和順序消費(fèi)玻粪。
那么多線程中發(fā)送消息算不算順序發(fā)布隅津?
多線程中若沒(méi)有因果關(guān)系則沒(méi)有順序。那么用戶在多線程中去發(fā)消息就意味著用戶不關(guān)心那些在不同線程中被發(fā)送的消息的順序劲室。即多線程發(fā)送的消息伦仍,不同線程間的消息不是順序發(fā)布的,同一線程的消息是順序發(fā)布的很洋。這是需要用戶自己去保障的
而對(duì)于順序消費(fèi)充蓝,則需要保證哪些來(lái)自同一個(gè)發(fā)送線程的消息在消費(fèi)時(shí)是按照相同的順序被處理的(為什么不說(shuō)他們應(yīng)該在一個(gè)線程中被消費(fèi)呢?)
全局順序其實(shí)是分區(qū)順序的一個(gè)特例喉磁,即使topic只有一個(gè)分區(qū)(以下不在討論全局順序谓苟,因?yàn)槿猪? 序?qū)⒚媾R性能的問(wèn)題,而且絕大多數(shù)場(chǎng)景都不需要全局順序)
2.3.2 如何保證順序
在MQ的模型中协怒,順序需要由3個(gè)階段去保障:
1.消息被發(fā)送時(shí)保持順序
2.消息被存儲(chǔ)時(shí)保持和發(fā)送的順序一致
3.消息被消費(fèi)時(shí)保持和存儲(chǔ)的順序一致
發(fā)送時(shí)保持順序意味著對(duì)于有順序要求的消息涝焙,用戶應(yīng)該在同一個(gè)線程中采用同步的方式發(fā)送。存儲(chǔ)保持和發(fā)送的順序一致則要求在同一線程中被發(fā)送出來(lái)的消息A和B孕暇,存儲(chǔ)時(shí)在空間上A一定在B之前纱皆。而消費(fèi)保持和存儲(chǔ)一致則要求消息A湾趾、B到達(dá)消費(fèi)者之后必須按照先A后B的順序被處理。
對(duì)于兩個(gè)訂單的消息的原始數(shù)據(jù):a1派草、b1搀缠、b2、a2近迁、a3艺普、b3(絕對(duì)時(shí)間下發(fā)生的順序):
在發(fā)送時(shí),a訂單的消息需要保持a1鉴竭、a2歧譬、a3的順序,b訂單的消息也相同搏存,但是a瑰步、b訂單之間的消息沒(méi)有順序關(guān)系,這意味著a璧眠、b訂單的消息可以在不同的線程中被發(fā)送出去
在存儲(chǔ)時(shí)缩焦,需要分別保證a、b訂單的消息的順序责静,但是a袁滥、b訂單之間的消息的順序可以不保證
a1、b1灾螃、b2题翻、a2、a3腰鬼、b3是可以接受的
a1嵌赠、a2、b1熄赡、b2姜挺、a3、b3是可以接受的
a1本谜、a3初家、b1、b2乌助、a2溜在、b3是不可以接受的
消費(fèi)時(shí)保證順序的簡(jiǎn)單方式就是“什么都不做”,不對(duì)收到的消息的順序進(jìn)行調(diào)整他托,即只要一個(gè)分區(qū)的消息只由一個(gè)線程處理即可掖肋;當(dāng)然,如果a赏参、b在一個(gè)分區(qū)中志笼,在收到消息后也可以將他們拆分到不同線程中處理沿盅,不過(guò)要權(quán)衡一下收益
2.3.3 RocketMQ順序消息實(shí)現(xiàn)
上圖是RocketMQ順序消息原理的介紹,將不同訂單的消息路由到不同的分區(qū)中纫溃。文檔只是給出了Producer順序的處理腰涧,Consumer消費(fèi)時(shí)通過(guò)一個(gè)分區(qū)只能有一個(gè)線程消費(fèi)的方式來(lái)保證消息順序,具體實(shí)現(xiàn)如下紊浩。
Producer端
Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區(qū)窖铡,在RocketMQ中,通過(guò)MessageQueueSelector來(lái)實(shí)現(xiàn)分區(qū)的選擇坊谁。
List<MessageQueue> mqs:消息要發(fā)送的Topic下所有的分區(qū)
Message msg:消息對(duì)象
-
額外的參數(shù):用戶可以傳遞自己的參數(shù)
比如如下實(shí)現(xiàn)就可以保證相同的訂單的消息被路由到相同的分區(qū):
long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());
Consumer端
RocketMQ消費(fèi)端有兩種類(lèi)型:MQPullConsumer和MQPushConsumer费彼。
MQPullConsumer由用戶控制線程,主動(dòng)從服務(wù)端獲取消息口芍,每次獲取到的是一個(gè)MessageQueue中的消息箍铲。PullResult中的List msgFoundList自然和存儲(chǔ)順序一致,用戶需要再拿到這批消息后自己保證消費(fèi)的順序鬓椭。
對(duì)于MQPushConsumer颠猴,由用戶注冊(cè)MessageListener來(lái)消費(fèi)消息,在客戶端中需要保證調(diào)用MessageListener時(shí)消息的順序性膘融。RocketMQ中的實(shí)現(xiàn)如下:
(1) PullMessageService單線程的從Broker獲取消息
(2) PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個(gè)消息的緩存)芙粱,之后提交一個(gè)消費(fèi)任務(wù)到ConsumeMessageOrderlyService
(3) ConsumeMessageOrderlyService多線程執(zhí)行祭玉,每個(gè)線程在消費(fèi)消息時(shí)需要拿到MessageQueue的鎖
(4) 拿到鎖之后從ProcessQueue中獲取消息
保證消費(fèi)順序的核心思想是:
(1) 獲取到消息后添加到ProcessQueue中氧映,單線程執(zhí)行,所以ProcessQueue中的消息是順序的
(2) 提交的消費(fèi)任務(wù)時(shí)提交的是“對(duì)某個(gè)MQ進(jìn)行一次消費(fèi)”脱货,這次消費(fèi)請(qǐng)求是從ProcessQueue中獲取消息消費(fèi)岛都,所以也是順序的(無(wú)論哪個(gè)線程獲取到鎖,都是按照ProcessQueue中消息的順序進(jìn)行消費(fèi))
順序和異常的關(guān)系
順序消息需要Producer和Consumer都保證順序振峻。Producer需要保證消息被路由到正確的分區(qū)臼疫,消息需要保證每個(gè)分區(qū)的數(shù)據(jù)只有一個(gè)線程消息,那么就會(huì)有一些缺陷:
(1) 發(fā)送順序消息無(wú)法利用集群的Failover特性扣孟,因?yàn)椴荒芨鼡QMessageQueue進(jìn)行重試
(2) 因?yàn)榘l(fā)送的路由策略導(dǎo)致的熱點(diǎn)問(wèn)題烫堤,可能某一些MessageQueue的數(shù)據(jù)量特別大
(3) 消費(fèi)的并行讀依賴于分區(qū)數(shù)量
(4) 消費(fèi)失敗時(shí)無(wú)法跳過(guò)
不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過(guò)Raft凤价、Paxos之類(lèi)的算法保證有可用的副本鸽斟,或者通過(guò)其他高可用的存儲(chǔ)設(shè)備來(lái)存儲(chǔ)MessageQueue。
熱點(diǎn)問(wèn)題好像沒(méi)有什么好的解決辦法利诺,只能通過(guò)拆分MessageQueue和優(yōu)化路由方法來(lái)盡量均衡的將消息分配到不同的MessageQueue富蓄。
消費(fèi)并行讀理論上不會(huì)有太大問(wèn)題,因?yàn)镸essageQueue的數(shù)量可以調(diào)整慢逾。
消費(fèi)失敗的無(wú)法跳過(guò)是不可避免的立倍,因?yàn)樘^(guò)可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯(cuò)誤的灭红。不過(guò)可以提供一些策略,由用戶根據(jù)錯(cuò)誤類(lèi)型來(lái)決定是否跳過(guò)口注,并且提供重試隊(duì)列之類(lèi)的功能变擒,在跳過(guò)之后用戶可以在“其他”地方重新消費(fèi)到這條消息。
2.4 消息去重
造成消息重復(fù)的根本原因是網(wǎng)絡(luò)不可達(dá)寝志。只要通過(guò)網(wǎng)絡(luò)交換數(shù)據(jù)赁项,就無(wú)法避免這個(gè)問(wèn)題。所以解決這個(gè)問(wèn)題的辦法就是繞過(guò)這個(gè)問(wèn)題澈段。那么問(wèn)題就變成了:如果消費(fèi)端收到兩條一樣的消息悠菜,應(yīng)該怎樣處理?
1.消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性
2.保證每條消息都有唯一編號(hào)且保證消息處理成功與去重表的日志同時(shí)出現(xiàn)
第1條很好理解败富,只要保持冪等性悔醋,不管來(lái)多少條重復(fù)消息,最后處理的結(jié)果都一樣兽叮。第2條原理就是利用一張日志表來(lái)記錄已經(jīng)處理成功的消息的ID芬骄,如果新到的消息ID已經(jīng)在日志表中,那么就不再處理這條消息鹦聪。
第1條解決方案账阻,很明顯應(yīng)該在消費(fèi)端實(shí)現(xiàn),不屬于消息系統(tǒng)要實(shí)現(xiàn)的功能泽本。第2條可以消息系統(tǒng)實(shí)現(xiàn)淘太,也可以業(yè)務(wù)端實(shí)現(xiàn)。正常情況下出現(xiàn)重復(fù)消息的概率其實(shí)很小规丽,如果由消息系統(tǒng)來(lái)實(shí)現(xiàn)的話蒲牧,肯定會(huì)對(duì)消息系統(tǒng)的吞吐量和高可用有影響,所以最好還是由業(yè)務(wù)端自己處理消息重復(fù)的問(wèn)題赌莺,這也是RocketMQ不解決消息重復(fù)的問(wèn)題的原因冰抢。
RocketMQ不保證消息不重復(fù),如果你的業(yè)務(wù)需要保證嚴(yán)格的不重復(fù)消息艘狭,需要你自己在業(yè)務(wù)端去重挎扰。
那么msgID記錄在哪里呢?當(dāng)然是緩存巢音。具體做法如下:
? 消費(fèi)端接收到消息的時(shí)候遵倦,調(diào)用redis提供的incr方法,以msgID作為key(具有唯一性)港谊,value則默認(rèn)從1開(kāi)始遞增骇吭。
? 當(dāng)incr返回值為1時(shí),設(shè)置其失效時(shí)間為兩分鐘以后歧寺,并且該消息需要被消費(fèi)燥狰。
? 當(dāng)incr返回值大于1時(shí)棘脐,則忽略該消息。
public long incr(String key,Date expireTime){
long count = redisNumber.incre(key);
if(count==1){
redisCache.setExpireTime(key,expireTime);
}
return count;
}
for(MsgExt msg: msgs){
long currentTime = System.currentTimeMillis();
currentTime += Constants.MSG_EXPIRES_TIME_MILLS;
Date expireTime = new Date(currentTime);
long msgIDCount = redisCacheHelper.incr(msg.getKeys(),expireTime);
if(msgIDCount>1){
continue
}
...
}
2.5 消息堆積
消息中間件的主要功能是異步解耦龙致,還有個(gè)重要功能是擋住前端的數(shù)據(jù)洪峰蛀缝,保證后端系統(tǒng)的穩(wěn)定性,這就要求消息中間件具有一定的消息堆積能力目代。消息堆積分以下兩種情況:
消息堆積在內(nèi)存Buffer屈梁,一旦超過(guò)內(nèi)存Buffer,可以根據(jù)一定的丟棄策略來(lái)丟棄消息榛了,如CORBA Notification規(guī)范中描述在讶。適合能容忍丟棄消息的業(yè)務(wù),這種情況消息的堆積能力主要在于內(nèi)存Buffer大小霜大,而且消息堆積后构哺,性能下降不會(huì)太大,因?yàn)閮?nèi)存中數(shù)據(jù)多少對(duì)于對(duì)外提供的訪問(wèn)能力影響有限战坤。
消息堆積到持久化存儲(chǔ)系統(tǒng)中曙强,例如DB,KV存儲(chǔ)途茫,文件記錄形式碟嘴。當(dāng)消息不能在內(nèi)存Cache命中時(shí),要不可避免的訪問(wèn)磁盤(pán)囊卜,會(huì)產(chǎn)生大量讀IO娜扇,讀IO的吞吐量直接決定了消息堆積后的訪問(wèn)能力。
評(píng)估消息堆積能力主要有以下四點(diǎn):
? 消息能堆積多少條边败,多少字節(jié)?即消息的堆積容量袱衷。
? 消息堆積后捎废,發(fā)消息的吞吐量大小笑窜,是否會(huì)受堆積影響。
? 消息堆積后登疗,正常消費(fèi)的Consumer是否會(huì)受影響排截。
? 消息堆積后,訪問(wèn)堆積在磁盤(pán)的消息時(shí)辐益,吞吐量有多大断傲。
2.5.1 消息積壓的生產(chǎn)故障的處理
首先要找到是什么原因?qū)е碌南⒍逊e,是Producer太多了智政,Consumer太少了導(dǎo)致的還是說(shuō)其他情況认罩,總之先定位問(wèn)題。然后看下消息消費(fèi)速度是否正常续捂,正常的話垦垂,可以通過(guò)上線更多consumer臨時(shí)解決消息堆積問(wèn)題宦搬。
2.5.1.1 提高消費(fèi)并行度
絕大部分消息消費(fèi)行為都屬于 IO 密集型,即可能是操作數(shù)據(jù)庫(kù)劫拗,或者調(diào)用 RPC间校,這類(lèi)消費(fèi)行為的消費(fèi)速度在于后端數(shù)據(jù)庫(kù)或者外系統(tǒng)的吞吐量,通過(guò)增加消費(fèi)并行度页慷,可以提高總的消費(fèi)吞吐量憔足,但是并行度增加到一定程度,反而會(huì)下降酒繁。所以滓彰,應(yīng)用必須要設(shè)置合理的并行度。 如下有幾種修改消費(fèi)并行度的方法:
同一個(gè) ConsumerGroup 下州袒,通過(guò)增加 Consumer 實(shí)例數(shù)量來(lái)提高并行度(需要注意的是超過(guò)訂閱隊(duì)列數(shù)的 Consumer 實(shí)例無(wú)效)找蜜。可以通過(guò)加機(jī)器稳析,或者在已有機(jī)器啟動(dòng)多個(gè)進(jìn)程的方式洗做。 提高單個(gè) Consumer 的消費(fèi)并行線程,通過(guò)修改參數(shù) consumeThreadMin彰居、consumeThreadMax 實(shí)現(xiàn)诚纸。
2.5.1.2 批量方式消費(fèi)
某些業(yè)務(wù)流程如果支持批量方式消費(fèi),則可以很大程度上提高消費(fèi)吞吐量陈惰,例如訂單扣款類(lèi)應(yīng)用畦徘,一次處理一個(gè)訂單耗時(shí) 1 s,一次處理 10 個(gè)訂單可能也只耗時(shí) 2 s抬闯,這樣即可大幅度提高消費(fèi)的吞吐量井辆,通過(guò)設(shè)置 consumer 的 consumeMessageBatchMaxSize 返個(gè)參數(shù),默認(rèn)是 1溶握,即一次只消費(fèi)一條消息杯缺,例如設(shè)置為 N,那么每次消費(fèi)的消息數(shù)小于等于 N睡榆。
2.5.1.3 跳過(guò)非重要消息
發(fā)生消息堆積時(shí)萍肆,如果消費(fèi)速度一直追不上發(fā)送速度,可以選擇丟棄不重要的消息
public ConsumeConcurrentlyStatus consumemessage
(List<messageExt> msgs,ConsumeConcurrentlyContext context){
long offset = msgs.get(0).getqueueOffset();
String maxOffset = msgs.get(0).getProperty(message.PROPERTY_MAX_OFFSET);
long diff= Long.parseLong(max0ffset)- offset;
if (diff > 10100) {//消息堆積情況的特殊處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//正常消費(fèi)過(guò)程
return ConsumeConcurrentlyStatus.COMSUME_SUCCESS;
}
如以上代碼所示胀屿,當(dāng)某個(gè)隊(duì)列的消息數(shù)堆積到 10100 條以上塘揣,則嘗試丟棄部分戒全部消息,返樣就可以快速追上収送消息的速度宿崭。
如果Consumer和Queue不對(duì)等亲铡,上線了多臺(tái)也在短時(shí)間內(nèi)無(wú)法消費(fèi)完堆積的消息怎么辦?
? 準(zhǔn)備一個(gè)臨時(shí)的topic,newTopic
? queue的數(shù)量是原先queue的幾倍
? queue分布到多個(gè)Broker中
? 上線一臺(tái)Consumer做消息的搬運(yùn)工奖蔓,把oldTopic中的消息挪到newTopic里琅摩,不做業(yè)務(wù)邏輯處理,只是挪過(guò)去
? 上線N臺(tái)Consumer同時(shí)消費(fèi)newTopic中的數(shù)據(jù)
? 改bug
? 恢復(fù)原來(lái)的Consumer锭硼,繼續(xù)消費(fèi)之前的topic房资,oldTopic
2.6 延時(shí)消息
2.6.1 什么是延時(shí)消息
延時(shí)消息是指消息發(fā)送到 broker 后,不能立刻被 consumer 消費(fèi)檀头,要到特定的時(shí)間點(diǎn)或者等待特定的時(shí)間后才能被消費(fèi)轰异。 RocketMQ 支持延時(shí)消息,但是不支持任意時(shí)間精度暑始,支持特定的 level搭独,例如定時(shí) 5s,10s廊镜,1m 等牙肝。
2.6.2 延時(shí)消息使用方法
(1)broker.conf 配置文件配置
#broker.conf配置文件
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_master
flushDiskType = ASYNC_FLUSH
#可以設(shè)置消息延時(shí)級(jí)別
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
延遲配置說(shuō)明:
配置項(xiàng)配置了從1級(jí)開(kāi)始,各級(jí)延時(shí)的時(shí)間嗤朴,可以修改這個(gè)指定級(jí)別的延時(shí)時(shí)間配椭; 時(shí)間單位支持:s、m雹姊、h股缸、d,分別表示秒吱雏、分敦姻、時(shí)、天歧杏;
默認(rèn)值就是上面聲明的镰惦,可手工調(diào)整; 默認(rèn)值已夠用犬绒,不建議修改這個(gè)值旺入。
(2)設(shè)置消息延時(shí)級(jí)別
//創(chuàng)建消息對(duì)象
message message = new message("topic-A", "tagB", ("hello" +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//設(shè)置消息延時(shí)級(jí)別
message.setDelayTimeLevel(6);
2.6.3 延時(shí)消息使用場(chǎng)景
我們經(jīng)常購(gòu)物,知道從下訂單到支付中間需要一段時(shí)間懂更。這個(gè)過(guò)程中涉及到訂單服務(wù)和支付服務(wù)眨业,當(dāng)支付完成時(shí)由支付服務(wù)修改數(shù)據(jù)庫(kù)訂單狀態(tài),代表訂單完成沮协。購(gòu)物中經(jīng)常有這樣的處理情況:超過(guò)30分鐘未支付的訂單叫做超時(shí)訂單,超時(shí)訂單必須關(guān)閉卓嫂。傳統(tǒng)的做法是起一個(gè)定時(shí)任務(wù)服務(wù)慷暂,每隔一段時(shí)間掃描訂單表,查詢是否有超時(shí)訂單,然后修改訂單狀態(tài)關(guān)閉訂單行瑞。這種做法的弊端是掃描表數(shù)據(jù)量大奸腺,對(duì)數(shù)據(jù)庫(kù)造成很大壓力。我們可以使用延時(shí)消息來(lái)提高此需求的執(zhí)行效率血久。首先在客戶下訂單后將訂單發(fā)送的rocketMQ突照,消息包含訂單編號(hào)并設(shè)置延時(shí)時(shí)間30分鐘,然后添加一個(gè)訂單超時(shí)服務(wù)氧吐,訂單超時(shí)服務(wù)訂閱延時(shí)消息然后處理訂單表讹蘑。
2.7 事務(wù)消息
RocketMQ除了支持普通消息,順序消息筑舅,另外還支持事務(wù)消息座慰。首先討論一下什么是事務(wù)消息以及支持事務(wù)消息的必要性。
2.7.1 相關(guān)概念
RocketMQ在其消息定義的基礎(chǔ)上翠拣,對(duì)事務(wù)消息擴(kuò)展了兩個(gè)相關(guān)的概念:
1.Half(Prepare) message——半消息(預(yù)處理消息)
半消息是一種特殊的消息類(lèi)型版仔,該狀態(tài)的消息暫時(shí)不能被consumer消費(fèi)。當(dāng)一條事務(wù)消息被成功投遞到broker上误墓,但是broker并沒(méi)有接收到producer發(fā)出的二次確認(rèn)時(shí)蛮粮,該事務(wù)消息就處于"暫時(shí)不可被消費(fèi)"狀態(tài)斯棒,該狀態(tài)的事務(wù)消息被稱為半消息闪朱。
2.message Status Check——消息狀態(tài)回查
由于網(wǎng)絡(luò)抖動(dòng)脑题、producer重啟等原因棺榔,可能導(dǎo)致producer向broker發(fā)送的二次確認(rèn)消息沒(méi)有成功送達(dá)饥追。如果broker檢測(cè)到某條事務(wù)消息長(zhǎng)時(shí)間處于半消息狀態(tài)俩垃,則會(huì)主動(dòng)向producer端發(fā)起回查操作叫确,查詢?cè)撌聞?wù)消息在producer端的事務(wù)狀態(tài)(Commit 或 Rollback)归露∥蹩ǎ可以看出杖刷,message Status Check主要用來(lái)解決分布式事務(wù)中的超時(shí)問(wèn)題。
2.7.2 執(zhí)行流程
上面是官網(wǎng)提供的事務(wù)消息執(zhí)行流程圖驳癌,下面對(duì)具體流程進(jìn)行分析:
1.Step1:producer向broker端發(fā)送Half message滑燃;
2.Step2:broker ACK,Half message發(fā)送成功颓鲜;
3.Step3:producer執(zhí)行本地事務(wù)表窘;
4.Step4:本地事務(wù)完畢,根據(jù)事務(wù)的狀態(tài)甜滨,producer向broker發(fā)送二次確認(rèn)消息乐严,確認(rèn)該Half message的Commit或者Rollback狀態(tài)。broker收到二次確認(rèn)消息后衣摩,對(duì)于Commit狀態(tài)昂验,則直接發(fā)送到consumer端執(zhí)行消費(fèi)邏輯,而對(duì)于Rollback則直接標(biāo)記為失敗,一段時(shí)間后清除既琴,并不會(huì)發(fā)給consumer占婉。正常情況下,到此分布式事務(wù)已經(jīng)完成甫恩,剩下要處理的就是超時(shí)問(wèn)題逆济,即一段時(shí)間后broker仍沒(méi)有收到producer的二次確認(rèn)消息;
5.Step5:針對(duì)超時(shí)狀態(tài)磺箕,broker主動(dòng)向producer發(fā)起消息回查奖慌;
6.Step6:producer處理回查消息,返回對(duì)應(yīng)的本地事務(wù)的執(zhí)行結(jié)果滞磺;
7.Step7:broker針對(duì)回查消息的結(jié)果升薯,執(zhí)行Commit或Rollback操作,同Step4
2.7.3 實(shí)際案例
我們以一個(gè)轉(zhuǎn)帳的場(chǎng)景為例來(lái)說(shuō)明這個(gè)問(wèn)題:Bob向Smith轉(zhuǎn)賬100塊击困。
圖中執(zhí)行本地事務(wù)(Bob賬戶扣款)和發(fā)送異步消息應(yīng)該保持同時(shí)成功或者失敗中涎劈,也就是扣款成功了,發(fā)送消息一定要成功阅茶,如果扣款失敗了蛛枚,就不能再發(fā)送消息。那問(wèn)題是我們是先扣款還是先發(fā)送消息呢脸哀?
首先我們看下蹦浦,先發(fā)送消息,大致的示意圖如下:
存在的問(wèn)題是:如果消息發(fā)送成功撞蜂,但是扣款失敗盲镶,消費(fèi)端就會(huì)消費(fèi)此消息,進(jìn)而向Smith賬戶加錢(qián)蝌诡。
先發(fā)消息不行溉贿,那我們就先扣款唄,大致的示意圖如下:
存在的問(wèn)題跟上面類(lèi)似:如果扣款成功浦旱,發(fā)送消息失敗宇色,就會(huì)出現(xiàn)Bob扣錢(qián)了,但是Smith賬戶未加錢(qián)颁湖。
可能大家會(huì)有很多的方法來(lái)解決這個(gè)問(wèn)題宣蠕,比如:直接將發(fā)消息放到Bob扣款的事務(wù)中去,如果發(fā)送失敗甥捺,拋出異常抢蚀,事務(wù)回滾。這樣的處理方式也符合“恰好”不需要解決的原則涎永。
RocketMQ支持事務(wù)消息思币,下面我們來(lái)看看RocketMQ是怎樣來(lái)實(shí)現(xiàn)的鹿响。
RocketMQ第一階段發(fā)送Prepared消息時(shí)會(huì)拿到消息的地址羡微,第二階段執(zhí)行本地事物谷饿,第三階段通過(guò)第一階段拿到的地址去訪問(wèn)消息并修改狀態(tài)。細(xì)心的你可能又發(fā)現(xiàn)問(wèn)題了妈倔,如果確認(rèn)消息發(fā)送失敗了怎么辦博投?RocketMQ會(huì)定期掃描消息集群中的事物消息,這時(shí)候發(fā)現(xiàn)了Prepared消息 盯蝴,它會(huì)向消息發(fā)送者確認(rèn)毅哗,Bob的錢(qián)到底是減了還是沒(méi)減呢,如果減了是回滾還是繼續(xù)發(fā)送確認(rèn)消息呢捧挺,RocketMQ 會(huì)根據(jù)發(fā)送端設(shè)置的策略來(lái)決定是回滾還是繼續(xù)發(fā)送確認(rèn)消息虑绵。這樣就保證了消息發(fā)送與本地事務(wù)同時(shí)成功或同時(shí)失敗。
3 高可靠性
3.1 RocketMQ可用性
多master部署闽烙,防止單點(diǎn)故障
3.2 RocketMQ可靠性
3.2.1 消息發(fā)送
producer 的 send 方法本身支持內(nèi)部重試翅睛,重試邏輯如下∶
1.至多重試3次。
2.如果發(fā)送失敗黑竞,則輪轉(zhuǎn)到下一個(gè) broker捕发。
3.這個(gè)方法的總耗時(shí)時(shí)間不超過(guò) sendMsgTimeout 設(shè)置的值,默認(rèn)10s很魂。
所以扎酷,如果本身向 broker發(fā)送消息產(chǎn)生超時(shí)異常,就不會(huì)再做重試遏匆。
以上策略仍然不能保證消息一定發(fā)送成功法挨,為保證消息一定成功,建議應(yīng)用這樣做:如果調(diào)用send同步方法發(fā)送失敗幅聘,則嘗試將消息存儲(chǔ)到db凡纳,由后臺(tái)線程定時(shí)重試,保證消息一定到達(dá) broker喊暖。
3.2.2 broker服務(wù)
所有發(fā)往broker的消息惫企,有同步刷盤(pán)和異步刷盤(pán)機(jī)制,總的來(lái)說(shuō)陵叽,可靠性非常高
同步刷盤(pán)時(shí)狞尔,消息寫(xiě)入物理文件才會(huì)返回成功,因此非彻簦可靠
異步刷盤(pán)時(shí)偏序,只有機(jī)器宕機(jī),才會(huì)產(chǎn)生消息丟失胖替,broker掛掉可能會(huì)發(fā)生研儒,但是機(jī)器宕機(jī)崩潰是很少發(fā)生的豫缨,除非突然斷電
3.2.3 消息消費(fèi)
RocketMQ的消費(fèi)與存儲(chǔ)結(jié)構(gòu)
正常情況下,P發(fā)送消息到broker端朵,消息內(nèi)容寫(xiě)到commitlog好芭,消息內(nèi)容在commitlog的位置信息(索引)寫(xiě)到consumerqueue,C讀取consumerqueue的內(nèi)容消費(fèi)消息冲呢。
CONSUME_SUCCESS表示消費(fèi)成功舍败,這是正常業(yè)務(wù)代碼中返回的狀態(tài)。
RECONSUME_LATER表示當(dāng)前消費(fèi)失敗敬拓,需要稍后進(jìn)行重試邻薯。
在RocketMQ中只有業(yè)務(wù)消費(fèi)者側(cè)返了CONSUME_SUCCESS才會(huì)認(rèn)為消息消費(fèi)時(shí)成功的,如果返回RECONSUME_LATER乘凸,RocketMQ則會(huì)認(rèn)為消費(fèi)失敗厕诡,需要重新投遞。
為了保證消息至少被成功消費(fèi)一次营勤,RocketMQ會(huì)把認(rèn)為消費(fèi)失敗的消息發(fā)回broker灵嫌,在接下來(lái)的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,可修改)再次投遞給消費(fèi)者冀偶。如果一直重復(fù)消息都失敗的話醒第,當(dāng)失敗累積到一定次數(shù)后(默認(rèn)16次)將消息投遞到死信隊(duì)列(Dead Letter queue)中,此時(shí)需要監(jiān)控死信隊(duì)列進(jìn)行人工干預(yù)进鸠。