一晃酒、 關(guān)鍵特性
1 消息發(fā)送和消費
1)消息發(fā)送者步驟分析:
- 創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
- 指定NameServer地址
- 啟動producer
- 創(chuàng)建消息對象派近,指定主題Topic涂屁、Tag和消息體
- 發(fā)送消息
- 關(guān)閉生產(chǎn)者producer
2)消息消費者步驟分析:
- 創(chuàng)建消費者consumer,制定消費者組名
- 指定NameServer地址
- 訂閱主題Topic和Tag
- 設(shè)置回調(diào)函數(shù)井仰,處理消息
- 啟動消費者consumer
2 消息類型
使用RocketMQ可以發(fā)送普通消息恶守、順序消息第献、事務(wù)消息,順序消息能實現(xiàn)有序消費兔港,事務(wù)消息可以解決分布式事務(wù)實現(xiàn)數(shù)據(jù)最終一致痊硕。
1)普通消息
消息隊列 MQ 提供三種方式來發(fā)送普通消息:
- 可靠同步發(fā)送
同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個數(shù)據(jù)包的通訊方式押框。這種可靠的消息發(fā)送方式使用的比較廣泛,比如:重要的消息通知理逊,短信通知橡伞。
public class SyncProducer {
public static void main(String[] args) throws Exception {
//- 創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//- 指定NameServer地址
producer.setNamesrvAddr("192.168.217.130:9876");
//- 啟動producer
producer.start();
//- 創(chuàng)建消息對象晋被,指定主題Topic兑徘、Tag和消息體
Message message = new Message("base","Tag1","keys_1",("hello").getBytes());
//- 發(fā)送消息
SendResult result = producer.send(message);
//發(fā)送狀態(tài)
SendStatus sendStatus = result.getSendStatus();
//消息id
String msgId = result.getMsgId();
//消息接受隊列id
int queueId = result.getMessageQueue().getQueueId();
TimeUnit.SECONDS.sleep(3);
System.out.println("發(fā)送狀態(tài)"+result+",消息id"+msgId+",隊列"+queueId);
//- 關(guān)閉生產(chǎn)者producer
producer.shutdown();
}
}
- 可靠異步發(fā)送
異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng)羡洛,接著發(fā)送下個數(shù)據(jù)包的通訊方式挂脑,發(fā)送方通過回調(diào)接口接收服務(wù)器響應(yīng),并對響應(yīng)結(jié)果進行處理。異步消息通常用在對響應(yīng)時間敏感的業(yè)務(wù)場景崭闲,即發(fā)送端不能容忍長時間地等待Broker的響應(yīng)肋联。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//- 創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//- 指定NameServer地址
producer.setNamesrvAddr("192.168.217.130:9876");
//- 啟動producer
producer.start();
//- 創(chuàng)建消息對象刁俭,指定主題Topic橄仍、Tag和消息體
for (int i = 0; i < 3; i++) {
Message message = new Message("base","Tag2",("hello"+i).getBytes());
//- 發(fā)送異步消息
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("發(fā)送成功:"+sendResult);
}
public void onException(Throwable throwable) {
System.out.println("發(fā)送異常:"+throwable);
}
});
TimeUnit.SECONDS.sleep(3);
}
//- 關(guān)閉生產(chǎn)者producer
producer.shutdown();
}
}
- 單向發(fā)送消息
這種方式注意用在不特別關(guān)心發(fā)送結(jié)果的場景,例如日志發(fā)送牍戚。
public class OnewayProducer {
public static void main(String[] args) throws Exception {
//- 創(chuàng)建消息生產(chǎn)者producer侮繁,并制定生產(chǎn)者組名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//- 指定NameServer地址
producer.setNamesrvAddr("192.168.217.130:9876");
//- 啟動producer
producer.start();
//- 創(chuàng)建消息對象,指定主題Topic如孝、Tag和消息體
for (int i = 0; i < 3; i++) {
Message message = new Message("base","Tag3",("hello"+i).getBytes());
//- 發(fā)送單向消息
producer.sendOneway(message);
TimeUnit.SECONDS.sleep(3);
}
//- 關(guān)閉生產(chǎn)者producer
producer.shutdown();
}
}
- 編寫消息消費者消費消息( 啟動時需要先啟動消費者監(jiān)聽)
public class Consumer {
public static void main(String[] args) throws MQClientException {
//- 創(chuàng)建消費者consumer宪哩,制定消費者組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//- 指定NameServer地址
consumer.setNamesrvAddr("192.168.217.130:9876");
//- 訂閱主題Topic和Tag
consumer.subscribe("base","*");
//- 設(shè)置回調(diào)函數(shù),處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接收消息內(nèi)容
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//- 啟動消費者consumer
consumer.start();
}
}
RocketMQ 常見異常處理
2) 延時消息
消息在發(fā)送到消息隊列 MQ 服務(wù)端后并不會立馬投遞第晰,而是根據(jù)消息中的屬性延遲固定時間后才投遞給消費者锁孟。但是RocketMQ不支持任意時間精度,僅支持特定的 level但荤,例如定時 5s罗岖, 10s, 1m 等腹躁。其中桑包,level=0 級表示不延時,level=1 表示 1 級延時纺非,level=2 表示 2 級延時哑了,以此類推。
在服務(wù)器端(rocketmq-broker端)的屬性配置文件中加入以下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各級別與延時時間的對應(yīng)映射關(guān)系烧颖。
? 這個配置項配置了從1級開始弱左,各級延時的時間,可以修改這個指定級別的延時時間炕淮;
? 時間單位支持:s拆火、m、h涂圆、d们镜,分別表示秒、分润歉、時模狭、天;
? 默認值就是上面聲明的踩衩,可手工調(diào)整嚼鹉;
? 默認值已夠用贩汉,不建議修改這個值。
public class DelayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.217.130:9876");
producer.start();
//延時10s
Message message = new Message("base","Tag1","keys_1",("hello").getBytes());
message.setDelayTimeLevel(3);
producer.send(message);
producer.shutdown();
}
}
如果你使用阿里云服務(wù)器锚赤,可以使用阿里封裝的api匹舞,它支持定時消息和延時消息,可以適應(yīng)更多場景宴树。
詳細介紹和代碼示例
3) 順序消息
消息有序指的是可以按照消息的發(fā)送順序來消費(FIFO)策菜。RocketMQ可以嚴格的保證消息有序,可以分為分區(qū)有序或者全局有序酒贬。
詳細介紹
4) 事務(wù)消息
消息隊列 MQ 提供類似 X/Open XA 的分布式事務(wù)功能又憨,通過消息隊列 MQ 事務(wù)消息能達到分布式事務(wù)的最終一致。上圖說明了事務(wù)消息的大致流程:正常事務(wù)消息的發(fā)送和提交锭吨、事務(wù)消息的補償流程蠢莺。
事務(wù)消息發(fā)送及提交:
①發(fā)送消息(half消息);
②服務(wù)端響應(yīng)消息寫入結(jié)果零如;
③根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗躏将,此時half消息對業(yè)務(wù)不可見,本地邏輯不執(zhí)行)考蕾;
④根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或Rollback(Commit操作生成消息索引祸憋,消息對消費者可見)。事務(wù)消息的補償流程:
①對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)肖卧,從服務(wù)端發(fā)起一次“回查”蚯窥;
②Producer收到回查消息,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài)塞帐。
③根據(jù)本地事務(wù)狀態(tài)拦赠,重新Commit或RollBack
其中,補償階段用于解決消息Commit或Rollback發(fā)生超時或者失敗的情況葵姥。
- 事務(wù)消息狀態(tài):
事務(wù)消息共有三種狀態(tài):提交狀態(tài)荷鼠、回滾狀態(tài)、中間狀態(tài):
①TransactionStatus.CommitTransaction:提交事務(wù)榔幸,它允許消費者消費此消息允乐。
②TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除削咆,不允許被消費喳篇。
③TransactionStatus.Unkonwn:中間狀態(tài),它代表需要檢查消息隊列來確定消息狀態(tài)态辛。
詳細介紹和代碼示例
消息類型對比:
Topic的消息類型 | 是否支持事務(wù)消息 | 是否支持定時/延時消息 | 性能 |
---|---|---|---|
無序消息(普通、事務(wù)挺尿、定時/延時消息) | 是 | 是 | 最高 |
分區(qū)順序消息 | 否 | 否 | 高 |
全局順序消息 | 否 | 否 | 一般 |
發(fā)送方式對比:
消息類型 | 是否支持同步發(fā)送 | 是否支持異步發(fā)送 | 是否支持單向發(fā)送 |
---|---|---|---|
無序消息(普通奏黑、事務(wù)炊邦、定時/延時消息) | 是 | 是 | 最高 |
分區(qū)順序消息 | 是 | 否 | 否 |
全局順序消息 | 是 | 否 | 否 |
3 批量消息
批量發(fā)送消息能顯著提高傳遞消息的性能,限制是這些消息應(yīng)該具有相同的topic熟史,相同的waitStoreMsgOK馁害,而且不能是延時消息。此外蹂匹,這一批量消息的總大小不應(yīng)超過1MB碘菜。如果超過,需要把消息分割限寞。
不超過1M忍啸,直接producer.send(msg)就可以了。
超過IM,消息分割代碼示例
4 消息消費方式
(1)負載均衡模式
消費者默認采用負載均衡方式履植,多個消費者共同消費隊列消息计雌,每個消費者處理的消息不同。
(2)廣播模式
消費者采用廣播的方式消費消息玫霎,每個消費者消費的消息都是相同的凿滤。
Producer負載均衡
Producer端,每個實例在發(fā)消息的時候庶近,默認會輪詢所有的message queue發(fā)送翁脆,以達 到讓消息平均落在不同的queue上。而由于queue可以散落在不同的broker鼻种,所以消息 就發(fā)送到不同的broker下反番,如下圖:
圖中箭頭線條上的標號代表順序,發(fā)布方會把第一條消息發(fā)送至 Queue 0普舆,然后第二條 消息發(fā)送至 Queue 1恬口,以此類推。
Consumer負載均衡
1)集群模式
在集群消費模式下沼侣,每條消息只需要投遞到訂閱這個topic的Consumer Group下的一個 實例即可祖能。RocketMQ采用主動拉取的方式拉取并消費消息,在拉取的時候需要明確指定 拉取哪一條message queue蛾洛。 而每當實例的數(shù)量有變更养铸,都會觸發(fā)一次所有實例的負載均衡,這時候會按照queue的 數(shù)量和實例的數(shù)量平均分配queue給每個實例轧膘。 默認的分配算法是AllocateMessageQueueAveragely钞螟,如下圖:
還有另外一種平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分攤 每一條queue谎碍,只是以環(huán)狀輪流分queue的形式鳞滨,如下圖:
需要注意的是,集群模式下蟆淀,queue都是只允許分配只一個實例拯啦,這是由于如果多個實 例同時消費一個queue的消息澡匪,由于拉取哪些消息是consumer主動控制的,那樣會導(dǎo)致 同一個消息在不同的實例下被消費多次褒链,所以算法上都是一個queue只分給一個 consumer實例唁情,一個consumer實例可以允許同時分到不同的queue。 通過增加consumer實例去分攤queue的消費甫匹,可以起到水平擴展的消費能力的作用甸鸟。而 有實例下線的時候,會重新觸發(fā)負載均衡兵迅,這時候原來分配到的queue將分配到其他實 例上繼續(xù)消費抢韭。 但是如consumer實例的數(shù)量比message queue的總數(shù)量還多的話,多出來的 consumer實例將無法分到queue喷兼,也就無法消費到消息篮绰,也就無法起到分攤負載的作用了。所以需要控制讓queue的總數(shù)量大于等于consumer的數(shù)量季惯。
2)廣播模式
由于廣播模式下要求一條消息需要投遞到一個消費組下面所有的消費者實例吠各,所以也就 沒有消息被分攤消費的說法。 在實現(xiàn)上勉抓,就是在consumer分配queue的時候贾漏,所有consumer都分到所 有的queue。
《深入理解RocketMQ》- MQ消息的投遞機制
5 簡單消息過濾
1) Tag過濾
RocketMQ 的消息過濾方式有別于其他消息中間件藕筋,是在訂閱時纵散,再做過濾,先來看下 Consume Queue 的存儲結(jié)構(gòu)隐圾。
(1)在 Broker 端進行 Message Tag 比對伍掀,先遍歷 Consume Queue,如果存儲的 Message Tag 與訂閱的 Message Tag 不符合暇藏,則跳過蜜笤,繼續(xù)比對下一個,符合則傳輸給 Consumer盐碱。注意:Message Tag 是字符串形式把兔,Consume Queue 中存儲的是其對應(yīng)的 hashcode,比對時也是比對 hashcode瓮顽。
(2)Consumer 收到過濾后的消息后县好,同樣也要執(zhí)行在 Broker 端的操作,但是比對的是真實的 Message Tag 字 符串暖混,而不是 Hashcode缕贡。
為什么過濾要這樣做?
(1)Message Tag 存儲 Hashcode,是為了在 Consume Queue 定長方式存儲善绎,節(jié)約空間黔漂。
(2)過濾過程中不會訪問 Commit Log 數(shù)據(jù),可以保證堆積情況下也能高效過濾禀酱。
(3) 即使存在 Hash 沖突,也可以在 Consumer 端進行修正牧嫉,保證萬無一失剂跟。
簡單消息過濾通過指定多個 Tag 來過濾消息,過濾動作在服務(wù)器進行酣藻。如:
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
以上方式對于復(fù)雜的場景可能不起作用曹洽,因為一個消息只能有一個tag。這種情況下辽剧,可以使用SQL表達式篩選消息送淆。
2) SQL語法過濾
consumer.subscribe("TopicTest",MessageSelector
.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 3)"));
注意:只有使用push模式的消費者此案使用SQL92標準的sql語句。
6 消息重試
1)順序消息的重試
對于順序消息怕轿,當消費者消費消息失敗后偷崩,消息隊列 RocketMQ 會自動不斷進行消息重 試(每次間隔時間為 1 秒),這時撞羽,應(yīng)用會出現(xiàn)消息消費被阻塞的情況阐斜。因此,在使用順序消息時诀紊,務(wù)必保證應(yīng)用能夠及時監(jiān)控并處理消費失敗的情況谒出,避免阻塞現(xiàn)象的發(fā)生。
2) 無序消息的重試
對于無序消息(普通邻奠、定時笤喳、延時、事務(wù)消息)碌宴,當消費者消費消息失敗時杀狡,您可以通過設(shè)置返回狀態(tài)達到消息重試的結(jié)果。
無序消息的重試只針對集群消費方式生效唧喉;廣播方式不提供失敗重試特性捣卤,即消費失敗后,失敗消息不再重試八孝,繼續(xù)消費新的消息董朝。
3)死信隊列
當一條消息初次消費失敗,消息隊列 RocketMQ 會自動進行消息重試干跛;達到最大重試次 數(shù)后子姜,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時哥捕,消息隊列 RocketMQ 不會立刻將消息丟棄牧抽,而是將其發(fā)送到該消費者對應(yīng)的特殊隊列中。 在消息隊列 RocketMQ 中遥赚,這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message)扬舒,存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。
7 消費冪等
二凫佛、消息存儲
分布式隊列因為有高可靠性的要求讲坎,所以數(shù)據(jù)要進行持久化存儲。
流程:
(1) 消息生成者發(fā)送消息愧薛;
(2) MQ收到消息晨炕,將消息進行持久化,在存儲中新增一條記錄 毫炉;
(3) 返回ACK給生產(chǎn)者瓮栗;
(4) MQ push 消息給對應(yīng)的消費者,然后等待消費者返回ACK瞄勾;
(5) 如果消息消費者在指定時間內(nèi)成功返回ack费奸,那么MQ認為消息消費成功,在存儲中 刪除消息丰榴,即執(zhí)行第6步货邓;如果MQ在指定時間內(nèi)沒有收到ACK,則認為消息消費失 敗四濒,會嘗試重新push消息,重復(fù)執(zhí)行4换况、5、6步驟
(6) MQ刪除消息盗蟆。
1 存儲介質(zhì)
(1) 關(guān)系型數(shù)據(jù)庫DB
Apache下開源的另外一款MQ—ActiveMQ(默認采用的KahaDB做消息存儲)可選用 JDBC的方式來做消息持久化戈二,通過簡單的xml配置信息即可實現(xiàn)JDBC消息存儲。由于喳资, 普通關(guān)系型數(shù)據(jù)庫(如Mysql)在單表數(shù)據(jù)量達到千萬級別的情況下觉吭,其IO讀寫性能往往 會出現(xiàn)瓶頸。在可靠性方面仆邓,該種方案非常依賴DB鲜滩,如果一旦DB出現(xiàn)故障,則MQ的消 息就無法落盤存儲會導(dǎo)致線上故障节值。
(2) 文件系統(tǒng)
目前業(yè)界較為常用的幾款產(chǎn)品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盤 至所部署虛擬機/物理機的文件系統(tǒng)來做持久化(刷盤一般可以分為異步刷盤和同步刷盤兩種模式)徙硅。消息刷盤為消息存儲提供了一種高效率、高可靠性和高性能的數(shù)據(jù) 持久化方式搞疗。除非部署MQ機器本身或是本地磁盤掛了嗓蘑,否則一般是不會出現(xiàn)無法持 久化的故障問題。
2 性能對比
文件系統(tǒng)>關(guān)系型數(shù)據(jù)庫DB
3 消息的存儲和發(fā)送
1)消息存儲
磁盤如果使用得當,磁盤的速度完全可以匹配上網(wǎng)絡(luò) 的數(shù)據(jù)傳輸速度桩皿。目前的高性能磁 盤豌汇,順序?qū)懰俣瓤梢赃_到600MB/s, 超過了一般網(wǎng)卡的傳輸速度泄隔。但是磁盤隨機寫的速 度只有大概100KB/s拒贱,和順序?qū)懙男阅芟嗖?000倍!因為有如此巨大的速度差別佛嬉,好的 消息隊列系統(tǒng)會比普通的消息隊列系統(tǒng)速度快多個數(shù)量級柜思。RocketMQ的消息用順序?qū)? 保證了消息存儲的速度。
2)消息發(fā)送
Linux操作系統(tǒng)分為【用戶態(tài)】和【內(nèi)核態(tài)】巷燥,文件操作、網(wǎng)絡(luò)操作需要涉及這兩種形態(tài) 的切換号枕,免不了進行數(shù)據(jù)復(fù)制缰揪。 一臺服務(wù)器 把本機磁盤文件的內(nèi)容發(fā)送到客戶端,一般分為兩個步驟:
1)read葱淳;讀取本地文件內(nèi)容钝腺;
2)write;將讀取的內(nèi)容通過網(wǎng)絡(luò)發(fā)送出去赞厕。
這兩個看似簡單的操作艳狐,實際進行了4 次數(shù)據(jù)復(fù)制,分別是:
1. 從磁盤復(fù)制數(shù)據(jù)到內(nèi)核態(tài)內(nèi)存皿桑;
2. 從內(nèi)核態(tài)內(nèi)存復(fù) 制到用戶態(tài)內(nèi)存毫目;
3. 然后從用戶態(tài) 內(nèi)存復(fù)制到網(wǎng)絡(luò)驅(qū)動的內(nèi)核態(tài)內(nèi)存;
4. 最后是從網(wǎng)絡(luò)驅(qū)動的內(nèi)核態(tài)內(nèi)存復(fù) 制到網(wǎng)卡中進行傳輸诲侮。
Consumer 消費消息過程镀虐,使用了零拷貝,零拷貝包含以下兩種方式
- 使用 mmap + write 方式
優(yōu)點:即使頻繁調(diào)用沟绪,使用小塊文件傳輸刮便,效率也很高
缺點:不能很好的利用 DMA 方式,會比 sendfile 多消耗 CPU绽慈,內(nèi)存安全性控制復(fù)雜恨旱,需要避免 JVM Crash問題。 - 使用 sendfile 方式
優(yōu)點:可以利用 DMA 方式坝疼,消耗 CPU 較少搜贤,大塊文件傳輸效率高,無內(nèi)存安全新問題裙士。
缺點:小塊文件效率低于 mmap 方式入客,只能是 BIO 方式傳輸,不能使用 NIO。
RocketMQ 選擇了第一種方式桌硫,mmap+write 方式夭咬,因為有小塊數(shù)據(jù)傳輸?shù)男枨螅Ч麜?sendfile 更好铆隘。
關(guān)于 Zero Copy 的更詳細介紹卓舵,請參考以下文章
http://www.linuxjournal.com/article/6345
通過使用mmap的方式,可以省去向用戶態(tài)的內(nèi)存復(fù)制膀钠,提高速度掏湾。這種機制在Java中是 通過MappedByteBuffer實現(xiàn)的 RocketMQ充分利用了上述特性,提高消息存盤和網(wǎng)絡(luò)發(fā)送 的速度肿嘲。
這里需要注意的是融击,采用MappedByteBuffer這種內(nèi)存映射的方式有幾個限制,其 中之一是一次只能映射1.5~2G 的文件至用戶態(tài)的虛擬內(nèi)存雳窟,這也是為何RocketMQ 默認設(shè)置單個CommitLog日志數(shù)據(jù)文件為1G的原因了尊浪。
MQ消息最終一致性解決方案
4 消息存儲結(jié)構(gòu)
RocketMQ消息的存儲是由ConsumeQueue和CommitLog配合完成 的,消息真正的物 理存儲文件是CommitLog封救,ConsumeQueue是消息的邏輯隊列拇涤,類似數(shù)據(jù)庫的索引文 件,存儲的是指向物理存儲的地址誉结。每 個Topic下的每個Message Queue都有一個對應(yīng) 的ConsumeQueue文件鹅士。
CommitLog:存儲消息的元數(shù)據(jù)
ConsumerQueue:存儲消息在CommitLog的索引
IndexFile:為了消息查詢提供了一種通過key或時間區(qū)間來查詢消息的方法,這種通過IndexFile來查找消息的方法不影響發(fā)送與消費消息的主流程
5 刷盤機制
RocketMQ的消息是存儲到磁盤上的惩坑,這樣既能保證斷電后恢復(fù)掉盅, 又可以讓存儲的消息 量超出內(nèi)存的限制。RocketMQ為了提高性能旭贬,會盡可能地保證磁盤的順序?qū)懻印O⒃谕?過Producer寫入RocketMQ的時 候,有兩種寫磁盤方式稀轨,分布式同步刷盤和異步刷盤扼脐。
1)同步刷盤
在返回寫成功狀態(tài)時,消息已經(jīng)被寫入磁盤奋刽。具體流程是瓦侮,消息寫入內(nèi)存的PAGECACHE 后,立刻通知刷盤線程刷盤佣谐, 然后等待刷盤完成肚吏,刷盤線程執(zhí)行完成后喚醒等待的線 程,返回消息寫 成功的狀態(tài)狭魂。
2)異步刷盤
在返回寫成功狀態(tài)時罚攀,消息可能只是被寫入了內(nèi)存的PAGECACHE党觅,寫操作的返回快,吞 吐量大斋泄;當內(nèi)存里的消息量積累到一定程度時杯瞻,統(tǒng)一觸發(fā)寫磁盤動作,快速寫入炫掐。
3)配置
同步刷盤還是異步刷盤魁莉,都是通過Broker配置文件里的flushDiskType 參數(shù)設(shè)置的, 這個參數(shù)被配置成SYNC_FLUSH募胃、ASYNC_FLUSH中的 一個旗唁。
三、高可用性機制
RocketMQ分布式集群是通過Master和Slave的配合達到高可用性的痹束。
Master和Slave的區(qū)別:在Broker的配置文件中检疫,參數(shù) brokerId的值為0表明這個Broker 是Master,大于0表明這個Broker是 Slave祷嘶,同時brokerRole參數(shù)也會說明這個Broker 是Master還是Slave电谣。 Master角色的Broker支持讀和寫,Slave角色的Broker僅支持讀抹蚀,也就是 Producer只能 和Master角色的Broker連接寫入消息;Consumer可以連接 Master角色的Broker企垦,也可 以連接Slave角色的Broker來讀取消息环壤。
1 消息消費高可用
在Consumer的配置文件中,并不需要設(shè)置是從Master讀還是從Slave 讀钞诡,當Master不 可用或者繁忙的時候郑现,Consumer會被自動切換到從Slave 讀。有了自動切換Consumer 這種機制荧降,當一個Master角色的機器出現(xiàn)故障后接箫,Consumer仍然可以從Slave讀取消 息,不影響Consumer程序朵诫。這就達到了消費端的高可用性辛友。
2 消息發(fā)送高可用
在創(chuàng)建Topic的時候,把Topic的多個Message Queue創(chuàng)建在多個Broker組上(相同 Broker名稱剪返,不同 brokerId的機器組成一個Broker組)废累,這樣當一個Broker組的 Master不可 用后,其他組的Master仍然可用脱盲,Producer仍然可以發(fā)送消息邑滨。 RocketMQ目前還不支持把Slave自動轉(zhuǎn)成Master,如果機器資源不足钱反, 需要把Slave轉(zhuǎn) 成Master掖看,則要手動停止Slave角色Broker匣距,更改配置文 件,用新的配置文件啟動 Broker哎壳。
3 消息主從復(fù)制
如果一個Broker組有Master和Slave毅待,消息需要從Master復(fù)制到Slave 上随珠,有同步和異步兩種復(fù)制方式伞访。
1)同步復(fù)制
同步復(fù)制方式是等Master和Slave均寫 成功后才反饋給客戶端寫成功狀態(tài);
在同步復(fù)制方式下沛善,如果Master出故障蹲坷, Slave上有全部的備份數(shù)據(jù)驶乾,容易恢復(fù),但是同 步復(fù)制會增大數(shù)據(jù)寫入 延遲循签,降低系統(tǒng)吞吐量级乐。
2)異步復(fù)制
異步復(fù)制方式是只要Master寫成功 即可反饋給客戶端寫成功狀態(tài)。在異步復(fù)制方式下县匠,系統(tǒng)擁有較低的延遲和較高的吞吐量风科,但是如果Master出了故障, 有些數(shù)據(jù)因為沒有被寫 入Slave乞旦,有可能會丟失贼穆;
3)配置
同步復(fù)制和異步復(fù)制是通過Broker配置文件里的brokerRole參數(shù)進行設(shè)置的,這個參數(shù) 可以被設(shè)置成ASYNC_MASTER兰粉、 SYNC_MASTER故痊、SLAVE三個值中的一個。
4) 總結(jié)
實際應(yīng)用中要結(jié)合業(yè)務(wù)場景玖姑,合理設(shè)置刷盤方式和主從復(fù)制方式愕秫, 尤其是SYNC_FLUSH 方式,由于頻繁地觸發(fā)磁盤寫動作焰络,會明顯降低 性能戴甩。通常情況下,應(yīng)該把Master和 Save配置成ASYNC_FLUSH的刷盤 方式闪彼,主從之間配置成SYNC_MASTER的復(fù)制方式甜孤,這 樣即使有一臺 機器出故障,仍然能保證數(shù)據(jù)不丟畏腕,是個不錯的選擇课蔬。