分布式事務(wù)設(shè)計(jì) -- 本地消息表扔傅,設(shè)計(jì)與代碼

分布式事務(wù)設(shè)計(jì)

場(chǎng)景

在業(yè)務(wù)中有一處需要用戶(hù)為訂單付款秉犹,該業(yè)務(wù)會(huì)修改用戶(hù)庫(kù)的balance(用戶(hù)余額表)活箕,扣減用戶(hù)的余額褥蚯,然后會(huì)修改訂單庫(kù)的order(訂單表)和enterprise(企業(yè)余額表)国夜,將訂單狀態(tài)設(shè)置為已被支付鹏控,并增加企業(yè)的余額威始。這里就同時(shí)修改多個(gè)數(shù)據(jù)庫(kù)枢纠,涉及到了分布式事務(wù)的問(wèn)題。我最終是使用了RocketMQ的事務(wù)消息黎棠,并從外圍解決了消息回查的問(wèn)題晋渺。

他人思路

在設(shè)計(jì)我的解決方案前嘗試搜索了一下別人的實(shí)現(xiàn) 傳送門(mén)。他的解決方案是在producer和consumer方設(shè)置了兩個(gè)scheduler脓斩,感覺(jué)是有些復(fù)雜的木西。我是在其基礎(chǔ)上進(jìn)行了簡(jiǎn)化,并解決了一些其他問(wèn)題随静,使得整個(gè)解決方案比較完整和邏輯自洽八千。

我的設(shè)計(jì)

A和B是兩個(gè)Service,A執(zhí)行本地事務(wù),B執(zhí)行遠(yuǎn)程事務(wù)叼丑。A會(huì)調(diào)用B的遠(yuǎn)程服務(wù)关翎,完成整個(gè)業(yè)務(wù)。就本項(xiàng)目而言鸠信,A就是用戶(hù)模塊的AccountService纵寝,B就是訂單模塊的OrderService。A和B都有一張表星立,存儲(chǔ)著消息數(shù)據(jù)爽茴。從MQ的視角看來(lái),A是消息的Producer绰垂,B是消息的Consumer室奏。

A(本地事務(wù)執(zhí)行方,MQProducer)

  1. db
    producer_msg(msgId,body,message_status,create_time,update_time,send_times,topic) msgId這里為orderId

  2. mq
    作為producer時(shí),注冊(cè)Topic account:當(dāng)執(zhí)行本地事務(wù)時(shí)同時(shí)插入producer_msg劲装,默認(rèn)status都是未被消費(fèi)胧沫。如果本地事務(wù)執(zhí)行失敗,那么直接回滾占业,不插入绒怨。當(dāng)消息發(fā)送失敗時(shí),我們已經(jīng)在producer_msg插入了記錄谦疾,可以進(jìn)行回查南蹂。

  3. scheduler
    A需要同步B的數(shù)據(jù)庫(kù),使得兩個(gè)數(shù)據(jù)庫(kù)數(shù)據(jù)一致念恍,不同的即為確認(rèn)信息發(fā)送失敗的六剥。
    消息狀態(tài)有未被消費(fèi)、已被消費(fèi)峰伙、消費(fèi)失敗疗疟、超過(guò)消費(fèi)失敗的重試次數(shù)、超過(guò)確認(rèn)消息發(fā)送失敗的重試次數(shù)和已被回滾词爬。
    A和B數(shù)據(jù)庫(kù)同步維護(hù)所有消息秃嗜,只是A數(shù)據(jù)庫(kù)保存內(nèi)容更多,比如會(huì)保存消息的body顿膨。
    如果消息已經(jīng)是超過(guò)重試次數(shù)或已被消費(fèi)锅锨,那么A不會(huì)再去考慮它。
    A的Scheduler會(huì)遍歷A數(shù)據(jù)庫(kù)恋沃,找出未被消費(fèi)和消費(fèi)失敗的id且創(chuàng)建時(shí)間距離當(dāng)前時(shí)間超過(guò)1min必搞,發(fā)送給B。
    B會(huì)遍歷這些id

for(id in ids){
    如果 id 不存在囊咏,說(shuō)明確認(rèn)消息發(fā)送失敗恕洲,
    如果 id 存在塔橡,則將該id對(duì)應(yīng)的status一并返回,map.put(id,status)
} 

A 接收到map后霜第,keySet取得所有id葛家,拿發(fā)送過(guò)去的id減去這些id(差集),就是確認(rèn)消息發(fā)送失敗的消息泌类,進(jìn)行重新發(fā)送癞谒;遍歷map,將本地?cái)?shù)據(jù)庫(kù)同步為B數(shù)據(jù)庫(kù)刃榨。

這個(gè)方法可能會(huì)出現(xiàn)消息重復(fù)弹砚,因?yàn)锳剛發(fā)送消息,B該沒(méi)有處理枢希,A的Scheduler就去查詢(xún)了桌吃,當(dāng)然消息都沒(méi)有被消費(fèi),因?yàn)锳會(huì)重發(fā)剛才的消息苞轿,但是B有做消息去重茅诱,所以不會(huì)影響。

B(遠(yuǎn)程事務(wù)執(zhí)行方呕屎,MQConsumer)

  1. db
    consumer_msg(msgId,create_time. message_status,topic) msgId這里是orderId

  2. mq
    作為consumer让簿,注冊(cè)Topic account:
    當(dāng)接收到消息后敬察,查詢(xún)是否被執(zhí)行過(guò)秀睛,如果沒(méi)有被消費(fèi)過(guò)(id未找到)或者消費(fèi)失敗了(這里解決了消息重復(fù)消費(fèi)的問(wèn)題),則執(zhí)行遠(yuǎn)程事務(wù)后插入/更新consumer_ msg(status為已被消費(fèi))莲祸,已被消費(fèi)則跳過(guò)蹂安。
    遠(yuǎn)程事務(wù)執(zhí)行失敗時(shí),插入/更新consumer_ msg(status為消費(fèi)失斎裰摹)
    超過(guò)重試消費(fèi)次數(shù)的消息也更新consumer_ msg田盈,status為超過(guò)消費(fèi)的重試次數(shù)。
    B這里就維護(hù)它所接收的消息的狀態(tài)缴阎。

消息表

在producer這一方設(shè)計(jì)了producer_transaction_message表允瞧。


這里寫(xiě)圖片描述
  • msgId是消息唯一id,可以采用業(yè)務(wù)上的id來(lái)實(shí)現(xiàn)蛮拔,比如訂單id述暂。
  • body是消息體,比如訂單對(duì)象的序列化結(jié)果建炫。
  • message_status是消息狀態(tài)
  • update_time是最后更新記錄時(shí)間
  • create_time是消息創(chuàng)建時(shí)間
  • send_times是確認(rèn)消息重復(fù)發(fā)送次數(shù)
  • topic是消息主題畦韭,這里均為account

在consumer這一方設(shè)計(jì)了consumer_transaction_message表。


這里寫(xiě)圖片描述

看得出來(lái)是producer的表的部分列肛跌,其含義也是相同的艺配。

分布式事務(wù)實(shí)現(xiàn)代碼

Producer方

MQProducerConfig(配置MQProducer)

@Configuration
@Slf4j
@Getter
public class MQProducerConfig {
    @Value("${spring.rocketmq.group-name}")
    private String groupName;
    @Value("${spring.rocketmq.namesrv-addr}")
    private String namesrvAddr;
    @Value("${spring.rocketmq.topic}")
    private String topic;
    @Value("${spring.rocketmq.confirm-message-faiure-retry-times}")  
    private Integer retryTimes;
    public static final Integer CHECK_GAP = 1; 

    @Bean
    public MQProducer mqProducer() throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer(groupName);
        producer.setNamesrvAddr(namesrvAddr); 
        producer.setTransactionCheckListener(new TransactionCheckListener() {
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                // doNothing
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                producer.shutdown();
            }
        }));
        producer.start();
        log.info("producer started!");
        return producer;
    }
}

AccountLocalTransactionExecutor(執(zhí)行本地事務(wù))

@Component
@Slf4j
public class AccountLocalTransactionExecutor implements LocalTransactionExecuter {
    @Autowired
    private PayService payService;
    @Autowired
    private ProducerTransactionMessageService messageService;

    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
        try {
            String paymentPassword = (String) arg;
            OrderDO order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
            if (order.getOrderStatus() != OrderStatus.UNPAID) {
                log.info("{} 訂單狀態(tài)不為unpaid", order.getId());
                throw new OrderStateIllegalException(order.getOrderStatus().toString());
            }
            // 本地事務(wù)察郁,減少用戶(hù)賬戶(hù)余額
            // 拋出異常時(shí)會(huì)進(jìn)行回滾,下面構(gòu)造消息存儲(chǔ)到數(shù)據(jù)庫(kù)也不會(huì)被執(zhí)行
            payService.decreaseAccount(order.getUser().getId(), order.getTotalPrice(), paymentPassword);
            // 保存消息至數(shù)據(jù)庫(kù)
            ProducerTransactionMessageDO messageDO = ProducerTransactionMessageDO.builder()
                    .id(order.getId())
                    .body(msg.getBody())
                    .createTime(LocalDateTime.now())
                    .updateTime(LocalDateTime.now())
                    .messageStatus(MessageStatus.UNCONSUMED)
                    .topic(msg.getTopic())
                    .sendTimes(0)
                    .build();
            messageService.save(messageDO);
            // 成功通知MQ消息變更 該消息變?yōu)椋?lt;確認(rèn)發(fā)送>
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            e.printStackTrace();
            log.info("本地事務(wù)執(zhí)行失敗转唉,直接回滾!");
            // 失敗則不通知MQ 該消息一直處于:<暫緩發(fā)送>
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

AccountServiceImpl(Producer支付業(yè)務(wù)入口)

@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    @Autowired
    private MQProducerConfig config;
    @Autowired
    private MQProducer producer;
    @Autowired
    private AccountLocalTransactionExecutor executor;
    @Autowired
    private ProducerTransactionMessageService messageService;
    @Autowired
    private PayService payService;

    @Override
    public void commit(OrderDO order, String paymentPassword) {
        Message message = new Message();
        message.setTopic(config.getTopic());
        message.setBody(ProtoStuffUtil.serialize(order));
        TransactionSendResult result = null;
        try {
            result = this.producer.sendMessageInTransaction(message, executor, paymentPassword);
            log.info("事務(wù)消息發(fā)送結(jié)果:{}", result);
            log.info("TransactionState:{} ", result.getLocalTransactionState());
            // 因?yàn)闊o(wú)法獲得executor中拋出的異常皮钠,只能模糊地返回訂單支付失敗信息。
            // TODO 想辦法從executor中找到原生異常
        } catch (Exception e) {
            log.info("AccountService拋出異常...");
            e.printStackTrace();
        }
        if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
            throw new OrderPaymentException(order.getId());
        }
    }

    @Transactional
    @Override
    public void rollback(ProducerTransactionMessageDO message) {
        OrderDO order = ProtoStuffUtil.deserialize(message.getBody(), OrderDO.class);
        message.setMessageStatus(MessageStatus.ROLLBACK);
        message.setUpdateTime(LocalDateTime.now());
        messageService.update(message);
        payService.increaseAccount(order.getUser().getId(), order.getTotalPrice());
    }
}

TransactionCheckScheduler(消息回查)

@Component
public class TransactionCheckScheduler {
    @Autowired
    private ProducerTransactionMessageService messageService;

    /**
     * 每分鐘執(zhí)行一次事務(wù)回查
     */
    @Scheduled(fixedRate = 60 * 1000)
    public void checkTransactionMessage(){
        messageService.check();
    }
}
ProducerTransactionMessageServiceImpl(Producer消息服務(wù)提供者)
@Slf4j
public class ProducerTransactionMessageServiceImpl implements ProducerTransactionMessageService {
    @Autowired
    private MQProducer producer;
    @Autowired
    private MQProducerConfig config;
    @Autowired
    private ProductTransactionMessageDOMapper mapper;
    @Autowired
    private ConsumerTransactionMessageService consumerTransactionMessageService;

    @Transactional
    @Override
    public void save(ProducerTransactionMessageDO message) {
        mapper.insert(message);
    }

    @Transactional
    @Override
    public void check() {
        List<Long> all = mapper.findMessageIdsByStatusCreatedAfter(Arrays.asList(MessageStatus.UNCONSUMED, MessageStatus.CONSUME_FAILED), MQProducerConfig.CHECK_GAP);
        Map<Long, MessageStatus> statusMap = consumerTransactionMessageService.findConsumerMessageStatuses(all);
        for (Map.Entry<Long, MessageStatus> entry : statusMap.entrySet()) {
            mapper.updateByPrimaryKeySelective(ProducerTransactionMessageDO.builder().id(entry.getKey()).messageStatus(entry.getValue()).updateTime(LocalDateTime.now()).build());
        }
        all.removeAll(statusMap.keySet());
        // 此時(shí)all為確認(rèn)消息發(fā)送失敗的
        this.reSend(mapper.selectBatchByPrimaryKeys(all));
    }

    @Transactional
    @Override
    public void reSend(List<ProducerTransactionMessageDO> messages) {
        for (ProducerTransactionMessageDO messageDO : messages) {
            if (messageDO.getSendTimes() == config.getRetryTimes()) {
                messageDO.setUpdateTime(LocalDateTime.now());
                messageDO.setMessageStatus(MessageStatus.OVER_CONFIRM_RETRY_TIME);
                mapper.updateByPrimaryKeySelective(messageDO);
                continue;
            }
            Message message = new Message();
            message.setTopic(config.getTopic());
            message.setBody(messageDO.getBody());
            try {
                SendResult result = producer.send(message);
                messageDO.setSendTimes(messageDO.getSendTimes() + 1);
                messageDO.setUpdateTime(LocalDateTime.now());
                mapper.updateByPrimaryKeySelective(messageDO);
                log.info("發(fā)送重試消息完畢,Message:{},result:{}", message, result);
            } catch (Exception e) {
                e.printStackTrace();
                log.info("發(fā)送重試消息時(shí)失敗! Message:{}", message);
            }
        }
    }

    @Transactional
    @Override
    public void delete(Long id) {
        mapper.deleteByPrimaryKey(id);
    }

    @Transactional(readOnly = true)
    @Override
    public List<ProducerTransactionMessageDO> findByIds(List<Long> ids) {
        return mapper.selectBatchByPrimaryKeys(ids);
    }

    @Transactional(readOnly = true)
    @Override
    public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(MessageQueryConditionDTO dto) {
        return mapper.findByCondition(dto, dto.getPageNum(), dto.getPageSize()).toPageInfo();
    }

    @Override
    public void update(ProducerTransactionMessageDO message) {
        mapper.updateByPrimaryKeySelective(message);
    }

}

Consumer

MQConsumerConfig(配置MQConsumer)

@Configuration
@Slf4j
@Getter
public class MQConsumerConfig {
    private DefaultMQPushConsumer consumer;

    @Value("${spring.rocketmq.group-name}")
    private String groupName;
    @Value("${spring.rocketmq.namesrv-addr}")
    private String namesrvAddr;
    @Value("${spring.rocketmq.topic}")
    private String topic;
    @Autowired
    private AccountMessageListener accountMessageListener;
    @Value("${spring.rocketmq.consume-failure-retry-times}")
    private Integer retryTimes;

    @PostConstruct
    public void init() throws MQClientException {
        this.consumer = new DefaultMQPushConsumer(groupName);
        this.consumer.setNamesrvAddr(namesrvAddr);
        // 啟動(dòng)后從隊(duì)列頭部開(kāi)始消費(fèi)
        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        this.consumer.subscribe(topic, "*");
        this.consumer.registerMessageListener(accountMessageListener);
        this.consumer.start();
        log.info("consumer started!");
    }
}

AccountMessageListener(消息接收方)

@Component
@Slf4j
public class AccountMessageListener implements MessageListenerConcurrently {
    @Autowired
    private OrderService orderService;
    @Autowired
    @Qualifier("consumerTransactionMessageService")
    private ConsumerTransactionMessageService messageService;
    @Autowired
    private MQConsumerConfig config;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        log.info("接收到消息數(shù)量為:{}", msgs.size());
        for (MessageExt msg : msgs) {
            ConsumerTransactionMessageDO messageDO = null;
            OrderDO order = null;
            try {
                String topic = msg.getTopic();
                String keys = msg.getKeys();
                order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
                log.info("消費(fèi)者接收到消息:topic: {}, keys:{} , order: {}", topic, keys, order);
                // 如果已經(jīng)被消費(fèi)過(guò)并且消費(fèi)成功赠法,那么不再重復(fù)消費(fèi)(未被消費(fèi)->id不存在或消費(fèi)失敗或超過(guò)重試次數(shù)的都會(huì)繼續(xù)消費(fèi))
                if(messageService.isMessageConsumedSuccessfully(order.getId())){
                    continue;
                }
                messageDO = ConsumerTransactionMessageDO.builder()
                        .id(order.getId())
                        .createTime(LocalDateTime.now())
                        .topic(msg.getTopic())
                        .build();
                // 業(yè)務(wù)邏輯處理
                orderService.finishOrder(order);
                // 如果業(yè)務(wù)邏輯拋出異常鳞芙,那么會(huì)跳過(guò)插入CONSUMED
                messageDO.setMessageStatus(MessageStatus.CONSUMED);
                // 如果是未被消費(fèi),第一次就消費(fèi)成功了期虾,則插入
                // 如果是超過(guò)重試次數(shù)原朝,又人工設(shè)置重試,則更新?tīng)顟B(tài)為已被消費(fèi)
                messageService.insertOrUpdate(messageDO);
            } catch (Exception e) {
                e.printStackTrace();
                // 重試次數(shù)達(dá)到最大重試次數(shù) 
                if (msg.getReconsumeTimes() == config.getRetryTimes()) {
                    log.info("客戶(hù)端重試三次,需要人工處理");
                    messageService.update(
                            ConsumerTransactionMessageDO.builder()
                                    .id(order.getId())
                                    .messageStatus(MessageStatus.OVER_CONSUME_RETRY_TIME).build()
                    );
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else {
                    log.info("消費(fèi)失敗镶苞,進(jìn)行重試喳坠,當(dāng)前重試次數(shù)為: {}", msg.getReconsumeTimes());
                    messageDO.setMessageStatus(MessageStatus.CONSUME_FAILED);
                    // 如果第一次消費(fèi)失敗,那么插入
                    // 如果之前消費(fèi)失敗茂蚓,繼續(xù)重試壕鹉,那么doNothing
                    // 如果之前是超過(guò)重試次數(shù),人工設(shè)置重試聋涨,那么將狀態(tài)改為消費(fèi)失敗
                    messageService.insertOrUpdate(messageDO);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

ConsumerTransactionMessageServiceImpl(Consumer消息服務(wù)提供者)

public class ConsumerTransactionMessageServiceImpl implements ConsumerTransactionMessageService {
    @Autowired
    private ConsumerTransactionMessageDOMapper mapper;

    @Transactional(readOnly = true)
    @Override
    public Map<Long, MessageStatus> findConsumerMessageStatuses(List<Long> ids) {
        Map<Long, MessageStatus> result = new HashMap<>();
        for (Long id : ids) {
            MessageStatus status = mapper.findStatusById(id);
            if (status != null) {
                result.put(id, status);
            }
        }
        return result;
    }

    @Transactional(readOnly = true)
    @Override
    public ConsumerTransactionMessageDO selectByPrimaryKey(Long id) {
        return mapper.selectByPrimaryKey(id);
    }

    @Transactional
    @Override
    public void insert(ConsumerTransactionMessageDO record) {
        mapper.insert(record);
    }

    @Override
    public void insertOrUpdate(ConsumerTransactionMessageDO record) {
        ConsumerTransactionMessageDO recordInDB = mapper.selectByPrimaryKey(record.getId());
        if (recordInDB == null) {
            mapper.insert(record);
        } else {
            recordInDB.setMessageStatus(record.getMessageStatus());
            mapper.updateByPrimaryKeySelective(recordInDB);
        }
    }

    @Transactional
    @Override
    public void insertIfNotExists(ConsumerTransactionMessageDO record) {
        if (mapper.selectByPrimaryKey(record.getId()) == null) {
            mapper.insert(record);
        }
    }

    @Transactional
    @Override
    public void update(ConsumerTransactionMessageDO record) {
        mapper.updateByPrimaryKeySelective(record);
    }

    @Transactional(readOnly = true)
    @Override
    public boolean isMessageConsumedSuccessfully(Long id) {
        MessageStatus status = mapper.findStatusById(id);
        return status == MessageStatus.CONSUMED;
    }
}

消息管理

尚需提供一個(gè)消息的監(jiān)控平臺(tái)晾浴,可以搜索和查看消息的狀態(tài),尤其是需要人工處理的死信牍白,可以回滾本地事務(wù)或重新發(fā)送脊凰。
界面類(lèi)似于下圖:


這里寫(xiě)圖片描述

當(dāng)前僅開(kāi)發(fā)了消息管理系統(tǒng)的數(shù)據(jù)接口,尚未開(kāi)發(fā)其客戶(hù)端茂腥。

@RestController
@RequestMapping("/message_console")
public class MessageConsoleController {
    @Autowired
    private ProducerTransactionMessageService messageService;
    @Autowired
    private AccountService accountService;

    @RequestMapping(value = "/query", method = RequestMethod.POST)
    public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(@RequestBody MessageQueryConditionDTO queryDTO) {
        if (queryDTO.getPageNum() == null || queryDTO.getPageNum() <= 0) {
            queryDTO.setPageNum(Integer.valueOf(PageProperties.DEFAULT_PAGE_NUM));
        }
        if (queryDTO.getPageSize() == null || queryDTO.getPageSize() <= 0) {
            queryDTO.setPageSize(Integer.valueOf(PageProperties.DEFAULT_PAGE_SIZE));
        }
        return messageService.findByQueryDTO(queryDTO);
    }

    @RequestMapping(value = "/reSend", method = RequestMethod.POST)
    public void reSend(@RequestBody MessageIdDTO dto) {
        List<ProducerTransactionMessageDO> messages = messageService.findByIds(dto.getIds());
        for (ProducerTransactionMessageDO messageDO : messages) {
            messageDO.setMessageStatus(MessageStatus.UNCONSUMED);
            messageDO.setSendTimes(0);
        }
        messageService.reSend(messages);
    }

    @RequestMapping(value = "/rollback", method = RequestMethod.POST)
    public void rollback(@RequestBody MessageIdDTO dto) {
        for (ProducerTransactionMessageDO message : messageService.findByIds(dto.getIds())) {
            accountService.rollback(message);
        }
    }
}

總結(jié)

自上次開(kāi)發(fā)完SpringBootSOASkeleton之后狸涌,就一直希望能完成一個(gè)數(shù)據(jù)庫(kù)按業(yè)務(wù)分庫(kù)和分布式事務(wù)的項(xiàng)目。大概花了兩周最岗,大概嘗試了TCC和可靠消息最終一致兩種方法帕胆,最終解決了分布式事務(wù)的問(wèn)題。
TCC是我首先采用的技術(shù)般渡,使用了Github開(kāi)源的ByteTCC懒豹,但花了很多時(shí)間沒(méi)有跑通,另外用起來(lái)非常復(fù)雜驯用,對(duì)業(yè)務(wù)邏輯侵入非常大脸秽,最后是放棄了,但也留下來(lái)基于ByTeTCC的完成度比較高的代碼晨汹,最后以Git的一個(gè)tag結(jié)束了它的生命周期豹储。


這里寫(xiě)圖片描述

然后我考慮使用MQ,尤其是原本對(duì)事務(wù)消息有所支持的RocketMQ來(lái)實(shí)現(xiàn)分布式事務(wù)淘这。因?yàn)橄⒒夭榈墓δ鼙婚幐畎郏秩ラ喿x了其源碼和他人考慮的解決方案去實(shí)現(xiàn)它巩剖。就目前這個(gè)解決方案而言,自我感覺(jué)是比較完善的钠怯,既不是非常復(fù)雜佳魔, 又解決了RocketMQ原來(lái)存在的很多問(wèn)題。但因?yàn)檫€是一個(gè)學(xué)生晦炊,對(duì)分布式比較缺乏經(jīng)驗(yàn)鞠鲜,如果大家能發(fā)現(xiàn)其中存在的問(wèn)題,也希望在博客下評(píng)論或Github提issue断国。

全部代碼已經(jīng)放到Github上贤姆,按照《Linux集群搭建》配置的環(huán)境下,代碼是可以跑通的稳衬,只是確認(rèn)消息發(fā)送失敗這種場(chǎng)景很難模擬出來(lái)霞捡,這也是有待觀察的。

參考資料

大規(guī)模SOA系統(tǒng)中的分布事務(wù)處事-程立
支付寶架構(gòu)與技術(shù)
RocketMQ用戶(hù)指南v3.2.4
高并發(fā)下的冪等策略分析
RocketMQ源碼解析
分布式開(kāi)放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐

https://blog.csdn.net/songxinjianqwe/article/details/78923482

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末薄疚,一起剝皮案震驚了整個(gè)濱河市碧信,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌街夭,老刑警劉巖砰碴,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異板丽,居然都是意外死亡呈枉,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)檐什,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)碴卧,“玉大人,你說(shuō)我怎么就攤上這事乃正。” “怎么了婶博?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵瓮具,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我凡人,道長(zhǎng)名党,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任挠轴,我火速辦了婚禮传睹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘岸晦。我一直安慰自己欧啤,他們只是感情好睛藻,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著邢隧,像睡著了一般店印。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上倒慧,一...
    開(kāi)封第一講書(shū)人閱讀 52,255評(píng)論 1 308
  • 那天按摘,我揣著相機(jī)與錄音,去河邊找鬼纫谅。 笑死炫贤,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的付秕。 我是一名探鬼主播照激,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼盹牧!你這毒婦竟也來(lái)了俩垃?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤汰寓,失蹤者是張志新(化名)和其女友劉穎口柳,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體有滑,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡跃闹,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了毛好。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片望艺。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖肌访,靈堂內(nèi)的尸體忽然破棺而出找默,到底是詐尸還是另有隱情,我是刑警寧澤吼驶,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布惩激,位于F島的核電站,受9級(jí)特大地震影響蟹演,放射性物質(zhì)發(fā)生泄漏风钻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一酒请、第九天 我趴在偏房一處隱蔽的房頂上張望骡技。 院中可真熱鬧,春花似錦羞反、人聲如沸布朦。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)喝滞。三九已至阁将,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間右遭,已是汗流浹背做盅。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留窘哈,地道東北人吹榴。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像滚婉,于是被迫代替她去往敵國(guó)和親图筹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容