分布式事務(wù)(二):基于可靠消息的分布式事務(wù)

項目使用技術(shù)

springboot、dubbo妹窖、zookeeper眉孩、定時任務(wù)、消息中間件MQ

一技健、項目結(jié)構(gòu)

maven父子工程:

父工程:consis

子工程:api-service写穴、order、product雌贱、message

api-service:該項目主要是提供接口調(diào)用的啊送,還包含實體類、枚舉等一些通用內(nèi)容

order:該項目是專門處理訂單相關(guān)操作的系統(tǒng)

product:該項目是專門處理產(chǎn)品相關(guān)操作的系統(tǒng)

message:該項目是提供消息服務(wù)的系統(tǒng)欣孤,好包括定時任務(wù)

它們的依賴關(guān)系如下圖:

根據(jù)上一篇的原理分別介紹下各個系統(tǒng)的實現(xiàn)

二馋没、order訂單系統(tǒng)

核心代碼:

@Override
@Transactional
public void add(Orders order) {
    String messageBody = JSONObject.toJSONString( order );
    //添加消息到數(shù)據(jù)庫
    String messageId = transactionMessageService.savePreparMessage(order.getMessageId(), messageBody, Constant.ORDER_QUEUE_NAME );
    log.info(">>> 預(yù)發(fā)送消息,消息編號:{}", messageId);
    boolean flag = false;
    boolean success = false;
    try{

        Orders orders = orderDao.saveAndFlush( order );
        //int i = 1/0 ;
        log.info(">>> 插入訂單,訂單編號:{}", orders.getId());
        flag = true;
    }catch (Exception e){
        transactionMessageService.delete( messageId );
        log.info(">>> 業(yè)務(wù)執(zhí)行異常刪除消息,消息編號:{}", messageId, e);
        throw new RuntimeException( ">>> 創(chuàng)建訂單失敗" );
    }finally {
        if(flag){
            try {
                transactionMessageService.confirmAndSend( messageId );
                success = true;
                log.info(">>> 確認并且發(fā)送消息到實時消息中間件,消息編號:{}", messageId);

            }catch (Exception e){
                log.error(">>> 消息確認異常,消息編號:{}", messageId, e);
                if(!success){
                    transactionMessageService.delete( messageId );
                    throw new RuntimeException( ">>> 確認消息異常,創(chuàng)建訂單失敗" );
                }
            }
        }
    }
}

  • 插入訂單表之前降传,首先創(chuàng)建預(yù)發(fā)送消息篷朵,保存到事務(wù)消息表中,此時消息狀態(tài)為:未發(fā)送
  • 插入訂單婆排,如果插入訂單失敗則將事務(wù)消息表中預(yù)發(fā)送消息刪除
  • 插入訂單成功后声旺,修改消息表預(yù)發(fā)送消息狀態(tài)為發(fā)送中,并發(fā)送消息至mq
  • 如果發(fā)送消息失敗段只,則訂單回滾并刪除事務(wù)消息表消息

三腮猖、message消息系統(tǒng)

核心代碼一:

@Override
public void sendMessageToMessageQueue(String queueName,final String messageBody) {

    jmsTemplate.convertAndSend( queueName,messageBody );

    log.info(">>> 發(fā)送消息到mq 隊列:{},消息內(nèi)容:{}", queueName, messageBody);
}

  • 主要是activemq生產(chǎn)者講消息發(fā)送至MQ消息中間件

核心代碼二:

/**
 * 定時重發(fā)消息(每分鐘)
 */
@Scheduled(cron = "0 */1 * * * ?")
public void    handler(){
    //查詢transaction_message表中已發(fā)送但未被刪除的消息
    List<TransactionMessage> list = transactionMessageService.queryRetryList( Constant.MESSAGE_UNDEAD, maxTimeOut, Constant.MESSAGE_SENDING );
    if(list!=null && list.size() > 0){
        for (TransactionMessage message:list){
            try {
                transactionMessageService.retry( message.getMessageId() );
            } catch (Exception e) {
                log.warn(">>> 消息不存在,可能已經(jīng)被消費,消息編號:{}", message.getMessageId());
            }
        }
    }
}

/**
 * 定時通知工作人員(每隔5分鐘)
 */
@Scheduled(cron = "0 */5 * * * ?")
public void    advance(){
    List<Long> messages = transactionMessageService.queryDeadList();
    log.warn(">>> 共有:{}條消息需要人工處理", messages.size());
    String ids = JSONObject.toJSONString( messages );
    //發(fā)郵件或者是發(fā)送短信通知工作人員處理

}

  • 定時重發(fā)消息
  • 定時將死亡的消息通知給工作人員,進行人工補償操作

四赞枕、product產(chǎn)品系統(tǒng)

核心代碼:

@Transactional
@JmsListener( destination = Constant.ORDER_QUEUE_NAME)
public void    receiveQueue(String msg){
    boolean flag = false;
    Orders orders = JSONObject.parseObject( msg, Orders.class );
    log.info(">>> 接收到mq消息隊列澈缺,消息編號:{} ,消息內(nèi)容:{}", orders.getMessageId(), msg);

    TransactionMessage transactionMessage = transactionMessageService.findByMessageId( orders.getMessageId() );
    try {
        //保證冪等性
        if(transactionMessage!=null){
            List<OrderDetail> list = orders.getList();
            for(OrderDetail detail : list){
                Product product = productService.findById( detail.getId() );
                Long skuNum = product.getProductSku() - detail.getNum();
                if(skuNum >= 0){
                    product.setProductSku( skuNum );
                    productService.update( product );
                }else {
                    throw new Exception( ">>> 庫存不足,修改庫存失斊捍础!" );
                }

            }
            //int i = 1 /0 ;
            flag = true;
        }

    }catch (Exception e){
        e.printStackTrace();
        throw new RuntimeException( e );
    }finally {
        if(flag){
            transactionMessageService.delete( orders.getMessageId() );
            DbLog dbLog = dbLogService.findByMesageId( orders.getMessageId() );
            if(dbLog!=null){
                dbLog.setState( "1" );//已處理成功
                dbLogService.update( dbLog );
            }
            log.info(">>> 業(yè)務(wù)執(zhí)行成功刪除消息! messageId:{}", orders.getMessageId());
        }
    }

}

  • 從mq消息中間件中監(jiān)聽并消費消息姐赡,將json消息轉(zhuǎn)為訂單對象
  • 根據(jù)消息編號查詢該消息是否已被消費莱预,保證冪等性
  • 如果消息未被消費(即存在此消息),則產(chǎn)品表扣減庫存雏吭;如果已經(jīng)消費(不存在此消息)锁施,則不做處理
  • 產(chǎn)品表扣減庫存成功,則刪除此消息杖们,如果待處理消息日志表中有此消息悉抵,則更改狀態(tài)為1,表示已處理摘完;扣減失敗姥饰,則不做處理

該項目源碼已上傳至github和碼云,鏈接如下孝治,希望喜歡的朋友都能給個star支持一下列粪!謝謝~

github鏈接:
https://github.com/wanglinyong/consis

碼云鏈接:
https://gitee.com/wanglinyong/consis

原文出處:

分布式事務(wù)(二):基于可靠消息的分布式事務(wù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市谈飒,隨后出現(xiàn)的幾起案子岂座,更是在濱河造成了極大的恐慌,老刑警劉巖杭措,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件费什,死亡現(xiàn)場離奇詭異,居然都是意外死亡手素,警方通過查閱死者的電腦和手機鸳址,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來泉懦,“玉大人稿黍,你說我怎么就攤上這事”懒ǎ” “怎么了巡球?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長琢锋。 經(jīng)常有香客問我辕漂,道長,這世上最難降的妖魔是什么吴超? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任钉嘹,我火速辦了婚禮,結(jié)果婚禮上鲸阻,老公的妹妹穿的比我還像新娘跋涣。我一直安慰自己缨睡,他們只是感情好,可當我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布陈辱。 她就那樣靜靜地躺著奖年,像睡著了一般。 火紅的嫁衣襯著肌膚如雪沛贪。 梳的紋絲不亂的頭發(fā)上陋守,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天,我揣著相機與錄音利赋,去河邊找鬼水评。 笑死,一個胖子當著我的面吹牛媚送,可吹牛的內(nèi)容都是我干的中燥。 我是一名探鬼主播,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼塘偎,長吁一口氣:“原來是場噩夢啊……” “哼疗涉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起吟秩,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤咱扣,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后涵防,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體偏窝,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年武学,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片伦意。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡火窒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出驮肉,到底是詐尸還是另有隱情熏矿,我是刑警寧澤,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布离钝,位于F島的核電站票编,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏卵渴。R本人自食惡果不足惜慧域,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望浪读。 院中可真熱鬧昔榴,春花似錦辛藻、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至仰禽,卻和暖如春氮墨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背吐葵。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工规揪, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人折联。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓粒褒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親诚镰。 傳聞我的和親對象是個殘疾皇子奕坟,可洞房花燭夜當晚...
    茶點故事閱讀 44,779評論 2 354