消息發(fā)送方式
同步發(fā)送消息
同步發(fā)送消息是指赔嚎,Producer發(fā)送一條消息后旭贬,會(huì)在收到MQ返回的ack后才發(fā)送下一條消息凯傲,該方式的消息可靠性最高,但是消息發(fā)送效率太低
public class SyncProducer {
public static void main(String[] args) throws Exception{
//創(chuàng)建一個(gè)producer, 參數(shù)為producer group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定namesever地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 設(shè)置當(dāng)發(fā)送失敗時(shí)重試發(fā)送的次數(shù)磕诊,默認(rèn)兩次
producer.setRetryTimesWhenSendFailed(3);
// 設(shè)置發(fā)送超時(shí)時(shí)間
producer.setSendMsgTimeout(5000);
// 開(kāi)啟生產(chǎn)者
producer.start();
// 發(fā)送消息
for(int i =0;i<10;i++){
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("topic","tag",body);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
// 關(guān)閉生產(chǎn)者
producer.shutdown();
}
}
異步發(fā)送消息
異步發(fā)送消息是指削彬,Producer發(fā)出消息后無(wú)需等待MQ返回ack,直接發(fā)送下一條消息,該方式的消息可靠性可以得到保障秀仲,消息發(fā)送效率也可以
public class AsyncProducer {
public static void main(String[] args) throws Exception{
//創(chuàng)建一個(gè)producer, 參數(shù)為producer group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定namesever地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 設(shè)置發(fā)送超時(shí)時(shí)間
producer.setSendMsgTimeout(5000);
// 開(kāi)啟生產(chǎn)者
producer.start();
for(int i=0;i<10;i++){
byte[] body = ("Hi," + i).getBytes();
try{
Message msg = new Message("async-topic","async-tag",body);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}catch (Exception e){
e.printStackTrace();
}
}
// 因?yàn)槭钱惒降娜谕矗孕枰骶€程休眠一會(huì)等待異步任務(wù)
TimeUnit.SECONDS.sleep(3);
producer.shutdown();
}
}
單向發(fā)送消息
單向發(fā)送消息是指,Producer僅發(fā)負(fù)責(zé)發(fā)送消息神僵,不等待雁刷,不處理MQ的ack,該發(fā)送方式MQ也不返回ack,該方式消息發(fā)送效率最高,但是消息可靠性差保礼。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//創(chuàng)建一個(gè)producer, 參數(shù)為producer group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定namesever地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 設(shè)置發(fā)送超時(shí)時(shí)間
producer.setSendMsgTimeout(5000);
// 開(kāi)啟生產(chǎn)者
producer.start();
// 發(fā)送消息
for(int i =0;i<10;i++){
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("oneway-topic","oneway-tag",body);
// 方法沒(méi)有返回值
producer.sendOneway(msg);
}
// 關(guān)閉生產(chǎn)者
producer.shutdown();
}
}
消息消費(fèi)
public class SomeConsumer {
public static void main(String[] args) throws Exception{
// 定義一個(gè)push消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("someTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 一但broker中有了其訂閱的消息就會(huì)觸發(fā)該方法的執(zhí)行
// 方法返回值為當(dāng)前consumer消費(fèi)的狀態(tài)
// 這里雖然為一個(gè)列表沛励,但是每次默認(rèn)只能消費(fèi)一條消息,通過(guò) consumer.getConsumeMessageBatchMaxSize();可以得到默認(rèn)值炮障,也可以改成批量消費(fèi)
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 逐條消費(fèi)消息
for(MessageExt msg:list){
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
Consumer的pullBatchSize屬性與consumeMessageBatchMaxSize屬性是否設(shè)置越大越好目派,當(dāng)然不是
- pullBatchSize值設(shè)置的越大,Consumer每拉取一次需要的時(shí)間就會(huì)越長(zhǎng)胁赢,且在網(wǎng)絡(luò)上傳輸問(wèn)題的可能性就越高企蹭,若在拉取過(guò)程中出現(xiàn)問(wèn)題,那么本批次所有的消息都需要全部重新拉取智末。
- consumerMessageBatchMaxSize值設(shè)置的越大谅摄,Consumer的消息并發(fā)消費(fèi)能力越低,且這批被消費(fèi)的消息具有相同的消費(fèi)結(jié)果系馆,因?yàn)閏onsumerMessageBatchSize指定的一批消息只會(huì)使用一個(gè)線程進(jìn)行處理送漠,且在處理過(guò)程中只要有一個(gè)消息處理異常,則這批消息需要全部重新再次消費(fèi)處理由蘑。
有序性分類
根據(jù)有序范圍的不同闽寡,Rocketmq可以嚴(yán)格的保證消息的有序性:分區(qū)有序性與全局有序性。
-
當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè)時(shí)所保證的有序性是整個(gè)Topic中的消息順序尼酿,稱為全局有序爷狈。
image.png -
如果有多個(gè)queue參與,其僅可保證在該queue分區(qū)隊(duì)列上的消息順序谓媒,則稱為分區(qū)有序淆院。
image.png
public class SyncProducer {
public static void main(String[] args) throws Exception{
//創(chuàng)建一個(gè)producer, 參數(shù)為producer group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定namesever地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 開(kāi)啟生產(chǎn)者
producer.start();
// 發(fā)送消息
for(int i =0;i<10;i++){
Integer orderId = i;
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("topic","tag",body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
int index = id % list.size();
return list.get(index);
}
},orderId);
System.out.println(sendResult);
}
// 關(guān)閉生產(chǎn)者
producer.shutdown();
}
}
延遲消息
當(dāng)消息寫(xiě)入到Broker后,在指定的時(shí)長(zhǎng)后才可以被消費(fèi)處理,稱為延遲消息
采用rocketmq的延遲消息可以實(shí)現(xiàn)定時(shí)任務(wù)的功能土辩,而不用使用定時(shí)器支救,典型的應(yīng)用場(chǎng)景是,電商交易中超時(shí)未支付關(guān)閉訂單的場(chǎng)景
事務(wù)消息
代碼舉例
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 回調(diào)方法
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
/**
* 消息回查方法:
* 1拷淘、回調(diào)操作返回UNKNOW
* 2各墨、TC沒(méi)有收到TM的最終全局事務(wù)確認(rèn)指令
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
事務(wù)消息場(chǎng)景舉例
工行用戶A向建行用戶B轉(zhuǎn)賬1萬(wàn)元
問(wèn)題點(diǎn):這里 1,2,3 沒(méi)有實(shí)現(xiàn)原子性,那么A賬號(hào)沒(méi)有扣款成功启涯,但是消息已經(jīng)發(fā)送成功了贬堵,這時(shí)候就會(huì)導(dǎo)致B的賬號(hào)增加了1萬(wàn)元,就會(huì)出現(xiàn)問(wèn)題结洼,這時(shí)候就需要事務(wù)消息來(lái)解決這個(gè)問(wèn)題黎做。
該分布式事務(wù)的解決方案是依賴于XA模式的,上圖中的第三步與TC向Broker發(fā)送預(yù)提交消息松忍,這里的預(yù)提交消息(半事務(wù)消息)就是消費(fèi)者還不能消費(fèi)的消息蒸殿。當(dāng)執(zhí)行到圖中第9步驟的時(shí)候,才會(huì)真正的寫(xiě)入消息到Broker中鸣峭,簡(jiǎn)單理解TC就是管理各個(gè)分支事務(wù)的狀態(tài)宏所,這里可以看到工行系統(tǒng)熄浓,Broker系統(tǒng)是兩個(gè)分支事務(wù)离福。TM是事務(wù)管理者,一般由Producer擔(dān)任贱纠。