一、RocketMQ 的前世今生
RocketMQ 是阿里巴巴開(kāi)源的分布式消息系統(tǒng)赖瞒,基于高可用分布式集群技術(shù)女揭,提供低延時(shí)的、高可靠的消息發(fā)布與訂閱服務(wù)栏饮,是一個(gè)統(tǒng)一的消息引擎吧兔,輕量級(jí)的數(shù)據(jù)處理平臺(tái)。起源于阿里巴巴 2001 年的五彩石項(xiàng)目袍嬉, Notify 在這期間應(yīng)運(yùn)而生境蔼,用于交易核心消息的流轉(zhuǎn) 。
2010 年, B2B 開(kāi)始大規(guī)模使用 ActiveMQ 作為消息內(nèi)核欧穴。
2011 年民逼,隨著阿里業(yè)務(wù)的快速發(fā)展,急需一款支持順序消息涮帘,擁有海量消息堆積能力的消息中間件拼苍, MetaQ 1.0 因此誕生。
2012 年调缨, MetaQ已經(jīng)發(fā)展到了3.0版本疮鲫,RocketMQ 正是基于 MetaQ 3.0 開(kāi)發(fā)的分布式消息傳遞中間件,專(zhuān)為萬(wàn)億級(jí)的消息處理而設(shè)計(jì)弦叶,具有高吞吐量俊犯,低延遲,海量積累和有序消息伤哺。它是阿里巴巴和眾多大型互聯(lián)網(wǎng)業(yè)務(wù)場(chǎng)景的雙十一購(gòu)物狂歡的現(xiàn)成工具燕侠。由于這些優(yōu)點(diǎn),它吸引了越來(lái)越多的應(yīng)用程序進(jìn)行訪問(wèn)立莉。同年绢彤,阿里巴巴正式開(kāi)源了 RocketMQ 的第一個(gè)版本。
2015 年蜓耻,RocketMQ 見(jiàn)證了消息傳遞的多項(xiàng)重量級(jí)功能茫舶,包括事務(wù)消息、SQL過(guò)濾器刹淌、消息追溯、調(diào)度消息疹启、多站點(diǎn)高可用性等柠衅,以滿(mǎn)足阿里巴巴日益豐富的業(yè)務(wù)場(chǎng)景皮仁。它還取代了阿里巴巴自主研發(fā)的另一款 MQ 產(chǎn)品 Notify菲宴,成為阿里巴巴首選的消息中間件趋急。
2016 年喝峦,RocketMQ 在阿里云上開(kāi)發(fā)了首個(gè)全托管服務(wù)呜达,幫助大量數(shù)字化轉(zhuǎn)型企業(yè)構(gòu)建現(xiàn)代應(yīng)用,并開(kāi)始體驗(yàn)大規(guī)模的云計(jì)算實(shí)踐挤忙。同年册烈,RocketMQ 被捐贈(zèng)給Apache基金會(huì)婿禽,并加入孵化器項(xiàng)目扭倾,旨在在未來(lái)為更多的開(kāi)發(fā)者服務(wù)膛壹。
2016 年模聋,RocketMQ 榮獲中國(guó)最受歡迎開(kāi)源軟件獎(jiǎng)撬槽。
2017 年此改,從 Apache 基金會(huì)畢業(yè)后共啃,RocketMQ被指定為頂級(jí)項(xiàng)目(TLP)移剪。
2018 年纵苛,RocketMQ 榮獲中國(guó)最受歡迎開(kāi)源軟件獎(jiǎng)攻人。
二怀吻、RocketMQ 的基本概念
2.1 消息模型(Message Model)
RocketMQ主要由 Producer蓬坡、Broker屑咳、Consumer 三部分組成兆龙,其中Producer 負(fù)責(zé)生產(chǎn)消息详瑞,Consumer 負(fù)責(zé)消費(fèi)消息坝橡,Broker 負(fù)責(zé)存儲(chǔ)消息计寇。Broker 在實(shí)際部署過(guò)程中對(duì)應(yīng)一臺(tái)服務(wù)器番宁,每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息蝶押,每個(gè)Topic的消息也可以分片存儲(chǔ)于不同的 Broker棋电。Message Queue 用于存儲(chǔ)消息的物理地址赶盔,每個(gè)Topic中的消息地址存儲(chǔ)于多個(gè) Message Queue 中于未。ConsumerGroup 由多個(gè)Consumer 實(shí)例構(gòu)成烘浦。
2.2 消息生產(chǎn)者(Producer)
負(fù)責(zé)生產(chǎn)消息谎倔,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)生產(chǎn)消息片习。一個(gè)消息生產(chǎn)者會(huì)把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到 broker 服務(wù)器状知。RocketMQ 提供多種發(fā)送方式饥悴,同步發(fā)送西设、異步發(fā)送贷揽、順序發(fā)送禽绪、單向發(fā)送印屁。同步和異步方式均需要 Broker 返回確認(rèn)信息雄人,單向發(fā)送不需要础钠。
2.3 消息消費(fèi)者(Consumer)
負(fù)責(zé)消費(fèi)消息珍坊,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)阵漏。一個(gè)消息消費(fèi)者會(huì)從 Broker 服務(wù)器拉取消息、并將其提供給應(yīng)用程序叹洲。從用戶(hù)應(yīng)用的角度而言提供了兩種消費(fèi)形式:拉取式消費(fèi)运提、推動(dòng)式消費(fèi)民泵。
2.4 主題(Topic)
表示一類(lèi)消息的集合栈妆,每個(gè)主題包含若干條消息鳞尔,每條消息只能屬于一個(gè)主題寥假,是RocketMQ 進(jìn)行消息訂閱的基本單位拾给。
2.5 代理服務(wù)器(Broker Server)
消息中轉(zhuǎn)角色蒋得,負(fù)責(zé)存儲(chǔ)消息额衙、轉(zhuǎn)發(fā)消息窍侧。代理服務(wù)器在 RocketMQ 系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來(lái)的消息并存儲(chǔ)伟件、同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備。代理服務(wù)器也存儲(chǔ)消息相關(guān)的元數(shù)據(jù)咧织,包括消費(fèi)者組习绢、消費(fèi)進(jìn)度偏移和主題和隊(duì)列消息等闪萄。
2.6 名字服務(wù)(Name Server)
名稱(chēng)服務(wù)充當(dāng)路由消息的提供者桃煎。生產(chǎn)者或消費(fèi)者能夠通過(guò)名字服務(wù)查找各主題相應(yīng)的Broker IP 列表。多個(gè) Namesrv 實(shí)例組成集群葫辐,但相互獨(dú)立耿战,沒(méi)有信息交換剂陡。
2.7 拉取式消費(fèi)(Pull Consumer)
Consumer消費(fèi)的一種類(lèi)型,應(yīng)用通常主動(dòng)調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息晕鹊、主動(dòng)權(quán)由應(yīng)用控制溅话。一旦獲取了批量消息飞几,應(yīng)用就會(huì)啟動(dòng)消費(fèi)過(guò)程屑墨。
2.8 推動(dòng)式消費(fèi)(Push Consumer)
Consumer消費(fèi)的一種類(lèi)型,該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給消費(fèi)端程腹,該消費(fèi)模式一般實(shí)時(shí)性較高寸潦。
2.9 生產(chǎn)者組(Producer Group)
同一類(lèi)Producer的集合见转,這類(lèi)Producer發(fā)送同一類(lèi)消息且發(fā)送邏輯一致吏砂。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰狐血,則Broker服務(wù)器會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)匈织。
2.10 消費(fèi)者組(Consumer Group)
同一類(lèi)Consumer的集合,這類(lèi)Consumer通常消費(fèi)同一類(lèi)消息且消費(fèi)邏輯一致乡小。消費(fèi)者組使得在消息消費(fèi)方面劲件,實(shí)現(xiàn)負(fù)載均衡和容錯(cuò)的目標(biāo)變得非常容易。要注意的是牵辣,消費(fèi)者組的消費(fèi)者實(shí)例必須訂閱完全相同的Topic纬向。RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。
2.11 集群消費(fèi)(Clustering)
集群消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例平均分?jǐn)傁ⅰ?/p>
2.12 廣播消費(fèi)(Broadcasting)
廣播消費(fèi)模式下师脂,相同Consumer Group的每個(gè)Consumer實(shí)例都接收全量的消息吃警。
2.13 普通順序消息(Normal Ordered Message)
普通順序消費(fèi)模式下拌消,消費(fèi)者通過(guò)同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的墩崩,不同消息隊(duì)列收到的消息則可能是無(wú)順序的泰鸡。
2.14 嚴(yán)格順序消息(Strictly Ordered Message)
嚴(yán)格順序消息模式下饰迹,消費(fèi)者收到的所有消息均是有順序的啊鸭。
2.15 消息(Message)
消息系統(tǒng)所傳輸信息的物理載體赠制,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位烟号,每條消息必須屬于一個(gè)主題汪拥。RocketMQ中每個(gè)消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key脯燃。系統(tǒng)提供了通過(guò)Message ID和Key查詢(xún)消息的功能。
2.16 標(biāo)簽(Tag)
為消息設(shè)置的標(biāo)志坟募,用于同一主題下區(qū)分不同類(lèi)型的消息懈糯。來(lái)自同一業(yè)務(wù)單元的消息她紫,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽贿讹。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性民褂,并優(yōu)化RocketMQ提供的查詢(xún)系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯哭廉,實(shí)現(xiàn)更好的擴(kuò)展性。
2.17 DefaultMQPushConsumer和DefaultMQPullConsumer
Push的方式是 Server端接收到消息后,主動(dòng)把消息推送給 Client端赎离,主動(dòng)權(quán)在Server端,實(shí)時(shí)性高荣病。用 Push方式主動(dòng)推送有很多弊 端:首先是加大 Server 端的 工作量个盆,進(jìn)而影響 Server 的性能;其次柴梆,Client 的處理能力各不相同绍在, Client 的狀態(tài)不受 Server 控制,
Pull方式是 Client端循環(huán)地從 Server端拉取消息溜宽,主動(dòng)權(quán)在 Client手里, 自己拉取到一定量消息后涡扼,處理妥當(dāng)了再接著取汤善。Pull 方式的問(wèn)題是循環(huán)拉取 消息的間隔不好設(shè)定红淡,間隔太短就處在一個(gè) “忙等”的狀態(tài)不狮,浪費(fèi)資源; Pull 的時(shí)間間隔太長(zhǎng) Server 端有消息到來(lái)時(shí) 有可能沒(méi)有被及時(shí)處理。
三在旱、SpringBoot整合RocketMQ
3.1 添加rocketmq-spring-boot-starter等相關(guān)依賴(lài)
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
3.2 添加配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
#必須指定group
group: test-group
3.3 同步消息
同步消息 API
//發(fā)送普通同步消息-Object
syncSend(String destination, Object payload);
//發(fā)送普通同步消息-Message
syncSend(String destination, Message<?> message);
//發(fā)送批量普通同步消息
syncSend(String destination, Collection<T> messages);
//發(fā)送普通同步消息-Object摇零,并設(shè)置發(fā)送超時(shí)時(shí)間
syncSend(String destination, Object payload, long timeout);
//發(fā)送普通同步消息-Message,并設(shè)置發(fā)送超時(shí)時(shí)間
syncSend(String destination, Message<?> message, long timeout);
//發(fā)送批量普通同步消息驻仅,并設(shè)置發(fā)送超時(shí)時(shí)間
syncSend(String destination, Collection<T> messages, long timeout);
//發(fā)送普通同步延遲消息,并設(shè)置超時(shí)登渣,這個(gè)下文會(huì)演示
syncSend(String destination, Message<?> message, long timeout, int delayLevel);
/**
* 普通發(fā)送
* @param topic 消息主題
* @param msg 消息體
* @param <T> 消息泛型
*/
public <T> void send(String topic, T msg) {
rocketMQTemplate.convertAndSend(topic + ":tag1", msg);
//rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(msg).build()); // 等價(jià)于上面一行
}
/**
* 發(fā)送帶tag的消息噪服,直接在topic后面加上":tag"
*
* @param topic 消息主題
* @param tag 消息tag
* @param msg 消息體
* @param <T> 消息泛型
* @return
*/
public <T> SendResult sendTagMsg(String topic, String tag, T msg) {
topic = topic + ":" + tag;
return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build());
}
/**
* 發(fā)送同步消息(阻塞當(dāng)前線(xiàn)程,等待broker響應(yīng)發(fā)送結(jié)果胜茧,這樣不太容易丟失消息)
* sendResult為返回的發(fā)送結(jié)果
*/
public <T> SendResult sendMsg(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
這里存在兩種消息體粘优,一種是Object的,另一種是Message<?>的形式的,其實(shí)我們發(fā)送Object的時(shí)候雹顺,底層是有幫我們做轉(zhuǎn)換的丹墨,其實(shí)和我們?cè)谏蠈诱{(diào)用。
MessageBuilder.withPayload("hello world test1").build()
3.4 異步消息
異步消息 API
//發(fā)送普通異步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback);
//發(fā)送普通異步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback);
//發(fā)送普通異步消息-Object嬉愧,并設(shè)置發(fā)送超時(shí)時(shí)間
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);
//發(fā)送普通異步消息-Message贩挣,并設(shè)置發(fā)送超時(shí)時(shí)間
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout);
//發(fā)送普通異步延遲消息,并設(shè)置超時(shí)英染,這個(gè)下文會(huì)演示
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel);
/**
* 發(fā)送異步消息
* 發(fā)送異步消息(通過(guò)線(xiàn)程池執(zhí)行發(fā)送到broker的消息任務(wù)揽惹,執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時(shí)的邏輯)
* (適合對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景)
* @param topic 消息Topic
* @param msg 消息實(shí)體
*
*/
public <T> void asyncSend(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---發(fā)送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---發(fā)送MQ失敗 ex:{}---", topic, throwable.getMessage());
}
});
}
/**
* 發(fā)送異步消息
* 發(fā)送異步消息(通過(guò)線(xiàn)程池執(zhí)行發(fā)送到broker的消息任務(wù),執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時(shí)的邏輯)
* (適合對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景)
* @param topic 消息Topic
* @param message 消息實(shí)體
* @param sendCallback 回調(diào)函數(shù)
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(topic, message, sendCallback);
}
/**
* 發(fā)送異步消息
*
* @param topic 消息Topic
* @param message 消息實(shí)體
* @param sendCallback 回調(diào)函數(shù)
* @param timeout 超時(shí)時(shí)間
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
3.5 單向消息
/**
* 單向消息
* 特點(diǎn)為只負(fù)責(zé)發(fā)送消息四康,不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā)搪搏,即只發(fā)送請(qǐng)求不等待應(yīng)答
* 此方式發(fā)送消息的過(guò)程耗時(shí)非常短,一般在微秒級(jí)別
* 應(yīng)用場(chǎng)景:適用于某些耗時(shí)非常短闪金,但對(duì)可靠性要求并不高的場(chǎng)景疯溺,例如日志收集
* @param topic 消息主題
* @param msg 消息體
* @param <T> 消息泛型
*/
public <T> void sendOneWayMsg(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.sendOneWay(topic, message);
}
3.6 批量消息
/**
* 發(fā)送批量消息
*
* @param topic 消息主題
* @param msgList 消息體集合
* @param <T> 消息泛型
* @return
*/
public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {
List<Message<T>> messageList = msgList.stream()
.map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
return rocketMQTemplate.syncSend(topic, messageList);
}
3.7 延遲消息
同步延遲消息
/**
* 同步延遲消息
* rocketMQ的延遲消息發(fā)送其實(shí)是已發(fā)送就已經(jīng)到broker端了,然后消費(fèi)端會(huì)延遲收到消息哎垦。
* RocketMQ 目前只支持固定精度的定時(shí)消息囱嫩。
* 固定等級(jí):1到18分別對(duì)應(yīng)1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 延遲的底層方法是用定時(shí)任務(wù)實(shí)現(xiàn)的。
* 發(fā)送延時(shí)消息(delayLevel的值就為0漏设,因?yàn)椴谎訒r(shí))
*
* @param topic 消息主題
* @param msg 消息體
* @param timeout 發(fā)送超時(shí)時(shí)間
* @param delayLevel 延遲級(jí)別 1到18
* @param <T> 消息泛型
*/
public <T> void sendDelay(String topic, T msg, long timeout, int delayLevel) {
Message<T> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
}
異步延遲消息
/**
* 發(fā)送異步延遲消息
*
* @param topic 消息Topic
* @param message 消息實(shí)體
* @param sendCallback 回調(diào)函數(shù)
* @param timeout 超時(shí)時(shí)間
* @param delayLevel 延遲消息的級(jí)別
*/
public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}
/**
* 發(fā)送異步延遲消息
*
* @param topic 消息Topic
* @param message 消息實(shí)體
* @param timeout 超時(shí)時(shí)間
* @param delayLevel 延遲消息的級(jí)別
*/
public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---發(fā)送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---發(fā)送MQ失敗 ex:{}---", topic, throwable.getMessage());
}
}, timeout, delayLevel);
}
3.8 順序消息
RocketMQ順序消息墨闲,這里使用rocketmq-spring-boot-starter發(fā)送順序消息就比較方便了,不像使用rocket-client那樣郑口,需要手動(dòng)獲取RocketMQ中當(dāng)前topic的隊(duì)列個(gè)數(shù)然后再通過(guò)hashKey值鸳碧,mqs.size()取模,得到一個(gè)索引值犬性,這里底層都幫我們做好了處理瞻离!
/**
* 發(fā)送順序消息
*
* @param topic 消息主題
* @param msg 消息體
* @param hashKey 確定消息發(fā)送到哪個(gè)隊(duì)列中
* @param <T> 消息泛型
*/
public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("發(fā)送順序消息,topic:{}, hashKey:{}", topic, hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* 發(fā)送順序消息
*
* @param topic 消息主題
* @param msg 消息體
* @param hashKey 確定消息發(fā)送到哪個(gè)隊(duì)列中
* @param timeout 超時(shí)時(shí)間
*/
public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("發(fā)送順序消息乒裆,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
3.9 消費(fèi)者
/**
* @Author: huangyibo
* @Date: 2022/7/2 22:22
* @Description:
* topic需要和生產(chǎn)者的topic一致套利,consumerGroup屬性是必須指定的,內(nèi)容可以隨意
* selectorExpression的意思指的就是tag鹤耍,默認(rèn)為“*”肉迫,不設(shè)置的話(huà)會(huì)監(jiān)聽(tīng)所有消息
*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "Con_Group_One",
topic = "RLT_TEST_TOPIC",
selectorExpression = "tag1")
public class RocketMqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("監(jiān)聽(tīng)到消息:message:{}", message);
}
}
四、事務(wù)消息
事務(wù)消息是RocketMQ提供的非常重要的一個(gè)特性惰蜜,在4.x版本之后開(kāi)源昂拂,可以利用事務(wù)消息輕松地實(shí)現(xiàn)分布式事務(wù)。
RocketMQ在其消息定義的基礎(chǔ)上抛猖,對(duì)事務(wù)消息擴(kuò)展了兩個(gè)相關(guān)的概念:
Half(Prepare) Message——半消息(預(yù)處理消息)
半消息是一種特殊的消息類(lèi)型格侯,該狀態(tài)的消息暫時(shí)不能被Consumer消費(fèi)鼻听。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒(méi)有接收到Producer發(fā)出的二次確認(rèn)時(shí)联四,該事務(wù)消息就處于"暫時(shí)不可被消費(fèi)"狀態(tài)撑碴,該狀態(tài)的事務(wù)消息被稱(chēng)為半消息。
Message Status Check——消息狀態(tài)回查
由于網(wǎng)絡(luò)抖動(dòng)朝墩、Producer重啟等原因醉拓,可能導(dǎo)致Producer向Broker發(fā)送的二次確認(rèn)消息沒(méi)有成功送達(dá)。如果Broker檢測(cè)到某條事務(wù)消息長(zhǎng)時(shí)間處于半消息狀態(tài)收苏,則會(huì)主動(dòng)向Producer端發(fā)起回查操作亿卤,查詢(xún)?cè)撌聞?wù)消息在Producer端的事務(wù)狀態(tài)(Commit 或 Rollback)÷拱裕可以看出排吴,Message Status Check主要用來(lái)解決分布式事務(wù)中的超時(shí)問(wèn)題。
執(zhí)行流程:
- 1懦鼠、應(yīng)用模塊遇到要發(fā)送事務(wù)消息的場(chǎng)景時(shí)钻哩,先發(fā)送prepare消息給MQ。
- 2肛冶、prepare消息發(fā)送成功后街氢,應(yīng)用模塊執(zhí)行數(shù)據(jù)庫(kù)事務(wù)(本地事務(wù))。
- 3睦袖、根據(jù)數(shù)據(jù)庫(kù)事務(wù)執(zhí)行的結(jié)果珊肃,再返回Commit或Rollback給MQ。
- 4馅笙、如果是Commit近范,MQ把消息下發(fā)給Consumer端,如果是Rollback延蟹,直接刪掉prepare消息。
- 5叶堆、第3步的執(zhí)行結(jié)果如果沒(méi)響應(yīng)阱飘,或是超時(shí)的,啟動(dòng)定時(shí)任務(wù)回查事務(wù)狀態(tài)(最多重試15次虱颗,超過(guò)了默認(rèn)丟棄此消息)沥匈,處理結(jié)果同第4步。
- 6忘渔、MQ消費(fèi)的成功機(jī)制由MQ自己保證高帖。
具體實(shí)例:
通過(guò)rocketMQTemplate的sendMessageInTransaction方法發(fā)送事務(wù)消息
/**
* 發(fā)送事務(wù)消息
*
* @param txProducerGroup 事務(wù)消息的生產(chǎn)者組名稱(chēng)
* @param topic 事務(wù)消息主題
* @param tag 事務(wù)消息tag
* @param msg 事務(wù)消息體
* @param arg 事務(wù)消息監(jiān)聽(tīng)器回查參數(shù)
* @param <T> 事務(wù)消息泛型
*/
public <T> void sendTransaction(String txProducerGroup, String topic, String tag, T msg, T arg){
if(!StringUtils.isEmpty(tag)){
topic = topic + ":" + tag;
}
String transactionId = UUID.randomUUID().toString();
Message<T> message = MessageBuilder.withPayload(msg)
//header也有大用處
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", msg.getId())
.build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, arg);
if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
&& result.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("事物消息發(fā)送成功");
}
log.info("事物消息發(fā)送結(jié)果:{}", result);
}
定義本地事務(wù)處理類(lèi),實(shí)現(xiàn)RocketMQLocalTransactionListener接口畦粮,以及加上@RocketMQTransactionListener注解散址,這個(gè)類(lèi)似方法的調(diào)用是異步的乖阵;
executeLocalTransaction方法,當(dāng)我們處理完業(yè)務(wù)后预麸,可以根據(jù)業(yè)務(wù)處理情況瞪浸,返回事務(wù)執(zhí)行狀態(tài),有bollback, commit or unknown三種吏祸,分別是回滾事務(wù)对蒲,提交事務(wù)和未知;根據(jù)事務(wù)消息執(zhí)行流程贡翘,如果返回bollback蹈矮,則直接丟棄消息;如果是返回commit鸣驱,則消費(fèi)消息泛鸟;如果是unknow,則繼續(xù)等待丐巫,然后調(diào)用checkLocalTransaction方法谈况,最多重試15次,超過(guò)了默認(rèn)丟棄此消息递胧;
checkLocalTransaction方法碑韵,是當(dāng)MQ Server未得到MQ發(fā)送方應(yīng)答,或者超時(shí)的情況缎脾,或者應(yīng)答是unknown的情況祝闻,調(diào)用此方法進(jìn)行檢查確認(rèn),返回值和上面的方法一樣遗菠;
/**
* @Author: huangyibo
* @Date: 2022/7/2 23:06
* @Description: 事物消息Producer事務(wù)監(jiān)聽(tīng)器
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private ShareService shareService;
@Autowired
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 發(fā)送prepare消息成功此方法被回調(diào)联喘,該方法用于執(zhí)行本地事務(wù)
* @param message 回傳的消息,利用transactionId即可獲取到該消息的唯一Id
* @param arg 調(diào)用send方法時(shí)傳遞的參數(shù)辙纬,當(dāng)send時(shí)候若有額外的參數(shù)可以傳遞到send方法中豁遭,這里能獲取到
* @return 返回事務(wù)狀態(tài),COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調(diào)
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
MessageHeaders headers = message.getHeaders();
String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.parseInt((String)headers.get("share_id"));
try {
shareService.auditBYIdWithRocketMqLog(shareId,(ShareAuditDTO)auditDTO,transactionId);
//本地事物成功贺拣,執(zhí)行commit
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事物執(zhí)行異常蓖谢,e={}",e);
//本地事物失敗,執(zhí)行rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//mq回調(diào)檢查本地事務(wù)執(zhí)行情況
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog
.builder().transactionId(transactionId).build());
if(rocketmqTransactionLog == null){
log.error("如果本地事物日志沒(méi)有記錄譬涡,transactionId={}",transactionId);
//本地事物失敗闪幽,執(zhí)行rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
//如果本地事物日志有記錄,執(zhí)行commit
return RocketMQLocalTransactionState.COMMIT;
}
}
事務(wù)消息消費(fèi)者和普通消息一致
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group",
topic = "transaction-str",
consumeMode = ConsumeMode.ORDERLY)
public class TransactionConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("監(jiān)聽(tīng)到消息:message:{}", message);
}
}
完整消息發(fā)送api
@Component
@Slf4j
public class RocketMqProducer {
/**
* rocketmq模板注入
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 普通發(fā)送
* @param topic 消息主題
* @param msg 消息體
* @param <T> 消息泛型
*/
public <T> void send(String topic, T msg) {
rocketMQTemplate.convertAndSend(topic + ":tag1", msg);
//rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(msg).build()); // 等價(jià)于上面一行
}
/**
* 發(fā)送帶tag的消息涡匀,直接在topic后面加上":tag"
*
* @param topic 消息主題
* @param tag 消息tag
* @param msg 消息體
* @param <T> 消息泛型
* @return
*/
public <T> SendResult sendTagMsg(String topic, String tag, T msg) {
topic = topic + ":" + tag;
return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build());
}
/**
* 發(fā)送同步消息(阻塞當(dāng)前線(xiàn)程盯腌,等待broker響應(yīng)發(fā)送結(jié)果,這樣不太容易丟失消息)
* sendResult為返回的發(fā)送結(jié)果
*/
public <T> SendResult sendMsg(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
/**
* 發(fā)送異步消息
* 發(fā)送異步消息(通過(guò)線(xiàn)程池執(zhí)行發(fā)送到broker的消息任務(wù)陨瘩,執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時(shí)的邏輯)
* (適合對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景)
* @param topic 消息Topic
* @param msg 消息實(shí)體
*
*/
public <T> void asyncSend(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---發(fā)送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---發(fā)送MQ失敗 ex:{}---", topic, throwable.getMessage());
}
});
}
/**
* 發(fā)送異步消息
* 發(fā)送異步消息(通過(guò)線(xiàn)程池執(zhí)行發(fā)送到broker的消息任務(wù)腕够,執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時(shí)的邏輯)
* (適合對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景)
* @param topic 消息Topic
* @param message 消息實(shí)體
* @param sendCallback 回調(diào)函數(shù)
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(topic, message, sendCallback);
}
/**
* 發(fā)送異步消息
*
* @param topic 消息Topic
* @param message 消息實(shí)體
* @param sendCallback 回調(diào)函數(shù)
* @param timeout 超時(shí)時(shí)間
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
/**
* 同步延遲消息
* rocketMQ的延遲消息發(fā)送其實(shí)是已發(fā)送就已經(jīng)到broker端了级乍,然后消費(fèi)端會(huì)延遲收到消息。
* RocketMQ 目前只支持固定精度的定時(shí)消息燕少。
* 固定等級(jí):1到18分別對(duì)應(yīng)1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 延遲的底層方法是用定時(shí)任務(wù)實(shí)現(xiàn)的卡者。
* 發(fā)送延時(shí)消息(delayLevel的值就為0,因?yàn)椴谎訒r(shí))
*
* @param topic 消息主題
* @param msg 消息體
* @param timeout 發(fā)送超時(shí)時(shí)間
* @param delayLevel 延遲級(jí)別 1到18
* @param <T> 消息泛型
*/
public <T> void sendDelay(String topic, T msg, long timeout, int delayLevel) {
Message<T> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
}
/**
* 發(fā)送異步延遲消息
*
* @param topic 消息Topic
* @param message 消息實(shí)體
* @param sendCallback 回調(diào)函數(shù)
* @param timeout 超時(shí)時(shí)間
* @param delayLevel 延遲消息的級(jí)別
*/
public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}
/**
* 發(fā)送異步延遲消息
*
* @param topic 消息Topic
* @param message 消息實(shí)體
* @param timeout 超時(shí)時(shí)間
* @param delayLevel 延遲消息的級(jí)別
*/
public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---發(fā)送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---發(fā)送MQ失敗 ex:{}---", topic, throwable.getMessage());
}
}, timeout, delayLevel);
}
/**
* 單向消息
* 特點(diǎn)為只負(fù)責(zé)發(fā)送消息客们,不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā)崇决,即只發(fā)送請(qǐng)求不等待應(yīng)答
* 此方式發(fā)送消息的過(guò)程耗時(shí)非常短,一般在微秒級(jí)別
* 應(yīng)用場(chǎng)景:適用于某些耗時(shí)非常短底挫,但對(duì)可靠性要求并不高的場(chǎng)景恒傻,例如日志收集
* @param topic 消息主題
* @param msg 消息體
* @param <T> 消息泛型
*/
public <T> void sendOneWayMsg(String topic, T msg) {
Message<T> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.sendOneWay(topic, message);
}
/**
* 發(fā)送順序消息
*
* @param topic 消息主題
* @param msg 消息體
* @param hashKey 確定消息發(fā)送到哪個(gè)隊(duì)列中
* @param <T> 消息泛型
*/
public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("發(fā)送順序消息,topic:{}, hashKey:{}", topic, hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* 發(fā)送順序消息
*
* @param topic 消息主題
* @param msg 消息體
* @param hashKey 確定消息發(fā)送到哪個(gè)隊(duì)列中
* @param timeout 超時(shí)時(shí)間
*/
public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
Message<T> message = MessageBuilder.withPayload(msg).build();
log.info("發(fā)送順序消息建邓,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
/**
* 發(fā)送批量消息
*
* @param topic 消息主題
* @param msgList 消息體集合
* @param <T> 消息泛型
* @return
*/
public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {
List<Message<T>> messageList = msgList.stream()
.map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
return rocketMQTemplate.syncSend(topic, messageList);
}
/**
* 發(fā)送事務(wù)消息
*
* @param txProducerGroup 事務(wù)消息的生產(chǎn)者組名稱(chēng)
* @param topic 事務(wù)消息主題
* @param tag 事務(wù)消息tag
* @param msg 事務(wù)消息體
* @param arg 事務(wù)消息監(jiān)聽(tīng)器回查參數(shù)
* @param <T> 事務(wù)消息泛型
*/
public <T> void sendTransaction(String txProducerGroup, String topic, String tag, T msg, T arg){
if(!StringUtils.isEmpty(tag)){
topic = topic + ":" + tag;
}
String transactionId = UUID.randomUUID().toString();
Message<T> message = MessageBuilder.withPayload(msg)
//header也有大用處
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", msg.getId())
.build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, arg);
if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
&& result.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("事物消息發(fā)送成功");
}
log.info("事物消息發(fā)送結(jié)果:{}", result);
}
}
上面寫(xiě)的這幾個(gè)消息發(fā)送方法盈厘,你應(yīng)該注意到了: 前兩個(gè)方法的參數(shù) topic 和其它的不一樣
其實(shí)這是 rocketmq 和 springboot 整合后設(shè)置 Tag 的方式(Tag:用于區(qū)分過(guò)濾同一主題下的不同業(yè)務(wù)類(lèi)型的消息,非常實(shí)用)
在項(xiàng)目里往mq寫(xiě)入消息時(shí)官边,最好每條消息都帶上tag沸手,用于消費(fèi)時(shí)根據(jù)業(yè)務(wù)過(guò)濾
在 rocketmq-spring-boot-starter 中,Tag 的設(shè)置方式: 在 topic后面加上 “:tagName”
另外注簿,從上面的截圖中可以看到“key”的設(shè)置方式契吉,發(fā)送消息時(shí)在header中設(shè)置:
MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS, "key1")
總結(jié):
- 1、實(shí)際運(yùn)用中一些配置不要像我上面一樣寫(xiě)在代碼里诡渴,寫(xiě)在配置文件里或統(tǒng)一配置捐晶。
- 2、消息發(fā)送成功與失敗可以根據(jù)sendResult判斷妄辩,消息消費(fèi)成功與否其實(shí)源碼內(nèi)部已做了處理惑灵,只要不出現(xiàn)異常,就是消費(fèi)成功眼耀,如果你業(yè)務(wù)代碼邏輯有問(wèn)題那另說(shuō)英支。
- 3、實(shí)際生產(chǎn)中還要注意重復(fù)消費(fèi)問(wèn)題哮伟,這里我提供一個(gè)方法:在數(shù)據(jù)庫(kù)加一個(gè)去重表潭辈,給表里的一個(gè)字段如key添加唯一索引,消費(fèi)前先入庫(kù)澈吨,正常則往下執(zhí)行你的業(yè)務(wù)邏輯,入庫(kù)失敗了表明該消息已消費(fèi)過(guò)寄摆,不能往下走了谅辣。
- 4、其實(shí)rocketmq還有一個(gè)很重要的特性:事務(wù)婶恼,其它mq可是不支持的桑阶,利用事務(wù)可以做很多事柏副,如跟錢(qián)相關(guān)的業(yè)務(wù)、分布式事務(wù)蚣录,不過(guò)事務(wù)的實(shí)現(xiàn)過(guò)程要麻煩點(diǎn)割择。
- 5、上面就是RocketMQ與Springboot的整合萎河,整合了使用起來(lái)還是比較簡(jiǎn)單的
參考:
https://blog.csdn.net/CSDN877425287/article/details/121964142
https://blog.csdn.net/qq_36737803/article/details/112261352
https://blog.csdn.net/caoli201314/article/details/120248361