rabbitmq 可靠性投遞(四)之實(shí)現(xiàn)可靠性投遞

前言

在之前介紹了可靠性投遞方案和項(xiàng)目搭建::https://juejin.im/post/5c3f43dae51d45731470a18c

唯一ID:https://juejin.im/post/5c3eb6e7518825253b5ea545

前期的一些準(zhǔn)備:https://juejin.im/post/5c3f4bafe51d4551ec60b521

先給出完整代碼:
https://github.com/hmilyos/common.git 
https://github.com/hmilyos/snowFlakeDemo.git
https://github.com/hmilyos/rabbitmq-common.git       available 分支

先來回顧一下我們可靠性投遞的流程


image

流程的示意圖如上所示,比如我下單成功了必盖,這時(shí)進(jìn)行 step1韧献,對我的業(yè)務(wù)數(shù)據(jù)進(jìn)行入庫钦椭,業(yè)務(wù)數(shù)據(jù)入庫完畢(這里要特別注意一定要保證業(yè)務(wù)數(shù)據(jù)入庫)再對要發(fā)送的消息進(jìn)行入庫氨鹏,圖中采用了兩個(gè)數(shù)據(jù)庫集歇,可以根據(jù)實(shí)際業(yè)務(wù)場景來確定是否采用兩個(gè)數(shù)據(jù)庫暴构,如果采用了兩個(gè)數(shù)據(jù)庫盗飒,有人可能就像到了采用分布式事務(wù)來保證數(shù)據(jù)的一致性烈拒,但是在大型互聯(lián)網(wǎng)中圆裕,基本很少采用事務(wù)广鳍,都是采用補(bǔ)償機(jī)制。
對業(yè)務(wù)數(shù)據(jù)和消息入庫完畢就進(jìn)入 setp2吓妆,發(fā)送消息到 MQ 服務(wù)上赊时,按照正常的流程就是消費(fèi)者監(jiān)聽到該消息,就根據(jù)唯一 id 修改該消息的狀態(tài)為已消費(fèi)耿战,并給一個(gè)確認(rèn)應(yīng)答 ack 到 Listener蛋叼。如果出現(xiàn)意外情況,消費(fèi)者未接收到或者 Listener 接收確認(rèn)時(shí)發(fā)生網(wǎng)絡(luò)閃斷剂陡,接收不到狈涮,這時(shí)候就需要用到我們的分布式定時(shí)任務(wù)來從 msg 數(shù)據(jù)庫抓取那些超時(shí)了還未被消費(fèi)的消息,重新發(fā)送一遍鸭栖。重試機(jī)制里面要設(shè)置重試次數(shù)限制歌馍,因?yàn)橐恍┩獠康脑驅(qū)е乱恢卑l(fā)送失敗的,不能重試太多次晕鹊,要不然會拖垮整個(gè)服務(wù)松却。例如重試三次還是失敗的,就把消息的 status 設(shè)置成 發(fā)送失敗溅话,然后通過補(bǔ)償機(jī)制晓锻,人工去處理。實(shí)際生產(chǎn)中飞几,這種情況還是比較少的砚哆,但是你不能沒有這個(gè)補(bǔ)償機(jī)制,要不然就做不到可靠性了屑墨。

下面就讓我們用代碼來實(shí)現(xiàn)這個(gè)方案吧躁锁。

1. 簡單的創(chuàng)建訂單接口
@RestController
public class OrderController {

    @Autowired
    private IMessageService messageService;

    @GetMapping("/createOrder")
    public ServerResponse createOrder(long userId){
        return messageService.createOrder(userId);
    }
}
2. IMessageService 接口以及實(shí)現(xiàn)類
public interface IMessageService {

    @Transactional
    ServerResponse createOrder(long userId);
    
}
@Service
public class MessageServiceImpl implements IMessageService {

    private final  static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);

    @Autowired
    private ISnowFlakeService snowFlakeService;

    @Autowired
    private MessageMapper messageMapper;
    @Autowired
    private RabbitOrderSender rabbitOrderSender;

    @Override
    public ServerResponse createOrder(long userId) {
//        首先是針對業(yè)務(wù)邏輯,進(jìn)行下單的業(yè)務(wù)卵史,保存到數(shù)據(jù)庫后
//        業(yè)務(wù)落庫后战转,再對消息進(jìn)行落庫,
        long msgId = snowFlakeService.getSnowFlakeID();
        Message message = new Message(msgId, TypeEnum.CREATE_ORDER.getCode(), userId + "創(chuàng)建訂單:" + msgId,
                0, MSGStatusEnum.SENDING.getCode(), DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
        int row = messageMapper.insertSelective(message);
        if (row == 0){
            throw new CustomException(500, "消息入庫異常");
        }
//        消息落庫后就可以發(fā)送消息了
        try {
            rabbitOrderSender.sendOrder(message);
        } catch (Exception e) {
//          因?yàn)闃I(yè)務(wù)已經(jīng)落庫了
//          所以 即使發(fā)送失敗也不影響以躯,因?yàn)榭煽啃酝哆f槐秧,我回去再次嘗試發(fā)送消息
            log.error("sendOrder mq msg error: ", e);
            messageMapper.updataNextRetryTimeForNow(message.getMessageId());
        }
        return ServerResponse.createBySuccess();
    }

}

注意了,我這里是直接拿消息的實(shí)體當(dāng)做業(yè)務(wù)去落庫了忧设,實(shí)際上應(yīng)該是先對訂單實(shí)體落庫色鸳,然后再對消息實(shí)體落庫,最后發(fā)送消息见转!

3. 發(fā)送消息的具體實(shí)現(xiàn) RabbitOrderSender

具體代碼如下,注釋也寫得很詳細(xì)了:


@Component
public class RabbitOrderSender {

    private static final Logger log = LoggerFactory.getLogger(RabbitOrderSender.class);

    //自動(dòng)注入RabbitTemplate模板類
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MessageMapper messageMapper;

    @Value("${order.rabbitmq.listener.order.exchange.name}")
    private String exchangeName;

    @Value("${order.rabbitmq.send.create.key}")
    private String routingKey;

    //發(fā)送消息方法調(diào)用: 構(gòu)建自定義對象消息
    public void sendOrder(Message message) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        CorrelationData correlationData = new CorrelationData(message.getMessageId() + "");
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
//        throw new CustomException("--test--");
    }

    //回調(diào)函數(shù): confirm確認(rèn)
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("correlationData: {}", correlationData);
            String messageId = correlationData.getId();
            if(ack){
                //如果confirm返回成功 則進(jìn)行更新
                messageMapper.changeMessageStatus(Long.parseLong(messageId), MSGStatusEnum.SEND_SUCCESS.getCode());
            } else {
                //失敗則進(jìn)行具體的后續(xù)操作:重試 或者補(bǔ)償?shù)仁侄?                log.error("消息發(fā)送失敗蒜哀,需要進(jìn)行異常處理...");
                messageMapper.updataNextRetryTimeForNow(Long.parseLong(messageId));
            }
        }
    };

    //回調(diào)函數(shù): return返回斩箫, 這里是預(yù)防消息不可達(dá)的情況吏砂,比如 MQ 里面沒有對應(yīng)的 exchange、queue 等情況乘客,
//    如果消息真的不可達(dá)狐血,那么就要根據(jù)你實(shí)際的業(yè)務(wù)去做對應(yīng)處理,比如是直接落庫易核,記錄補(bǔ)償匈织,還是放到死信隊(duì)列里面,之后再進(jìn)行落庫
//    這里脫開實(shí)際業(yè)務(wù)場景牡直,不大好描述
    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
                                    String replyText, String exchange, String routingKey) {
            log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
                    exchange, routingKey, replyCode, replyText);
        }
    };
}
4. 消費(fèi)端代碼 RabbitOrderReceiver
@Component
public class RabbitOrderReceiver {

    private static final Logger log = LoggerFactory.getLogger(RabbitOrderReceiver.class);

    @Autowired
    private MessageMapper messageMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${order.rabbitmq.listener.order.queue.name}",
                    durable="${order.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${order.rabbitmq.listener.order.exchange.name}",
                    durable="${order.rabbitmq.listener.order.exchange.durable}",
                    type= "${order.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "${order.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${order.rabbitmq.listener.order.key}"
    )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload com.hmily.rabbitmq.rabbitmqcommon.entity.Message msg,
                               Channel channel,
                               @Headers Map<String, Object> headers) throws Exception {
        log.info("-----------------RabbitOrderReceiver---------------------");
        log.info("消費(fèi)端 order msg: {} ",  msg.toString());
        msg.setStatus(MSGStatusEnum.PROCESSING_IN.getCode());
        int row = messageMapper.updateByPrimaryKeySelective(msg);
        if (row != 0) {
            Thread.sleep(2000L);
            Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK
            channel.basicAck(deliveryTag, false);
//            接著去執(zhí)行你對應(yīng)的業(yè)務(wù)邏輯缀匕,
//            注意,這是可靠性投遞碰逸,執(zhí)行業(yè)務(wù)邏輯一定要做冪等性
        }

    }
}
注意:我這里并沒有做冪等性去重乡小,實(shí)際業(yè)務(wù)時(shí)必須做冪等性!饵史!

6.我們的定時(shí)重試機(jī)制 SendMessageTask
@Component
public class SendMessageTask {
    private static final Logger log = LoggerFactory.getLogger(SendMessageTask.class);

    @Autowired
    private MessageMapper messageMapper;
    @Autowired
    private IMessageFailedService messageFailedService;

    @Autowired
    private RabbitOrderSender rabbitOrderSender;

    @Scheduled(initialDelay = 3000, fixedDelay = 10000)
    public void reSend(){
        log.info("---------------定時(shí)任務(wù)開始---------------");
        List<Message> msgs = messageMapper.getNotProcessingInByType(TypeEnum.CREATE_ORDER.getCode(), null, 
                new int[]{MSGStatusEnum.SENDING.getCode()});
        msgs.forEach(msg -> {
            if (msg.getTryCount() >= Constants.MAX_TRY_COUNT) {
//              如果重試次數(shù)大于最大重試次數(shù)就不再重試满钟,記錄失敗
                msg.setStatus(MSGStatusEnum.SEND_FAILURE.getCode());
                msg.setUpdateTime(new Date());
                messageMapper.updateByPrimaryKeySelective(msg);
                MessageFailed failed = new MessageFailed(msg.getMessageId(), "消息發(fā)送失敗", "已達(dá)到最大重試次數(shù),但是還是發(fā)送失敗");
                messageFailedService.add(failed);
            } else {
//              未達(dá)到最大重試次數(shù)胳喷,可以進(jìn)行重發(fā)消息
//              先改一下消息記錄湃番,保存好再發(fā)送消息
                msg.setNextRetry(DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
                int row = messageMapper.updateTryCount(msg);
                try {
                    rabbitOrderSender.sendOrder(msg);
                } catch (Exception e) {
                    log.error("sendOrder mq msg error: ", e);
                    messageMapper.updataNextRetryTimeForNow(msg.getMessageId());
                }
            }
        });
    }
}

至此,我們的可靠性投遞就完成了吭露。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吠撮,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子奴饮,更是在濱河造成了極大的恐慌纬向,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,686評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件戴卜,死亡現(xiàn)場離奇詭異逾条,居然都是意外死亡轻庆,警方通過查閱死者的電腦和手機(jī)区端,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,668評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來卦洽,“玉大人江锨,你說我怎么就攤上這事吃警。” “怎么了啄育?”我有些...
    開封第一講書人閱讀 158,160評論 0 348
  • 文/不壞的土叔 我叫張陵酌心,是天一觀的道長。 經(jīng)常有香客問我挑豌,道長安券,這世上最難降的妖魔是什么墩崩? 我笑而不...
    開封第一講書人閱讀 56,736評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮侯勉,結(jié)果婚禮上鹦筹,老公的妹妹穿的比我還像新娘。我一直安慰自己址貌,他們只是感情好铐拐,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,847評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著练对,像睡著了一般遍蟋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上锹淌,一...
    開封第一講書人閱讀 50,043評論 1 291
  • 那天匿值,我揣著相機(jī)與錄音,去河邊找鬼赂摆。 笑死挟憔,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烟号。 我是一名探鬼主播绊谭,決...
    沈念sama閱讀 39,129評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼汪拥!你這毒婦竟也來了达传?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,872評論 0 268
  • 序言:老撾萬榮一對情侶失蹤迫筑,失蹤者是張志新(化名)和其女友劉穎宪赶,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體脯燃,經(jīng)...
    沈念sama閱讀 44,318評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡搂妻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,645評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了辕棚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片欲主。...
    茶點(diǎn)故事閱讀 38,777評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖逝嚎,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情补君,我是刑警寧澤挽铁,帶...
    沈念sama閱讀 34,470評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響够掠,放射性物質(zhì)發(fā)生泄漏疯潭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,126評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望遵绰。 院中可真熱鬧增淹,春花似錦虑润、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,861評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至渗柿,卻和暖如春朵栖,著一層夾襖步出監(jiān)牢的瞬間陨溅,已是汗流浹背门扇。 一陣腳步聲響...
    開封第一講書人閱讀 32,095評論 1 267
  • 我被黑心中介騙來泰國打工霸奕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留质帅,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,589評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像剪侮,于是被迫代替她去往敵國和親洛退。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,687評論 2 351

推薦閱讀更多精彩內(nèi)容