一混稽、順序消息
順序消息(FIFO 消息)是消息隊列 RocketMQ 提供的一種嚴格按照順序來發(fā)布和消費的消息。順序發(fā)布和順序消費是指對于指定的一個 Topic,生 產(chǎn)者按照一定的先后順序發(fā)布消息荚坞;消費者按照既定的先后順序訂閱消息挑宠,即先發(fā)布的消息一定會先被客戶端接收到。
順序消息分為全局順序消息和分區(qū)順序消息颓影。
1.1各淀、全局順序消息
RocketMQ 在默認情況下不保證順序,要保證全局順序诡挂,需要把 Topic 的讀寫隊列數(shù)設(shè)置為 1碎浇,然后生產(chǎn)者和消費者的并發(fā)設(shè)置也是 1。所以這樣的話 高并發(fā)璃俗,高吞吐量的功能完全用不上奴璃。
1.1.1、適用場景
適用于性能要求不高城豁,所有的消息嚴格按照 FIFO 原則來發(fā)布和消費的場景苟穆。
1.1.2、示例
要確保全局順序消息唱星,需要先把 Topic 的讀寫隊列數(shù)設(shè)置為 1雳旅,然后生產(chǎn)者和消費者的并發(fā)設(shè)置也是 1。
mqadmin update Topic -t AllOrder -c DefaultCluster -r 1 -w 1 -n 127.0.0.1:9876
在證券處理中间聊,以人民幣兌換美元為 Topic攒盈,在價格相同的情況下,先出價者優(yōu)先處理哎榴,則可以按照 FIFO 的方式發(fā)布和消費全局順序消息型豁。
1.2、部分順序消息
對于指定的一個 Topic尚蝌,所有消息根據(jù) Sharding Key 進行區(qū)塊分區(qū)迎变。同一個分區(qū)內(nèi)的消息按照嚴格的 FIFO 順序進行發(fā)布和消費。Sharding Key 是順 序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段驼壶,和普通消息的 Key 是完全不同的概念氏豌。
二、延時消息
2.1热凹、概念介紹
延時消息:Producer 將消息發(fā)送到消息隊列 RocketMQ 服務(wù)端泵喘,但并不期望這條消息立馬投遞,而是延遲一定時間后才投遞到 Consumer 進行消費般妙, 該消息即延時消息纪铺。
2.2、適用場景
消息生產(chǎn)和消費有時間窗口要求:比如在電商交易中超時未支付關(guān)閉訂單的場景碟渺,在訂單創(chuàng)建時會發(fā)送一條延時消息鲜锚。這條消息將會在 30 分鐘以 后投遞給消費者,消費者收到此消息后需要判斷對應(yīng)的訂單是否已完成支付。 如支付未完成芜繁,則關(guān)閉訂單旺隙。如已完成支付則忽略。
2.3骏令、使用方式
Apache RocketMQ 目前只支持固定精度的定時消息蔬捷,因為如果要支持任意的時間精度,在 Broker 層面榔袋,必須要做消息排序周拐,如果再涉及到持久化, 那么消息排序要不可避免的產(chǎn)生巨大性能開銷凰兑。(阿里云 RocketMQ 提供了任意時刻的定時消息功能妥粟,Apache 的 RocketMQ 并沒有,阿里并沒有開源)
發(fā)送延時消息時需要設(shè)定一個延時時間長度,消息將從當(dāng)前發(fā)送時間點開始延遲固定時間之后才開始投遞吏够。
延遲消息是根據(jù)延遲隊列的 level 來的勾给,延遲隊列默認是
msg.setDelayTimeLevel(5)代表延遲一分鐘
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
是這 18 個等級(秒(s)、分(m)锅知、小時(h))锦秒,level 為 1,表示延遲 1 秒后消費喉镰,level 為 5 表示延遲 1 分鐘后消費,level 為 18 表示延遲 2 個 小時消費惭笑。生產(chǎn)消息跟普通的生產(chǎn)消息類似侣姆,只需要在消息上設(shè)置延遲隊列的 level 即可。消費消息跟普通的消費消息一致沉噩。
三捺宗、死信隊列
3.1、概念介紹
死信隊列用于處理無法被正常消費的消息川蒙。當(dāng)一條消息初次消費失敗蚜厉,消息隊列 MQ 會自動進行消息重試;達到最大重試次數(shù)后畜眨,若消費依然失敗昼牛, 則表明Consumer 在正常情況下無法正確地消費該消息。此時康聂,消息隊列MQ不會立刻將消息丟棄贰健,而是將這條消息發(fā)送到該 Consumer 對應(yīng)的特殊隊列中。
消息隊列 MQ 將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message)恬汁,將存儲死信消息的特殊隊列稱為死信隊列 (Dead-Letter Queue)伶椿。
3.2適用場景
3.2.1、死信消息的特性
不會再被消費者正常消費。 有效期與正常消息相同脊另,均為 3 天导狡,3 天后會被自動刪除。因此偎痛,請在死信消息產(chǎn)生后的 3 天內(nèi)及時處理旱捧。
3.2.2、死信隊列的特性
一個死信隊列對應(yīng)一個 Group ID看彼, 而不是對應(yīng)單個消費者實例廊佩。
如果一個 Group ID 未產(chǎn)生死信消息,消息隊列 MQ 不會為其創(chuàng)建相應(yīng)的死信隊列靖榕。
一個死信隊列包含了對應(yīng) Group ID 產(chǎn)生的所有死信消息标锄,不論該消息屬于哪個 Topic。
消息隊列 MQ 控制臺提供對死信消息的查詢的功能茁计。
一般控制臺直接查看死信消息會報錯料皇。
進入RocketMQ中服務(wù)器對應(yīng)的 RocketMQ 中的/bin 目錄,執(zhí)行以下腳本
sh mqadmin updateTopic -b 192.168.0.128:10911 -n 192.168.0.128:9876 -t %DLQ%group1 -p 6
四星压、消費冪等
為了防止消息重復(fù)消費導(dǎo)致業(yè)務(wù)處理異常践剂,消息隊列 MQ 的消費者在接收到消息后,有必要根據(jù)業(yè)務(wù)上的唯一 Key 對消息做冪等處理娜膘。本文介紹消息冪 等的概念逊脯、適用場景以及處理方法。
4.1竣贪、什么是消息冪等
當(dāng)出現(xiàn)消費者對某條消息重復(fù)消費的情況時军洼,重復(fù)消費的結(jié)果與消費一次的結(jié)果是相同的,并且多次消費并未對業(yè)務(wù)系統(tǒng)產(chǎn)生任何負面影響演怎,那么 這整個過程就實現(xiàn)可消息冪等匕争。
例如,在支付場景下爷耀,消費者消費扣款消息甘桑,對一筆訂單執(zhí)行扣款操作,扣款金額為 100 元歹叮。如果因網(wǎng)絡(luò)不穩(wěn)定等原因?qū)е驴劭钕⒅貜?fù)投遞跑杭,消 費者重復(fù)消費了該扣款消息,但最終的業(yè)務(wù)結(jié)果是只扣款一次盗胀,扣費 100 元艘蹋,且用戶的扣款記錄中對應(yīng)的訂單只有一條扣款流水,不會多次扣除費用票灰。 那么這次扣款操作是符合要求的女阀,整個消費過程實現(xiàn)了消費冪等宅荤。
4.2、需要處理的場景
在互聯(lián)網(wǎng)應(yīng)用中浸策,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下冯键,消息隊列 MQ 的消息有可能會出現(xiàn)重復(fù)。如果消息重復(fù)會影響您的業(yè)務(wù)處理庸汗,請對消息做冪等處理惫确。 消息重復(fù)的場景如下:
1. 發(fā)送時消息重復(fù)
當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機蚯舱,導(dǎo)致服務(wù)端對客戶端應(yīng)答失敗改化。 如果此時生產(chǎn)者意識到消 息發(fā)送失敗并嘗試再次發(fā)送消息,消費者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息枉昏。
2. 投遞時消息重復(fù)
消息消費的場景下陈肛,消息已投遞到消費者并完成業(yè)務(wù)處理,當(dāng)客戶端給服務(wù)端反饋應(yīng)答的時候網(wǎng)絡(luò)閃斷兄裂。為了保證消息至少被消費一次句旱,消息隊列 MQ 的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過的消息,消費者后續(xù)會收到兩條內(nèi)容相同并且 Message ID 也相同的消息晰奖。
3. 負載均衡時消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動谈撒、Broker 重啟以及消費者應(yīng)用重啟)
當(dāng)消息隊列 MQ 的 Broker 或客戶端重啟、擴容或縮容時匾南,會觸發(fā) Rebalance啃匿,此時消費者可能會收到重復(fù)消息。
4.3蛆楞、處理方法
因為 Message ID 有可能出現(xiàn)沖突(重復(fù))的情況立宜,所以真正安全的冪等處理,不建議以 Message ID 作為處理依據(jù)臊岸。最好的方式是以業(yè)務(wù)唯一標(biāo)識 作為冪等處理的關(guān)鍵依據(jù),而業(yè)務(wù)的唯一標(biāo)識可以通過消息 Key 設(shè)置尊流。
以支付場景為例帅戒,可以將消息的 Key 設(shè)置為訂單號,作為冪等處理的依據(jù)崖技。具體代碼示例如下:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
消費者收到消息時可以根據(jù)消息的 Key逻住,即訂單號來實現(xiàn)消息冪等:
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根據(jù)業(yè)務(wù)唯一標(biāo)識的 Key 做冪等處理
} });
五、消息過濾
5.1迎献、概念介紹
RocketMQ 分布式消息隊列的消息過濾方式有別于其它 MQ 中間件瞎访,是可以實現(xiàn)服務(wù)端的過濾。
5.2吁恍、表達式過濾
主要支持如下 2 種的過濾方式
(1) Tag 過濾方式:Consumer 端在訂閱消息時除了指定 Topic 還可以指定 TAG扒秸,如果一個消息有多個 TAG播演,可以用||分隔。其中伴奥,Consumer 端會將 這個訂閱請求構(gòu)建成一個 SubscriptionData写烤,發(fā)送一個 Pull 消息的請求給 Broker 端。Broker 端從 RocketMQ 的文件存儲層—Store 讀取數(shù)據(jù)之 前拾徙,會用這些數(shù)據(jù)先構(gòu)建一個 MessageFilter洲炊,然后傳給 Store。Store 從 ConsumeQueue 讀取到一條記錄后尼啡,會用它記錄的消息 tag hash 值去做過濾暂衡,由于在服務(wù)端只是根據(jù) hashcode 進行判斷,無法精確對 tag 原始字符串進行過濾崖瞭,故在消息消費端拉取到消息后狂巢,還需要對消息的 原始 tag 字符串進行比對,如果不同读恃,則丟棄該消息隧膘,不進行消息消費。
(2) SQL92 的過濾方式:這種方式的大致做法和上面的 Tag 過濾方式一樣寺惫,只是具體過濾過程不太一樣疹吃,真正的 SQL expression 的構(gòu)建和執(zhí)行由 rocketmq-filter 模塊負責(zé)的。具體使用見 http://rocketmq.apache.org/docs/filter-by-sql92-example/
注意如果開啟 SQL 過濾的話西雀,Broker 需要開啟參數(shù) enablePropertyFilter=true萨驶,然后服務(wù)器重啟生效
5.3、類過濾
新版本(>=4.3.0)已經(jīng)不支持(代碼中 FilterServerConsumer 新版本已經(jīng)不支持了)