特性(features)
來源 : 官方文檔
訂閱與發(fā)布
消息的發(fā)布是指某個(gè)生產(chǎn)者向某個(gè)topic發(fā)送消息;消息的訂閱是指某個(gè)消費(fèi)者關(guān)注了某個(gè)topic中帶有某些tag的消息雳旅,進(jìn)而從該topic消費(fèi)數(shù)據(jù)。
消息順序
消息有序指的是一類消息消費(fèi)時(shí),能按照發(fā)送的順序來消費(fèi)责嚷。例如:一個(gè)訂單產(chǎn)生了三條消息分別是訂單創(chuàng)建、訂單付款、訂單完成啃勉。消費(fèi)時(shí)要按照這個(gè)順序消費(fèi)才能有意義,但是同時(shí)訂單之間是可以并行消費(fèi)的双妨。RocketMQ可以嚴(yán)格的保證消息有序淮阐。
順序消息分為全局順序消息與分區(qū)順序消息,全局順序是指某個(gè)Topic下的所有消息都要保證順序刁品;部分順序消息只要保證每一組消息被順序消費(fèi)即可泣特。
- 全局順序
對(duì)于指定的一個(gè) Topic,所有消息按照嚴(yán)格的先入先出(FIFO)的順序進(jìn)行發(fā)布和消費(fèi)挑随。
適用場(chǎng)景:性能要求不高状您,所有的消息嚴(yán)格按照 FIFO 原則進(jìn)行消息發(fā)布和消費(fèi)的場(chǎng)景 - 分區(qū)順序
對(duì)于指定的一個(gè) Topic,所有消息根據(jù) sharding key 進(jìn)行區(qū)塊分區(qū)镀裤。 同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的 FIFO 順序進(jìn)行發(fā)布和消費(fèi)竞阐。 Sharding key 是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的 Key 是完全不同的概念暑劝。
適用場(chǎng)景:性能要求高骆莹,以 sharding key 作為分區(qū)字段,在同一個(gè)區(qū)塊中嚴(yán)格的按照 FIFO 原則進(jìn)行消息發(fā)布和消費(fèi)的場(chǎng)景担猛。
消息過濾
RocketMQ的消費(fèi)者可以根據(jù)Tag進(jìn)行消息過濾幕垦,也支持自定義屬性過濾。消息過濾目前是在Broker端實(shí)現(xiàn)的傅联,優(yōu)點(diǎn)是減少了對(duì)于Consumer無用消息的網(wǎng)絡(luò)傳輸先改,缺點(diǎn)是增加了Broker的負(fù)擔(dān)、而且實(shí)現(xiàn)相對(duì)復(fù)雜蒸走。
消息可靠性
RocketMQ支持消息的高可靠仇奶,影響消息可靠性的幾種情況:
- Broker正常關(guān)閉
- Broker異常Crash
- OS Crash
- 機(jī)器掉電,但是能立即恢復(fù)供電情況
- 機(jī)器無法開機(jī)(可能是cpu比驻、主板该溯、內(nèi)存等關(guān)鍵設(shè)備損壞)
- 磁盤設(shè)備損壞
1)、2)别惦、3)狈茉、4) 四種情況都屬于硬件資源可立即恢復(fù)情況,RocketMQ在這四種情況下能保證消息不丟掸掸,或者丟失少量數(shù)據(jù)(依賴刷盤方式是同步還是異步)氯庆。
5)蹭秋、6)屬于單點(diǎn)故障,且無法恢復(fù)堤撵,一旦發(fā)生仁讨,在此單點(diǎn)上的消息全部丟失。RocketMQ在這兩種情況下粒督,通過異步復(fù)制陪竿,可保證99%的消息不丟,但是仍然會(huì)有極少量的消息可能丟失屠橄。通過同步雙寫技術(shù)可以完全避免單點(diǎn),同步雙寫勢(shì)必會(huì)影響性能闰挡,適合對(duì)消息可靠性要求極高的場(chǎng)合锐墙,例如與Money相關(guān)的應(yīng)用。注:RocketMQ從3.0版本開始支持同步雙寫长酗。
至少一次
至少一次(At least Once)指每個(gè)消息必須投遞一次溪北。Consumer先Pull消息到本地,消費(fèi)完成后夺脾,才向服務(wù)器返回ack之拨,如果沒有消費(fèi)一定不會(huì)ack消息,所以RocketMQ可以很好的支持此特性咧叭。
回溯消費(fèi)
回溯消費(fèi)是指Consumer已經(jīng)消費(fèi)成功的消息蚀乔,由于業(yè)務(wù)上需求需要重新消費(fèi),要支持此功能菲茬,Broker在向Consumer投遞成功消息后吉挣,消息仍然需要保留。并且重新消費(fèi)一般是按照時(shí)間維度婉弹,例如由于Consumer系統(tǒng)故障睬魂,恢復(fù)后需要重新消費(fèi)1小時(shí)前的數(shù)據(jù),那么Broker要提供一種機(jī)制镀赌,可以按照時(shí)間維度來回退消費(fèi)進(jìn)度氯哮。RocketMQ支持按照時(shí)間回溯消費(fèi),時(shí)間維度精確到毫秒商佛。
事務(wù)消息
RocketMQ事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中喉钢,要么同時(shí)成功,要么同時(shí)失敗威彰。RocketMQ的事務(wù)消息提供類似 X/Open XA 的分布事務(wù)功能出牧,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
定時(shí)消息
定時(shí)消息(延遲隊(duì)列)是指消息發(fā)送到broker后歇盼,不會(huì)立即被消費(fèi)舔痕,等待特定時(shí)間投遞給真正的topic。
broker有配置項(xiàng)messageDelayLevel,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”伯复,18個(gè)level慨代。可以配置自定義messageDelayLevel啸如。注意侍匙,messageDelayLevel是broker的屬性,不屬于某個(gè)topic叮雳。發(fā)消息時(shí)想暗,設(shè)置delayLevel等級(jí)即可:msg.setDelayLevel(level)。level有以下三種情況:
- level == 0帘不,消息為非延遲消息
- 1<=level<=maxLevel说莫,消息延遲特定時(shí)間,例如level==1寞焙,延遲1s
- level > maxLevel储狭,則level== maxLevel,例如level==20捣郊,延遲2h
定時(shí)消息會(huì)暫存在名為SCHEDULE_TOPIC_XXXX的topic中辽狈,并根據(jù)delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1呛牲,即一個(gè)queue只存相同延遲的消息刮萌,保證具有相同發(fā)送延遲的消息能夠順序消費(fèi)。broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX侈净,將消息寫入真實(shí)的topic尊勿。
需要注意的是油猫,定時(shí)消息會(huì)在第一次寫入和調(diào)度寫入真實(shí)topic時(shí)都會(huì)計(jì)數(shù)浅浮,因此發(fā)送數(shù)量、tps都會(huì)變高往扔。
消息重試
Consumer消費(fèi)消息失敗后旋膳,要提供一種重試機(jī)制澎语,令消息再消費(fèi)一次。Consumer消費(fèi)消息失敗通逞榘茫可以認(rèn)為有以下幾種情況:
- 由于消息本身的原因擅羞,例如反序列化失敗,消息數(shù)據(jù)本身無法處理(例如話費(fèi)充值义图,當(dāng)前消息的手機(jī)號(hào)被注銷减俏,無法充值)等。這種錯(cuò)誤通常需要跳過這條消息碱工,再消費(fèi)其它消息娃承,而這條失敗的消息即使立刻重試消費(fèi)奏夫,99%也不成功,所以最好提供一種定時(shí)重試機(jī)制历筝,即過10秒后再重試酗昼。
- 由于依賴的下游應(yīng)用服務(wù)不可用,例如db連接不可用梳猪,外系統(tǒng)網(wǎng)絡(luò)不可達(dá)等麻削。遇到這種錯(cuò)誤,即使跳過當(dāng)前失敗的消息春弥,消費(fèi)其他消息同樣也會(huì)報(bào)錯(cuò)呛哟。這種情況建議應(yīng)用sleep 30s,再消費(fèi)下一條消息惕稻,這樣可以減輕Broker重試消息的壓力竖共。
RocketMQ會(huì)為每個(gè)消費(fèi)組都設(shè)置一個(gè)Topic名稱為“%RETRY%+consumerGroup”的重試隊(duì)列(這里需要注意的是,這個(gè)Topic的重試隊(duì)列是針對(duì)消費(fèi)組俺祠,而不是針對(duì)每個(gè)Topic設(shè)置的),用于暫時(shí)保存因?yàn)楦鞣N異常而導(dǎo)致Consumer端無法消費(fèi)的消息借帘≈┰考慮到異常恢復(fù)起來需要一些時(shí)間肺然,會(huì)為重試隊(duì)列設(shè)置多個(gè)重試級(jí)別蔫缸,每個(gè)重試級(jí)別都有與之對(duì)應(yīng)的重新投遞延時(shí),重試次數(shù)越多投遞延時(shí)就越大际起。RocketMQ對(duì)于重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊(duì)列中拾碌,后臺(tái)定時(shí)任務(wù)按照對(duì)應(yīng)的時(shí)間進(jìn)行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊(duì)列中。
消息重投
生產(chǎn)者在發(fā)送消息時(shí)街望,同步消息失敗會(huì)重投校翔,異步消息有重試,oneway沒有任何保證灾前。消息重投保證消息盡可能發(fā)送成功防症、不丟失,但可能會(huì)造成消息重復(fù)哎甲,消息重復(fù)在RocketMQ中是無法避免的問題蔫敲。消息重復(fù)在一般情況下不會(huì)發(fā)生,當(dāng)出現(xiàn)消息量大炭玫、網(wǎng)絡(luò)抖動(dòng)奈嘿,消息重復(fù)就會(huì)是大概率事件。另外吞加,生產(chǎn)者主動(dòng)重發(fā)裙犹、consumer負(fù)載變化也會(huì)導(dǎo)致重復(fù)消息尽狠。如下方法可以設(shè)置消息重試策略:
- retryTimesWhenSendFailed:同步發(fā)送失敗重投次數(shù),默認(rèn)為2伯诬,因此生產(chǎn)者會(huì)最多嘗試發(fā)送retryTimesWhenSendFailed + 1次晚唇。不會(huì)選擇上次失敗的broker,嘗試向其他broker發(fā)送盗似,最大程度保證消息不丟哩陕。超過重投次數(shù),拋出異常赫舒,由客戶端保證消息不丟悍及。當(dāng)出現(xiàn)RemotingException、MQClientException和部分MQBrokerException時(shí)會(huì)重投接癌。
- retryTimesWhenSendAsyncFailed:異步發(fā)送失敗重試次數(shù)心赶,異步重試不會(huì)選擇其他broker,僅在同一個(gè)broker上做重試缺猛,不保證消息不丟缨叫。
- retryAnotherBrokerWhenNotStoreOK:消息刷盤(主或備)超時(shí)或slave不可用(返回狀態(tài)非SEND_OK),是否嘗試發(fā)送到其他broker荔燎,默認(rèn)false耻姥。十分重要消息可以開啟。
流量控制
生產(chǎn)者流控有咨,因?yàn)閎roker處理能力達(dá)到瓶頸琐簇;消費(fèi)者流控,因?yàn)橄M(fèi)能力達(dá)到瓶頸座享。
生產(chǎn)者流控:
- commitLog文件被鎖時(shí)間超過osPageCacheBusyTimeOutMills時(shí)婉商,參數(shù)默認(rèn)為1000ms,返回流控渣叛。
- 如果開啟transientStorePoolEnable == true丈秩,且broker為異步刷盤的主機(jī),且transientStorePool中資源不足诗箍,拒絕當(dāng)前send請(qǐng)求癣籽,返回流控。
- broker每隔10ms檢查send請(qǐng)求隊(duì)列頭部請(qǐng)求的等待時(shí)間滤祖,如果超過waitTimeMillsInSendQueue筷狼,默認(rèn)200ms,拒絕當(dāng)前send請(qǐng)求匠童,返回流控埂材。
- broker通過拒絕send 請(qǐng)求方式實(shí)現(xiàn)流量控制。
注意汤求,生產(chǎn)者流控俏险,不會(huì)嘗試消息重投严拒。
消費(fèi)者流控:
- 消費(fèi)者本地緩存消息數(shù)超過pullThresholdForQueue時(shí),默認(rèn)1000竖独。
- 消費(fèi)者本地緩存消息大小超過pullThresholdSizeForQueue時(shí)裤唠,默認(rèn)100MB。
- 消費(fèi)者本地緩存消息跨度超過consumeConcurrentlyMaxSpan時(shí)莹痢,默認(rèn)2000种蘸。
消費(fèi)者流控的結(jié)果是降低拉取頻率。
死信隊(duì)列
死信隊(duì)列用于處理無法被正常消費(fèi)的消息竞膳。當(dāng)一條消息初次消費(fèi)失敗航瞭,消息隊(duì)列會(huì)自動(dòng)進(jìn)行消息重試;達(dá)到最大重試次數(shù)后坦辟,若消費(fèi)依然失敗刊侯,則表明消費(fèi)者在正常情況下無法正確地消費(fèi)該消息,此時(shí)锉走,消息隊(duì)列 不會(huì)立刻將消息丟棄滨彻,而是將其發(fā)送到該消費(fèi)者對(duì)應(yīng)的特殊隊(duì)列中。
RocketMQ將這種正常情況下無法被消費(fèi)的消息稱為死信消息(Dead-Letter Message)挪蹭,將存儲(chǔ)死信消息的特殊隊(duì)列稱為死信隊(duì)列(Dead-Letter Queue)疮绷。在RocketMQ中,可以通過使用console控制臺(tái)對(duì)死信隊(duì)列中的消息進(jìn)行重發(fā)來使得消費(fèi)者實(shí)例再次進(jìn)行消費(fèi)嚣潜。