在實(shí)際開發(fā)中讯壶,經(jīng)常需要排查一條消息是否成功發(fā)送到底層MQ中想际,或者查看MQ中消息的內(nèi)容,以及如何將消息發(fā)送給指定的/所有的消費(fèi)者組重新消費(fèi)荧恍。本文對RocketMQ提供到的查詢機(jī)制和背后原理進(jìn)行深入的介紹胜卤。文章主要包括4個部分:
消息查詢介紹:介紹消息查詢中使用到的Message Key 、Unique Key、Message Id 的區(qū)別
消息查詢工具:分別介紹命令行工具、管理平臺据沈、客戶端API這三種工具的詳細(xì)用法,以及如何讓消費(fèi)者重新消費(fèi)特定的消息饺蔑。
核心實(shí)現(xiàn)原理:介紹Message Key & Unique Key與Message Id的實(shí)現(xiàn)機(jī)制上區(qū)別,Unique Key在Exactly Once語義下的作用嗜诀,以及為什么Message Id查詢效率更高猾警。
索引機(jī)制:介紹Message Key & Unique Key底層使用的哈希索引機(jī)制
1 消息查詢介紹
RocketMQ提供了3種消息查詢方式:???????????
按照Message Key 查詢:消息的key是業(yè)務(wù)開發(fā)同學(xué)在發(fā)送消息之前自行指定的,通常會把具有業(yè)務(wù)含義隆敢,區(qū)分度高的字段作為消息的key发皿,如用戶id,訂單id等拂蝎。
按照Unique Key查詢:除了業(yè)務(wù)開發(fā)同學(xué)明確的指定消息中的key穴墅,RocketMQ生產(chǎn)者客戶端在發(fā)送發(fā)送消息之前,會自動生成一個UNIQ_KEY温自,設(shè)置到消息的屬性中玄货,從邏輯上唯一代表一條消息。
按照Message Id 查詢:Message Id 是消息發(fā)送后悼泌,在Broker端生成的松捉,其包含了Broker的地址,和在CommitLog中的偏移信息馆里,并會將Message Id作為發(fā)送結(jié)果的一部分進(jìn)行返回隘世。Message Id中屬于精確匹配,可以唯一定位一條消息鸠踪,不需要使用哈希索引機(jī)制丙者,查詢效率更高。
RocketMQ有意弱化Unique Key與Message Id的區(qū)別营密,對外都稱之為Message Id械媒。在通過RocketMQ的命令行工具或管理平臺進(jìn)行查詢時,二者可以通用卵贱。在根據(jù)Unique Key進(jìn)行查詢時滥沫,本身是有可能查詢到多條消息的,但是查詢工具會進(jìn)行過濾键俱,只會返回一條消息兰绣。種種情況導(dǎo)致很多RocketMQ的用戶,并未能很好對二者進(jìn)行區(qū)分编振。
業(yè)務(wù)開發(fā)同學(xué)在使用RocketMQ時缀辩,應(yīng)該養(yǎng)成良好的習(xí)慣臭埋,在發(fā)送/消費(fèi)消息時,將這些信息記錄下來臀玄,通常是記錄到日志文件中瓢阴,以便在出現(xiàn)問題時進(jìn)行排查。
以生產(chǎn)者在發(fā)送消息為例健无,通常由以下3步組成:
//1?構(gòu)建消息對象Message
Message?msg?=?new?Message();
msg.setTopic("TopicA");
msg.setKeys("Key1");
msg.setBody("message?body".getBytes());
try{
????//2?發(fā)送消息
????SendResult?result?=?producer.send(msg);
????//3?打印發(fā)送結(jié)果
????System.out.println(result);
}catch?(Exception?e){
????e.printStackTrace();
}
第1步:構(gòu)建消息
構(gòu)建消息對象Message荣恐,在這里我們通過setKeys方法設(shè)置消息的key,如果有多個key可以使用空格" "進(jìn)行分割
第2步:發(fā)送消息
發(fā)送消息累贤,會返回一個SendResult對象表示消息發(fā)送結(jié)果叠穆。
第3步:打印發(fā)送結(jié)果
結(jié)果中包含Unique Key和Message Id,如下所示:
SendResult?[
sendStatus=SEND_OK,?
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue?[topic=TopicA,?brokerName=broker-a,?queueId=0],?
queueOffset=0]
其中:
sendStatus:表示消息發(fā)送結(jié)果的狀態(tài)???????
msgId:注意這里的命名雖然是msgId臼膏,但實(shí)際上其是Unique Key
offsetMsgId:Broker返回的Message ID 硼被。在后文中,未進(jìn)行特殊說明的情況下渗磅,Message ID總是表示offsetMsgId嚷硫。
messageQueue:消息發(fā)送到了哪個的隊(duì)列,如上圖顯示發(fā)送到broker-a的第0個的隊(duì)列?
queueOffset:消息在隊(duì)列中的偏移量始鱼,每次發(fā)送到一個隊(duì)列時仔掸,offset+1?
事實(shí)上,用戶主動設(shè)置的Key以及客戶端自動生成的Unique Key医清,最終都會設(shè)置到Message對象的properties屬性中嘉汰,如下圖所示:
?其中:
KEYS:表示用戶通過setKeys方法設(shè)置的消息key,
UNIQ_KEY:表示消息發(fā)送之前由RocketMQ客戶端自動生成的Unique Key状勤。細(xì)心的讀者發(fā)現(xiàn)了其值與上述打印SendResult結(jié)果中的msgId字段的值是一樣的鞋怀,這驗(yàn)證了前面所說的msgId表示的實(shí)際上就是Unique Key的說法。
在了解如何主動設(shè)置Key持搜,以及如何獲取RocketMQ自動生成的Unique Key和Message Id后密似,就可以利用一些工具來進(jìn)行查詢。
2 消息查詢工具
RocketMQ提供了3種方式來根據(jù)Message Key葫盼、Unique Key残腌、Message Id來查詢消息,包括:
命令行工具:主要是運(yùn)維同學(xué)使用
管理平臺:運(yùn)維和開發(fā)同學(xué)都可以使用
客戶端API:主要是開發(fā)同學(xué)使用
這些工具除了可以查詢某條消息的內(nèi)容贫导,還支持將查詢到的歷史消息讓消費(fèi)者重新進(jìn)行消費(fèi)抛猫,下面分別進(jìn)行講述。
2.1 命令行工具
RocketMQ自帶的mqadmin命令行工具提供了一些子命令孩灯,用于查詢消息闺金,如下:
$?sh?bin/mqadmin?
The?most?commonly?used?mqadmin?commands?are:?
...
???queryMsgById?????????按照Message?Id查詢消息
???queryMsgByKey????????按照Key查詢消息?
???queryMsgByUniqueKey??按照UNIQ_KEY查詢消息
...
此外,還有一個queryMsgByOffset子命令峰档,不在本文講述范疇內(nèi)
2.1.1 按照Message Key查詢
mqadmin工具的queryMsgByKey子命令提供了根據(jù)key進(jìn)行查詢消息的功能败匹。注意寨昙,由于一個key可能對應(yīng)多條消息,查詢結(jié)果只會展示出這些消息對應(yīng)的Unique Key掀亩,需要根據(jù)Unique Key再次進(jìn)行查詢舔哪。
queryMsgByKey子命令使用方法如下所示:
$?sh?bin/mqadmin?queryMsgByKey?-h
usage:?mqadmin?queryMsgByKey?[-h]?-k?<arg>?[-n?<arg>]?-t?<arg>
?-h,--help????????????????打印幫助信息
?-k,--msgKey?<arg>????????指定消息的key,必須提供
?-n,--namesrvAddr?<arg>???指定nameserver地址
?-t,--topic?<arg>?????????指定topic槽棍,必須提供
例如捉蚤,要查詢在TopicA中,key為Key1的消息
$?sh?bin/mqadmin?queryMsgByKey?-k?Key1?-t?TopicA?-n?localhost:9876
#Message?ID???????????????????????????#QID?????????#Offset
C0A80103515618B4AAC2429A6E970000?????????0???????????????0
C0A80103511B18B4AAC24296D2CB0000?????????0???????????????0
C0A8010354C418B4AAC242A281360000?????????1???????????????0
C0A8010354C718B4AAC242A2B5340000?????????1???????????????1
這里炼七,我們看到輸出結(jié)果中包含了4條記錄外里。其中:
Message ID列:這里這一列的名字顯示有問題,實(shí)際上其代表的是Unique Key
QID列:表示隊(duì)列的ID特石,注意在RocketMQ中唯一地位一個隊(duì)列需要topic+brokerName+queueId。這里只顯示了queueId鳖链,其實(shí)并不能知道在哪個Broker上姆蘸。
Offset:消息在在隊(duì)列中的偏移量
在查詢到Unique Key之后,我們就可以使用另外一個命令:queryMsgByUniqueKey芙委,來查詢消息的具體內(nèi)容逞敷。
2.1.2 按照Unique Key查詢
mqadmin工具的queryMsgByUniqueKey的子命令有2個功能:
根據(jù)Unique Key查詢消息,并展示結(jié)果
讓消費(fèi)者重新消費(fèi)Unique Key對應(yīng)的消息?
我們將分別進(jìn)行講述灌侣。queryMsgByUniqueKey子命令的使用方式如下:
$?sh?bin/mqadmin?queryMsgByUniqueKey?-h
usage:?mqadmin?queryMsgByUniqueKey?[-d?<arg>]?[-g?<arg>]?[-h]?-i?<arg>?[-n?<arg>]?-t?<arg>
?-d,--clientId?<arg>????????消費(fèi)者?client?id
?-g,--consumerGroup?<arg>???消費(fèi)者組名稱
?-h,--help??????????????????打印幫助信息
?-i,--msgId?<arg>???????????消息的Unique?Key推捐,或者M(jìn)essage?Id
?-n,--namesrvAddr?<arg>?????NameServer地址
?-t,--topic?<arg>???????????消息所屬的Topic,必須提供
這里對-i 參數(shù)進(jìn)行下特殊說明侧啼,其即可接受Unique Key牛柒,即SendResult中的msgId字段;也可以接受Message Id痊乾,即SendResult中的offsetMsgId字段皮壁。
根據(jù)Unique Key查詢消息:
通過-i 參數(shù)指定Unique Key,通過-t 參數(shù)指定topic哪审,如:
$?sh?bin/mqadmin?queryMsgByUniqueKey?-i?C0A80103511B18B4AAC24296D2CB0000?-t?TopicA?-n?localhost:9876
Topic:???????????????TopicA
Tags:????????????????[null]
Keys:????????????????[Key1]
Queue?ID:????????????0
Queue?Offset:????????0
CommitLog?Offset:????507625
Reconsume?Times:?????0
Born?Timestamp:??????2019-12-13?22:19:40,619
Store?Timestamp:?????2019-12-13?22:19:40,631
Born?Host:???????????192.168.1.3:53974
Store?Host:??????????192.168.1.3:10911
System?Flag:?????????0
Properties:??????????{KEYS=Key1,?UNIQ_KEY=C0A80103511B18B4AAC24296D2CB0000,?WAIT=true}
Message?Body?Path:???/tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000
對于消息體的內(nèi)容蛾魄,會存儲到Message Body Path字段指定到的路徑中∈遥可通過cat命令查看(僅適用于消息體是字符串):
$?cat?/tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000
message?body
指定消費(fèi)者重新消費(fèi):
queryMsgByUniqueKey子命令還接收另外兩個參數(shù):-g參數(shù)用于指定消費(fèi)者組名稱滴须,-d參數(shù)指定消費(fèi)者client id。指定了這兩個參數(shù)之后叽奥,消息將由消費(fèi)者直接消費(fèi)扔水,而不是打印在控制臺上。
首先朝氓,通過consumerStatus命令铭污,查詢出消費(fèi)者組下的client id信息恋日,如:
$?sh?bin/mqadmin?consumerStatus?-g?group_X?-n?localhost:9876
001??192.168.1.3@26868????V4_5_0????1576300822831/192.168.1.3@26868
Same?subscription?in?the?same?group?of?consumer
Rebalance?OK
這里顯示了消費(fèi)者組group_X下面只有一個消費(fèi)者,client id為192.168.1.3@26868嘹狞。
接著我們可以在queryMsgByUniqueKey子命令中岂膳,添加-g和-d參數(shù),如下所示:
$?sh?bin/mqadmin?queryMsgByUniqueKey?\
-g?group_X?\
-d?192.168.1.3@26868?\
-t?TopicA?\
-i?C0A80103511B18B4AAC24296D2CB0000?\
-n?localhost:9876
ConsumeMessageDirectlyResult?[
order=false,?
autoCommit=true,?
consumeResult=CR_SUCCESS,?
remark=null,?
spentTimeMills=1]
可以看到磅网,這里并沒有打印出消息內(nèi)容谈截,取而代之的是消息消費(fèi)的結(jié)果。
在內(nèi)部涧偷,主要是分為3個步驟來完成讓指定消費(fèi)者來消費(fèi)這條消息簸喂,如下圖所示:
?第1步:
命令行工具給所有Broker發(fā)起QUERY_MESSAGE請求查詢消息,因?yàn)椴⒉恢繳NIQ_KEY這條消息在哪個Broker上燎潮,且最多只會返回一條消息喻鳄,如果超過1條其他會過濾掉;如果查詢不到就直接報(bào)錯确封。
第2步:
根據(jù)消息中包含了Store Host信息除呵,也就是消息存儲在哪個Broker上,接來下命令行工具會直接給這個Broker發(fā)起CONSUME_MESSAGE_DIRECTLY請求爪喘,這個請求會攜帶msgId颜曾,group和client id的信息
第3步:
Broker接收到這個請求,查詢出消息內(nèi)容后秉剑,主動給消費(fèi)者發(fā)送CONSUME_MESSAGE_DIRECTLY通知請求泛豪,注意雖然與第2步使用了同一個請求碼,但不同的是這個請求中包含了消息體的內(nèi)容侦鹏,消費(fèi)者可直接處理诡曙。注意:這里并不是將消息重新發(fā)送到Topic中,否則訂閱這個Topic的所有消費(fèi)者組略水,都會重新消費(fèi)這條消息岗仑。
2.1.3 根據(jù)Message Id進(jìn)行查詢
前面講解生產(chǎn)者發(fā)送消息后,返回的SendResult對象包含一個offsetMsgId字段聚请,這也就是我們常規(guī)意義上所說的Message Id荠雕,我們也可以根據(jù)這個字段來查詢消息。
根據(jù)Message Id查詢使用queryMsgById子命令驶赏,這個命令有3個作用:
根據(jù)Message Id查詢消息
通知指定消費(fèi)者重新消費(fèi)這條消息炸卑,與queryMsgByUniqueKey類似,這里不再介紹
將消息重新發(fā)送到Topic中煤傍,所有消費(fèi)者組都將重新消費(fèi)?
queryMsgById子命令用法如下所示:
$?sh?bin/mqadmin?queryMsgById?-h
usage:?mqadmin?queryMsgById?[-d?<arg>]?[-g?<arg>]?[-h]?-i?<arg>?[-n?<arg>]?[-s?<arg>]?[-u?<arg>]
?-d,--clientId?<arg>????????消費(fèi)者id
?-g,--consumerGroup?<arg>???消費(fèi)者組名稱
?-h,--help??????????????????打印幫助信息
?-i,--msgId?<arg>???????????Message?Id
?-n,--namesrvAddr?<arg>?????Name?server?地址
?-s,--sendMessage?<arg>?????重新發(fā)送消息
?-u,--unitName?<arg>????????unit?name
參數(shù)說明如下:
-d和-g參數(shù):類似于queryMsgById命令盖文,用于將消息發(fā)送給某個消費(fèi)者進(jìn)行重新消費(fèi)
-i 參數(shù):指定Message Id,即SendResult對象的offsetMsgId字段蚯姆,多個值使用逗號","分割五续。
-s參數(shù):是否重新發(fā)送消息到Topic洒敏。如果同時指定了-d和-g參數(shù),-s參數(shù)不生效疙驾。
根據(jù)Message Id查詢消息:
下圖根據(jù)SendResult的offsetMsgId字段凶伙,作為-i參數(shù),來查詢一條消息:
$?sh?bin/mqadmin?queryMsgById?-i?C0A8010300002A9F000000000007BEE9?-n?localhost:9876
OffsetID:????????????C0A8010300002A9F000000000007BEE9
OffsetID:????????????C0A8010300002A9F000000000007BEE9
Topic:???????????????TopicA
Tags:????????????????[null]
Keys:????????????????[Key1]
Queue?ID:????????????0
Queue?Offset:????????0
CommitLog?Offset:????507625
Reconsume?Times:?????0
Born?Timestamp:??????2019-12-13?22:19:40,619
Store?Timestamp:?????2019-12-13?22:19:40,631
Born?Host:???????????192.168.1.3:53974
Store?Host:??????????192.168.1.3:10911
System?Flag:?????????0
Properties:??????????{KEYS=Key1,?UNIQ_KEY=C0A80103511B18B4AAC24296D2CB0000,?WAIT=true}
Message?Body?Path:???/tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000
與queryMsgByUniqueKey子命令輸出基本類似它碎,主要是在輸出開頭多出了OffsetID字段函荣,即offsetMsgId。需要注意的是扳肛,queryMsgById不能接受Unqiue Key作為查詢參數(shù)傻挂。
重新發(fā)送消息到topic:
在指定-s參數(shù)后,消息將重新發(fā)送到topic挖息,如下(輸出進(jìn)行了格式化):
$?sh?bin/mqadmin?queryMsgById?-i?C0A8010300002A9F000000000007BEE9?-n?localhost:9876?-s?true
prepare?resend?msg.?originalMsgId=C0A8010300002A9F000000000007BEE9
SendResult?[
sendStatus=SEND_OK,?
msgId=C0A80103511B18B4AAC24296D2CB0000,?
offsetMsgId=C0A80103000078BF000000000004D923,?
messageQueue=MessageQueue?[topic=TopicA,?brokerName=broker-b,?queueId=1],?
queueOffset=1]
可以看到金拒,這里因?yàn)橄⑹侵匦掳l(fā)送到了Topic中,因此與我們之前使用生產(chǎn)者發(fā)送消息一樣套腹,輸出的是一個SendResult绪抛。在這種情況下沉迹,訂閱這個Topic的所有消費(fèi)者組都會重新消費(fèi)到這條消息。
在實(shí)際開發(fā)中害驹,如果多個消費(fèi)者組訂閱了某個Topic的消息鞭呕,如果所有的消費(fèi)者都希望重新消費(fèi),那么就應(yīng)該使用-s參數(shù)宛官。如果只是某個消費(fèi)者希望重新消費(fèi)葫松,那么應(yīng)該指定-g和-d參數(shù)。
另外底洗,我們看到發(fā)送前打印的originalMsgId和發(fā)送后SendResult中的offsetMsgId值并不一樣腋么,這是因?yàn)橄l(fā)送到Topic重新進(jìn)行了存儲,因此值不相同亥揖。這也是為什么我們說Message Id可以唯一對應(yīng)一條消息的原因珊擂。
而輸出的SendResult結(jié)果中的msgId,即Unique Key费变,并沒有發(fā)生變化摧扇,因此盡管名字是Unique Key,但是實(shí)際上還是有可能對應(yīng)多條消息的挚歧。而前面根據(jù)queryMsgByUniqueKey查詢之所以只有一條消息扛稽,實(shí)際上是進(jìn)行了過濾。
2.2 管理平臺
RocketMQ提供的命令行工具滑负,雖然功能強(qiáng)大在张,一般是運(yùn)維同學(xué)使用較多用含。通過RocketMQ提供的管理平臺進(jìn)來行消息查詢,則對業(yè)務(wù)開發(fā)同學(xué)更加友好帮匾。在管理平臺的消息一欄啄骇,有3個TAB,分別用于:根據(jù)Topic時間范圍查詢辟狈、Message Key查詢肠缔、Message Id查詢,下面分別進(jìn)行介紹哼转。
根據(jù)Topic時間范圍查詢:
按 Topic 查詢屬于范圍查詢明未,不推薦使用,因?yàn)闀r間范圍內(nèi)消息很多壹蔓,不具備區(qū)分度趟妥。查詢時,盡可能設(shè)置最為精確的時間區(qū)間佣蓉,以便縮小查詢范圍披摄,提高速度。最多返回2000條數(shù)據(jù)勇凭。
?根據(jù)Message Key查詢:
按 Message Key 查詢屬于模糊查詢疚膊,僅適用于沒有記錄 Message ID 但是設(shè)置了具有區(qū)分度的 Message Key的情況。 目前虾标,根據(jù)Message Key查詢寓盗,有一個很大局限性:不能指定時間范圍,且最多返回64條數(shù)據(jù)璧函。如果用戶指定的key重復(fù)率比較高的話傀蚌,就有可能搜不到。
?根據(jù)Message Id查詢:
按 Message ID 查詢屬于精確查詢蘸吓,速度快善炫,精確匹配,只會返回一條結(jié)果库继,推薦使用箩艺。在這里,傳入U(xiǎn)nique Key宪萄,offsetMsgId都可以舅桩。
查看消息詳情:
在按照Topic 時間范圍查詢,按照Message Key查詢雨膨,結(jié)果列表有一個Message Detail按鈕擂涛,點(diǎn)擊可以看到消息詳情:包括消息key、tag、生成時間撒妈,消息體內(nèi)容等恢暖。在詳情頁面,也可以將消息直接發(fā)送給某個消費(fèi)者組進(jìn)行重新消費(fèi)狰右。
需要注意的是杰捂,在消息體展示的時候,只能將消息體轉(zhuǎn)換成字符串進(jìn)行展示棋蚌,如果消息的內(nèi)容是protobuf嫁佳、thrift、hessian編碼的谷暮,那么將顯示一堆亂碼蒿往。
如果公司內(nèi)部有統(tǒng)一的IDL/Schema管理平臺,則可以解決這個問題湿弦,通過為每個Topic關(guān)聯(lián)一個IDL瓤漏,在消息展示時,可以根據(jù)IDL反序列化后在進(jìn)行展示颊埃。
2.3 客戶端API
除了通過命令行工具和管理平臺蔬充,還可以通過客戶端API的方式來進(jìn)行查詢,這其實(shí)是最本質(zhì)的方式班利,命令行工具和管理平臺的查詢功能都是基于此實(shí)現(xiàn)饥漫。
在org.apache.rocketmq.client.MQAdmin接口中,定義了以下幾個方法用于消息查詢:
//msgId參數(shù):僅接收SendResult中的offsetMsgId罗标,返回單條消息
MessageExt?viewMessage(final?String?msgId)
//msgId參數(shù):傳入SendResult中的offsetMsgId庸队、msgId都可以,返回單條消息
MessageExt?viewMessage(String?topic,String?msgId)
//在指定topic下馒稍,根據(jù)key進(jìn)行查詢皿哨,并指定最大返回條數(shù)浅侨,以及開始和結(jié)束時間
QueryResult?queryMessage(final?String?topic,?final?String?key,?
?????????????????????????final?int?maxNum,?final?long?begin,final?long?end)
對于MQAdmin接口纽谒,可能部分同學(xué)比較陌生。不過我們常用的DefaultMQProducer如输、DefaultMQPushConsumer等鼓黔,都實(shí)現(xiàn)了此接口,因此都具備消息查詢的能力不见,如下所示:
對于命令行工具澳化,底層實(shí)際上是基于MQAdminExt接口的實(shí)現(xiàn)來完成的。
細(xì)心的讀者會問稳吮,相同的查詢功能在在多處實(shí)現(xiàn)是不是太麻煩了缎谷?事實(shí)上,這只是對外暴露的接口灶似,在內(nèi)部列林,實(shí)際上都是基于MQAdminImpl這個類來完成的瑞你。
viewMessage方法:
兩種viewMessage方法重載形式,都只會返回單條消息希痴。下面以生產(chǎn)者搜索為例者甲,講解如何使用API進(jìn)行查詢:
//初始化Producer
DefaultMQProducer?producer?=?new?DefaultMQProducer();
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//根據(jù)UniqueKey查詢
String?uniqueKey?=?"C0A8010354C418B4AAC242A281360000";
MessageExt?msg?=?producer.viewMessage("TopicA",?uniqueKey);
//打印結(jié)果:這里僅輸出Unique?Key與offsetMsgId
MessageClientExt?msgExt=?(MessageClientExt)?msg;
System.out.println("Unique?Key:"+msgExt.getMsgId()//即UNIQUE_KEY
????????+"\noffsetMsgId:"+msgExt.getOffsetMsgId());
輸出結(jié)果如下:
Unique?Key:C0A8010354C418B4AAC242A281360000
offsetMsgId:C0A8010300002A9F000000000007BF94
如果我們把offsetMsgId當(dāng)做方法參數(shù)傳入,也可以查詢到相同的結(jié)果砌创。這是因?yàn)槁哺祝诜椒▋?nèi)部實(shí)際上是分兩步進(jìn)行查詢的:
先把參數(shù)當(dāng)做offsetMsgId,即Message Id進(jìn)行查詢
如果失敗嫩实,再嘗試當(dāng)做Unique Key進(jìn)行查詢刽辙。
源碼如下所示:
DefaultMQProducer#viewMessage(String,String)
@Override
public?MessageExt?viewMessage(String?topic,?String?msgId)?{//省略異常聲明
????try?{
????????//1?嘗試當(dāng)做offsetMsgId進(jìn)行查詢
????????MessageId?oldMsgId?=?MessageDecoder.decodeMessageId(msgId);
????????return?this.viewMessage(msgId);
????}?catch?(Exception?e)?{
????????//查詢失敗直接忽略
????}
????//2?嘗試當(dāng)做UNIQ_KEY進(jìn)行查詢
????return?this.defaultMQProducerImpl.queryMessageByUniqKey(topic,?msgId);
}
前面提到,Unique Key只是從邏輯上代表一條消息舶赔,實(shí)際上在Broker端可能存儲了多條扫倡,因此在當(dāng)做Unique Key進(jìn)行查詢時,會進(jìn)行過濾竟纳,只取其中一條撵溃。源碼如下所示:
MQAdminImpl#queryMessageByUniqKey
public?MessageExt?queryMessageByUniqKey(String?topic,String?uniqKey)?{
????//根據(jù)uniqKey進(jìn)行查詢
????QueryResult?qr?=?this.queryMessage(topic,?uniqKey,?32,
????????????MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime()?-?1000,
????????????Long.MAX_VALUE,
????????????true);
????//對查詢結(jié)果進(jìn)行過濾,最多只取一條
????if?(qr?!=?null?&&?qr.getMessageList()?!=?null?
????????????????????????????&&?qr.getMessageList().size()?>?0)?{
????????return?qr.getMessageList().get(0);
????}?else?{
????????return?null;
????}
}
我們也可以通過另外只接收一個參數(shù)的viewMessage方法進(jìn)行查詢锥累,但是需要注意的是缘挑,參數(shù)只能是offsetMsgId,不能是Unique Key桶略。
String?offsetMsgId?=?"C0A8010300002A9F000000000007BF94";
producer.viewMessage(offsetMsgId);
queryMessage方法:
其是根據(jù)消息Key進(jìn)行查詢语淘,這里不再介紹API如何使用。則與前面兩種viewMessage方法重載不同际歼,其返回的是一個QueryResult對象惶翻,包含了多條消息。
主要是注意這個方法接收時間范圍參數(shù)鹅心,相比較于管理平臺更加靈活吕粗。管理平臺按照消息Key查詢,默認(rèn)最多返回64條消息旭愧,且不能支持指定時間范圍颅筋,如果消息Key重復(fù)度較高,那么可能有些消息搜索不到输枯。如果是在指定時間范圍內(nèi)返回64條消息议泵,如果沒有發(fā)現(xiàn)想找到的消息,再選擇其他時間范圍桃熄,則可以規(guī)避這個問題先口。
3 實(shí)現(xiàn)原理
? ??Unqiue Key & Message Key都需要利用RocketMQ的哈希索引機(jī)制來完成消息查詢,由于建立索引有一定的開銷,因此Broker端提供了相關(guān)配置項(xiàng)來控制是否開啟索引碉京。關(guān)于RocketMQ索引機(jī)制將在后面的文章進(jìn)行詳細(xì)的介紹桩引。
? ? Message Id是在Broker端生成的,其包含了Broker地址和commit Log offset信息收夸,可以精確匹配一條消息坑匠,查詢消息更好。下面分別介紹?Unqiue Key & Message Id的生成和作用卧惜。
3.1 Unique Key的生成與作用
3.1.1 Unique Key生成
Unique Key是生產(chǎn)者發(fā)送消息之前厘灼,由RocketMQ 客戶端自動生成的,具體來說咽瓷,RocketMQ發(fā)送消息之前设凹,最終都要通過以下方法:
DefaultMQProducerImpl#sendKernelImpl
private?SendResult?sendKernelImpl(final?Message?msg,
??????????????????????????????????final?MessageQueue?mq,
??????????????????????????????????final?CommunicationMode?communicationMode,
??????????????????????????????????final?SendCallback?sendCallback,
??????????????????????????????????final?TopicPublishInfo?topicPublishInfo,
??????????????????????????????????final?long?timeout)??{//省略異常聲明
?????//...略
????????try?{
????????????//如果不是批量消息,則生成Unique?Key
????????????if?(!(msg?instanceof?MessageBatch))?{
????????????????MessageClientIDSetter.setUniqID(msg);
????????????}
?????//...略
如上所示茅姜,如果不是批量消息闪朱,會通過MessageClientIDSetter的setUniqID方法為消息設(shè)置Unique key,該方法實(shí)現(xiàn)如下所示:
MessageClientIDSetter#setUniqID
public?static?void?setUniqID(final?Message?msg)?{
????//?Unique?Key不為空的情況下钻洒,才進(jìn)行設(shè)置
????if?(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,)?==?null)?{
????????msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,,?createUniqID());
????}
}
如果消息的Unique Key屬性為null奋姿,就通過createUniqID()方法為消息創(chuàng)建一個新的Unique Key,并設(shè)置到消息屬性中素标。之所以要判斷Unique Key是否為null與其作用有關(guān)称诗。
3.1.2 Unique Key作用
了解Unique Key的作用對于我們理解消息重復(fù)的原因有很大的幫助。RocketMQ并不保證消息投遞過程中的Exactly Once語義头遭,即消息只會被精確消費(fèi)一次寓免,需要消費(fèi)者自己做冪等。而通常導(dǎo)致消息重復(fù)消費(fèi)的原因计维,主要包括:
生產(chǎn)者發(fā)送時消息重復(fù):RocketMQ對于無序消息發(fā)送失敗袜香,默認(rèn)會重試2次。對于有序消息和普通有序消息為什么不進(jìn)行重試鲫惶,可參考:RocketMQ NameServer詳解?
消費(fèi)者Rebalance時消息重復(fù):這里不做介紹蜈首,可參考RocketMQ Rebalance機(jī)制詳解?
導(dǎo)致生產(chǎn)者發(fā)送重復(fù)消息的原因可能是:一條消息已被成功發(fā)送到服務(wù)端并完成持久化,由于網(wǎng)絡(luò)超時此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機(jī)剑按,導(dǎo)致服務(wù)端對客戶端應(yīng)答失敗疾就,此時生產(chǎn)者將再次嘗試發(fā)送消息澜术。
在重試發(fā)送時艺蝴,sendKernelImpl會被重復(fù)調(diào)用,意味著setUniqID方法會被重復(fù)調(diào)用鸟废,不過由于setUniqID方法實(shí)現(xiàn)中進(jìn)行判空處理猜敢,因此重復(fù)設(shè)置Unique Key。在這種情況下,消費(fèi)者后續(xù)會收到兩條內(nèi)容相同并且 Unique Key 也相同的消息(offsetMsgId不同缩擂,因?yàn)閷roker來說存儲了多次)鼠冕。
那么消費(fèi)者如何判斷,消費(fèi)重復(fù)是因?yàn)橹貜?fù)發(fā)送還是Rebalance導(dǎo)致的重復(fù)消費(fèi)呢胯盯?
消費(fèi)者實(shí)現(xiàn)MessageListener接口監(jiān)聽到的消息類型是MessageExt懈费,可以將其強(qiáng)制轉(zhuǎn)換為MessageClientExt,之后調(diào)用getMsgId方法獲取Unique Key博脑,調(diào)用getOffsetMsgId獲得Message Id憎乙。如果多消息的Unique Key相同,但是offsetMsgId不同叉趣,則有可能是因?yàn)橹貜?fù)發(fā)送導(dǎo)致泞边。
3.1.3 批量發(fā)送模式下的Unique Key
DefaultMQProducer提供了批量發(fā)送消息的接口:
public?SendResult?send(Collection<Message>?msgs)
在內(nèi)部,這批消息首先會被構(gòu)建成一個MessageBatch對象疗杉。在前面sendKernelImpl方法中我們也看到了阵谚,對于MessageBatch對象,并不會設(shè)置Unique Key烟具。這是因?yàn)樵趯⑴肯⑥D(zhuǎn)換成MessageBatch時梢什,已經(jīng)設(shè)置過了。
可能有一部分同學(xué)會誤以為一個批量消息中每條消息Unique Key是相同的朝聋,其實(shí)不然绳矩,每條消息Unique Key都不同。
這里通過一個批量發(fā)送案例進(jìn)行說明:
//構(gòu)建批量消息
ArrayList<Message>?msgs?=?new?ArrayList<>();
Message?msg1?=?new?Message("Topic_S",("message3").getBytes());
Message?msg2?=?new?Message("Topic_S",("message4").getBytes());
msgs.add(msg1);
msgs.add(msg2);
//發(fā)送
SendResult?result?=?producer.send(msgs);
//打印
System.out.println(result);
輸出如下所示:
SendResult?[sendStatus=SEND_OK,
msgId=C0A80103583618B4AAC24CDC29F10000,C0A80103583618B4AAC24CDC29F10001,
offsetMsgId=C0A80103000051AF00000000000B05BD,C0A80103000051AF00000000000B065B,
messageQueue=MessageQueue?[topic=Topic_S,?brokerName=broker-c,?queueId=2],
queueOffset=3]
可以看到玖翅,此時輸出的msgId(即Unique Key)和offsetMsgId都會包含多個值翼馆。客戶端給批量消息中每條消息設(shè)置不同的Unqiue Key金度,可以參考DefaultMQProducer#batch方法源碼:
private?MessageBatch?batch(Collection<Message>?msgs)?throws?MQClientException?{
????MessageBatch?msgBatch;
????try?{
????????//1?將消息集合轉(zhuǎn)換為MessageBatch
????????msgBatch?=?MessageBatch.generateFromList(msgs);
????????//2?迭代每個消息应媚,逐一設(shè)置Unique?Key
????????for?(Message?message?:?msgBatch)?{
????????????Validators.checkMessage(message,?this);
????????????MessageClientIDSetter.setUniqID(message);
????????}
????????//3?設(shè)置批量消息的消息體
????????msgBatch.setBody(msgBatch.encode());
????}?catch?(Exception?e)?{
????????throw?new?MQClientException("Failed?to?initiate?the?MessageBatch",?e);
????}
????return?msgBatch;
}
3.2 Message Id生成
SendResult中的offsetMsgId,即常規(guī)意義上我們所說的Message Id是在Broker端生成的猜极,用于唯一標(biāo)識一條消息中姜,在根據(jù)Message Id查詢的情況下,最多只能查詢到一條消息跟伏。Message Id總共 16 字節(jié)丢胚,包含消息存儲主機(jī)地址,消息 Commit Log offset受扳。如下圖所示:
?RocketMQ內(nèi)部通過一個MessageId對象進(jìn)行表示:
public?class?MessageId?{
????private?SocketAddress?address;?//broker地址
????private?long?offset;?//commit?log?offset
并提供了一個MessageDecoder對象來創(chuàng)建或者解碼MessageId携龟。
public?static?String?createMessageId(final?ByteBuffer?input,
?????????????????????????????????????final?ByteBuffer?addr,?final?long?offset)
public?static?MessageId?decodeMessageId(final?String?msgId)
Broker端在順序存儲消息時,首先會通過createMessageId方法創(chuàng)建msgId勘高。源碼如下所示:
CommitLog.DefaultAppendMessageCallback#doAppend
public?AppendMessageResult?doAppend(final?long?fileFromOffset,?final?ByteBuffer?byteBuffer,?
??????????????????????final?int?maxBlank,final?MessageExtBrokerInner?msgInner)?{
????//1?PHY?OFFSET:即Commit?Log?Offset?或者稱之為msgOffsetId
????long?wroteOffset?=?fileFromOffset?+?byteBuffer.position();
????//2?hostHolder用于維護(hù)broker地址信息
????this.resetByteBuffer(hostHolder,?8);
????//3?創(chuàng)建msgOffsetId
????String?msgId?=?MessageDecoder.createMessageId(this.msgIdMemory,?
????????????????????????msgInner.getStoreHostBytes(hostHolder),?wroteOffset);
而客戶端在根據(jù)msgId向Broker查詢消息時峡蟋,首先會將通過MessageDecoder的decodeMessageId方法坟桅,之后直接向這個broker進(jìn)行查詢指定位置的消息。
參見:MQAdminImpl#viewMessage
public?MessageExt?viewMessage(String?msgId)?{//省略異常聲明
????//1?根據(jù)msgId解碼成MessageId對象
????MessageId?messageId?=?null;
????try?{
????????messageId?=?MessageDecoder.decodeMessageId(msgId);
????}?catch?(Exception?e)?{
????????throw?new?MQClientException(ResponseCode.NO_MESSAGE,?
??????????????????????"query?message?by?id?finished,?but?no?message.");
????}
????//2?根據(jù)MessageId中的Broker地址和commit?log?offset信息進(jìn)行查詢
????return?this.mQClientFactory.getMQClientAPIImpl().viewMessage(
????????????RemotingUtil.socketAddress2String(messageId.getAddress()),
????????????messageId.getOffset(),?
????????????timeoutMillis);
}
由于根據(jù)Message Id進(jìn)行查詢蕊蝗,實(shí)際上是直接從特定Broker的CommitLog中的指定位置進(jìn)行查詢的仅乓,屬于精確匹配,并不像用戶設(shè)置的key蓬戚,或者Unique Key那么樣夸楣,需要使用到哈希索引機(jī)制,因此效率很高子漩。
4 總結(jié)
RocketMQ提供了3種消息查詢方式:Message Key & Unique Key & Message Id
RocketMQ提供了3種消息查詢工具:命令行裕偿、管理平臺、客戶端API痛单,且支持將查詢到讓特定/所有消費(fèi)者組重新消費(fèi)
RocketMQ有意對用戶屏蔽Unique Key & Message Id區(qū)別嘿棘,很多地方二者可以通用
Message Key & Unique Key 需要使用到哈希索引機(jī)制,有額外的索引維護(hù)成本
Message Id由Broker和commit log offset組成旭绒,屬于精確匹配鸟妙,查詢效率更好
免費(fèi)學(xué)習(xí)視頻歡迎關(guān)注云圖智聯(lián):https://e.yuntuzhilian.com/?