rabbitmq 整合 spring boot 完成消息可靠投遞

一.實(shí)現(xiàn)思路

1.生產(chǎn)者發(fā)起調(diào)用
2.消費(fèi)者消費(fèi)消息
3.定時(shí)任務(wù)定時(shí)拉取投遞失敗的消息重新投遞
4.各種異常清空的測試驗(yàn)證
5.使用動(dòng)態(tài)代理實(shí)現(xiàn)消費(fèi)端幕等性校驗(yàn)和消息確認(rèn)

二.項(xiàng)目搭建

1.pom

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.rabbitmq配置

spring:
  rabbitmq:
    port: 5672
    username: liming
    host: 114.116.18.120
    password: liming
    virtual-host: /
    ##開啟回調(diào)
    publisher-returns: true
    publisher-confirms: true
    ##設(shè)置手動(dòng)確認(rèn)
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 100

3.表結(jié)構(gòu)

CREATE TABLE `msg_log` (
  `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一標(biāo)識',
  `msg` text COMMENT '消息體, json格式化',
  `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交換機(jī)',
  `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由鍵',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '狀態(tài): 0投遞中 1投遞成功 2投遞失敗 3已消費(fèi)',
  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重試次數(shù)',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重試時(shí)間',
  `create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時(shí)間',
  `update_time` datetime DEFAULT NULL COMMENT '更新時(shí)間',
  PRIMARY KEY (`msg_id`),
  UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投遞日志';

4.rabbitConfig

@Configuration
public class RabbitConfig{

    @Bean
    public Queue queue1(){
        return new Queue("hello.queue1", true);
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
    }

}

5.生產(chǎn)者生產(chǎn)消息

@Slf4j
@Component
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendList(String name){
        //設(shè)置消息消費(fèi)成功回調(diào)操作
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->{
            if(ack){
                log.info("消息發(fā)送成功胸蛛!");
                String msgId = correlationData.getId();
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
            }else{
                log.info("消息發(fā)送到Exchange失敗, {}, cause: {}", correlationData, cause);
            }
        });
        // 觸發(fā)setReturnCallback回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
        rabbitTemplate.setMandatory(true);
        // 消息是否從Exchange路由到Queue, 注意: 這是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });
        
//創(chuàng)建消息日志
        String msgId = UUID.randomUUID().toString()磕道;
        MsgLog msgLog = new MsgLog(msgId, name, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
        
        msgLogMapper.insert(msgLog);// 消息入庫
        Message message = new Message(name.getBytes(), new MessageProperties());
        //設(shè)置消息唯一id 發(fā)送消息
        CorrelationData correlationData = new CorrelationData(msgId );
        rabbitTemplate.convertAndSend("topicExchange", "key.2", message, correlationData);

    }

6.消費(fèi)者 消費(fèi)消息

@Component
@Slf4j
public class TopicConsumer {
 
    @Autowired
    private MsgLogService msgLogService;
 
    @RabbitListener(queues = "hello.queue1")
    public void consume(Message message, Channel channel) throws IOException {
        String name = new String(message.getBody())愤炸;
        log.info("收到消息: {}", name);
         
        MsgLog msgLog = msgLogService.selectByMsgId(msgId);
        if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {// 消費(fèi)冪等性
            log.info("重復(fù)消費(fèi), msgId: {}", msgId);
            return;
        }
 
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();
 
        boolean success = mailUtil.send(mail);
        if (success) {
            msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
            channel.basicAck(tag, false);// 消費(fèi)確認(rèn)
        } else {
            channel.basicNack(tag, false, true);
        }
    }
 
}

7.定時(shí)任務(wù) 處理消費(fèi)失敗的消息


@Component
@Slf4j
public class ResendMsg {
 
    @Autowired
    private MsgLogService msgLogService;
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    // 最大投遞次數(shù)
    private static final int MAX_TRY_COUNT = 3;
 
    /**
     * 每30s拉取投遞失敗的消息, 重新投遞
     */
    @Scheduled(cron = "0/30 * * * * ?")
    public void resend() {
        log.info("開始執(zhí)行定時(shí)任務(wù)(重新投遞消息)");
 
        List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();
        msgLogs.forEach(msgLog -> {
            String msgId = msgLog.getMsgId();
            if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
                log.info("超過最大重試次數(shù), 消息投遞失敗, msgId: {}", msgId);
            } else {
                msgLogService.updateTryCount(msgId, msgLog.getNextTryTime());// 投遞次數(shù)+1
 
                CorrelationData correlationData = new CorrelationData(msgId);
                rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投遞
 
                log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投遞消息");
            }
        });
 
        log.info("定時(shí)任務(wù)執(zhí)行結(jié)束(重新投遞消息)");
    }
 
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鳍悠,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子旺入,更是在濱河造成了極大的恐慌哲泊,老刑警劉巖埃碱,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異荔仁,居然都是意外死亡伍宦,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進(jìn)店門乏梁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來次洼,“玉大人,你說我怎么就攤上這事遇骑÷艋伲” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵落萎,是天一觀的道長亥啦。 經(jīng)常有香客問我,道長练链,這世上最難降的妖魔是什么翔脱? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮兑宇,結(jié)果婚禮上碍侦,老公的妹妹穿的比我還像新娘。我一直安慰自己隶糕,他們只是感情好瓷产,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著枚驻,像睡著了一般濒旦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上再登,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天尔邓,我揣著相機(jī)與錄音晾剖,去河邊找鬼。 笑死梯嗽,一個(gè)胖子當(dāng)著我的面吹牛齿尽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播灯节,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼循头,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了炎疆?” 一聲冷哼從身側(cè)響起卡骂,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎形入,沒想到半個(gè)月后全跨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡亿遂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年浓若,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片崩掘。...
    茶點(diǎn)故事閱讀 40,505評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡七嫌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出苞慢,到底是詐尸還是另有隱情诵原,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布挽放,位于F島的核電站绍赛,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏辑畦。R本人自食惡果不足惜吗蚌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纯出。 院中可真熱鬧蚯妇,春花似錦、人聲如沸暂筝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽焕襟。三九已至陨收,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間鸵赖,已是汗流浹背务漩。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工拄衰, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人饵骨。 一個(gè)月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓翘悉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親居触。 傳聞我的和親對象是個(gè)殘疾皇子镐确,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評論 2 359