RocketMQ學(xué)習(xí)教程:09.深入理解NameServer【云圖智聯(lián)】

在實(shí)際開發(fā)中讯壶,經(jīng)常需要排查一條消息是否成功發(fā)送到底層MQ中想际,或者查看MQ中消息的內(nèi)容,以及如何將消息發(fā)送給指定的/所有的消費(fèi)者組重新消費(fèi)荧恍。本文對RocketMQ提供到的查詢機(jī)制和背后原理進(jìn)行深入的介紹胜卤。文章主要包括4個部分:

消息查詢介紹:介紹消息查詢中使用到的Message Key 、Unique Key、Message Id 的區(qū)別

消息查詢工具:分別介紹命令行工具、管理平臺据沈、客戶端API這三種工具的詳細(xì)用法,以及如何讓消費(fèi)者重新消費(fèi)特定的消息饺蔑。

核心實(shí)現(xiàn)原理:介紹Message Key & Unique Key與Message Id的實(shí)現(xiàn)機(jī)制上區(qū)別,Unique Key在Exactly Once語義下的作用嗜诀,以及為什么Message Id查詢效率更高猾警。

索引機(jī)制:介紹Message Key & Unique Key底層使用的哈希索引機(jī)制

1 消息查詢介紹

RocketMQ提供了3種消息查詢方式:???????????

按照Message Key 查詢:消息的key是業(yè)務(wù)開發(fā)同學(xué)在發(fā)送消息之前自行指定的,通常會把具有業(yè)務(wù)含義隆敢,區(qū)分度高的字段作為消息的key发皿,如用戶id,訂單id等拂蝎。

按照Unique Key查詢:除了業(yè)務(wù)開發(fā)同學(xué)明確的指定消息中的key穴墅,RocketMQ生產(chǎn)者客戶端在發(fā)送發(fā)送消息之前,會自動生成一個UNIQ_KEY温自,設(shè)置到消息的屬性中玄货,從邏輯上唯一代表一條消息。

按照Message Id 查詢:Message Id 是消息發(fā)送后悼泌,在Broker端生成的松捉,其包含了Broker的地址,和在CommitLog中的偏移信息馆里,并會將Message Id作為發(fā)送結(jié)果的一部分進(jìn)行返回隘世。Message Id中屬于精確匹配,可以唯一定位一條消息鸠踪,不需要使用哈希索引機(jī)制丙者,查詢效率更高。

RocketMQ有意弱化Unique Key與Message Id的區(qū)別营密,對外都稱之為Message Id械媒。在通過RocketMQ的命令行工具或管理平臺進(jìn)行查詢時,二者可以通用卵贱。在根據(jù)Unique Key進(jìn)行查詢時滥沫,本身是有可能查詢到多條消息的,但是查詢工具會進(jìn)行過濾键俱,只會返回一條消息兰绣。種種情況導(dǎo)致很多RocketMQ的用戶,并未能很好對二者進(jìn)行區(qū)分编振。

業(yè)務(wù)開發(fā)同學(xué)在使用RocketMQ時缀辩,應(yīng)該養(yǎng)成良好的習(xí)慣臭埋,在發(fā)送/消費(fèi)消息時,將這些信息記錄下來臀玄,通常是記錄到日志文件中瓢阴,以便在出現(xiàn)問題時進(jìn)行排查。

以生產(chǎn)者在發(fā)送消息為例健无,通常由以下3步組成:

//1?構(gòu)建消息對象Message

Message?msg?=?new?Message();

msg.setTopic("TopicA");

msg.setKeys("Key1");

msg.setBody("message?body".getBytes());

try{

????//2?發(fā)送消息

????SendResult?result?=?producer.send(msg);


????//3?打印發(fā)送結(jié)果

????System.out.println(result);

}catch?(Exception?e){

????e.printStackTrace();

}

第1步:構(gòu)建消息

構(gòu)建消息對象Message荣恐,在這里我們通過setKeys方法設(shè)置消息的key,如果有多個key可以使用空格" "進(jìn)行分割

第2步:發(fā)送消息

發(fā)送消息累贤,會返回一個SendResult對象表示消息發(fā)送結(jié)果叠穆。

第3步:打印發(fā)送結(jié)果

結(jié)果中包含Unique Key和Message Id,如下所示:

SendResult?[

sendStatus=SEND_OK,?

msgId=C0A801030D4B18B4AAC247DE4A0D0000,

offsetMsgId=C0A8010300002A9F000000000007BEE9,

messageQueue=MessageQueue?[topic=TopicA,?brokerName=broker-a,?queueId=0],?

queueOffset=0]

其中:

sendStatus:表示消息發(fā)送結(jié)果的狀態(tài)???????

msgId:注意這里的命名雖然是msgId臼膏,但實(shí)際上其是Unique Key

offsetMsgId:Broker返回的Message ID 硼被。在后文中,未進(jìn)行特殊說明的情況下渗磅,Message ID總是表示offsetMsgId嚷硫。

messageQueue:消息發(fā)送到了哪個的隊(duì)列,如上圖顯示發(fā)送到broker-a的第0個的隊(duì)列?

queueOffset:消息在隊(duì)列中的偏移量始鱼,每次發(fā)送到一個隊(duì)列時仔掸,offset+1?

事實(shí)上,用戶主動設(shè)置的Key以及客戶端自動生成的Unique Key医清,最終都會設(shè)置到Message對象的properties屬性中嘉汰,如下圖所示:

?其中:

KEYS:表示用戶通過setKeys方法設(shè)置的消息key,

UNIQ_KEY:表示消息發(fā)送之前由RocketMQ客戶端自動生成的Unique Key状勤。細(xì)心的讀者發(fā)現(xiàn)了其值與上述打印SendResult結(jié)果中的msgId字段的值是一樣的鞋怀,這驗(yàn)證了前面所說的msgId表示的實(shí)際上就是Unique Key的說法。

在了解如何主動設(shè)置Key持搜,以及如何獲取RocketMQ自動生成的Unique Key和Message Id后密似,就可以利用一些工具來進(jìn)行查詢。

2 消息查詢工具

RocketMQ提供了3種方式來根據(jù)Message Key葫盼、Unique Key残腌、Message Id來查詢消息,包括:

命令行工具:主要是運(yùn)維同學(xué)使用

管理平臺:運(yùn)維和開發(fā)同學(xué)都可以使用

客戶端API:主要是開發(fā)同學(xué)使用

這些工具除了可以查詢某條消息的內(nèi)容贫导,還支持將查詢到的歷史消息讓消費(fèi)者重新進(jìn)行消費(fèi)抛猫,下面分別進(jìn)行講述。

2.1 命令行工具

RocketMQ自帶的mqadmin命令行工具提供了一些子命令孩灯,用于查詢消息闺金,如下:

$?sh?bin/mqadmin?

The?most?commonly?used?mqadmin?commands?are:?

...

???queryMsgById?????????按照Message?Id查詢消息

???queryMsgByKey????????按照Key查詢消息?

???queryMsgByUniqueKey??按照UNIQ_KEY查詢消息

...

此外,還有一個queryMsgByOffset子命令峰档,不在本文講述范疇內(nèi)

2.1.1 按照Message Key查詢

mqadmin工具的queryMsgByKey子命令提供了根據(jù)key進(jìn)行查詢消息的功能败匹。注意寨昙,由于一個key可能對應(yīng)多條消息,查詢結(jié)果只會展示出這些消息對應(yīng)的Unique Key掀亩,需要根據(jù)Unique Key再次進(jìn)行查詢舔哪。

queryMsgByKey子命令使用方法如下所示:

$?sh?bin/mqadmin?queryMsgByKey?-h

usage:?mqadmin?queryMsgByKey?[-h]?-k?<arg>?[-n?<arg>]?-t?<arg>

?-h,--help????????????????打印幫助信息

?-k,--msgKey?<arg>????????指定消息的key,必須提供

?-n,--namesrvAddr?<arg>???指定nameserver地址

?-t,--topic?<arg>?????????指定topic槽棍,必須提供

例如捉蚤,要查詢在TopicA中,key為Key1的消息

$?sh?bin/mqadmin?queryMsgByKey?-k?Key1?-t?TopicA?-n?localhost:9876

#Message?ID???????????????????????????#QID?????????#Offset

C0A80103515618B4AAC2429A6E970000?????????0???????????????0

C0A80103511B18B4AAC24296D2CB0000?????????0???????????????0

C0A8010354C418B4AAC242A281360000?????????1???????????????0

C0A8010354C718B4AAC242A2B5340000?????????1???????????????1

這里炼七,我們看到輸出結(jié)果中包含了4條記錄外里。其中:

Message ID列:這里這一列的名字顯示有問題,實(shí)際上其代表的是Unique Key

QID列:表示隊(duì)列的ID特石,注意在RocketMQ中唯一地位一個隊(duì)列需要topic+brokerName+queueId。這里只顯示了queueId鳖链,其實(shí)并不能知道在哪個Broker上姆蘸。

Offset:消息在在隊(duì)列中的偏移量

在查詢到Unique Key之后,我們就可以使用另外一個命令:queryMsgByUniqueKey芙委,來查詢消息的具體內(nèi)容逞敷。

2.1.2 按照Unique Key查詢

mqadmin工具的queryMsgByUniqueKey的子命令有2個功能:

根據(jù)Unique Key查詢消息,并展示結(jié)果

讓消費(fèi)者重新消費(fèi)Unique Key對應(yīng)的消息?

我們將分別進(jìn)行講述灌侣。queryMsgByUniqueKey子命令的使用方式如下:

$?sh?bin/mqadmin?queryMsgByUniqueKey?-h

usage:?mqadmin?queryMsgByUniqueKey?[-d?<arg>]?[-g?<arg>]?[-h]?-i?<arg>?[-n?<arg>]?-t?<arg>

?-d,--clientId?<arg>????????消費(fèi)者?client?id

?-g,--consumerGroup?<arg>???消費(fèi)者組名稱

?-h,--help??????????????????打印幫助信息

?-i,--msgId?<arg>???????????消息的Unique?Key推捐,或者M(jìn)essage?Id

?-n,--namesrvAddr?<arg>?????NameServer地址

?-t,--topic?<arg>???????????消息所屬的Topic,必須提供

這里對-i 參數(shù)進(jìn)行下特殊說明侧啼,其即可接受Unique Key牛柒,即SendResult中的msgId字段;也可以接受Message Id痊乾,即SendResult中的offsetMsgId字段皮壁。

根據(jù)Unique Key查詢消息:

通過-i 參數(shù)指定Unique Key,通過-t 參數(shù)指定topic哪审,如:

$?sh?bin/mqadmin?queryMsgByUniqueKey?-i?C0A80103511B18B4AAC24296D2CB0000?-t?TopicA?-n?localhost:9876

Topic:???????????????TopicA

Tags:????????????????[null]

Keys:????????????????[Key1]

Queue?ID:????????????0

Queue?Offset:????????0

CommitLog?Offset:????507625

Reconsume?Times:?????0

Born?Timestamp:??????2019-12-13?22:19:40,619

Store?Timestamp:?????2019-12-13?22:19:40,631

Born?Host:???????????192.168.1.3:53974

Store?Host:??????????192.168.1.3:10911

System?Flag:?????????0

Properties:??????????{KEYS=Key1,?UNIQ_KEY=C0A80103511B18B4AAC24296D2CB0000,?WAIT=true}

Message?Body?Path:???/tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000

對于消息體的內(nèi)容蛾魄,會存儲到Message Body Path字段指定到的路徑中∈遥可通過cat命令查看(僅適用于消息體是字符串):

$?cat?/tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000

message?body

指定消費(fèi)者重新消費(fèi):

queryMsgByUniqueKey子命令還接收另外兩個參數(shù):-g參數(shù)用于指定消費(fèi)者組名稱滴须,-d參數(shù)指定消費(fèi)者client id。指定了這兩個參數(shù)之后叽奥,消息將由消費(fèi)者直接消費(fèi)扔水,而不是打印在控制臺上。

首先朝氓,通過consumerStatus命令铭污,查詢出消費(fèi)者組下的client id信息恋日,如:

$?sh?bin/mqadmin?consumerStatus?-g?group_X?-n?localhost:9876

001??192.168.1.3@26868????V4_5_0????1576300822831/192.168.1.3@26868

Same?subscription?in?the?same?group?of?consumer

Rebalance?OK

這里顯示了消費(fèi)者組group_X下面只有一個消費(fèi)者,client id為192.168.1.3@26868嘹狞。

接著我們可以在queryMsgByUniqueKey子命令中岂膳,添加-g和-d參數(shù),如下所示:

$?sh?bin/mqadmin?queryMsgByUniqueKey?\

-g?group_X?\

-d?192.168.1.3@26868?\

-t?TopicA?\

-i?C0A80103511B18B4AAC24296D2CB0000?\

-n?localhost:9876

ConsumeMessageDirectlyResult?[

order=false,?

autoCommit=true,?

consumeResult=CR_SUCCESS,?

remark=null,?

spentTimeMills=1]

可以看到磅网,這里并沒有打印出消息內(nèi)容谈截,取而代之的是消息消費(fèi)的結(jié)果。

在內(nèi)部涧偷,主要是分為3個步驟來完成讓指定消費(fèi)者來消費(fèi)這條消息簸喂,如下圖所示:

?第1步:

命令行工具給所有Broker發(fā)起QUERY_MESSAGE請求查詢消息,因?yàn)椴⒉恢繳NIQ_KEY這條消息在哪個Broker上燎潮,且最多只會返回一條消息喻鳄,如果超過1條其他會過濾掉;如果查詢不到就直接報(bào)錯确封。

第2步:

根據(jù)消息中包含了Store Host信息除呵,也就是消息存儲在哪個Broker上,接來下命令行工具會直接給這個Broker發(fā)起CONSUME_MESSAGE_DIRECTLY請求爪喘,這個請求會攜帶msgId颜曾,group和client id的信息

第3步:

Broker接收到這個請求,查詢出消息內(nèi)容后秉剑,主動給消費(fèi)者發(fā)送CONSUME_MESSAGE_DIRECTLY通知請求泛豪,注意雖然與第2步使用了同一個請求碼,但不同的是這個請求中包含了消息體的內(nèi)容侦鹏,消費(fèi)者可直接處理诡曙。注意:這里并不是將消息重新發(fā)送到Topic中,否則訂閱這個Topic的所有消費(fèi)者組略水,都會重新消費(fèi)這條消息岗仑。

2.1.3 根據(jù)Message Id進(jìn)行查詢

前面講解生產(chǎn)者發(fā)送消息后,返回的SendResult對象包含一個offsetMsgId字段聚请,這也就是我們常規(guī)意義上所說的Message Id荠雕,我們也可以根據(jù)這個字段來查詢消息。

根據(jù)Message Id查詢使用queryMsgById子命令驶赏,這個命令有3個作用:

根據(jù)Message Id查詢消息

通知指定消費(fèi)者重新消費(fèi)這條消息炸卑,與queryMsgByUniqueKey類似,這里不再介紹

將消息重新發(fā)送到Topic中煤傍,所有消費(fèi)者組都將重新消費(fèi)?

queryMsgById子命令用法如下所示:

$?sh?bin/mqadmin?queryMsgById?-h

usage:?mqadmin?queryMsgById?[-d?<arg>]?[-g?<arg>]?[-h]?-i?<arg>?[-n?<arg>]?[-s?<arg>]?[-u?<arg>]

?-d,--clientId?<arg>????????消費(fèi)者id

?-g,--consumerGroup?<arg>???消費(fèi)者組名稱

?-h,--help??????????????????打印幫助信息

?-i,--msgId?<arg>???????????Message?Id

?-n,--namesrvAddr?<arg>?????Name?server?地址

?-s,--sendMessage?<arg>?????重新發(fā)送消息

?-u,--unitName?<arg>????????unit?name

參數(shù)說明如下:

-d和-g參數(shù):類似于queryMsgById命令盖文,用于將消息發(fā)送給某個消費(fèi)者進(jìn)行重新消費(fèi)

-i 參數(shù):指定Message Id,即SendResult對象的offsetMsgId字段蚯姆,多個值使用逗號","分割五续。

-s參數(shù):是否重新發(fā)送消息到Topic洒敏。如果同時指定了-d和-g參數(shù),-s參數(shù)不生效疙驾。

根據(jù)Message Id查詢消息:

下圖根據(jù)SendResult的offsetMsgId字段凶伙,作為-i參數(shù),來查詢一條消息:

$?sh?bin/mqadmin?queryMsgById?-i?C0A8010300002A9F000000000007BEE9?-n?localhost:9876

OffsetID:????????????C0A8010300002A9F000000000007BEE9

OffsetID:????????????C0A8010300002A9F000000000007BEE9

Topic:???????????????TopicA

Tags:????????????????[null]

Keys:????????????????[Key1]

Queue?ID:????????????0

Queue?Offset:????????0

CommitLog?Offset:????507625

Reconsume?Times:?????0

Born?Timestamp:??????2019-12-13?22:19:40,619

Store?Timestamp:?????2019-12-13?22:19:40,631

Born?Host:???????????192.168.1.3:53974

Store?Host:??????????192.168.1.3:10911

System?Flag:?????????0

Properties:??????????{KEYS=Key1,?UNIQ_KEY=C0A80103511B18B4AAC24296D2CB0000,?WAIT=true}

Message?Body?Path:???/tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000

與queryMsgByUniqueKey子命令輸出基本類似它碎,主要是在輸出開頭多出了OffsetID字段函荣,即offsetMsgId。需要注意的是扳肛,queryMsgById不能接受Unqiue Key作為查詢參數(shù)傻挂。

重新發(fā)送消息到topic:

在指定-s參數(shù)后,消息將重新發(fā)送到topic挖息,如下(輸出進(jìn)行了格式化):

$?sh?bin/mqadmin?queryMsgById?-i?C0A8010300002A9F000000000007BEE9?-n?localhost:9876?-s?true

prepare?resend?msg.?originalMsgId=C0A8010300002A9F000000000007BEE9

SendResult?[

sendStatus=SEND_OK,?

msgId=C0A80103511B18B4AAC24296D2CB0000,?

offsetMsgId=C0A80103000078BF000000000004D923,?

messageQueue=MessageQueue?[topic=TopicA,?brokerName=broker-b,?queueId=1],?

queueOffset=1]

可以看到金拒,這里因?yàn)橄⑹侵匦掳l(fā)送到了Topic中,因此與我們之前使用生產(chǎn)者發(fā)送消息一樣套腹,輸出的是一個SendResult绪抛。在這種情況下沉迹,訂閱這個Topic的所有消費(fèi)者組都會重新消費(fèi)到這條消息。

在實(shí)際開發(fā)中害驹,如果多個消費(fèi)者組訂閱了某個Topic的消息鞭呕,如果所有的消費(fèi)者都希望重新消費(fèi),那么就應(yīng)該使用-s參數(shù)宛官。如果只是某個消費(fèi)者希望重新消費(fèi)葫松,那么應(yīng)該指定-g和-d參數(shù)。

另外底洗,我們看到發(fā)送前打印的originalMsgId和發(fā)送后SendResult中的offsetMsgId值并不一樣腋么,這是因?yàn)橄l(fā)送到Topic重新進(jìn)行了存儲,因此值不相同亥揖。這也是為什么我們說Message Id可以唯一對應(yīng)一條消息的原因珊擂。

而輸出的SendResult結(jié)果中的msgId,即Unique Key费变,并沒有發(fā)生變化摧扇,因此盡管名字是Unique Key,但是實(shí)際上還是有可能對應(yīng)多條消息的挚歧。而前面根據(jù)queryMsgByUniqueKey查詢之所以只有一條消息扛稽,實(shí)際上是進(jìn)行了過濾。

2.2 管理平臺

RocketMQ提供的命令行工具滑负,雖然功能強(qiáng)大在张,一般是運(yùn)維同學(xué)使用較多用含。通過RocketMQ提供的管理平臺進(jìn)來行消息查詢,則對業(yè)務(wù)開發(fā)同學(xué)更加友好帮匾。在管理平臺的消息一欄啄骇,有3個TAB,分別用于:根據(jù)Topic時間范圍查詢辟狈、Message Key查詢肠缔、Message Id查詢,下面分別進(jìn)行介紹哼转。

根據(jù)Topic時間范圍查詢:

按 Topic 查詢屬于范圍查詢明未,不推薦使用,因?yàn)闀r間范圍內(nèi)消息很多壹蔓,不具備區(qū)分度趟妥。查詢時,盡可能設(shè)置最為精確的時間區(qū)間佣蓉,以便縮小查詢范圍披摄,提高速度。最多返回2000條數(shù)據(jù)勇凭。

?根據(jù)Message Key查詢:

按 Message Key 查詢屬于模糊查詢疚膊,僅適用于沒有記錄 Message ID 但是設(shè)置了具有區(qū)分度的 Message Key的情況。 目前虾标,根據(jù)Message Key查詢寓盗,有一個很大局限性:不能指定時間范圍,且最多返回64條數(shù)據(jù)璧函。如果用戶指定的key重復(fù)率比較高的話傀蚌,就有可能搜不到。

?根據(jù)Message Id查詢:

按 Message ID 查詢屬于精確查詢蘸吓,速度快善炫,精確匹配,只會返回一條結(jié)果库继,推薦使用箩艺。在這里,傳入U(xiǎn)nique Key宪萄,offsetMsgId都可以舅桩。

查看消息詳情:

在按照Topic 時間范圍查詢,按照Message Key查詢雨膨,結(jié)果列表有一個Message Detail按鈕擂涛,點(diǎn)擊可以看到消息詳情:包括消息key、tag、生成時間撒妈,消息體內(nèi)容等恢暖。在詳情頁面,也可以將消息直接發(fā)送給某個消費(fèi)者組進(jìn)行重新消費(fèi)狰右。

需要注意的是杰捂,在消息體展示的時候,只能將消息體轉(zhuǎn)換成字符串進(jìn)行展示棋蚌,如果消息的內(nèi)容是protobuf嫁佳、thrift、hessian編碼的谷暮,那么將顯示一堆亂碼蒿往。

如果公司內(nèi)部有統(tǒng)一的IDL/Schema管理平臺,則可以解決這個問題湿弦,通過為每個Topic關(guān)聯(lián)一個IDL瓤漏,在消息展示時,可以根據(jù)IDL反序列化后在進(jìn)行展示颊埃。

2.3 客戶端API

除了通過命令行工具和管理平臺蔬充,還可以通過客戶端API的方式來進(jìn)行查詢,這其實(shí)是最本質(zhì)的方式班利,命令行工具和管理平臺的查詢功能都是基于此實(shí)現(xiàn)饥漫。

在org.apache.rocketmq.client.MQAdmin接口中,定義了以下幾個方法用于消息查詢:

//msgId參數(shù):僅接收SendResult中的offsetMsgId罗标,返回單條消息

MessageExt?viewMessage(final?String?msgId)

//msgId參數(shù):傳入SendResult中的offsetMsgId庸队、msgId都可以,返回單條消息

MessageExt?viewMessage(String?topic,String?msgId)

//在指定topic下馒稍,根據(jù)key進(jìn)行查詢皿哨,并指定最大返回條數(shù)浅侨,以及開始和結(jié)束時間

QueryResult?queryMessage(final?String?topic,?final?String?key,?

?????????????????????????final?int?maxNum,?final?long?begin,final?long?end)

對于MQAdmin接口纽谒,可能部分同學(xué)比較陌生。不過我們常用的DefaultMQProducer如输、DefaultMQPushConsumer等鼓黔,都實(shí)現(xiàn)了此接口,因此都具備消息查詢的能力不见,如下所示:

對于命令行工具澳化,底層實(shí)際上是基于MQAdminExt接口的實(shí)現(xiàn)來完成的。

細(xì)心的讀者會問稳吮,相同的查詢功能在在多處實(shí)現(xiàn)是不是太麻煩了缎谷?事實(shí)上,這只是對外暴露的接口灶似,在內(nèi)部列林,實(shí)際上都是基于MQAdminImpl這個類來完成的瑞你。

viewMessage方法:

兩種viewMessage方法重載形式,都只會返回單條消息希痴。下面以生產(chǎn)者搜索為例者甲,講解如何使用API進(jìn)行查詢:

//初始化Producer

DefaultMQProducer?producer?=?new?DefaultMQProducer();

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

//根據(jù)UniqueKey查詢

String?uniqueKey?=?"C0A8010354C418B4AAC242A281360000";

MessageExt?msg?=?producer.viewMessage("TopicA",?uniqueKey);

//打印結(jié)果:這里僅輸出Unique?Key與offsetMsgId

MessageClientExt?msgExt=?(MessageClientExt)?msg;

System.out.println("Unique?Key:"+msgExt.getMsgId()//即UNIQUE_KEY

????????+"\noffsetMsgId:"+msgExt.getOffsetMsgId());

輸出結(jié)果如下:

Unique?Key:C0A8010354C418B4AAC242A281360000

offsetMsgId:C0A8010300002A9F000000000007BF94

如果我們把offsetMsgId當(dāng)做方法參數(shù)傳入,也可以查詢到相同的結(jié)果砌创。這是因?yàn)槁哺祝诜椒▋?nèi)部實(shí)際上是分兩步進(jìn)行查詢的:

先把參數(shù)當(dāng)做offsetMsgId,即Message Id進(jìn)行查詢

如果失敗嫩实,再嘗試當(dāng)做Unique Key進(jìn)行查詢刽辙。

源碼如下所示:

DefaultMQProducer#viewMessage(String,String)

@Override

public?MessageExt?viewMessage(String?topic,?String?msgId)?{//省略異常聲明

????try?{

????????//1?嘗試當(dāng)做offsetMsgId進(jìn)行查詢

????????MessageId?oldMsgId?=?MessageDecoder.decodeMessageId(msgId);

????????return?this.viewMessage(msgId);

????}?catch?(Exception?e)?{

????????//查詢失敗直接忽略

????}

????//2?嘗試當(dāng)做UNIQ_KEY進(jìn)行查詢

????return?this.defaultMQProducerImpl.queryMessageByUniqKey(topic,?msgId);

}

前面提到,Unique Key只是從邏輯上代表一條消息舶赔,實(shí)際上在Broker端可能存儲了多條扫倡,因此在當(dāng)做Unique Key進(jìn)行查詢時,會進(jìn)行過濾竟纳,只取其中一條撵溃。源碼如下所示:

MQAdminImpl#queryMessageByUniqKey

public?MessageExt?queryMessageByUniqKey(String?topic,String?uniqKey)?{

????//根據(jù)uniqKey進(jìn)行查詢

????QueryResult?qr?=?this.queryMessage(topic,?uniqKey,?32,

????????????MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime()?-?1000,

????????????Long.MAX_VALUE,

????????????true);


????//對查詢結(jié)果進(jìn)行過濾,最多只取一條

????if?(qr?!=?null?&&?qr.getMessageList()?!=?null?

????????????????????????????&&?qr.getMessageList().size()?>?0)?{

????????return?qr.getMessageList().get(0);

????}?else?{

????????return?null;

????}

}

我們也可以通過另外只接收一個參數(shù)的viewMessage方法進(jìn)行查詢锥累,但是需要注意的是缘挑,參數(shù)只能是offsetMsgId,不能是Unique Key桶略。

String?offsetMsgId?=?"C0A8010300002A9F000000000007BF94";

producer.viewMessage(offsetMsgId);

queryMessage方法:

其是根據(jù)消息Key進(jìn)行查詢语淘,這里不再介紹API如何使用。則與前面兩種viewMessage方法重載不同际歼,其返回的是一個QueryResult對象惶翻,包含了多條消息。

主要是注意這個方法接收時間范圍參數(shù)鹅心,相比較于管理平臺更加靈活吕粗。管理平臺按照消息Key查詢,默認(rèn)最多返回64條消息旭愧,且不能支持指定時間范圍颅筋,如果消息Key重復(fù)度較高,那么可能有些消息搜索不到输枯。如果是在指定時間范圍內(nèi)返回64條消息议泵,如果沒有發(fā)現(xiàn)想找到的消息,再選擇其他時間范圍桃熄,則可以規(guī)避這個問題先口。

3 實(shí)現(xiàn)原理

? ??Unqiue Key & Message Key都需要利用RocketMQ的哈希索引機(jī)制來完成消息查詢,由于建立索引有一定的開銷,因此Broker端提供了相關(guān)配置項(xiàng)來控制是否開啟索引碉京。關(guān)于RocketMQ索引機(jī)制將在后面的文章進(jìn)行詳細(xì)的介紹桩引。

? ? Message Id是在Broker端生成的,其包含了Broker地址和commit Log offset信息收夸,可以精確匹配一條消息坑匠,查詢消息更好。下面分別介紹?Unqiue Key & Message Id的生成和作用卧惜。

3.1 Unique Key的生成與作用

3.1.1 Unique Key生成

Unique Key是生產(chǎn)者發(fā)送消息之前厘灼,由RocketMQ 客戶端自動生成的,具體來說咽瓷,RocketMQ發(fā)送消息之前设凹,最終都要通過以下方法:

DefaultMQProducerImpl#sendKernelImpl

private?SendResult?sendKernelImpl(final?Message?msg,

??????????????????????????????????final?MessageQueue?mq,

??????????????????????????????????final?CommunicationMode?communicationMode,

??????????????????????????????????final?SendCallback?sendCallback,

??????????????????????????????????final?TopicPublishInfo?topicPublishInfo,

??????????????????????????????????final?long?timeout)??{//省略異常聲明

?????//...略


????????try?{

????????????//如果不是批量消息,則生成Unique?Key

????????????if?(!(msg?instanceof?MessageBatch))?{

????????????????MessageClientIDSetter.setUniqID(msg);

????????????}


?????//...略

如上所示茅姜,如果不是批量消息闪朱,會通過MessageClientIDSetter的setUniqID方法為消息設(shè)置Unique key,該方法實(shí)現(xiàn)如下所示:

MessageClientIDSetter#setUniqID

public?static?void?setUniqID(final?Message?msg)?{

????//?Unique?Key不為空的情況下钻洒,才進(jìn)行設(shè)置

????if?(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,)?==?null)?{

????????msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,,?createUniqID());

????}

}

如果消息的Unique Key屬性為null奋姿,就通過createUniqID()方法為消息創(chuàng)建一個新的Unique Key,并設(shè)置到消息屬性中素标。之所以要判斷Unique Key是否為null與其作用有關(guān)称诗。

3.1.2 Unique Key作用

了解Unique Key的作用對于我們理解消息重復(fù)的原因有很大的幫助。RocketMQ并不保證消息投遞過程中的Exactly Once語義头遭,即消息只會被精確消費(fèi)一次寓免,需要消費(fèi)者自己做冪等。而通常導(dǎo)致消息重復(fù)消費(fèi)的原因计维,主要包括:

生產(chǎn)者發(fā)送時消息重復(fù):RocketMQ對于無序消息發(fā)送失敗袜香,默認(rèn)會重試2次。對于有序消息和普通有序消息為什么不進(jìn)行重試鲫惶,可參考:RocketMQ NameServer詳解?

消費(fèi)者Rebalance時消息重復(fù):這里不做介紹蜈首,可參考RocketMQ Rebalance機(jī)制詳解?

導(dǎo)致生產(chǎn)者發(fā)送重復(fù)消息的原因可能是:一條消息已被成功發(fā)送到服務(wù)端并完成持久化,由于網(wǎng)絡(luò)超時此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機(jī)剑按,導(dǎo)致服務(wù)端對客戶端應(yīng)答失敗疾就,此時生產(chǎn)者將再次嘗試發(fā)送消息澜术。

在重試發(fā)送時艺蝴,sendKernelImpl會被重復(fù)調(diào)用,意味著setUniqID方法會被重復(fù)調(diào)用鸟废,不過由于setUniqID方法實(shí)現(xiàn)中進(jìn)行判空處理猜敢,因此重復(fù)設(shè)置Unique Key。在這種情況下,消費(fèi)者后續(xù)會收到兩條內(nèi)容相同并且 Unique Key 也相同的消息(offsetMsgId不同缩擂,因?yàn)閷roker來說存儲了多次)鼠冕。

那么消費(fèi)者如何判斷,消費(fèi)重復(fù)是因?yàn)橹貜?fù)發(fā)送還是Rebalance導(dǎo)致的重復(fù)消費(fèi)呢胯盯?

消費(fèi)者實(shí)現(xiàn)MessageListener接口監(jiān)聽到的消息類型是MessageExt懈费,可以將其強(qiáng)制轉(zhuǎn)換為MessageClientExt,之后調(diào)用getMsgId方法獲取Unique Key博脑,調(diào)用getOffsetMsgId獲得Message Id憎乙。如果多消息的Unique Key相同,但是offsetMsgId不同叉趣,則有可能是因?yàn)橹貜?fù)發(fā)送導(dǎo)致泞边。

3.1.3 批量發(fā)送模式下的Unique Key

DefaultMQProducer提供了批量發(fā)送消息的接口:

public?SendResult?send(Collection<Message>?msgs)

在內(nèi)部,這批消息首先會被構(gòu)建成一個MessageBatch對象疗杉。在前面sendKernelImpl方法中我們也看到了阵谚,對于MessageBatch對象,并不會設(shè)置Unique Key烟具。這是因?yàn)樵趯⑴肯⑥D(zhuǎn)換成MessageBatch時梢什,已經(jīng)設(shè)置過了。

可能有一部分同學(xué)會誤以為一個批量消息中每條消息Unique Key是相同的朝聋,其實(shí)不然绳矩,每條消息Unique Key都不同。

這里通過一個批量發(fā)送案例進(jìn)行說明:

//構(gòu)建批量消息

ArrayList<Message>?msgs?=?new?ArrayList<>();

Message?msg1?=?new?Message("Topic_S",("message3").getBytes());

Message?msg2?=?new?Message("Topic_S",("message4").getBytes());

msgs.add(msg1);

msgs.add(msg2);

//發(fā)送

SendResult?result?=?producer.send(msgs);

//打印

System.out.println(result);

輸出如下所示:

SendResult?[sendStatus=SEND_OK,

msgId=C0A80103583618B4AAC24CDC29F10000,C0A80103583618B4AAC24CDC29F10001,

offsetMsgId=C0A80103000051AF00000000000B05BD,C0A80103000051AF00000000000B065B,

messageQueue=MessageQueue?[topic=Topic_S,?brokerName=broker-c,?queueId=2],

queueOffset=3]

可以看到玖翅,此時輸出的msgId(即Unique Key)和offsetMsgId都會包含多個值翼馆。客戶端給批量消息中每條消息設(shè)置不同的Unqiue Key金度,可以參考DefaultMQProducer#batch方法源碼:

private?MessageBatch?batch(Collection<Message>?msgs)?throws?MQClientException?{

????MessageBatch?msgBatch;

????try?{

????????//1?將消息集合轉(zhuǎn)換為MessageBatch

????????msgBatch?=?MessageBatch.generateFromList(msgs);


????????//2?迭代每個消息应媚,逐一設(shè)置Unique?Key

????????for?(Message?message?:?msgBatch)?{

????????????Validators.checkMessage(message,?this);

????????????MessageClientIDSetter.setUniqID(message);

????????}

????????//3?設(shè)置批量消息的消息體

????????msgBatch.setBody(msgBatch.encode());

????}?catch?(Exception?e)?{

????????throw?new?MQClientException("Failed?to?initiate?the?MessageBatch",?e);

????}

????return?msgBatch;

}

3.2 Message Id生成

SendResult中的offsetMsgId,即常規(guī)意義上我們所說的Message Id是在Broker端生成的猜极,用于唯一標(biāo)識一條消息中姜,在根據(jù)Message Id查詢的情況下,最多只能查詢到一條消息跟伏。Message Id總共 16 字節(jié)丢胚,包含消息存儲主機(jī)地址,消息 Commit Log offset受扳。如下圖所示:

?RocketMQ內(nèi)部通過一個MessageId對象進(jìn)行表示:

public?class?MessageId?{

????private?SocketAddress?address;?//broker地址

????private?long?offset;?//commit?log?offset

并提供了一個MessageDecoder對象來創(chuàng)建或者解碼MessageId携龟。

public?static?String?createMessageId(final?ByteBuffer?input,

?????????????????????????????????????final?ByteBuffer?addr,?final?long?offset)

public?static?MessageId?decodeMessageId(final?String?msgId)

Broker端在順序存儲消息時,首先會通過createMessageId方法創(chuàng)建msgId勘高。源碼如下所示:

CommitLog.DefaultAppendMessageCallback#doAppend

public?AppendMessageResult?doAppend(final?long?fileFromOffset,?final?ByteBuffer?byteBuffer,?

??????????????????????final?int?maxBlank,final?MessageExtBrokerInner?msgInner)?{

????//1?PHY?OFFSET:即Commit?Log?Offset?或者稱之為msgOffsetId

????long?wroteOffset?=?fileFromOffset?+?byteBuffer.position();

????//2?hostHolder用于維護(hù)broker地址信息

????this.resetByteBuffer(hostHolder,?8);

????//3?創(chuàng)建msgOffsetId

????String?msgId?=?MessageDecoder.createMessageId(this.msgIdMemory,?

????????????????????????msgInner.getStoreHostBytes(hostHolder),?wroteOffset);

而客戶端在根據(jù)msgId向Broker查詢消息時峡蟋,首先會將通過MessageDecoder的decodeMessageId方法坟桅,之后直接向這個broker進(jìn)行查詢指定位置的消息。

參見:MQAdminImpl#viewMessage

public?MessageExt?viewMessage(String?msgId)?{//省略異常聲明

????//1?根據(jù)msgId解碼成MessageId對象

????MessageId?messageId?=?null;

????try?{

????????messageId?=?MessageDecoder.decodeMessageId(msgId);

????}?catch?(Exception?e)?{

????????throw?new?MQClientException(ResponseCode.NO_MESSAGE,?

??????????????????????"query?message?by?id?finished,?but?no?message.");

????}

????//2?根據(jù)MessageId中的Broker地址和commit?log?offset信息進(jìn)行查詢

????return?this.mQClientFactory.getMQClientAPIImpl().viewMessage(

????????????RemotingUtil.socketAddress2String(messageId.getAddress()),

????????????messageId.getOffset(),?

????????????timeoutMillis);

}

由于根據(jù)Message Id進(jìn)行查詢蕊蝗,實(shí)際上是直接從特定Broker的CommitLog中的指定位置進(jìn)行查詢的仅乓,屬于精確匹配,并不像用戶設(shè)置的key蓬戚,或者Unique Key那么樣夸楣,需要使用到哈希索引機(jī)制,因此效率很高子漩。

4 總結(jié)

RocketMQ提供了3種消息查詢方式:Message Key & Unique Key & Message Id

RocketMQ提供了3種消息查詢工具:命令行裕偿、管理平臺、客戶端API痛单,且支持將查詢到讓特定/所有消費(fèi)者組重新消費(fèi)

RocketMQ有意對用戶屏蔽Unique Key & Message Id區(qū)別嘿棘,很多地方二者可以通用

Message Key & Unique Key 需要使用到哈希索引機(jī)制,有額外的索引維護(hù)成本

Message Id由Broker和commit log offset組成旭绒,屬于精確匹配鸟妙,查詢效率更好

免費(fèi)學(xué)習(xí)視頻歡迎關(guān)注云圖智聯(lián):https://e.yuntuzhilian.com/?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市挥吵,隨后出現(xiàn)的幾起案子重父,更是在濱河造成了極大的恐慌,老刑警劉巖忽匈,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件房午,死亡現(xiàn)場離奇詭異,居然都是意外死亡丹允,警方通過查閱死者的電腦和手機(jī)郭厌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雕蔽,“玉大人折柠,你說我怎么就攤上這事∨” “怎么了扇售?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長嚣艇。 經(jīng)常有香客問我承冰,道長,這世上最難降的妖魔是什么食零? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任困乒,我火速辦了婚禮,結(jié)果婚禮上慌洪,老公的妹妹穿的比我還像新娘顶燕。我一直安慰自己,他們只是感情好冈爹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布涌攻。 她就那樣靜靜地躺著,像睡著了一般频伤。 火紅的嫁衣襯著肌膚如雪恳谎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天憋肖,我揣著相機(jī)與錄音因痛,去河邊找鬼。 笑死岸更,一個胖子當(dāng)著我的面吹牛鸵膏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播怎炊,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼谭企,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了评肆?” 一聲冷哼從身側(cè)響起债查,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎瓜挽,沒想到半個月后盹廷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡久橙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年俄占,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片淆衷。...
    茶點(diǎn)故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡颠放,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出吭敢,到底是詐尸還是另有隱情碰凶,我是刑警寧澤,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布鹿驼,位于F島的核電站枝秤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏白魂。R本人自食惡果不足惜蔓搞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望凄鼻。 院中可真熱鬧腊瑟,春花似錦聚假、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至财松,卻和暖如春瘪贱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辆毡。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工菜秦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人舶掖。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓球昨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親眨攘。 傳聞我的和親對象是個殘疾皇子褪尝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評論 2 353