RocketMQ介紹
RocketMQ是一個純java、分布式唤反、隊列模型的開源消息中間件,具有以下特點:
能夠保證嚴格的消息順序
提供豐富的消息拉取模式
高效的訂閱者水平擴展能力
實時的消息訂閱機制
億級消息堆積能力
選用理由:
強調(diào)集群無單點春瞬,可擴展柴信,任意一點高可用,水平可擴展?
海量消息堆積能力宽气,消息堆積后随常,寫入低延遲
支持上萬個隊列
消息失敗重試機制
消息可查詢
開源社區(qū)活躍
成熟度(經(jīng)過雙十一考驗)
RocketMQ的各部分角色介紹
角色名稱功能
Producer:消息生產(chǎn)者,負責產(chǎn)生消息萄涯,一般由業(yè)務(wù)系統(tǒng)負責產(chǎn)生消息
Consumer:消息消費者绪氛,負責消費消息,一般是后臺系統(tǒng)負責異步消費
Broker:消息中轉(zhuǎn)角色涝影,負責存儲消息枣察,轉(zhuǎn)發(fā)消息,一般也稱為Server
NameServer:管理中心,一般存儲Broker的信息
????????RocketMQ這四個角色就相當于我們現(xiàn)實生活中的郵政系統(tǒng)序目,其中Producer臂痕、Consumer、 Broker宛琅、NameServer分別代表發(fā)信者刻蟹、收信者、負責暫存和傳輸?shù)泥]局嘿辟、以及協(xié)調(diào)各個地方郵局的管理機構(gòu)舆瘪。
????????啟動RocketMQ之前先要啟動NameServer,再啟動Broker红伦,這時候消息隊列已經(jīng)在開始工作了英古。如果想要發(fā)送消息,就用Producer昙读;接受消息就用Consumer召调。如果程序中既要接收也要發(fā)送,可以啟動多個Producer和Consumer蛮浑。如果想要增加可靠性或者增大吞吐量唠叛,防止單點故障也可以在多臺機器上部署多個NameServer和Broker,并且每個Broker也可以部署一個或者過個Slave沮稚。
????????大致了解了基本角色功能后艺沼,再介紹兩個重要的名詞概念Topic(主題)和Message Queue(消息隊列)。當一個企業(yè)搭建好消息平臺后會有多條業(yè)務(wù)線接入進來蕴掏,同一個業(yè)務(wù)也會有不同類型的消息需要投遞障般,如何保證這消息準確地進行,就需要給不同類型的消息加上Topic名稱來進行區(qū)分盛杰。所以在發(fā)送消息和接受消息時挽荡,需要先創(chuàng)建Topic。有了Topic后即供,仍然還有性能問題需要考慮定拟。當一個Topic下的消息投遞量或者發(fā)送量過大怎么辦,這就需要在一個Topic下設(shè)置一個或者多個Message Queue來提高并行處理速度逗嫡。有了Message Queue后青自,消息就可以并行地向各個Message Queue進行分發(fā),從而消費者也可以從多個隊列中讀取消息祸穷,滿足性能要求性穿。
RocketMQ單點安裝
參照官網(wǎng):Downloading the Apache RocketMQ Releases - Apache RocketMQ
RocketMQ多級集群部署以及安裝
本次先講如何利用兩臺物理機,搭建出雙主雙從無單點故障的高可用RoketMQ集群雷滚。假設(shè)這兩臺物理機的ip分別為192.168.218.51和192.168.218.52需曾,端口號默認為9876。
1.啟動多個NameServer和Broker
首先按照單點部署,在兩臺服務(wù)器上分別安裝RocketMQ呆万,服務(wù)地址分別為192.168.218.51:9876和192.168.218.52:9876商源,然后啟動NameServer(nohup sh bin/mqnamesrv &)
啟動Broker,每臺機器都需啟動一個Master角色和一個Slave角色谋减,作為主備牡彻。修改的配置文件在安裝目錄下的conf/2m-2s-sync下。
192.168.218.51機器上的Master Broker配置文件(conf/2m-2s-sync/broker-a.properties)
brokerClusterName=ifind-rocketmq-cluster? ? ?所屬集群名字出爹,集群比較多可以分成多個Cluster庄吼,每個供一個業(yè)務(wù)使用
brokerName=broker-a? ??broker名字,注意此處不同的配置文件填寫的不一樣严就,2選1
brokerId=1????0表示 Master总寻,>0表示 Slave
namesrvAddr=192.168.216.57:9876;192.168.216.61:9876;192.168.216.58:9876;192.168.216.62:9876????nameServer地址,分號分割
defaultTopicQueueNums=4????在發(fā)送消息時梢为,自動創(chuàng)建服務(wù)器不存在的topic渐行,默認創(chuàng)建的隊列數(shù)
autoCreateTopicEnable=true????是否允許 Broker自動創(chuàng)建Topic,建議線下開啟铸董,線上關(guān)閉
autoCreateSubscriptionGroup=true????是否允許 Broker自動創(chuàng)建訂閱組祟印,建議線下開啟,線上關(guān)閉
listenPort=10911????Broker對外服務(wù)的監(jiān)聽端口
deleteWhen=04????刪除文件時間點粟害,默認凌晨 4點
fileReservedTime=120????文件保留時間蕴忆,默認 48小時
mapedFileSizeCommitLog=1073741824????commitLog每個文件的大小默認1G
brokerRole=ASYNC_MASTER????Broker的角色
flushDiskType=ASYNC_FLUSH????刷盤方式????
修改192.168.218.51機器上的Master Broker配置文件(conf/2m-2s-sync/broker-a-s.properties)
修改192.168.218.52機器上的Master Broker配置文件(conf/2m-2s-sync/broker-b.properties)
修改192.168.218.52機器上的Master Broker配置文件(conf/2m-2s-sync/broker-b-s.properties)
幾個配置參數(shù)的含義
參數(shù)名含義
brokerId有三種:SYNCMASTER ASYNCMASTER SLAVE,SYNC表示當Slave和Master消息同步完成 后,再返回發(fā)送成功的狀態(tài)
flushDiskType表示刷盤策略我磁,分為SYNCFLUSH和ASYNCFLUSH兩種孽文,代表同步刷盤和異步刷盤驻襟。同 步狀態(tài)下夺艰,消息真正寫入磁盤才返回成功狀態(tài);異步刷盤情況下沉衣,消息寫入緩存后才返回成功狀態(tài)
2.發(fā)送和接受消息的demo
procucer
public class SyncProducer {
? ? public static void main(String[] args) throws Exception {
? ? ? ? //實例化一個生產(chǎn)者
? ? ? ? DefaultMQProducer producer = new DefaultMQProducer("please_input_group_name"); //生產(chǎn)組名稱
? ? ? ? producer.setNamesrvAddr("192.168.218.51"); //確定服務(wù)地址郁副,集群時通過讀取配置文件變量賦值
? ? ? ? producer.start();? //生產(chǎn)者開始工作
? ? ? ? //發(fā)送消息
? ? ? ? for () {
? ? ? ? ? ? //三個參數(shù),第一個topic豌习,第二個tag標識存谎,第三個是消息內(nèi)容
? ? ? ? ? ? Message msg = new Message("test-topic","tag-a","msg");
? ? ? ? ? ? SendResult sendResult = producer.send(msg); //生產(chǎn)者發(fā)送消息
? ? ? ? }
? ? ? ? producer.shutdown();
? ? }
}
consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_input_group_name"); //消費組名稱
consumer.subscribe("topic","*");? ? //消費者訂閱
consumer.registerMessageListener(new MessageListenerConcurrentliy) {
? ? public ConsumeConcurrentStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
? ? ? ? //處理的邏輯代碼
? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
? ? }
};
consumer.start();
消息隊列的協(xié)調(diào)者
NameServer的功能
NameServer是消息隊列中的狀態(tài)服務(wù)器,集群中的各個組件通過它了解全局的信息肥隆。同時既荚,各個機器都要定期向NameServer上報自己的狀態(tài),如果超時不上報的話栋艳,其它組件會把這個機器從列表中移除恰聘。NameServer可以部署多個,本身是無狀態(tài)的,也就是Broker晴叨、Topic等狀態(tài)信息不會持久存儲凿宾,是由各個角色上報存儲到內(nèi)存的。
集群狀態(tài)的存儲結(jié)構(gòu)
private final HashMap> topicQueue topicQueueTable //這個map的key是Topic的名稱兼蕊,存儲了所有topic的信息初厚。Value存儲著Broker的名稱、讀寫Queue的數(shù)量以及同步標識等?
private final HashMap?//這個結(jié)構(gòu)key是BrokerName孙技,value存儲著地址信息以及所屬Cluster的名稱?
private final HashMap?//這個結(jié)構(gòu)的key是Broker的ip地址产禾,value為Broker機器的實時狀態(tài),包括上次更新狀態(tài)的時間戳?
private final HashMap> //key為Cluster的名稱牵啦,set中存儲的是Broker的名稱下愈,就是集群的BrokerName的集合
?private final HashMap> filterServerTable //key是Broker的地址,value是和這個Broker關(guān)聯(lián)的多個過濾器的地址
以上五個變量的定義蕾久,可以清楚的看出各個組件的狀態(tài)是如何進行存儲的势似,而NameServer的作用便是維護這五個變量中存儲的信息。