SpringBoot——整合RocketMQ

一、RocketMQ 的前世今生

RocketMQ 是阿里巴巴開(kāi)源的分布式消息系統(tǒng)赖瞒,基于高可用分布式集群技術(shù)女揭,提供低延時(shí)的、高可靠的消息發(fā)布與訂閱服務(wù)栏饮,是一個(gè)統(tǒng)一的消息引擎吧兔,輕量級(jí)的數(shù)據(jù)處理平臺(tái)。起源于阿里巴巴 2001 年的五彩石項(xiàng)目袍嬉, Notify 在這期間應(yīng)運(yùn)而生境蔼,用于交易核心消息的流轉(zhuǎn) 。

2010 年, B2B 開(kāi)始大規(guī)模使用 ActiveMQ 作為消息內(nèi)核欧穴。

2011 年民逼,隨著阿里業(yè)務(wù)的快速發(fā)展,急需一款支持順序消息涮帘,擁有海量消息堆積能力的消息中間件拼苍, MetaQ 1.0 因此誕生。

2012 年调缨, MetaQ已經(jīng)發(fā)展到了3.0版本疮鲫,RocketMQ 正是基于 MetaQ 3.0 開(kāi)發(fā)的分布式消息傳遞中間件,專(zhuān)為萬(wàn)億級(jí)的消息處理而設(shè)計(jì)弦叶,具有高吞吐量俊犯,低延遲,海量積累和有序消息伤哺。它是阿里巴巴和眾多大型互聯(lián)網(wǎng)業(yè)務(wù)場(chǎng)景的雙十一購(gòu)物狂歡的現(xiàn)成工具燕侠。由于這些優(yōu)點(diǎn),它吸引了越來(lái)越多的應(yīng)用程序進(jìn)行訪問(wèn)立莉。同年绢彤,阿里巴巴正式開(kāi)源了 RocketMQ 的第一個(gè)版本。

2015 年蜓耻,RocketMQ 見(jiàn)證了消息傳遞的多項(xiàng)重量級(jí)功能茫舶,包括事務(wù)消息、SQL過(guò)濾器刹淌、消息追溯、調(diào)度消息疹启、多站點(diǎn)高可用性等柠衅,以滿(mǎn)足阿里巴巴日益豐富的業(yè)務(wù)場(chǎng)景皮仁。它還取代了阿里巴巴自主研發(fā)的另一款 MQ 產(chǎn)品 Notify菲宴,成為阿里巴巴首選的消息中間件趋急。

2016 年喝峦,RocketMQ 在阿里云上開(kāi)發(fā)了首個(gè)全托管服務(wù)呜达,幫助大量數(shù)字化轉(zhuǎn)型企業(yè)構(gòu)建現(xiàn)代應(yīng)用,并開(kāi)始體驗(yàn)大規(guī)模的云計(jì)算實(shí)踐挤忙。同年册烈,RocketMQ 被捐贈(zèng)給Apache基金會(huì)婿禽,并加入孵化器項(xiàng)目扭倾,旨在在未來(lái)為更多的開(kāi)發(fā)者服務(wù)膛壹。

2016 年模聋,RocketMQ 榮獲中國(guó)最受歡迎開(kāi)源軟件獎(jiǎng)撬槽。

2017 年此改,從 Apache 基金會(huì)畢業(yè)后共啃,RocketMQ被指定為頂級(jí)項(xiàng)目(TLP)移剪。

2018 年纵苛,RocketMQ 榮獲中國(guó)最受歡迎開(kāi)源軟件獎(jiǎng)攻人。

二怀吻、RocketMQ 的基本概念

2.1 消息模型(Message Model)

RocketMQ主要由 Producer蓬坡、Broker屑咳、Consumer 三部分組成兆龙,其中Producer 負(fù)責(zé)生產(chǎn)消息详瑞,Consumer 負(fù)責(zé)消費(fèi)消息坝橡,Broker 負(fù)責(zé)存儲(chǔ)消息计寇。Broker 在實(shí)際部署過(guò)程中對(duì)應(yīng)一臺(tái)服務(wù)器番宁,每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息蝶押,每個(gè)Topic的消息也可以分片存儲(chǔ)于不同的 Broker棋电。Message Queue 用于存儲(chǔ)消息的物理地址赶盔,每個(gè)Topic中的消息地址存儲(chǔ)于多個(gè) Message Queue 中于未。ConsumerGroup 由多個(gè)Consumer 實(shí)例構(gòu)成烘浦。

2.2 消息生產(chǎn)者(Producer)

負(fù)責(zé)生產(chǎn)消息谎倔,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)生產(chǎn)消息片习。一個(gè)消息生產(chǎn)者會(huì)把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到 broker 服務(wù)器状知。RocketMQ 提供多種發(fā)送方式饥悴,同步發(fā)送西设、異步發(fā)送贷揽、順序發(fā)送禽绪、單向發(fā)送印屁。同步和異步方式均需要 Broker 返回確認(rèn)信息雄人,單向發(fā)送不需要础钠。

2.3 消息消費(fèi)者(Consumer)

負(fù)責(zé)消費(fèi)消息珍坊,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)阵漏。一個(gè)消息消費(fèi)者會(huì)從 Broker 服務(wù)器拉取消息、并將其提供給應(yīng)用程序叹洲。從用戶(hù)應(yīng)用的角度而言提供了兩種消費(fèi)形式:拉取式消費(fèi)运提、推動(dòng)式消費(fèi)民泵。

2.4 主題(Topic)

表示一類(lèi)消息的集合栈妆,每個(gè)主題包含若干條消息鳞尔,每條消息只能屬于一個(gè)主題寥假,是RocketMQ 進(jìn)行消息訂閱的基本單位拾给。

2.5 代理服務(wù)器(Broker Server)

消息中轉(zhuǎn)角色蒋得,負(fù)責(zé)存儲(chǔ)消息额衙、轉(zhuǎn)發(fā)消息窍侧。代理服務(wù)器在 RocketMQ 系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來(lái)的消息并存儲(chǔ)伟件、同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備。代理服務(wù)器也存儲(chǔ)消息相關(guān)的元數(shù)據(jù)咧织,包括消費(fèi)者組习绢、消費(fèi)進(jìn)度偏移和主題和隊(duì)列消息等闪萄。

2.6 名字服務(wù)(Name Server)

名稱(chēng)服務(wù)充當(dāng)路由消息的提供者桃煎。生產(chǎn)者或消費(fèi)者能夠通過(guò)名字服務(wù)查找各主題相應(yīng)的Broker IP 列表。多個(gè) Namesrv 實(shí)例組成集群葫辐,但相互獨(dú)立耿战,沒(méi)有信息交換剂陡。

2.7 拉取式消費(fèi)(Pull Consumer)

Consumer消費(fèi)的一種類(lèi)型,應(yīng)用通常主動(dòng)調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息晕鹊、主動(dòng)權(quán)由應(yīng)用控制溅话。一旦獲取了批量消息飞几,應(yīng)用就會(huì)啟動(dòng)消費(fèi)過(guò)程屑墨。

2.8 推動(dòng)式消費(fèi)(Push Consumer)

Consumer消費(fèi)的一種類(lèi)型,該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給消費(fèi)端程腹,該消費(fèi)模式一般實(shí)時(shí)性較高寸潦。

2.9 生產(chǎn)者組(Producer Group)

同一類(lèi)Producer的集合见转,這類(lèi)Producer發(fā)送同一類(lèi)消息且發(fā)送邏輯一致吏砂。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰狐血,則Broker服務(wù)器會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)匈织。

2.10 消費(fèi)者組(Consumer Group)

同一類(lèi)Consumer的集合,這類(lèi)Consumer通常消費(fèi)同一類(lèi)消息且消費(fèi)邏輯一致乡小。消費(fèi)者組使得在消息消費(fèi)方面劲件,實(shí)現(xiàn)負(fù)載均衡和容錯(cuò)的目標(biāo)變得非常容易。要注意的是牵辣,消費(fèi)者組的消費(fèi)者實(shí)例必須訂閱完全相同的Topic纬向。RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。

2.11 集群消費(fèi)(Clustering)

集群消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例平均分?jǐn)傁ⅰ?/p>

2.12 廣播消費(fèi)(Broadcasting)

廣播消費(fèi)模式下师脂,相同Consumer Group的每個(gè)Consumer實(shí)例都接收全量的消息吃警。

2.13 普通順序消息(Normal Ordered Message)

普通順序消費(fèi)模式下拌消,消費(fèi)者通過(guò)同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的墩崩,不同消息隊(duì)列收到的消息則可能是無(wú)順序的泰鸡。

2.14 嚴(yán)格順序消息(Strictly Ordered Message)

嚴(yán)格順序消息模式下饰迹,消費(fèi)者收到的所有消息均是有順序的啊鸭。

2.15 消息(Message)

消息系統(tǒng)所傳輸信息的物理載體赠制,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位烟号,每條消息必須屬于一個(gè)主題汪拥。RocketMQ中每個(gè)消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key脯燃。系統(tǒng)提供了通過(guò)Message ID和Key查詢(xún)消息的功能。

2.16 標(biāo)簽(Tag)

為消息設(shè)置的標(biāo)志坟募,用于同一主題下區(qū)分不同類(lèi)型的消息懈糯。來(lái)自同一業(yè)務(wù)單元的消息她紫,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽贿讹。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性民褂,并優(yōu)化RocketMQ提供的查詢(xún)系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯哭廉,實(shí)現(xiàn)更好的擴(kuò)展性。

2.17 DefaultMQPushConsumer和DefaultMQPullConsumer

Push的方式是 Server端接收到消息后,主動(dòng)把消息推送給 Client端赎离,主動(dòng)權(quán)在Server端,實(shí)時(shí)性高荣病。用 Push方式主動(dòng)推送有很多弊 端:首先是加大 Server 端的 工作量个盆,進(jìn)而影響 Server 的性能;其次柴梆,Client 的處理能力各不相同绍在, Client 的狀態(tài)不受 Server 控制,

Pull方式是 Client端循環(huán)地從 Server端拉取消息溜宽,主動(dòng)權(quán)在 Client手里, 自己拉取到一定量消息后涡扼,處理妥當(dāng)了再接著取汤善。Pull 方式的問(wèn)題是循環(huán)拉取 消息的間隔不好設(shè)定红淡,間隔太短就處在一個(gè) “忙等”的狀態(tài)不狮,浪費(fèi)資源; Pull 的時(shí)間間隔太長(zhǎng) Server 端有消息到來(lái)時(shí) 有可能沒(méi)有被及時(shí)處理。

三在旱、SpringBoot整合RocketMQ

3.1 添加rocketmq-spring-boot-starter等相關(guān)依賴(lài)

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

3.2 添加配置

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    #必須指定group
    group: test-group

3.3 同步消息

同步消息 API

//發(fā)送普通同步消息-Object
syncSend(String destination, Object payload);

//發(fā)送普通同步消息-Message
syncSend(String destination, Message<?> message);

//發(fā)送批量普通同步消息
syncSend(String destination, Collection<T> messages);

//發(fā)送普通同步消息-Object摇零,并設(shè)置發(fā)送超時(shí)時(shí)間
syncSend(String destination, Object payload, long timeout);

//發(fā)送普通同步消息-Message,并設(shè)置發(fā)送超時(shí)時(shí)間
syncSend(String destination, Message<?> message, long timeout);

//發(fā)送批量普通同步消息驻仅,并設(shè)置發(fā)送超時(shí)時(shí)間
syncSend(String destination, Collection<T> messages, long timeout);

//發(fā)送普通同步延遲消息,并設(shè)置超時(shí)登渣,這個(gè)下文會(huì)演示
syncSend(String destination, Message<?> message, long timeout, int delayLevel);
/**
 * 普通發(fā)送
 * @param topic     消息主題
 * @param msg       消息體
 * @param <T>       消息泛型
 */
public <T> void send(String topic, T msg) {
    rocketMQTemplate.convertAndSend(topic + ":tag1", msg);
    //rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(msg).build()); // 等價(jià)于上面一行
}


/**
 * 發(fā)送帶tag的消息噪服,直接在topic后面加上":tag"
 *
 * @param topic     消息主題
 * @param tag       消息tag
 * @param msg       消息體
 * @param <T>       消息泛型
 * @return
 */
public <T> SendResult sendTagMsg(String topic, String tag, T msg) {
    topic = topic + ":" + tag;
    return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build());
}


/**
 * 發(fā)送同步消息(阻塞當(dāng)前線(xiàn)程,等待broker響應(yīng)發(fā)送結(jié)果胜茧,這樣不太容易丟失消息)
 * sendResult為返回的發(fā)送結(jié)果
 */
public <T> SendResult sendMsg(String topic, T msg) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
    log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
    return sendResult;
}

這里存在兩種消息體粘优,一種是Object的,另一種是Message<?>的形式的,其實(shí)我們發(fā)送Object的時(shí)候雹顺,底層是有幫我們做轉(zhuǎn)換的丹墨,其實(shí)和我們?cè)谏蠈诱{(diào)用。

MessageBuilder.withPayload("hello world test1").build()

3.4 異步消息

異步消息 API

//發(fā)送普通異步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback);

//發(fā)送普通異步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback);

//發(fā)送普通異步消息-Object嬉愧,并設(shè)置發(fā)送超時(shí)時(shí)間
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);

//發(fā)送普通異步消息-Message贩挣,并設(shè)置發(fā)送超時(shí)時(shí)間
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout);

//發(fā)送普通異步延遲消息,并設(shè)置超時(shí)英染,這個(gè)下文會(huì)演示
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
        int delayLevel);
/**
 * 發(fā)送異步消息
 * 發(fā)送異步消息(通過(guò)線(xiàn)程池執(zhí)行發(fā)送到broker的消息任務(wù)揽惹,執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時(shí)的邏輯)
 * (適合對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景)
 * @param topic     消息Topic
 * @param msg       消息實(shí)體
 *
 */
public <T> void asyncSend(String topic, T msg) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    asyncSend(topic, message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("topic:{}消息---發(fā)送MQ成功---", topic);
        }

        @Override
        public void onException(Throwable throwable) {
            log.error("topic:{}消息---發(fā)送MQ失敗 ex:{}---", topic, throwable.getMessage());
        }
    });
}


/**
 * 發(fā)送異步消息
 * 發(fā)送異步消息(通過(guò)線(xiàn)程池執(zhí)行發(fā)送到broker的消息任務(wù),執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時(shí)的邏輯)
 * (適合對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景)
 * @param topic        消息Topic
 * @param message      消息實(shí)體
 * @param sendCallback 回調(diào)函數(shù)
 */
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
    rocketMQTemplate.asyncSend(topic, message, sendCallback);
}


/**
 * 發(fā)送異步消息
 *
 * @param topic         消息Topic
 * @param message       消息實(shí)體
 * @param sendCallback  回調(diào)函數(shù)
 * @param timeout       超時(shí)時(shí)間
 */
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
    rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}

3.5 單向消息

/**
 * 單向消息
 * 特點(diǎn)為只負(fù)責(zé)發(fā)送消息四康,不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā)搪搏,即只發(fā)送請(qǐng)求不等待應(yīng)答
 * 此方式發(fā)送消息的過(guò)程耗時(shí)非常短,一般在微秒級(jí)別
 * 應(yīng)用場(chǎng)景:適用于某些耗時(shí)非常短闪金,但對(duì)可靠性要求并不高的場(chǎng)景疯溺,例如日志收集
 * @param topic     消息主題
 * @param msg       消息體
 * @param <T>       消息泛型
 */
public <T> void sendOneWayMsg(String topic, T msg) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    rocketMQTemplate.sendOneWay(topic, message);
}

3.6 批量消息

/**
 * 發(fā)送批量消息
 *
 * @param topic     消息主題
 * @param msgList   消息體集合
 * @param <T>       消息泛型
 * @return
 */
public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {
    List<Message<T>> messageList = msgList.stream()
            .map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
    return rocketMQTemplate.syncSend(topic, messageList);
}

3.7 延遲消息

同步延遲消息

/**
 * 同步延遲消息
 * rocketMQ的延遲消息發(fā)送其實(shí)是已發(fā)送就已經(jīng)到broker端了,然后消費(fèi)端會(huì)延遲收到消息哎垦。
 * RocketMQ 目前只支持固定精度的定時(shí)消息囱嫩。
 * 固定等級(jí):1到18分別對(duì)應(yīng)1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 * 延遲的底層方法是用定時(shí)任務(wù)實(shí)現(xiàn)的。
 * 發(fā)送延時(shí)消息(delayLevel的值就為0漏设,因?yàn)椴谎訒r(shí))
 *
 * @param topic         消息主題
 * @param msg           消息體
 * @param timeout       發(fā)送超時(shí)時(shí)間
 * @param delayLevel    延遲級(jí)別  1到18
 * @param <T>           消息泛型
 */
public <T> void sendDelay(String topic, T msg, long timeout, int delayLevel) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
}

異步延遲消息

/**
 * 發(fā)送異步延遲消息
 *
 * @param topic        消息Topic
 * @param message      消息實(shí)體
 * @param sendCallback 回調(diào)函數(shù)
 * @param timeout      超時(shí)時(shí)間
 * @param delayLevel   延遲消息的級(jí)別
 */
public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
    rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}


/**
 * 發(fā)送異步延遲消息
 *
 * @param topic        消息Topic
 * @param message      消息實(shí)體
 * @param timeout      超時(shí)時(shí)間
 * @param delayLevel   延遲消息的級(jí)別
 */
public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {
    rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("topic:{}消息---發(fā)送MQ成功---", topic);
        }

        @Override
        public void onException(Throwable throwable) {
            log.error("topic:{}消息---發(fā)送MQ失敗 ex:{}---", topic, throwable.getMessage());
        }
    }, timeout, delayLevel);
}

3.8 順序消息

RocketMQ順序消息墨闲,這里使用rocketmq-spring-boot-starter發(fā)送順序消息就比較方便了,不像使用rocket-client那樣郑口,需要手動(dòng)獲取RocketMQ中當(dāng)前topic的隊(duì)列個(gè)數(shù)然后再通過(guò)hashKey值鸳碧,mqs.size()取模,得到一個(gè)索引值犬性,這里底層都幫我們做好了處理瞻离!

/**
 * 發(fā)送順序消息
 *
 * @param topic     消息主題
 * @param msg       消息體
 * @param hashKey   確定消息發(fā)送到哪個(gè)隊(duì)列中
 * @param <T>       消息泛型
 */
public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    log.info("發(fā)送順序消息,topic:{}, hashKey:{}", topic, hashKey);
    rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}


/**
 * 發(fā)送順序消息
 *
 * @param topic     消息主題
 * @param msg       消息體
 * @param hashKey   確定消息發(fā)送到哪個(gè)隊(duì)列中
 * @param timeout   超時(shí)時(shí)間
 */
public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    log.info("發(fā)送順序消息乒裆,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
    rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}

3.9 消費(fèi)者

/**
 * @Author: huangyibo
 * @Date: 2022/7/2 22:22
 * @Description:
 * topic需要和生產(chǎn)者的topic一致套利,consumerGroup屬性是必須指定的,內(nèi)容可以隨意
 * selectorExpression的意思指的就是tag鹤耍,默認(rèn)為“*”肉迫,不設(shè)置的話(huà)會(huì)監(jiān)聽(tīng)所有消息
 */

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "Con_Group_One",
        topic = "RLT_TEST_TOPIC",
        selectorExpression = "tag1")
public class RocketMqConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("監(jiān)聽(tīng)到消息:message:{}", message);
    }
}

四、事務(wù)消息

事務(wù)消息是RocketMQ提供的非常重要的一個(gè)特性惰蜜,在4.x版本之后開(kāi)源昂拂,可以利用事務(wù)消息輕松地實(shí)現(xiàn)分布式事務(wù)。

RocketMQ在其消息定義的基礎(chǔ)上抛猖,對(duì)事務(wù)消息擴(kuò)展了兩個(gè)相關(guān)的概念:

Half(Prepare) Message——半消息(預(yù)處理消息)

半消息是一種特殊的消息類(lèi)型格侯,該狀態(tài)的消息暫時(shí)不能被Consumer消費(fèi)鼻听。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒(méi)有接收到Producer發(fā)出的二次確認(rèn)時(shí)联四,該事務(wù)消息就處于"暫時(shí)不可被消費(fèi)"狀態(tài)撑碴,該狀態(tài)的事務(wù)消息被稱(chēng)為半消息。

Message Status Check——消息狀態(tài)回查

由于網(wǎng)絡(luò)抖動(dòng)朝墩、Producer重啟等原因醉拓,可能導(dǎo)致Producer向Broker發(fā)送的二次確認(rèn)消息沒(méi)有成功送達(dá)。如果Broker檢測(cè)到某條事務(wù)消息長(zhǎng)時(shí)間處于半消息狀態(tài)收苏,則會(huì)主動(dòng)向Producer端發(fā)起回查操作亿卤,查詢(xún)?cè)撌聞?wù)消息在Producer端的事務(wù)狀態(tài)(Commit 或 Rollback)÷拱裕可以看出排吴,Message Status Check主要用來(lái)解決分布式事務(wù)中的超時(shí)問(wèn)題。

執(zhí)行流程:

  • 1懦鼠、應(yīng)用模塊遇到要發(fā)送事務(wù)消息的場(chǎng)景時(shí)钻哩,先發(fā)送prepare消息給MQ。
  • 2肛冶、prepare消息發(fā)送成功后街氢,應(yīng)用模塊執(zhí)行數(shù)據(jù)庫(kù)事務(wù)(本地事務(wù))。
  • 3睦袖、根據(jù)數(shù)據(jù)庫(kù)事務(wù)執(zhí)行的結(jié)果珊肃,再返回Commit或Rollback給MQ。
  • 4馅笙、如果是Commit近范,MQ把消息下發(fā)給Consumer端,如果是Rollback延蟹,直接刪掉prepare消息。
  • 5叶堆、第3步的執(zhí)行結(jié)果如果沒(méi)響應(yīng)阱飘,或是超時(shí)的,啟動(dòng)定時(shí)任務(wù)回查事務(wù)狀態(tài)(最多重試15次虱颗,超過(guò)了默認(rèn)丟棄此消息)沥匈,處理結(jié)果同第4步。
  • 6忘渔、MQ消費(fèi)的成功機(jī)制由MQ自己保證高帖。

具體實(shí)例:

通過(guò)rocketMQTemplate的sendMessageInTransaction方法發(fā)送事務(wù)消息

/**
 * 發(fā)送事務(wù)消息
 *
 * @param txProducerGroup   事務(wù)消息的生產(chǎn)者組名稱(chēng)
 * @param topic             事務(wù)消息主題
 * @param tag               事務(wù)消息tag
 * @param msg               事務(wù)消息體
 * @param arg               事務(wù)消息監(jiān)聽(tīng)器回查參數(shù)
 * @param <T>               事務(wù)消息泛型
 */
public <T> void sendTransaction(String txProducerGroup, String topic, String tag, T msg, T arg){
    if(!StringUtils.isEmpty(tag)){
        topic = topic + ":" + tag;
    }
    String transactionId = UUID.randomUUID().toString();
    Message<T> message = MessageBuilder.withPayload(msg)
            //header也有大用處
            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
            .setHeader("share_id", msg.getId())
            .build();
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, arg);
    if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
            && result.getSendStatus().equals(SendStatus.SEND_OK)){
        log.info("事物消息發(fā)送成功");
    }
    log.info("事物消息發(fā)送結(jié)果:{}", result);
}

定義本地事務(wù)處理類(lèi),實(shí)現(xiàn)RocketMQLocalTransactionListener接口畦粮,以及加上@RocketMQTransactionListener注解散址,這個(gè)類(lèi)似方法的調(diào)用是異步的乖阵;

executeLocalTransaction方法,當(dāng)我們處理完業(yè)務(wù)后预麸,可以根據(jù)業(yè)務(wù)處理情況瞪浸,返回事務(wù)執(zhí)行狀態(tài),有bollback, commit or unknown三種吏祸,分別是回滾事務(wù)对蒲,提交事務(wù)和未知;根據(jù)事務(wù)消息執(zhí)行流程贡翘,如果返回bollback蹈矮,則直接丟棄消息;如果是返回commit鸣驱,則消費(fèi)消息泛鸟;如果是unknow,則繼續(xù)等待丐巫,然后調(diào)用checkLocalTransaction方法谈况,最多重試15次,超過(guò)了默認(rèn)丟棄此消息递胧;

checkLocalTransaction方法碑韵,是當(dāng)MQ Server未得到MQ發(fā)送方應(yīng)答,或者超時(shí)的情況缎脾,或者應(yīng)答是unknown的情況祝闻,調(diào)用此方法進(jìn)行檢查確認(rèn),返回值和上面的方法一樣遗菠;

/**
 * @Author: huangyibo
 * @Date: 2022/7/2 23:06
 * @Description: 事物消息Producer事務(wù)監(jiān)聽(tīng)器
 */

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Autowired
    private ShareService shareService;

    @Autowired
    private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 發(fā)送prepare消息成功此方法被回調(diào)联喘,該方法用于執(zhí)行本地事務(wù)
     * @param message   回傳的消息,利用transactionId即可獲取到該消息的唯一Id
     * @param arg       調(diào)用send方法時(shí)傳遞的參數(shù)辙纬,當(dāng)send時(shí)候若有額外的參數(shù)可以傳遞到send方法中豁遭,這里能獲取到
     * @return          返回事務(wù)狀態(tài),COMMIT:提交  ROLLBACK:回滾  UNKNOW:回調(diào)
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        MessageHeaders headers = message.getHeaders();
        String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.parseInt((String)headers.get("share_id"));
        try {
            shareService.auditBYIdWithRocketMqLog(shareId,(ShareAuditDTO)auditDTO,transactionId);

            //本地事物成功贺拣,執(zhí)行commit
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("本地事物執(zhí)行異常蓖谢,e={}",e);
            //本地事物失敗,執(zhí)行rollback
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //mq回調(diào)檢查本地事務(wù)執(zhí)行情況
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);

        RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog
                .builder().transactionId(transactionId).build());
        if(rocketmqTransactionLog == null){
            log.error("如果本地事物日志沒(méi)有記錄譬涡,transactionId={}",transactionId);
            //本地事物失敗闪幽,執(zhí)行rollback
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        //如果本地事物日志有記錄,執(zhí)行commit
        return RocketMQLocalTransactionState.COMMIT;
    }
}

事務(wù)消息消費(fèi)者和普通消息一致

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group",
        topic = "transaction-str",
        consumeMode = ConsumeMode.ORDERLY)
public class TransactionConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("監(jiān)聽(tīng)到消息:message:{}", message);
    }

}

完整消息發(fā)送api

@Component
@Slf4j
public class RocketMqProducer {

    /**
     * rocketmq模板注入
     */
    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    /**
     * 普通發(fā)送
     * @param topic     消息主題
     * @param msg       消息體
     * @param <T>       消息泛型
     */
    public <T> void send(String topic, T msg) {
        rocketMQTemplate.convertAndSend(topic + ":tag1", msg);
        //rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(msg).build()); // 等價(jià)于上面一行
    }


    /**
     * 發(fā)送帶tag的消息涡匀,直接在topic后面加上":tag"
     *
     * @param topic     消息主題
     * @param tag       消息tag
     * @param msg       消息體
     * @param <T>       消息泛型
     * @return
     */
    public <T> SendResult sendTagMsg(String topic, String tag, T msg) {
        topic = topic + ":" + tag;
        return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build());
    }


    /**
     * 發(fā)送同步消息(阻塞當(dāng)前線(xiàn)程盯腌,等待broker響應(yīng)發(fā)送結(jié)果,這樣不太容易丟失消息)
     * sendResult為返回的發(fā)送結(jié)果
     */
    public <T> SendResult sendMsg(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
        log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
        return sendResult;
    }


    /**
     * 發(fā)送異步消息
     * 發(fā)送異步消息(通過(guò)線(xiàn)程池執(zhí)行發(fā)送到broker的消息任務(wù)陨瘩,執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時(shí)的邏輯)
     * (適合對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景)
     * @param topic     消息Topic
     * @param msg       消息實(shí)體
     *
     */
    public <T> void asyncSend(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("topic:{}消息---發(fā)送MQ成功---", topic);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("topic:{}消息---發(fā)送MQ失敗 ex:{}---", topic, throwable.getMessage());
            }
        });
    }


    /**
     * 發(fā)送異步消息
     * 發(fā)送異步消息(通過(guò)線(xiàn)程池執(zhí)行發(fā)送到broker的消息任務(wù)腕够,執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時(shí)的邏輯)
     * (適合對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景)
     * @param topic        消息Topic
     * @param message      消息實(shí)體
     * @param sendCallback 回調(diào)函數(shù)
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback);
    }


    /**
     * 發(fā)送異步消息
     *
     * @param topic         消息Topic
     * @param message       消息實(shí)體
     * @param sendCallback  回調(diào)函數(shù)
     * @param timeout       超時(shí)時(shí)間
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
    }


    /**
     * 同步延遲消息
     * rocketMQ的延遲消息發(fā)送其實(shí)是已發(fā)送就已經(jīng)到broker端了级乍,然后消費(fèi)端會(huì)延遲收到消息。
     * RocketMQ 目前只支持固定精度的定時(shí)消息燕少。
     * 固定等級(jí):1到18分別對(duì)應(yīng)1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 延遲的底層方法是用定時(shí)任務(wù)實(shí)現(xiàn)的卡者。
     * 發(fā)送延時(shí)消息(delayLevel的值就為0,因?yàn)椴谎訒r(shí))
     *
     * @param topic         消息主題
     * @param msg           消息體
     * @param timeout       發(fā)送超時(shí)時(shí)間
     * @param delayLevel    延遲級(jí)別  1到18
     * @param <T>           消息泛型
     */
    public <T> void sendDelay(String topic, T msg, long timeout, int delayLevel) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
    }


    /**
     * 發(fā)送異步延遲消息
     *
     * @param topic        消息Topic
     * @param message      消息實(shí)體
     * @param sendCallback 回調(diào)函數(shù)
     * @param timeout      超時(shí)時(shí)間
     * @param delayLevel   延遲消息的級(jí)別
     */
    public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
    }


    /**
     * 發(fā)送異步延遲消息
     *
     * @param topic        消息Topic
     * @param message      消息實(shí)體
     * @param timeout      超時(shí)時(shí)間
     * @param delayLevel   延遲消息的級(jí)別
     */
    public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("topic:{}消息---發(fā)送MQ成功---", topic);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("topic:{}消息---發(fā)送MQ失敗 ex:{}---", topic, throwable.getMessage());
            }
        }, timeout, delayLevel);
    }


    /**
     * 單向消息
     * 特點(diǎn)為只負(fù)責(zé)發(fā)送消息客们,不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā)崇决,即只發(fā)送請(qǐng)求不等待應(yīng)答
     * 此方式發(fā)送消息的過(guò)程耗時(shí)非常短,一般在微秒級(jí)別
     * 應(yīng)用場(chǎng)景:適用于某些耗時(shí)非常短底挫,但對(duì)可靠性要求并不高的場(chǎng)景恒傻,例如日志收集
     * @param topic     消息主題
     * @param msg       消息體
     * @param <T>       消息泛型
     */
    public <T> void sendOneWayMsg(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.sendOneWay(topic, message);
    }


    /**
     * 發(fā)送順序消息
     *
     * @param topic     消息主題
     * @param msg       消息體
     * @param hashKey   確定消息發(fā)送到哪個(gè)隊(duì)列中
     * @param <T>       消息泛型
     */
    public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        log.info("發(fā)送順序消息,topic:{}, hashKey:{}", topic, hashKey);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    }


    /**
     * 發(fā)送順序消息
     *
     * @param topic     消息主題
     * @param msg       消息體
     * @param hashKey   確定消息發(fā)送到哪個(gè)隊(duì)列中
     * @param timeout   超時(shí)時(shí)間
     */
    public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        log.info("發(fā)送順序消息建邓,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
    }


    /**
     * 發(fā)送批量消息
     *
     * @param topic     消息主題
     * @param msgList   消息體集合
     * @param <T>       消息泛型
     * @return
     */
    public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {
        List<Message<T>> messageList = msgList.stream()
                .map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
        return rocketMQTemplate.syncSend(topic, messageList);
    }

    /**
     * 發(fā)送事務(wù)消息
     *
     * @param txProducerGroup   事務(wù)消息的生產(chǎn)者組名稱(chēng)
     * @param topic             事務(wù)消息主題
     * @param tag               事務(wù)消息tag
     * @param msg               事務(wù)消息體
     * @param arg               事務(wù)消息監(jiān)聽(tīng)器回查參數(shù)
     * @param <T>               事務(wù)消息泛型
     */
    public <T> void sendTransaction(String txProducerGroup, String topic, String tag, T msg, T arg){
        if(!StringUtils.isEmpty(tag)){
            topic = topic + ":" + tag;
        }
        String transactionId = UUID.randomUUID().toString();
        Message<T> message = MessageBuilder.withPayload(msg)
                //header也有大用處
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .setHeader("share_id", msg.getId())
                .build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, arg);
        if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
                && result.getSendStatus().equals(SendStatus.SEND_OK)){
            log.info("事物消息發(fā)送成功");
        }
        log.info("事物消息發(fā)送結(jié)果:{}", result);
    }
}

上面寫(xiě)的這幾個(gè)消息發(fā)送方法盈厘,你應(yīng)該注意到了: 前兩個(gè)方法的參數(shù) topic 和其它的不一樣
其實(shí)這是 rocketmq 和 springboot 整合后設(shè)置 Tag 的方式(Tag:用于區(qū)分過(guò)濾同一主題下的不同業(yè)務(wù)類(lèi)型的消息,非常實(shí)用)
在項(xiàng)目里往mq寫(xiě)入消息時(shí)官边,最好每條消息都帶上tag沸手,用于消費(fèi)時(shí)根據(jù)業(yè)務(wù)過(guò)濾

在 rocketmq-spring-boot-starter 中,Tag 的設(shè)置方式: 在 topic后面加上 “:tagName”

另外注簿,從上面的截圖中可以看到“key”的設(shè)置方式契吉,發(fā)送消息時(shí)在header中設(shè)置:

MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS, "key1")

總結(jié):

  • 1、實(shí)際運(yùn)用中一些配置不要像我上面一樣寫(xiě)在代碼里诡渴,寫(xiě)在配置文件里或統(tǒng)一配置捐晶。
  • 2、消息發(fā)送成功與失敗可以根據(jù)sendResult判斷妄辩,消息消費(fèi)成功與否其實(shí)源碼內(nèi)部已做了處理惑灵,只要不出現(xiàn)異常,就是消費(fèi)成功眼耀,如果你業(yè)務(wù)代碼邏輯有問(wèn)題那另說(shuō)英支。
  • 3、實(shí)際生產(chǎn)中還要注意重復(fù)消費(fèi)問(wèn)題哮伟,這里我提供一個(gè)方法:在數(shù)據(jù)庫(kù)加一個(gè)去重表潭辈,給表里的一個(gè)字段如key添加唯一索引,消費(fèi)前先入庫(kù)澈吨,正常則往下執(zhí)行你的業(yè)務(wù)邏輯,入庫(kù)失敗了表明該消息已消費(fèi)過(guò)寄摆,不能往下走了谅辣。
  • 4、其實(shí)rocketmq還有一個(gè)很重要的特性:事務(wù)婶恼,其它mq可是不支持的桑阶,利用事務(wù)可以做很多事柏副,如跟錢(qián)相關(guān)的業(yè)務(wù)、分布式事務(wù)蚣录,不過(guò)事務(wù)的實(shí)現(xiàn)過(guò)程要麻煩點(diǎn)割择。
  • 5、上面就是RocketMQ與Springboot的整合萎河,整合了使用起來(lái)還是比較簡(jiǎn)單的

參考:
https://blog.csdn.net/CSDN877425287/article/details/121964142

https://blog.csdn.net/qq_36737803/article/details/112261352

https://blog.csdn.net/caoli201314/article/details/120248361

https://blog.csdn.net/Ting1king/article/details/119324359

https://blog.csdn.net/u012069313/article/details/122403509

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子作彤,更是在濱河造成了極大的恐慌灯帮,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,544評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件擎椰,死亡現(xiàn)場(chǎng)離奇詭異支子,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)达舒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)值朋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人巩搏,你說(shuō)我怎么就攤上這事昨登。” “怎么了塔猾?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,764評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵篙骡,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我丈甸,道長(zhǎng)糯俗,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,193評(píng)論 1 292
  • 正文 為了忘掉前任睦擂,我火速辦了婚禮得湘,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘顿仇。我一直安慰自己淘正,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布臼闻。 她就那樣靜靜地躺著鸿吆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪述呐。 梳的紋絲不亂的頭發(fā)上惩淳,一...
    開(kāi)封第一講書(shū)人閱讀 51,182評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音乓搬,去河邊找鬼思犁。 笑死代虾,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的激蹲。 我是一名探鬼主播棉磨,決...
    沈念sama閱讀 40,063評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼学辱!你這毒婦竟也來(lái)了乘瓤?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,917評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤项郊,失蹤者是張志新(化名)和其女友劉穎馅扣,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體着降,經(jīng)...
    沈念sama閱讀 45,329評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡差油,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了任洞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蓄喇。...
    茶點(diǎn)故事閱讀 39,722評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖交掏,靈堂內(nèi)的尸體忽然破棺而出妆偏,到底是詐尸還是另有隱情,我是刑警寧澤盅弛,帶...
    沈念sama閱讀 35,425評(píng)論 5 343
  • 正文 年R本政府宣布钱骂,位于F島的核電站,受9級(jí)特大地震影響挪鹏,放射性物質(zhì)發(fā)生泄漏见秽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評(píng)論 3 326
  • 文/蒙蒙 一讨盒、第九天 我趴在偏房一處隱蔽的房頂上張望解取。 院中可真熱鬧,春花似錦返顺、人聲如沸禀苦。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,671評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)振乏。三九已至,卻和暖如春秉扑,著一層夾襖步出監(jiān)牢的瞬間昆码,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,825評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留赋咽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,729評(píng)論 2 368
  • 正文 我出身青樓吨娜,卻偏偏與公主長(zhǎng)得像脓匿,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子宦赠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 1陪毡、RocketMQ概念 Apache RocketMQ作為阿里開(kāi)源的一款高性能、高吞吐量的分布式消息中間件勾扭。支持...
    JBryan閱讀 3,907評(píng)論 0 0
  • RocketMQ由四部分組成 Name Server 可集群部署毡琉,節(jié)點(diǎn)之間無(wú)任何信息同步。提供輕量級(jí)的服務(wù)發(fā)現(xiàn)和路...
    Zhao_e9c5閱讀 1,085評(píng)論 0 1
  • 簡(jiǎn)介 rocketMq消息隊(duì)列以前是阿里開(kāi)發(fā)的后來(lái)捐贈(zèng)給Apache開(kāi)源妙色,先進(jìn)先出桅滋,可以用來(lái)推送消息,也可以一對(duì)多...
    haiyong6閱讀 1,261評(píng)論 0 6
  • 一. 概述 參考開(kāi)源項(xiàng)目https://github.com/xkcoding/spring-boot-demo[...
    任未然閱讀 1,440評(píng)論 0 2
  • 轉(zhuǎn)載原文:https://www.cnblogs.com/wuzhenzhao/p/11498735.html ...
    Zhao_e9c5閱讀 697評(píng)論 0 1