最近 RocketMQ 剛剛上生產(chǎn)環(huán)境鸿秆,閑暇之時在這里做一些分享鸠项,主要目的是讓初學(xué)者能快速上手RocketMQ。
RocketMQ 是什么
Github 上關(guān)于 RocketMQ 的介紹:
RcoketMQ 是一款低延遲嗽测、高可靠、可伸縮铐炫、易于使用的消息中間件。具有以下特性:
- 支持發(fā)布/訂閱(Pub/Sub)和點對點(P2P)消息模型
- 在一個隊列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞
- 支持拉(pull)和推(push)兩種消息模式
- 單一隊列百萬消息的堆積能力
- 支持多種消息協(xié)議钓瞭,如 JMS驳遵、MQTT 等
- 分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義
- 提供 docker 鏡像用于隔離測試和云集群部署
- 提供配置、指標(biāo)和監(jiān)控等功能豐富的 Dashboard
對于這些特性描述山涡,大家簡單過一眼就即可堤结,深入學(xué)習(xí)之后自然就明白了。
專業(yè)術(shù)語
Producer
消息生產(chǎn)者鸭丛,生產(chǎn)者的作用就是將消息發(fā)送到 MQ竞穷,生產(chǎn)者本身既可以產(chǎn)生消息,如讀取文本信息等鳞溉。也可以對外提供接口瘾带,由外部應(yīng)用來調(diào)用接口,再由生產(chǎn)者將收到的消息發(fā)送到 MQ熟菲。
Producer Group
生產(chǎn)者組看政,簡單來說就是多個發(fā)送同一類消息的生產(chǎn)者稱之為一個生產(chǎn)者組。在這里可以不用關(guān)心抄罕,只要知道有這么一個概念即可允蚣。
Consumer
消息消費者,簡單來說呆贿,消費 MQ 上的消息的應(yīng)用程序就是消費者嚷兔,至于消息是否進(jìn)行邏輯處理,還是直接存儲到數(shù)據(jù)庫等取決于業(yè)務(wù)需要做入。
Consumer Group
消費者組冒晰,和生產(chǎn)者類似,消費同一類消息的多個 consumer 實例組成一個消費者組竟块。
Topic
Topic 是一種消息的邏輯分類壶运,比如說你有訂單類的消息,也有庫存類的消息浪秘,那么就需要進(jìn)行分類前弯,一個是訂單 Topic 存放訂單相關(guān)的消息,一個是庫存 Topic 存儲庫存相關(guān)的消息秫逝。
Message
Message 是消息的載體。一個 Message 必須指定 topic询枚,相當(dāng)于寄信的地址违帆。Message 還有一個可選的 tag 設(shè)置,以便消費端可以基于 tag 進(jìn)行過濾消息金蜀。也可以添加額外的鍵值對刷后,例如你需要一個業(yè)務(wù) key 來查找 broker 上的消息的畴,方便在開發(fā)過程中診斷問題。
Tag
標(biāo)簽可以被認(rèn)為是對 Topic 進(jìn)一步細(xì)化尝胆。一般在相同業(yè)務(wù)模塊中通過引入標(biāo)簽來標(biāo)記不同用途的消息丧裁。
Broker
Broker 是 RocketMQ 系統(tǒng)的主要角色,其實就是前面一直說的 MQ含衔。Broker 接收來自生產(chǎn)者的消息煎娇,儲存以及為消費者拉取消息的請求做好準(zhǔn)備。
Name Server
Name Server 為 producer 和 consumer 提供路由信息贪染。
RocketMQ 架構(gòu)
由這張圖可以看到有四個集群缓呛,分別是 NameServer 集群、Broker 集群杭隙、Producer 集群和 Consumer 集群:
- NameServer: 提供輕量級的服務(wù)發(fā)現(xiàn)和路由哟绊。 每個 NameServer 記錄完整的路由信息,提供等效的讀寫服務(wù)痰憎,并支持快速存儲擴(kuò)展票髓。
- Broker: 通過提供輕量級的 Topic 和 Queue 機(jī)制來處理消息存儲,同時支持推(push)和拉(pull)模式以及主從結(jié)構(gòu)的容錯機(jī)制。
- Producer:生產(chǎn)者铣耘,產(chǎn)生消息的實例洽沟,擁有相同 Producer Group 的 Producer 組成一個集群。
- Consumer:消費者涡拘,接收消息進(jìn)行消費的實例玲躯,擁有相同 Consumer Group 的
Consumer 組成一個集群。
簡單說明一下圖中箭頭含義鳄乏,從 Broker 開始跷车,Broker Master1 和 Broker Slave1 是主從結(jié)構(gòu),它們之間會進(jìn)行數(shù)據(jù)同步橱野,即 Date Sync朽缴。同時每個 Broker 與
NameServer 集群中的所有節(jié)
點建立長連接,定時注冊 Topic 信息到所有 NameServer 中水援。
Producer 與 NameServer 集群中的其中一個節(jié)點(隨機(jī)選擇)建立長連接密强,定期從 NameServer 獲取 Topic 路由信息,并向提供 Topic 服務(wù)的 Broker Master 建立長連接蜗元,且定時向 Broker 發(fā)送心跳或渤。Producer 只能將消息發(fā)送到 Broker master,但是 Consumer 則不一樣奕扣,它同時和提供 Topic 服務(wù)的 Master 和 Slave
建立長連接薪鹦,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。
RocketMQ 集群部署模式
- 單 master 模式
也就是只有一個 master 節(jié)點池磁,稱不上是集群奔害,一旦這個 master 節(jié)點宕機(jī),那么整個服務(wù)就不可用地熄,適合個人學(xué)習(xí)使用华临。 - 多 master 模式
多個 master 節(jié)點組成集群,單個 master 節(jié)點宕機(jī)或者重啟對應(yīng)用沒有影響端考。
優(yōu)點:所有模式中性能最高
缺點:單個 master 節(jié)點宕機(jī)期間雅潭,未被消費的消息在節(jié)點恢復(fù)之前不可用,消息的實時性就受到影響跛梗。
注意:使用同步刷盤可以保證消息不丟失寻馏,同時 Topic 相對應(yīng)的 queue 應(yīng)該分布在集群中各個節(jié)點,而不是只在某各節(jié)點上核偿,否則诚欠,該節(jié)點宕機(jī)會對訂閱該 topic 的應(yīng)用造成影響。 - 多 master 多 slave 異步復(fù)制模式
在多 master 模式的基礎(chǔ)上漾岳,每個 master 節(jié)點都有至少一個對應(yīng)的 slave轰绵。master
節(jié)點可讀可寫,但是 slave 只能讀不能寫尼荆,類似于 mysql 的主備模式左腔。
優(yōu)點: 在 master 宕機(jī)時,消費者可以從 slave 讀取消息捅儒,消息的實時性不會受影響液样,性能幾乎和多 master 一樣。
缺點:使用異步復(fù)制的同步方式有可能會有消息丟失的問題巧还。 - 多 master 多 slave 同步雙寫模式
同多 master 多 slave 異步復(fù)制模式類似鞭莽,區(qū)別在于 master 和 slave 之間的數(shù)據(jù)同步方式。
優(yōu)點:同步雙寫的同步模式能保證數(shù)據(jù)不丟失麸祷。
缺點:發(fā)送單個消息 RT 會略長澎怒,性能相比異步復(fù)制低10%左右。
刷盤策略:同步刷盤和異步刷盤(指的是節(jié)點自身數(shù)據(jù)是同步還是異步存儲)
同步方式:同步雙寫和異步復(fù)制(指的一組 master 和 slave 之間數(shù)據(jù)的同步)
注意:要保證數(shù)據(jù)可靠阶牍,需采用同步刷盤和同步雙寫的方式喷面,但性能會較其他方式低。
RocketMQ 單主部署
鑒于是快速入門走孽,我選擇的是第一種單 master 的部署模式惧辈。先說明一下我的安裝環(huán)境:
- Centos 7.2
- jdk 1.8
- Maven 3.2.x
- Git
這里 git 可用可不用,主要是用來直接下載 github 上的源碼磕瓷。也可以選擇自己到
github 上下載盒齿,然后上傳到服務(wù)器上。以git操作為示例。
- clone 源碼并用 maven 編譯
> git clone https://github.com/alibaba/RocketMQ.git /opt/RocketMQ
> cd /opt/RocketMQ && mvn -Dmaven.test.skip=true clean package install assembly:assembly -U
> cd target/alibaba-rocketmq-broker/alibaba-rocketmq
此處可能遇到的問題
一县昂、執(zhí)行"git clone https://github.com/alibaba/RocketMQ.git /home/inspkgs/RocketMQ"時出現(xiàn)以下提示:
fatal: unable to access 'https://github.com/alibaba/RocketMQ.git/': Could not resolve host: github.com; Unknown error
解決辦法:一般是由于網(wǎng)絡(luò)原因造成的,執(zhí)行以下命令
> ping github.com
確定可以 ping 通之后陷舅,再重新執(zhí)行 git clone 命令倒彰。
二、執(zhí)行"mvn -Dmaven.test.skip=true clean package install assembly:assembly -U"編譯時莱睁,可能出現(xiàn)下載相關(guān)jar很慢的情況待讳。
這也是由于默認(rèn) maven 中央倉庫在國外的原因,可以根據(jù)需要在 /home/maven/conf/setting.xml 中的 <mirrors></mirrors> 添加以下內(nèi)容后重新編譯:
<mirror>
<id>aliyun</id>
<mirrorOf>central</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
- 啟動 Name Server
> nohup sh /opt/RocketMQ/bin/mqnamesrv &
//執(zhí)行 jps 查看進(jìn)程
> jps
25913 NamesrvStartup
//查看日志確保服務(wù)已正常啟動
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
- 啟動 broker
> nohup sh /opt/RocketMQ/bin/mqbroker -n localhost:9876 &
//執(zhí)行 jps 查看進(jìn)程
> jps
25954 BrokerStartup
//查看日志確保服務(wù)已正常啟動
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 10.1.54.121:10911] boot success...
- 發(fā)送和接收消息
發(fā)送/接收消息之前,我們需要告訴客戶端 NameServer 地址仰剿。RocketMQ 提供了多種方式來實現(xiàn)這一目標(biāo)创淡。為簡單起見,我們使用環(huán)境變量 NAMESRV_ADDR。
> export NAMESRV_ADDR=localhost:9876
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
- 關(guān)閉服務(wù)
> sh /opt/RocketMQ/bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh /opt/RocketMQ/bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
生產(chǎn)者南吮、消費者 Demo
- 生產(chǎn)者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//聲明并初始化一個producer
//需要一個producer group名字作為構(gòu)造方法的參數(shù)琳彩,這里為producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//設(shè)置NameServer地址,此處應(yīng)改為實際NameServer地址,多個地址之間用部凑;分隔
//NameServer的地址必須有露乏,但是也可以通過環(huán)境變量的方式設(shè)置,不一定非得寫死在代碼里
producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
//調(diào)用start()方法啟動一個producer實例
producer.start();
//發(fā)送10條消息到Topic為TopicTest涂邀,tag為TagA瘟仿,消息內(nèi)容為“Hello RocketMQ”拼接上i的值
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
//調(diào)用producer的send()方法發(fā)送消息
//這里調(diào)用的是同步的方式,所以會有返回結(jié)果
SendResult sendResult = producer.send(msg);
//打印返回結(jié)果比勉,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
//發(fā)送完消息之后劳较,調(diào)用shutdown()方法關(guān)閉producer
producer.shutdown();
}
}
- 消費者
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//聲明并初始化一個consumer
//需要一個consumer group名字作為構(gòu)造方法的參數(shù),這里為consumer1
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
//同樣也要設(shè)置NameServer地址
consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
//這里設(shè)置的是一個consumer的消費策略
//CONSUME_FROM_LAST_OFFSET 默認(rèn)策略浩聋,從該隊列最尾開始消費观蜗,即跳過歷史消息
//CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費赡勘,和setConsumeTimestamp()配合使用嫂便,默認(rèn)是半個小時以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//設(shè)置consumer所訂閱的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
//設(shè)置一個Listener闸与,主要進(jìn)行消息的邏輯處理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
//返回消費狀態(tài)
//CONSUME_SUCCESS 消費成功
//RECONSUME_LATER 消費失敗毙替,需要稍后重新消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//調(diào)用start()方法啟動consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
本篇到此完結(jié),第一次寫文章践樱,表達(dá)不清楚的地方還請各位小伙伴海涵厂画。如有問題,可以多多交流拷邢。如果后面有時間的話袱院,我也會分享更多的干貨,包括
producer、consumer 的 API 詳解忽洛、如何與 spring boot 整合腻惠、生產(chǎn)環(huán)境部署的一些注意事項,以及學(xué)習(xí) RocketMQ 這一路踩過的坑等等欲虚。
最后集灌,如果覺得寫的還過得去,點個喜歡或者小小打賞一下都是一種肯定复哆,謝謝大家的支持欣喧。
ps:偶然的機(jī)會看到自己寫的文章出現(xiàn)在一個個人網(wǎng)站,當(dāng)我通過QQ聯(lián)系那個人詢問這件事時梯找,還被直接拉黑唆阿,真的很郁悶。在此锈锤,我想說驯鳖,想要用我的作品可以,但一定要征得我同意牙咏,這是對一個作者最基本的尊重臼隔。
最新更新文章如下,包括修改的內(nèi)容都會重新整合:
- 3分鐘快速入門RocketMQ(上)
- 3分鐘快速入門RocketMQ(下)
- 必知必會的RocketMQ消息類型
- RocketMQ的消息發(fā)送方式(20180121更新)
- 細(xì)談RocketMQ的消費模式(20180126更新)
- 使用RocketMQ的小細(xì)節(jié)(上)(20180205更新)
- 使用RocketMQ的小細(xì)節(jié)(下)(20180301更新)