1. 分布式事務(wù)消息介紹
簡單的說,就是一次大的操作由不同的小操作組成,這些小的操作分布在不同的服務(wù)器上履羞,且屬于不同的應(yīng)用茄靠,分布式事務(wù)需要保證這些小操作要么全部成功茂契,要么全部失敗。
本質(zhì)上來說慨绳,分布式事務(wù)就是為了保證不同數(shù)據(jù)庫的數(shù)據(jù)一致性掉冶。
2. RocketMQ4.X分布式事務(wù)消息架構(gòu)講解
RocketMQ事務(wù)消息:
RocketMQ提供分布式事務(wù)功能真竖,通過RocketMQ事務(wù)消息能達到分布式事務(wù)的最終一致性半消息HalfMessage:
暫不能投遞的消息(暫不能消費),Producer已經(jīng)將消息成功發(fā)送Broker端厌小,但是服務(wù)端未收到生產(chǎn)者對消息的二次確認恢共,此時該消息被標記成"暫不能投遞狀態(tài)",處于該種狀態(tài)下的消息即半消息消息回查:
由于網(wǎng)絡(luò)閃斷璧亚、生產(chǎn)者應(yīng)用重啟等原因讨韭,導(dǎo)致某條事務(wù)消息的二次確認丟失,消息隊列RocketMQ服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半消息”時癣蟋,需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback)透硝,該過程即消息回查。-
整體交互流程:
1. Producer向broker端發(fā)送消息
2. 服務(wù)端將消息持久化成功之后疯搅,向發(fā)送方ACK確認消息已經(jīng)發(fā)送成功濒生,此時消息為半消息
3. 發(fā)送方開始執(zhí)行本地事務(wù)邏輯
4. 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(Commit或者Rollback),服務(wù)端收到Commit狀態(tài)則將半消息標記為可投遞幔欧,訂閱方最終將收到該消息甜攀;服務(wù)端收到Rollback狀態(tài)則刪除半消息,訂閱方將不會接受該消息
5. 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下琐馆,上述步驟4提交的二次確認最終未到達服務(wù)端规阀,經(jīng)過固定時間后服務(wù)端將對該消息發(fā)起消息回查
6. 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果
7. 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認瘦麸,服務(wù)端仍按照4對半消息進行操作 RocketMQ事務(wù)消息的狀態(tài):
1. COMMIT_MESSAGE: 提交事務(wù)消息谁撼,消費者可以消費此消息
2. ROLLBACK_MESSAGE:回滾事務(wù)消息,消息會在broker中刪除滋饲,消費者不能消費
3. UNKNOW:Broker需要回查確認消息的狀態(tài)關(guān)于事務(wù)消息的消費:
事務(wù)消息consumer端的消費方式和普通消息是一樣的厉碟,RocketMQ能保證消息能被consumer收到(消息重試機制,最后也存在consumer消費失敗的情況屠缭,這種情況出現(xiàn)的概率極低箍鼓,而且消費端消費失敗使用之前博客中講解的失敗重試機制)
3. 代碼實現(xiàn)
3.1 Producer代碼
package com.pj.boot.jms;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
@Component
public class TransacationProducer {
private String producerGroup = "trac_producer_group";
// 事務(wù)監(jiān)聽器,執(zhí)行本地事務(wù)
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = null;
// 創(chuàng)建自定義線程池
/**
* @param corePoolSize 池中所保存的核心線程數(shù)
* @param maximumPoolSize 池中允許的最大線程池
* @param keepActiveTime 非核心線程空閑等待新任務(wù)的最長時間
* @param timeunit keepActiveTime參數(shù)的時間單位
* @param blockingqueue 隊列任務(wù)
*/
private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory()
{
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
public TransacationProducer(){
producer = new TransactionMQProducer(producerGroup);
//指定NameServer地址呵曹,多個地址以 ; 隔開
//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
start();
}
public TransactionMQProducer getProducer(){
return this.producer;
}
/**
* 對象在使用之前必須要調(diào)用一次款咖,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在應(yīng)用上下文,使用上下文監(jiān)聽器奄喂,進行關(guān)閉
*/
public void shutdown(){
this.producer.shutdown();
}
}
class TransactionListenerImpl implements TransactionListener {
/**
* 半消息發(fā)送成功觸發(fā)此方法來執(zhí)行本地事務(wù)
* @param message 消息
* @param o 發(fā)送消息時傳遞的參數(shù)
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("====executeLocalTransaction=======");
String body = new String(message.getBody());
String key = message.getKeys();
String transactionId = message.getTransactionId();
System.out.println("transactionId="+transactionId+", key="+key+", body="+body);
// 執(zhí)行本地事務(wù)begin TODO
// 執(zhí)行本地事務(wù)end TODO
int status = Integer.parseInt(o.toString());
//二次確認消息铐殃,然后消費者可以消費
if(status == 1){
return LocalTransactionState.COMMIT_MESSAGE;
}
//回滾消息,broker端會刪除半消息
if(status == 2){
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//broker端會進行回查消息跨新,再或者什么都不響應(yīng)
if(status == 3){
return LocalTransactionState.UNKNOW;
}
return null;
}
/**
* 當(dāng)沒有響應(yīng)時準備(半)消息富腊。broker將發(fā)送檢查消息來檢查事務(wù)狀態(tài),并將調(diào)用此方法來獲取本地事務(wù)狀態(tài)域帐。broker回查本地事務(wù)
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("====checkLocalTransaction=======");
String body = new String(messageExt.getBody());
String key = messageExt.getKeys();
String transactionId = messageExt.getTransactionId();
System.out.println("transactionId="+transactionId+", key="+key+", body="+body);
//要么commit 要么rollback
//可以根據(jù)key去檢查本地事務(wù)消息是否完成
return LocalTransactionState.COMMIT_MESSAGE;
}
}
3.2 Consumer代碼
package com.pj.boot.jms;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup = "pay_consumer_group";
public PayConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(JmsConfig.TOPIC, "*");
consumer.registerMessageListener( new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
Message msg = msgs.get(0);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
String topic = msg.getTopic();
String body = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
// 告訴broker消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
3.3 發(fā)送消息
// 生產(chǎn)時建議再加一個key值
Message message = new Message(JmsConfig.TOPIC,tag, ("hello xdclass rocketmq = "+tag).getBytes() );
/**
* 發(fā)送半消息
* 第一個參數(shù):消息
* 第二個參數(shù):param赘被,消息回查時會使用到
*/
SendResult sendResult = transacationProducer.getProducer().sendMessageInTransaction(message, otherParam);
3.4 注意
TransactionMQProducer的groupName要唯一是整,不能和普通的producer一樣