RocketMQ官網(wǎng)
一鹿霸、MQ介紹
1.1 為什么要用MQ
消息隊(duì)列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)
其應(yīng)用場(chǎng)景主要包含以下3個(gè)方面
1.1.1 應(yīng)用解耦
系統(tǒng)的耦合性越高,容錯(cuò)性就越低备闲。以電商應(yīng)用為例虚茶,用戶創(chuàng)建訂單后,如果耦合度調(diào)用庫(kù)存系統(tǒng)丧鸯、物流系統(tǒng)蛤铜、支付系統(tǒng),任何一個(gè)子系統(tǒng)出了故障或因?yàn)樯?jí)等原因暫時(shí)不可用,都會(huì)造成下單操作異常围肥,影響用戶使用體驗(yàn)剿干。
使用消息隊(duì)列解耦,系統(tǒng)的容錯(cuò)性就提高了穆刻。比如物流系統(tǒng)發(fā)生故障置尔,需要幾分鐘才能來(lái)修復(fù),在這段時(shí)間內(nèi)氢伟,物流系統(tǒng)要處理的數(shù)據(jù)被緩存到消息隊(duì)列中榜轿,用戶的下單操作正常完成。當(dāng)物流系統(tǒng)恢復(fù)后腐芍,補(bǔ)充處理存在消息隊(duì)列中的訂單消息即可差导,終端系統(tǒng)感知不到物流系統(tǒng)發(fā)生過(guò)幾分鐘故障。
1.1.2 流量削峰
應(yīng)用系統(tǒng)如果遇到系統(tǒng)請(qǐng)求流量的瞬間猛增猪勇,有可能會(huì)將系統(tǒng)壓垮设褐。有了消息隊(duì)列可以將大量請(qǐng)求緩存起來(lái),分散到很長(zhǎng)一段時(shí)間處理泣刹,這樣可以大大提高系統(tǒng)的穩(wěn)定性和用戶體驗(yàn)助析。
一般情況,為了保證系統(tǒng)的穩(wěn)定性椅您,如果系統(tǒng)負(fù)載超過(guò)閥值外冀,就會(huì)阻止用戶請(qǐng)求,這會(huì)影響用戶體驗(yàn)掀泳,而如果使用消息隊(duì)列將請(qǐng)求緩存起來(lái)雪隧,等待系統(tǒng)處理完畢后通知用戶下單完畢,這樣總比不能下單體驗(yàn)要好员舵。
另外脑沿,出于經(jīng)濟(jì)考慮目的,業(yè)務(wù)系統(tǒng)正常時(shí)段的QPS如果是1000马僻,流量最高峰是10000庄拇,為了應(yīng)對(duì)流量高峰配置高性能的服務(wù)器顯然不劃算,這時(shí)可以使用消息隊(duì)列對(duì)峰值流量削峰韭邓。
1.1.3 數(shù)據(jù)分發(fā)
通過(guò)消息隊(duì)列可以讓數(shù)據(jù)在多個(gè)系統(tǒng)之間更加流通措近。數(shù)據(jù)的生產(chǎn)方不需要關(guān)心誰(shuí)來(lái)使用數(shù)據(jù),只需要將數(shù)據(jù)發(fā)送到消息隊(duì)列女淑,數(shù)據(jù)使用放直接在消息隊(duì)列中獲取數(shù)據(jù)即可瞭郑。
1.2 MQ的優(yōu)點(diǎn)和缺點(diǎn)
1.2.1 優(yōu)點(diǎn)
解耦、削峰鸭你、數(shù)據(jù)分發(fā)
1.2.2 缺點(diǎn)
系統(tǒng)的可用性降低:系統(tǒng)引入的外部依賴越多凰浮,系統(tǒng)穩(wěn)定性越差我抠。一旦MQ宕機(jī),就會(huì)對(duì)業(yè)務(wù)造成影響袜茧。如何保證MQ的高可用?
系統(tǒng)復(fù)雜度提高:MQ的加入大大增加了系統(tǒng)的復(fù)雜度瓣窄,以前系統(tǒng)間是同步的遠(yuǎn)程調(diào)用笛厦,現(xiàn)在是通過(guò)MQ進(jìn)行異步調(diào)用。如何保證消息沒(méi)有被重復(fù)消費(fèi)(冪等性)俺夕?怎么處理消息丟失情況裳凸?怎么保證消息傳遞的順序性?
一致性問(wèn)題:A系統(tǒng)處理完業(yè)務(wù)劝贸,通過(guò)MQ給B姨谷、C、D三個(gè)系統(tǒng)發(fā)送數(shù)據(jù)映九,如果B系統(tǒng)梦湘、C系統(tǒng)處理成功,D系統(tǒng)處理失敗件甥。如何保證消息數(shù)據(jù)處理的一致性捌议?
1.3 各種MQ產(chǎn)品的比較
常見(jiàn)的MQ產(chǎn)品包括:Kafka、ActiveMQ引有、RabbitMQ瓣颅、RocketMQ
二、Ubuntu安裝RocketMQ
2.1 RocketMQ環(huán)境要求
1) 64bit OS,linux/Unix/Max
2) 64bit JDK 1.8+
3) Maven 3.2.x
4) Git
2.2 Ubuntu安裝openjdk8
先檢查是否已安裝JDK
java -version
如果未安裝JDK譬正,需要先安裝JDK
# 1宫补、更新軟件包
sudo apt-get update
# 2、安裝openjdk
sudo apt-get install openjdk-8-jdk
# 3曾我、查看安裝版本號(hào)
java -version
4粉怕、配置JAVA環(huán)境
依次運(yùn)行下述代碼找到j(luò)ava的位置
[root@xxx]# which java
/usr/bin/java
[root@xxx]# ls -lrt /usr/bin/java
lrwxrwxrwx. 1 root root 22 7月 23 14:43 /usr/bin/java -> /etc/alternatives/java
[root@xxx]# ls -lrt /etc/alternatives/java
lrwxrwxrwx. 1 root root 73 7月 23 14:43 /etc/alternatives/java -> /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
找到j(luò)ava路徑后,去掉最后的/jre/bin/java您单,在profile文件中進(jìn)行修改
vim /etc/profile
在末尾追加
JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH
使配置生效
source /etc/profile
2.3 Ubuntu安裝maven
先檢查是否已安裝maven
mvn -v
如果未安裝maven斋荞,需要先安裝maven
# 1、安裝maven
sudo apt-get install maven
# 2虐秦、查看版本號(hào)
mvn -v
2.4 Ubuntu安裝git
先檢查是否已安裝git
git --version
如果未安裝git平酿,需要先安裝git
# 1、安裝git
sudo apt-get install git
# 2悦陋、查看版本號(hào)
git --version
2.5 下載RocketMQ并構(gòu)建
2.5.1 創(chuàng)建data目錄蜈彼,并進(jìn)入該目錄
mkdir /data
cd /data
2.5.2 下載并構(gòu)建
git clone https://github.com/apache/incubator-rocketmq.git
cd incubator-rocketmq
mvn -Prelease-all -DskipTests clean install -U
2.5.3 配置環(huán)境變量
# 修改配置文件
vim /etc/profile
# 在結(jié)尾處添加:
export ROCKETMQ_HOME=/data/incubator-rocketmq/distribution/target/rocketmq-4.9.3-SNAPSHOT/rocketmq-4.9.3-SNAPSHOT
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
# 使配置生效
source /etc/profile
注意:ROCKETMQ_HOME要配置成你自己的目錄
2.5.4 修改JAVA_HOME 路徑
修改兩個(gè)文件:runbroker.sh和runserver.sh
我的路徑是/data/incubator-rocketmq/distribution/target/rocketmq-4.9.3-SNAPSHOT/rocketmq-4.9.3-SNAPSHOT/bin/runbroker.sh
修改JAVA_HOME路徑
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
2.5.5 啟動(dòng)NameServer
RocketMQ NameServer 默認(rèn)端口9876
# 進(jìn)入RocketMQ目錄
cd /data/incubator-rocketmq/distribution/target/rocketmq-4.9.3-SNAPSHOT/rocketmq-4.9.3-SNAPSHOT
# 啟動(dòng)NameServer
nohup sh bin/mqnamesrv > /data/mqnamesrv.log 2>&1 &
# 查看是否啟動(dòng)成功
netstat -tunlp|grep 9876
# 查看日志
tail -f /data/mqnamesrv.log
root@iZm5eetszs07500os8erolZ:~# cd /data/incubator-rocketmq/distribution/target/rocketmq-4.9.3-SNAPSHOT/rocketmq-4.9.3-SNAPSHOT
root@iZm5eetszs07500os8erolZ:/data/incubator-rocketmq/distribution/target/rocketmq-4.9.3-SNAPSHOT/rocketmq-4.9.3-SNAPSHOT# nohup sh bin/mqnamesrv > /data/mqnamesrv.log 2>&1 &
[1] 2814
root@iZm5eetszs07500os8erolZ:/data/incubator-rocketmq/distribution/target/rocketmq-4.9.3-SNAPSHOT/rocketmq-4.9.3-SNAPSHOT# netstat -tunlp|grep 9876
tcp 0 0 0.0.0.0:9876 0.0.0.0:* LISTEN 2830/java
root@iZm5eetszs07500os8erolZ:/data/incubator-rocketmq/distribution/target/rocketmq-4.9.3-SNAPSHOT/rocketmq-4.9.3-SNAPSHOT# tail -f /data/mqnamesrv.log
nohup: ignoring input
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
備注:官網(wǎng)的啟動(dòng)命令是nohup sh bin/mqnamesrv &
,但會(huì)提示nohup: ignoring input and appending output to 'nohup.out'
俺驶,所以將命令換成了nohup sh bin/mqnamesrv > /data/mqnamesrv.log 2>&1 &
2.5.6 修改虛擬機(jī)內(nèi)存
RocketMQ默認(rèn)使用的虛擬機(jī)內(nèi)存較大幸逆,啟動(dòng)Broker如果內(nèi)存不足會(huì)失敗棍辕,需要編輯如下兩個(gè)配置文件,修改JVM內(nèi)存大小
vi runbroker.sh
vi runserver.sh
2.5.7 啟動(dòng)Broker
三还绘、RocketMQ簡(jiǎn)介
RocketMQ是阿里巴巴2016年開(kāi)源的MQ中間件楚昭,使用Java語(yǔ)言開(kāi)發(fā),在阿里內(nèi)部拍顷,RocketMQ承接了例如雙11等高并發(fā)場(chǎng)景的消息流轉(zhuǎn)抚太,能夠處理萬(wàn)億級(jí)別的消息。
四昔案、RocketMQ基本概念
4.1 消息模型(Message Model)
RocketMQ主要由 Producer尿贫、Broker、Consumer 三部分組成踏揣,其中Producer 負(fù)責(zé)生產(chǎn)消息庆亡,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲(chǔ)消息捞稿。Broker 在實(shí)際部署過(guò)程中對(duì)應(yīng)一臺(tái)服務(wù)器又谋,每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息,每個(gè)Topic的消息也可以分片存儲(chǔ)于不同的 Broker括享。Message Queue 用于存儲(chǔ)消息的物理地址搂根,每個(gè)Topic中的消息地址存儲(chǔ)于多個(gè) Message Queue 中。ConsumerGroup 由多個(gè)Consumer 實(shí)例構(gòu)成铃辖。
4.2 消息生產(chǎn)者(Producer)
負(fù)責(zé)生產(chǎn)消息剩愧,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)生產(chǎn)消息。一個(gè)消息生產(chǎn)者會(huì)把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務(wù)器娇斩。RocketMQ提供多種發(fā)送方式仁卷,同步發(fā)送、異步發(fā)送犬第、順序發(fā)送锦积、單向發(fā)送。同步和異步方式均需要Broker返回確認(rèn)信息歉嗓,單向發(fā)送不需要丰介。
4.3 消息消費(fèi)者(Consumer)
負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)鉴分。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器拉取消息哮幢、并將其提供給應(yīng)用程序。從用戶應(yīng)用的角度而言提供了兩種消費(fèi)形式:拉取式消費(fèi)志珍、推動(dòng)式消費(fèi)橙垢。
4.4 主題(Topic)
表示一類消息的集合,每個(gè)主題包含若干條消息伦糯,每條消息只能屬于一個(gè)主題柜某,是RocketMQ進(jìn)行消息訂閱的基本單位嗽元。
4.5 代理服務(wù)器(Broker Server)
消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息喂击、轉(zhuǎn)發(fā)消息剂癌。代理服務(wù)器在RocketMQ系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來(lái)的消息并存儲(chǔ)、同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備惭等。代理服務(wù)器也存儲(chǔ)消息相關(guān)的元數(shù)據(jù)珍手,包括消費(fèi)者組、消費(fèi)進(jìn)度偏移和主題和隊(duì)列消息等辞做。
4.6 名字服務(wù)(Name Server)
名稱服務(wù)充當(dāng)路由消息的提供者。生產(chǎn)者或消費(fèi)者能夠通過(guò)名字服務(wù)查找各主題相應(yīng)的Broker IP列表寡具。多個(gè)Namesrv實(shí)例組成集群秤茅,但相互獨(dú)立,沒(méi)有信息交換童叠。
4.7 拉取式消費(fèi)(Pull Consumer)
Consumer消費(fèi)的一種類型框喳,應(yīng)用通常主動(dòng)調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息、主動(dòng)權(quán)由應(yīng)用控制厦坛。一旦獲取了批量消息五垮,應(yīng)用就會(huì)啟動(dòng)消費(fèi)過(guò)程。
4.8 推動(dòng)式消費(fèi)(Push Consumer)
Consumer消費(fèi)的一種類型杜秸,該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給消費(fèi)端放仗,該消費(fèi)模式一般實(shí)時(shí)性較高。
4.9 生產(chǎn)者組(Producer Group)
同一類Producer的集合撬碟,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致诞挨。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務(wù)器會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)呢蛤。
4.10 消費(fèi)者組(Consumer Group)
同一類Consumer的集合惶傻,這類Consumer通常消費(fèi)同一類消息且消費(fèi)邏輯一致。消費(fèi)者組使得在消息消費(fèi)方面其障,實(shí)現(xiàn)負(fù)載均衡和容錯(cuò)的目標(biāo)變得非常容易银室。要注意的是,消費(fèi)者組的消費(fèi)者實(shí)例必須訂閱完全相同的Topic励翼。RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)蜈敢。
4.11 集群消費(fèi)(Clustering)
集群消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例平均分?jǐn)傁ⅰ?/p>
4.12 廣播消費(fèi)(Broadcasting)
廣播消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例都接收全量的消息抚笔。
4.13 普通順序消息(Normal Ordered Message)
普通順序消費(fèi)模式下扶认,消費(fèi)者通過(guò)同一個(gè)消息隊(duì)列( Topic 分區(qū),稱作 Message Queue) 收到的消息是有順序的殊橙,不同消息隊(duì)列收到的消息則可能是無(wú)順序的辐宾。
4.14 嚴(yán)格順序消息(Strictly Ordered Message)
嚴(yán)格順序消息模式下狱从,消費(fèi)者收到的所有消息均是有順序的。
4.15 消息(Message)
消息系統(tǒng)所傳輸信息的物理載體叠纹,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位季研,每條消息必須屬于一個(gè)主題。RocketMQ中每個(gè)消息擁有唯一的Message ID誉察,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key与涡。系統(tǒng)提供了通過(guò)Message ID和Key查詢消息的功能。
4.16 標(biāo)簽(Tag)
為消息設(shè)置的標(biāo)志持偏,用于同一主題下區(qū)分不同類型的消息驼卖。來(lái)自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽鸿秆。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性酌畜,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯卿叽,實(shí)現(xiàn)更好的擴(kuò)展性桥胞。
五、RocketMQ特性
5.1 訂閱與發(fā)布
消息的發(fā)布是指某個(gè)生產(chǎn)者向某個(gè)topic發(fā)送消息考婴;消息的訂閱是指某個(gè)消費(fèi)者關(guān)注了某個(gè)topic中帶有某些tag的消息贩虾,進(jìn)而從該topic消費(fèi)數(shù)據(jù)。
5.2 消息順序
消息有序指的是一類消息消費(fèi)時(shí)沥阱,能按照發(fā)送的順序來(lái)消費(fèi)缎罢。例如:一個(gè)訂單產(chǎn)生了三條消息分別是訂單創(chuàng)建、訂單付款喳钟、訂單完成屁使。消費(fèi)時(shí)要按照這個(gè)順序消費(fèi)才能有意義,但是同時(shí)訂單之間是可以并行消費(fèi)的奔则。RocketMQ可以嚴(yán)格的保證消息有序蛮寂。
順序消息分為全局順序消息與分區(qū)順序消息,全局順序是指某個(gè)Topic下的所有消息都要保證順序易茬;部分順序消息只要保證每一組消息被順序消費(fèi)即可酬蹋。
全局順序 對(duì)于指定的一個(gè) Topic,所有消息按照嚴(yán)格的先入先出(FIFO)的順序進(jìn)行發(fā)布和消費(fèi)抽莱。 適用場(chǎng)景:性能要求不高范抓,所有的消息嚴(yán)格按照 FIFO 原則進(jìn)行消息發(fā)布和消費(fèi)的場(chǎng)景。
分區(qū)順序 對(duì)于指定的一個(gè) Topic食铐,所有消息根據(jù) sharding key 進(jìn)行區(qū)塊分區(qū)匕垫。 同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的 FIFO 順序進(jìn)行發(fā)布和消費(fèi)。 Sharding key 是順序消息中用來(lái)區(qū)分不同分區(qū)的關(guān)鍵字段虐呻,和普通消息的 Key 是完全不同的概念象泵。 適用場(chǎng)景:性能要求高寞秃,以 sharding key 作為分區(qū)字段,在同一個(gè)區(qū)塊中嚴(yán)格的按照 FIFO 原則進(jìn)行消息發(fā)布和消費(fèi)的場(chǎng)景偶惠。
5.3 消息過(guò)濾
RocketMQ的消費(fèi)者可以根據(jù)Tag進(jìn)行消息過(guò)濾春寿,也支持自定義屬性過(guò)濾。消息過(guò)濾目前是在Broker端實(shí)現(xiàn)的忽孽,優(yōu)點(diǎn)是減少了對(duì)于Consumer無(wú)用消息的網(wǎng)絡(luò)傳輸绑改,缺點(diǎn)是增加了Broker的負(fù)擔(dān)、而且實(shí)現(xiàn)相對(duì)復(fù)雜兄一。
5.4 消息可靠性
RocketMQ支持消息的高可靠厘线,影響消息可靠性的幾種情況:
1、Broker非正常關(guān)閉
2出革、Broker異常Crash
3皆的、OS Crash
4、機(jī)器掉電蹋盆,但是能立即恢復(fù)供電情況
5、機(jī)器無(wú)法開(kāi)機(jī)(可能是cpu硝全、主板栖雾、內(nèi)存等關(guān)鍵設(shè)備損壞)
磁盤(pán)設(shè)備損壞。
1)伟众、2)析藕、3)、4) 四種情況都屬于硬件資源可立即恢復(fù)情況凳厢,RocketMQ在這四種情況下能保證消息不丟账胧,或者丟失少量數(shù)據(jù)(依賴刷盤(pán)方式是同步還是異步)。
5)先紫、6)屬于單點(diǎn)故障治泥,且無(wú)法恢復(fù),一旦發(fā)生遮精,在此單點(diǎn)上的消息全部丟失居夹。RocketMQ在這兩種情況下,通過(guò)異步復(fù)制本冲,可保證99%的消息不丟准脂,但是仍然會(huì)有極少量的消息可能丟失。通過(guò)同步雙寫(xiě)技術(shù)可以完全避免單點(diǎn)檬洞,同步雙寫(xiě)勢(shì)必會(huì)影響性能狸膏,適合對(duì)消息可靠性要求極高的場(chǎng)合,例如與Money相關(guān)的應(yīng)用添怔。注:RocketMQ從3.0版本開(kāi)始支持同步雙寫(xiě)湾戳。
5.5 至少一次
至少一次(At least Once)指每個(gè)消息必須投遞一次贤旷。Consumer先Pull消息到本地,消費(fèi)完成后院塞,才向服務(wù)器返回ack遮晚,如果沒(méi)有消費(fèi)一定不會(huì)ack消息,所以RocketMQ可以很好的支持此特性拦止。
5.6 回溯消費(fèi)
回溯消費(fèi)是指Consumer已經(jīng)消費(fèi)成功的消息县遣,由于業(yè)務(wù)上需求需要重新消費(fèi),要支持此功能汹族,Broker在向Consumer投遞成功消息后萧求,消息仍然需要保留。并且重新消費(fèi)一般是按照時(shí)間維度顶瞒,例如由于Consumer系統(tǒng)故障夸政,恢復(fù)后需要重新消費(fèi)1小時(shí)前的數(shù)據(jù),那么Broker要提供一種機(jī)制榴徐,可以按照時(shí)間維度來(lái)回退消費(fèi)進(jìn)度守问。RocketMQ支持按照時(shí)間回溯消費(fèi),時(shí)間維度精確到毫秒坑资。
5.7 事務(wù)消息
RocketMQ事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中耗帕,要么同時(shí)成功,要么同時(shí)失敗袱贮。RocketMQ的事務(wù)消息提供類似 X/Open XA 的分布事務(wù)功能仿便,通過(guò)事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
5.8 定時(shí)(延時(shí))消息
定時(shí)消息(延遲隊(duì)列)是指消息發(fā)送到broker后攒巍,不會(huì)立即被消費(fèi)嗽仪,等待特定時(shí)間投遞給真正的topic。 broker有配置項(xiàng)messageDelayLevel柒莉,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”闻坚,18個(gè)level〕1可以配置自定義messageDelayLevel鲤氢。注意,messageDelayLevel是broker的屬性西潘,不屬于某個(gè)topic卷玉。發(fā)消息時(shí),設(shè)置delayLevel等級(jí)即可:msg.setDelayLevel(level)喷市。level有以下三種情況:
level == 0相种,消息為非延遲消息
1<=level<=maxLevel,消息延遲特定時(shí)間,例如level==1寝并,延遲1s
level > maxLevel箫措,則level== maxLevel,例如level==20衬潦,延遲2h斤蔓,即最大延時(shí)時(shí)間為2小時(shí)。
定時(shí)消息會(huì)暫存在名為SCHEDULE_TOPIC_XXXX的topic中镀岛,并根據(jù)delayTimeLevel存入特定的queue弦牡,queueId = delayTimeLevel – 1,即一個(gè)queue只存相同延遲的消息漂羊,保證具有相同發(fā)送延遲的消息能夠順序消費(fèi)驾锰。broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫(xiě)入真實(shí)的topic走越。
需要注意的是椭豫,定時(shí)消息會(huì)在第一次寫(xiě)入和調(diào)度寫(xiě)入真實(shí)topic時(shí)都會(huì)計(jì)數(shù),因此發(fā)送數(shù)量旨指、tps都會(huì)變高赏酥。
5.8.2 延時(shí)消息的使用場(chǎng)景
比如電商里,提交了一個(gè)訂單就可以發(fā)送一個(gè)延時(shí)消息谆构,1h后去檢查這個(gè)訂單的狀態(tài)今缚,如果還是未付款就取消訂單釋放庫(kù)存。
擴(kuò)展:Rabbitmq本身是沒(méi)有延遲隊(duì)列的低淡,只能通過(guò)Rabbitmq本身的特性來(lái)實(shí)現(xiàn),想要Rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列瞬项,需要使用Rabbitmq的死信交換機(jī)(Exchage)和消息存活時(shí)間TTL(Time To Live)
5.8.3 延時(shí)消息的使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
現(xiàn)在RocketMq并不支持任意時(shí)間的延時(shí)蔗蹋,需要設(shè)置幾個(gè)固定的延時(shí)等級(jí)(商業(yè)版本支持自定義時(shí)間),從1s到2h分別對(duì)應(yīng)著等級(jí)1到18 消息消費(fèi)失敗會(huì)進(jìn)入延時(shí)消息隊(duì)列囱淋,消息發(fā)送時(shí)間與設(shè)置的延時(shí)等級(jí)和重試次數(shù)有關(guān)猪杭,詳見(jiàn)代碼SendMessageProcessor.java
5.9 消息重試
Consumer消費(fèi)消息失敗后,要提供一種重試機(jī)制妥衣,令消息再消費(fèi)一次。Consumer消費(fèi)消息失敗通乘笆郑可以認(rèn)為有以下幾種情況:
- 由于消息本身的原因,例如反序列化失敗芦倒,消息數(shù)據(jù)本身無(wú)法處理(例如話費(fèi)充值艺挪,當(dāng)前消息的手機(jī)號(hào)被注銷,無(wú)法充值)等兵扬。這種錯(cuò)誤通常需要跳過(guò)這條消息麻裳,再消費(fèi)其它消息口蝠,而這條失敗的消息即使立刻重試消費(fèi),99%也不成功津坑,所以最好提供一種定時(shí)重試機(jī)制妙蔗,即過(guò)10秒后再重試。
- 由于依賴的下游應(yīng)用服務(wù)不可用疆瑰,例如db連接不可用眉反,外系統(tǒng)網(wǎng)絡(luò)不可達(dá)等。遇到這種錯(cuò)誤乃摹,即使跳過(guò)當(dāng)前失敗的消息禁漓,消費(fèi)其他消息同樣也會(huì)報(bào)錯(cuò)。這種情況建議應(yīng)用sleep 30s孵睬,再消費(fèi)下一條消息播歼,這樣可以減輕Broker重試消息的壓力。
RocketMQ會(huì)為每個(gè)消費(fèi)組都設(shè)置一個(gè)Topic名稱為“%RETRY%+consumerGroup”的重試隊(duì)列(這里需要注意的是掰读,這個(gè)Topic的重試隊(duì)列是針對(duì)消費(fèi)組秘狞,而不是針對(duì)每個(gè)Topic設(shè)置的),用于暫時(shí)保存因?yàn)楦鞣N異常而導(dǎo)致Consumer端無(wú)法消費(fèi)的消息蹈集∷甘裕考慮到異常恢復(fù)起來(lái)需要一些時(shí)間拢肆,會(huì)為重試隊(duì)列設(shè)置多個(gè)重試級(jí)別减响,每個(gè)重試級(jí)別都有與之對(duì)應(yīng)的重新投遞延時(shí),重試次數(shù)越多投遞延時(shí)就越大郭怪。RocketMQ對(duì)于重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊(duì)列中支示,后臺(tái)定時(shí)任務(wù)按照對(duì)應(yīng)的時(shí)間進(jìn)行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊(duì)列中。
RocketMQ消息重試機(jī)制:采用時(shí)間衰減的方式鄙才,使用了自身定時(shí)消費(fèi)的能力颂鸿。首次在10秒后重試消費(fèi),如果消費(fèi)成功則不再重試攒庵,如果消費(fèi)失敗則繼續(xù)重試消費(fèi)嘴纺,第二次載30s后重試消費(fèi),以此類推浓冒,每次重試的間隔時(shí)間都會(huì)加長(zhǎng)栽渴,直到超過(guò)最大重試次數(shù)(默認(rèn)16次),則寫(xiě)入死信隊(duì)列不再重試稳懒,重試消費(fèi)過(guò)程中的間隔時(shí)間使用了定時(shí)消費(fèi)熔萧,重試的消息數(shù)據(jù)并非直接寫(xiě)入重試隊(duì)列,而是先寫(xiě)入定時(shí)消費(fèi)隊(duì)列,再通過(guò)定時(shí)消息的功能轉(zhuǎn)發(fā)到重試隊(duì)列佛致。(1小時(shí)后重試第15次贮缕,2小時(shí)后重試第16次)
5.10 消息重投
生產(chǎn)者在發(fā)送消息時(shí),同步消息失敗會(huì)重投俺榆,異步消息有重試感昼,oneway沒(méi)有任何保證。消息重投保證消息盡可能發(fā)送成功罐脊、不丟失定嗓,但可能會(huì)造成消息重復(fù),消息重復(fù)在RocketMQ中是無(wú)法避免的問(wèn)題萍桌。消息重復(fù)在一般情況下不會(huì)發(fā)生宵溅,當(dāng)出現(xiàn)消息量大、網(wǎng)絡(luò)抖動(dòng)上炎,消息重復(fù)就會(huì)是大概率事件恃逻。另外,生產(chǎn)者主動(dòng)重發(fā)藕施、consumer負(fù)載變化也會(huì)導(dǎo)致重復(fù)消息寇损。如下方法可以設(shè)置消息重試策略:
retryTimesWhenSendFailed:同步發(fā)送失敗重投次數(shù),默認(rèn)為2裳食,因此生產(chǎn)者會(huì)最多嘗試發(fā)送retryTimesWhenSendFailed + 1次矛市。不會(huì)選擇上次失敗的broker,嘗試向其他broker發(fā)送诲祸,最大程度保證消息不丟浊吏。超過(guò)重投次數(shù),拋出異常救氯,由客戶端保證消息不丟卿捎。當(dāng)出現(xiàn)RemotingException、MQClientException和部分MQBrokerException時(shí)會(huì)重投径密。
retryTimesWhenSendAsyncFailed:異步發(fā)送失敗重試次數(shù),異步重試不會(huì)選擇其他broker躺孝,僅在同一個(gè)broker上做重試享扔,不保證消息不丟。
retryAnotherBrokerWhenNotStoreOK:消息刷盤(pán)(主或備)超時(shí)或slave不可用(返回狀態(tài)非SEND_OK)植袍,是否嘗試發(fā)送到其他broker惧眠,默認(rèn)false。十分重要消息可以開(kāi)啟于个。
5.11 流量控制
生產(chǎn)者流控氛魁,因?yàn)閎roker處理能力達(dá)到瓶頸;消費(fèi)者流控,因?yàn)橄M(fèi)能力達(dá)到瓶頸秀存。
生產(chǎn)者流控:
commitLog文件被鎖時(shí)間超過(guò)osPageCacheBusyTimeOutMills時(shí)捶码,參數(shù)默認(rèn)為1000ms,返回流控或链。
如果開(kāi)啟transientStorePoolEnable == true惫恼,且broker為異步刷盤(pán)的主機(jī),且transientStorePool中資源不足澳盐,拒絕當(dāng)前send請(qǐng)求祈纯,返回流控。
broker每隔10ms檢查send請(qǐng)求隊(duì)列頭部請(qǐng)求的等待時(shí)間叼耙,如果超過(guò)waitTimeMillsInSendQueue腕窥,默認(rèn)200ms,拒絕當(dāng)前send請(qǐng)求筛婉,返回流控簇爆。
broker通過(guò)拒絕send 請(qǐng)求方式實(shí)現(xiàn)流量控制。
注意倾贰,生產(chǎn)者流控冕碟,不會(huì)嘗試消息重投。
消費(fèi)者流控:
- 消費(fèi)者本地緩存消息數(shù)超過(guò)pullThresholdForQueue時(shí)匆浙,默認(rèn)1000安寺。
- 消費(fèi)者本地緩存消息大小超過(guò)pullThresholdSizeForQueue時(shí),默認(rèn)100MB首尼。
- 消費(fèi)者本地緩存消息跨度超過(guò)consumeConcurrentlyMaxSpan時(shí)挑庶,默認(rèn)2000。
消費(fèi)者流控的結(jié)果是降低拉取頻率软能。
5.12 死信隊(duì)列
死信隊(duì)列用于處理無(wú)法被正常消費(fèi)的消息迎捺。當(dāng)一條消息初次消費(fèi)失敗,消息隊(duì)列會(huì)自動(dòng)進(jìn)行消息重試查排;達(dá)到最大重試次數(shù)后凳枝,若消費(fèi)依然失敗,則表明消費(fèi)者在正常情況下無(wú)法正確地消費(fèi)該消息跋核,此時(shí)岖瑰,消息隊(duì)列 不會(huì)立刻將消息丟棄疟位,而是將其發(fā)送到該消費(fèi)者對(duì)應(yīng)的特殊隊(duì)列中阀坏。
RocketMQ將這種正常情況下無(wú)法被消費(fèi)的消息稱為死信消息(Dead-Letter Message),將存儲(chǔ)死信消息的特殊隊(duì)列稱為死信隊(duì)列(Dead-Letter Queue)琳拭。在RocketMQ中刻伊,可以通過(guò)使用console控制臺(tái)對(duì)死信隊(duì)列中的消息進(jìn)行重發(fā)來(lái)使得消費(fèi)者實(shí)例再次進(jìn)行消費(fèi)露戒。
六椒功、RocketMQ最佳實(shí)踐
6.1 生產(chǎn)者
6.1.1 Tags的使用
一個(gè)應(yīng)用盡可能用一個(gè)Topic,而消息子類型則可以用tags來(lái)標(biāo)識(shí)智什。tags可以由應(yīng)用自由設(shè)置动漾,只有生產(chǎn)者在發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時(shí)才可以利用tags通過(guò)broker做消息過(guò)濾
6.1.2 Keys的使用
每個(gè)消息在業(yè)務(wù)層面的唯一標(biāo)識(shí)碼要設(shè)置到keys字段撩鹿,方便將來(lái)定位消息丟失問(wèn)題谦炬。服務(wù)器會(huì)為每個(gè)消息創(chuàng)建索引(哈希索引),應(yīng)用可以通過(guò)topic节沦、key來(lái)查詢這條消息內(nèi)容键思,以及消息被誰(shuí)消費(fèi)。由于是哈希索引甫贯,請(qǐng)務(wù)必保證key盡可能唯一吼鳞,這樣可以避免潛在的哈希沖突。
6.1.3 消息發(fā)送失敗處理方式
Producer的send方法本身支持內(nèi)部重試叫搁,重試邏輯如下:
至多重試2次赔桌。
如果同步模式發(fā)送失敗,則輪轉(zhuǎn)到下一個(gè)Broker渴逻,如果異步模式發(fā)送失敗疾党,則只會(huì)在當(dāng)前Broker進(jìn)行重試。這個(gè)方法的總耗時(shí)時(shí)間不超過(guò)sendMsgTimeout設(shè)置的值惨奕,默認(rèn)10s雪位。
如果本身向broker發(fā)送消息產(chǎn)生超時(shí)異常,就不會(huì)再重試梨撞。
以上策略也是在一定程度上保證了消息可以發(fā)送成功雹洗。如果業(yè)務(wù)對(duì)消息可靠性要求比較高,建議應(yīng)用增加相應(yīng)的重試邏輯:比如調(diào)用send同步方法發(fā)送失敗時(shí)卧波,則嘗試將消息存儲(chǔ)到db时肿,然后由后臺(tái)線程定時(shí)重試,確保消息一定到達(dá)Broker港粱。
上述db重試方式為什么沒(méi)有集成到MQ客戶端內(nèi)部做螃成,而是要求應(yīng)用自己去完成,主要基于以下幾點(diǎn)考慮:首先查坪,MQ的客戶端設(shè)計(jì)為無(wú)狀態(tài)模式寸宏,方便任意的水平擴(kuò)展,且對(duì)機(jī)器資源的消耗僅僅是cpu咪惠、內(nèi)存、網(wǎng)絡(luò)淋淀。其次遥昧,如果MQ客戶端內(nèi)部集成一個(gè)KV存儲(chǔ)模塊覆醇,那么數(shù)據(jù)只有同步落盤(pán)才能較可靠,而同步落盤(pán)本身性能開(kāi)銷較大炭臭,所以通常會(huì)采用異步落盤(pán)永脓,又由于應(yīng)用關(guān)閉過(guò)程不受MQ運(yùn)維人員控制,可能經(jīng)常會(huì)發(fā)生 kill -9 這樣暴力方式關(guān)閉鞋仍,造成數(shù)據(jù)沒(méi)有及時(shí)落盤(pán)而丟失常摧。第三,Producer所在機(jī)器的可靠性較低威创,一般為虛擬機(jī)落午,不適合存儲(chǔ)重要數(shù)據(jù)。綜上肚豺,建議重試過(guò)程交由應(yīng)用來(lái)控制溃斋。
6.1.4 選擇oneway形式發(fā)送
通常消息的發(fā)送是這樣一個(gè)過(guò)程:
- 客戶端發(fā)送請(qǐng)求到服務(wù)器
- 服務(wù)器處理請(qǐng)求
- 服務(wù)器向客戶端返回應(yīng)答
所以,一次消息發(fā)送的耗時(shí)時(shí)間是上述三個(gè)步驟的總和吸申,而某些場(chǎng)景要求耗時(shí)非常短梗劫,但是對(duì)可靠性要求并不高,例如日志收集類應(yīng)用截碴,此類應(yīng)用可以采用oneway形式調(diào)用梳侨,oneway形式只發(fā)送請(qǐng)求不等待應(yīng)答,而發(fā)送請(qǐng)求在客戶端實(shí)現(xiàn)層面僅僅是一個(gè)操作系統(tǒng)系統(tǒng)調(diào)用的開(kāi)銷日丹,即將數(shù)據(jù)寫(xiě)入客戶端的socket緩沖區(qū)走哺,此過(guò)程耗時(shí)通常在微秒級(jí)。
6.2 消費(fèi)者
6.2.1 消費(fèi)過(guò)程冪等
RocketMQ無(wú)法避免消息重復(fù)(Exactly-Once)聚凹,所以如果業(yè)務(wù)對(duì)消費(fèi)重復(fù)非常敏感割坠,務(wù)必要在業(yè)務(wù)層面進(jìn)行去重處理《恃溃可以借助關(guān)系數(shù)據(jù)庫(kù)進(jìn)行去重彼哼。首先需要確定消息的唯一鍵,可以是msgId湘今,也可以是消息內(nèi)容中的唯一標(biāo)識(shí)字段敢朱,例如訂單Id等。在消費(fèi)之前判斷唯一鍵是否在關(guān)系數(shù)據(jù)庫(kù)中存在摩瞎。如果不存在則插入拴签,并消費(fèi),否則跳過(guò)旗们。(實(shí)際過(guò)程要考慮原子性問(wèn)題蚓哩,判斷是否存在可以嘗試插入,如果報(bào)主鍵沖突上渴,則插入失敗岸梨,直接跳過(guò))
msgId一定是全局唯一標(biāo)識(shí)符喜颁,但是實(shí)際使用中,可能會(huì)存在相同的消息有兩個(gè)不同msgId的情況(消費(fèi)者主動(dòng)重發(fā)曹阔、因客戶端重投機(jī)制導(dǎo)致的重復(fù)等)半开,這種情況就需要使業(yè)務(wù)字段進(jìn)行重復(fù)消費(fèi)。
6.2.2 消費(fèi)速度慢的處理方式
1赃份、提高消費(fèi)并行度
絕大部分消息消費(fèi)行為都屬于 IO 密集型寂拆,即可能是操作數(shù)據(jù)庫(kù),或者調(diào)用 RPC抓韩,這類消費(fèi)行為的消費(fèi)速度在于后端數(shù)據(jù)庫(kù)或者外系統(tǒng)的吞吐量纠永,通過(guò)增加消費(fèi)并行度,可以提高總的消費(fèi)吞吐量园蝠,但是并行度增加到一定程度渺蒿,反而會(huì)下降。所以彪薛,應(yīng)用必須要設(shè)置合理的并行度茂装。 如下有幾種修改消費(fèi)并行度的方法:
- 同一個(gè) ConsumerGroup 下,通過(guò)增加 Consumer 實(shí)例數(shù)量來(lái)提高并行度(需要注意的是超過(guò)訂閱隊(duì)列數(shù)的 Consumer 實(shí)例無(wú)效)善延∩偬可以通過(guò)加機(jī)器,或者在已有機(jī)器啟動(dòng)多個(gè)進(jìn)程的方式易遣。
- 提高單個(gè) Consumer 的消費(fèi)并行線程彼妻,通過(guò)修改參數(shù) consumeThreadMin、consumeThreadMax實(shí)現(xiàn)豆茫。
2侨歉、批量方式消費(fèi)
某些業(yè)務(wù)流程如果支持批量方式消費(fèi),則可以很大程度上提高消費(fèi)吞吐量揩魂,例如訂單扣款類應(yīng)用幽邓,一次處理一個(gè)訂單耗時(shí) 1 s,一次處理 10 個(gè)訂單可能也只耗時(shí) 2 s火脉,這樣即可大幅度提高消費(fèi)的吞吐量牵舵,通過(guò)設(shè)置 consumer的 consumeMessageBatchMaxSize 返個(gè)參數(shù),默認(rèn)是 1倦挂,即一次只消費(fèi)一條消息畸颅,例如設(shè)置為 N,那么每次消費(fèi)的消息數(shù)小于等于 N方援。
3没炒、跳過(guò)非重要消息
發(fā)生消息堆積時(shí),如果消費(fèi)速度一直追不上發(fā)送速度犯戏,如果業(yè)務(wù)對(duì)數(shù)據(jù)要求不高的話送火,可以選擇丟棄不重要的消息祖很。例如,當(dāng)某個(gè)隊(duì)列的消息數(shù)堆積到100000條以上漾脂,則嘗試丟棄部分或全部消息,這樣就可以快速追上發(fā)送消息的速度胚鸯。
4骨稿、優(yōu)化每條消息消費(fèi)過(guò)程
舉例如下,某條消息的消費(fèi)過(guò)程如下:
1姜钳、根據(jù)消息從 DB 查詢【數(shù)據(jù) 1】
2坦冠、根據(jù)消息從 DB 查詢【數(shù)據(jù) 2】
3、復(fù)雜的業(yè)務(wù)計(jì)算
4哥桥、向 DB 插入【數(shù)據(jù) 3】
5辙浑、向 DB 插入【數(shù)據(jù) 4】
這條消息的消費(fèi)過(guò)程中有4次與 DB的 交互,如果按照每次 5ms 計(jì)算拟糕,那么總共耗時(shí) 20ms判呕,假設(shè)業(yè)務(wù)計(jì)算耗時(shí) 5ms,那么總過(guò)耗時(shí) 25ms送滞,所以如果能把 4 次 DB 交互優(yōu)化為 2 次侠草,那么總耗時(shí)就可以優(yōu)化到 15ms,即總體性能提高了 40%犁嗅。所以應(yīng)用如果對(duì)時(shí)延敏感的話边涕,可以把DB部署在SSD硬盤(pán),相比于SCSI磁盤(pán)褂微,前者的RT會(huì)小很多功蜓。
6.3 消費(fèi)打印日志
如果消息量較少,建議在消費(fèi)入口方法打印消息宠蚂,消費(fèi)耗時(shí)等式撼,方便后續(xù)排查問(wèn)題。
七肥矢、RocketMQ樣例
八端衰、Rocketmq-console控制臺(tái)
- 下載開(kāi)源的rocketmq-externals項(xiàng)目進(jìn)行部署
https://github.com/apache/rocketmq-externals
2、控制臺(tái)界面如下