mq事務(wù)實(shí)戰(zhàn)/mq延遲隊(duì)列探索

mq的應(yīng)用場(chǎng)景
1。解耦
2。削峰
3【寥常【延遲消息】
4笆呆∥樯【分布式事務(wù)】
5.日志收集
6.消息分發(fā)

采用分布式事務(wù)方式確保消息默認(rèn)發(fā)送到rocketmq上获三,
默認(rèn)開(kāi)啟了數(shù)據(jù)持久化位置:${user.home}/store
消費(fèi)的時(shí)候報(bào)錯(cuò)會(huì)自動(dòng)重試

【默認(rèn)重試間隔是多少】
無(wú)序消息
對(duì)于無(wú)序消息(通過(guò)實(shí)現(xiàn)MessageListenerConcurrently接口來(lái)消費(fèi))食绿,第一次重試間隔時(shí)間是 10 秒踩叭。第二次重試間隔時(shí)間是 30 秒磕潮,第三次重試間隔時(shí)間是 1 分鐘,第四次重試間隔時(shí)間是 2 分鐘容贝,第五次重試間隔時(shí)間是 3 分鐘自脯,第六次重試間隔時(shí)間是 4 分鐘,第七次重試間隔時(shí)間是 5 分鐘斤富,第八次重試間隔時(shí)間是 6 分鐘膏潮,第九次重試間隔時(shí)間是 7 分鐘,第十次重試間隔時(shí)間是 8 分鐘满力,第十一次重試間隔時(shí)間是 9 分鐘焕参,第十二次重試間隔時(shí)間是 10 分鐘,第十三次重試間隔時(shí)間是 20 分鐘油额,第十四次重試間隔時(shí)間是 30 分鐘叠纷,第十五次重試間隔時(shí)間是 1 小時(shí),第十六次重試間隔時(shí)間是 2 小時(shí)潦嘶。
順序消息
對(duì)于順序消息(通過(guò)實(shí)現(xiàn)MessageListenerOrderly接口來(lái)消費(fèi))讲岁,默認(rèn)的重試間隔時(shí)間是 1000 毫秒(1 秒)〕囊裕可以通過(guò)MessageListenerOrderly接口的SUSPEND_CURRENT_QUEUE_TIME_MILLIS屬性來(lái)設(shè)置具體的重試間隔時(shí)間缓艳。

【重試次數(shù)限制及死信隊(duì)列】
RocketMQ 會(huì)限制消息的重試次數(shù)。當(dāng)消息重試達(dá)到一定次數(shù)(默認(rèn) 16 次)后看峻,如果仍然無(wú)法成功消費(fèi)阶淘,消息會(huì)被發(fā)送到死信隊(duì)列(DLQ,Dead - Letter - Queue)互妓。
死信隊(duì)列中的消息可以通過(guò)專(zhuān)門(mén)的工具或者自定義的程序進(jìn)行后續(xù)處理溪窒。例如,開(kāi)發(fā)人員可以編寫(xiě)一個(gè)工具來(lái)定期掃描死信隊(duì)列中的消息冯勉,分析消費(fèi)失敗的原因澈蚌,可能是消息格式錯(cuò)誤、業(yè)務(wù)邏輯處理有嚴(yán)重問(wèn)題(如數(shù)據(jù)庫(kù)連接異常無(wú)法恢復(fù)等)灼狰,然后根據(jù)具體情況對(duì)消息進(jìn)行手動(dòng)修復(fù)或者調(diào)整業(yè)務(wù)邏輯后重新消費(fèi)宛瞄。

mq事務(wù)


import com.alibaba.fastjson.JSON;
import com.csw.RocketMQ.dao.TxLogDao;
import com.csw.domain.Order;
import com.csw.domain.TxLog;
import com.csw.orderAndMsg.dao.OrderDao;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.UUID;

@Service
public class OrderServiceImpl4 {

    @Autowired
    private OrderDao orderDao;

    @Autowired
    private TxLogDao txLogDao;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //事務(wù)mq【第1步】
    public void createOrderBefore(Order order) {

        String txId = UUID.randomUUID().toString();
        String orderJson = JSON.toJSONString(order);

        //發(fā)送半事務(wù)消息
        rocketMQTemplate.sendMessageInTransaction(
                "tx_producer_group",
                "tx_topic",
                MessageBuilder.withPayload(orderJson).setHeader("txId", txId).build(),
                order
        );
    }

    //事務(wù)mq【第3步】
    @Transactional
    public void createOrder(String txId, Order order) {
        //保存訂單
        orderDao.save(order);

        TxLog txLog = new TxLog();
        txLog.setTxId(txId);
        txLog.setDate(new Date());

        //記錄事物日志
        txLogDao.save(txLog);
    }

 



import com.csw.RocketMQ.dao.TxLogDao;
import com.csw.domain.Order;
import com.csw.domain.TxLog;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 * 事務(wù)消息接受
 * 普通消息接收在user里
 */
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderServiceImpl4 orderServiceImpl4;


    @Autowired
    private TxLogDao txLogDao;

    //執(zhí)行本地事物
    //事務(wù)mq【第2步】
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        String txId = (String) msg.getHeaders().get("txId");

        try {
            //本地事物
            Order order = (Order) arg;
            orderServiceImpl4.createOrder(txId,order);

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //消息回查【如果一段時(shí)間沒(méi)有收到消息比如,報(bào)錯(cuò)還沒(méi)有來(lái)得及提交就宕機(jī)了交胚,網(wǎng)絡(luò)延遲了mq沒(méi)收到等】
    //事務(wù)mq【第4步】
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String txId = (String) msg.getHeaders().get("txId");

 TxLog txLog = null;
    int retryCount = 0;
    while (retryCount < 3) { // 最多重試3次
     txLog = txLogDao.findById(txId).get();
        if (txLog!= null) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            try {
                Thread.sleep(1000); // 等待1秒后重試
            } catch (InterruptedException e) {
                // 處理異常
            }
            retryCount++;
        }
    }

    return RocketMQLocalTransactionState.ROLLBACK;

    }
}



import com.csw.domain.Order;
import com.csw.domain.User;
import com.csw.sms.utils.SmsUtil;
import com.csw.user.dao.UserDao;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Random;

/**
 * 普通消息接受
 * 事務(wù)消息接收在order里
 */
@Slf4j
@Service("shopSmsService")
//consumerGroup-消費(fèi)者組名  topic-要消費(fèi)的主題
@RocketMQMessageListener(
        consumerGroup = "tx_producer_group", //消費(fèi)者組名
        topic = "tx_topic",//消費(fèi)主題
        consumeMode = ConsumeMode.CONCURRENTLY,//消費(fèi)模式,指定是否順序消費(fèi) CONCURRENTLY(同步,默認(rèn)) ORDERLY(順序)
        messageModel = MessageModel.CLUSTERING//消息模式 BROADCASTING(廣播)  CLUSTERING(集群,默認(rèn))在廣播模式下份汗,消費(fèi)失敗的消息會(huì)被丟棄盈电,而在集群模式下,【消費(fèi)失敗的消息會(huì)被重新入隊(duì)等待稍后消費(fèi)】
)
public class RocketSms implements RocketMQListener<Order> {

    @Autowired
    private UserDao userDao;

    //消費(fèi)邏輯
    @Override
    public void onMessage(Order order) {
        log.error("接收到了一個(gè)訂單信息{},接下來(lái)就可以發(fā)送短信通知了", order);
// 獲取消息體中的JSON字符串


        //根據(jù)uid 獲取手機(jī)號(hào)
        User user = userDao.findById(order.getUid()).get();

        //消費(fèi)失敗測(cè)試
        //int mm = 1 / 0;

        //生成驗(yàn)證碼 1-9 6
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 6; i++) {
            builder.append(new Random().nextInt(9) + 1);
        }
        String smsCode = builder.toString();

        Param param = new Param(smsCode);
        try {
            //發(fā)送短信 {"code":"123456"}
            SmsUtil smsUtil = new SmsUtil();
            //節(jié)省資源
            //smsUtil.send(user.getTelephone(),JSON.toJSONString(param),"yzm");
            log.error("短信發(fā)送成功");

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Param {
        private String code;
    }
}

延遲隊(duì)列

  /**
     * 發(fā)送延遲消息
     *
     * RocketMQ 默認(rèn)提供了 18 個(gè)延遲級(jí)別杯活,每個(gè)延遲級(jí)別對(duì)應(yīng)的延遲時(shí)間如下:
     * 1s匆帚、5s、10s旁钧、30s吸重、1m、2m歪今、3m嚎幸、4m、5m彤委、6m鞭铆、7m或衡、8m焦影、9m、10m封断、20m斯辰、30m、1h坡疼、2h彬呻。
     */
    public void sendDelayMessage() {
        // 構(gòu)建消息,這里以簡(jiǎn)單的字符串消息為例
        Message<String> message = MessageBuilder.withPayload("This is a delay message").build();
        //超時(shí)10秒鐘柄瑰,延遲級(jí)別是5等于1分鐘
        rocketMQTemplate.syncSend("topic-dely-test", message, 10 * 1000, 5);
    }


}
package com.csw.rocketMQ;

import com.csw.user.dao.UserDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 延遲隊(duì)列的組名可以隨便指定,但是不要指定已有的組名
*在
 */
@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = "topic-dely-test", topic = "topic-dely-test")
public class RocketDelaySms implements RocketMQListener<String> {

    @Autowired
    private UserDao userDao;

    //消費(fèi)邏輯
    @Override
    public void onMessage(String message) {
        log.error("接收到了一個(gè)訂單信息{},接下來(lái)就可以發(fā)送短信通知了", message);

    }


}

配置

【配置文件】
rocketmq:
  name-server: 192.168.147.131:9876   #rocketMQ服務(wù)的地址

【依賴(lài)】
<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
  <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
下載RocketMQ并解壓
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

 nohup ./bin/mqnamesrv &
 nohup bin/mqbroker -n localhost:9876 &



下載控制臺(tái)
# 在git上下載下面的工程 rocketmq-console-1.0.0
https://github.com/apache/rocketmq-externals/releases

# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=7777 #項(xiàng)目啟動(dòng)后的端口號(hào)
rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址闸氮,注意防火墻要開(kāi)啟
9876端口

# 進(jìn)入控制臺(tái)項(xiàng)目,將工程打成jar包
mvn clean package -Dmaven.test.skip=true

# 啟動(dòng)控制臺(tái)
java -jar target/rocketmq-console-ng-1.0.0.jar

瀏覽器訪(fǎng)問(wèn)
http://localhost:7777/#/consumer
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末教沾,一起剝皮案震驚了整個(gè)濱河市蒲跨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌授翻,老刑警劉巖或悲,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異堪唐,居然都是意外死亡巡语,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén)淮菠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)男公,“玉大人,你說(shuō)我怎么就攤上這事合陵±砼欤” “怎么了逞力?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)糠爬。 經(jīng)常有香客問(wèn)我寇荧,道長(zhǎng),這世上最難降的妖魔是什么执隧? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任揩抡,我火速辦了婚禮,結(jié)果婚禮上镀琉,老公的妹妹穿的比我還像新娘峦嗤。我一直安慰自己,他們只是感情好屋摔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布烁设。 她就那樣靜靜地躺著,像睡著了一般钓试。 火紅的嫁衣襯著肌膚如雪装黑。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,443評(píng)論 1 302
  • 那天弓熏,我揣著相機(jī)與錄音恋谭,去河邊找鬼。 笑死挽鞠,一個(gè)胖子當(dāng)著我的面吹牛疚颊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播信认,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼材义,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了嫁赏?” 一聲冷哼從身側(cè)響起其掂,我...
    開(kāi)封第一講書(shū)人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎橄教,沒(méi)想到半個(gè)月后清寇,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡护蝶,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年华烟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片持灰。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡盔夜,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情喂链,我是刑警寧澤返十,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站椭微,受9級(jí)特大地震影響洞坑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蝇率,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一迟杂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧本慕,春花似錦排拷、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至藤违,卻和暖如春浪腐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背纺弊。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工牛欢, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留骡男,地道東北人淆游。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像隔盛,于是被迫代替她去往敵國(guó)和親犹菱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354