項目使用技術(shù)
springboot、dubbo妹窖、zookeeper眉孩、定時任務(wù)、消息中間件MQ
一技健、項目結(jié)構(gòu)
maven父子工程:
父工程:consis
子工程:api-service写穴、order、product雌贱、message
api-service:該項目主要是提供接口調(diào)用的啊送,還包含實體類、枚舉等一些通用內(nèi)容
order:該項目是專門處理訂單相關(guān)操作的系統(tǒng)
product:該項目是專門處理產(chǎn)品相關(guān)操作的系統(tǒng)
message:該項目是提供消息服務(wù)的系統(tǒng)欣孤,好包括定時任務(wù)
它們的依賴關(guān)系如下圖:
根據(jù)上一篇的原理分別介紹下各個系統(tǒng)的實現(xiàn)
二馋没、order訂單系統(tǒng)
核心代碼:
@Override
@Transactional
public void add(Orders order) {
String messageBody = JSONObject.toJSONString( order );
//添加消息到數(shù)據(jù)庫
String messageId = transactionMessageService.savePreparMessage(order.getMessageId(), messageBody, Constant.ORDER_QUEUE_NAME );
log.info(">>> 預(yù)發(fā)送消息,消息編號:{}", messageId);
boolean flag = false;
boolean success = false;
try{
Orders orders = orderDao.saveAndFlush( order );
//int i = 1/0 ;
log.info(">>> 插入訂單,訂單編號:{}", orders.getId());
flag = true;
}catch (Exception e){
transactionMessageService.delete( messageId );
log.info(">>> 業(yè)務(wù)執(zhí)行異常刪除消息,消息編號:{}", messageId, e);
throw new RuntimeException( ">>> 創(chuàng)建訂單失敗" );
}finally {
if(flag){
try {
transactionMessageService.confirmAndSend( messageId );
success = true;
log.info(">>> 確認并且發(fā)送消息到實時消息中間件,消息編號:{}", messageId);
}catch (Exception e){
log.error(">>> 消息確認異常,消息編號:{}", messageId, e);
if(!success){
transactionMessageService.delete( messageId );
throw new RuntimeException( ">>> 確認消息異常,創(chuàng)建訂單失敗" );
}
}
}
}
}
- 插入訂單表之前降传,首先創(chuàng)建預(yù)發(fā)送消息篷朵,保存到事務(wù)消息表中,此時消息狀態(tài)為:未發(fā)送
- 插入訂單婆排,如果插入訂單失敗則將事務(wù)消息表中預(yù)發(fā)送消息刪除
- 插入訂單成功后声旺,修改消息表預(yù)發(fā)送消息狀態(tài)為發(fā)送中,并發(fā)送消息至mq
- 如果發(fā)送消息失敗段只,則訂單回滾并刪除事務(wù)消息表消息
三腮猖、message消息系統(tǒng)
核心代碼一:
@Override
public void sendMessageToMessageQueue(String queueName,final String messageBody) {
jmsTemplate.convertAndSend( queueName,messageBody );
log.info(">>> 發(fā)送消息到mq 隊列:{},消息內(nèi)容:{}", queueName, messageBody);
}
- 主要是activemq生產(chǎn)者講消息發(fā)送至MQ消息中間件
核心代碼二:
/**
* 定時重發(fā)消息(每分鐘)
*/
@Scheduled(cron = "0 */1 * * * ?")
public void handler(){
//查詢transaction_message表中已發(fā)送但未被刪除的消息
List<TransactionMessage> list = transactionMessageService.queryRetryList( Constant.MESSAGE_UNDEAD, maxTimeOut, Constant.MESSAGE_SENDING );
if(list!=null && list.size() > 0){
for (TransactionMessage message:list){
try {
transactionMessageService.retry( message.getMessageId() );
} catch (Exception e) {
log.warn(">>> 消息不存在,可能已經(jīng)被消費,消息編號:{}", message.getMessageId());
}
}
}
}
/**
* 定時通知工作人員(每隔5分鐘)
*/
@Scheduled(cron = "0 */5 * * * ?")
public void advance(){
List<Long> messages = transactionMessageService.queryDeadList();
log.warn(">>> 共有:{}條消息需要人工處理", messages.size());
String ids = JSONObject.toJSONString( messages );
//發(fā)郵件或者是發(fā)送短信通知工作人員處理
}
- 定時重發(fā)消息
- 定時將死亡的消息通知給工作人員,進行人工補償操作
四赞枕、product產(chǎn)品系統(tǒng)
核心代碼:
@Transactional
@JmsListener( destination = Constant.ORDER_QUEUE_NAME)
public void receiveQueue(String msg){
boolean flag = false;
Orders orders = JSONObject.parseObject( msg, Orders.class );
log.info(">>> 接收到mq消息隊列澈缺,消息編號:{} ,消息內(nèi)容:{}", orders.getMessageId(), msg);
TransactionMessage transactionMessage = transactionMessageService.findByMessageId( orders.getMessageId() );
try {
//保證冪等性
if(transactionMessage!=null){
List<OrderDetail> list = orders.getList();
for(OrderDetail detail : list){
Product product = productService.findById( detail.getId() );
Long skuNum = product.getProductSku() - detail.getNum();
if(skuNum >= 0){
product.setProductSku( skuNum );
productService.update( product );
}else {
throw new Exception( ">>> 庫存不足,修改庫存失斊捍础!" );
}
}
//int i = 1 /0 ;
flag = true;
}
}catch (Exception e){
e.printStackTrace();
throw new RuntimeException( e );
}finally {
if(flag){
transactionMessageService.delete( orders.getMessageId() );
DbLog dbLog = dbLogService.findByMesageId( orders.getMessageId() );
if(dbLog!=null){
dbLog.setState( "1" );//已處理成功
dbLogService.update( dbLog );
}
log.info(">>> 業(yè)務(wù)執(zhí)行成功刪除消息! messageId:{}", orders.getMessageId());
}
}
}
- 從mq消息中間件中監(jiān)聽并消費消息姐赡,將json消息轉(zhuǎn)為訂單對象
- 根據(jù)消息編號查詢該消息是否已被消費莱预,保證冪等性
- 如果消息未被消費(即存在此消息),則產(chǎn)品表扣減庫存雏吭;如果已經(jīng)消費(不存在此消息)锁施,則不做處理
- 產(chǎn)品表扣減庫存成功,則刪除此消息杖们,如果待處理消息日志表中有此消息悉抵,則更改狀態(tài)為1,表示已處理摘完;扣減失敗姥饰,則不做處理
該項目源碼已上傳至github和碼云,鏈接如下孝治,希望喜歡的朋友都能給個star支持一下列粪!謝謝~
github鏈接:
https://github.com/wanglinyong/consis
碼云鏈接:
https://gitee.com/wanglinyong/consis