一:發(fā)送方式講解
RocketMQ版提供三種方式來發(fā)送消息:同步(Sync)發(fā)送市框、異步(Async)發(fā)送和單向(Oneway)發(fā)送霞扬。 我們會(huì)介紹每種發(fā)送方式的原理、應(yīng)用場(chǎng)景枫振、代碼差異喻圃,以及三種發(fā)送方式的對(duì)比。
1.1 同步發(fā)送
CommunicationMode#SYNC
原理:
同步發(fā)送是指發(fā)送者向MQ執(zhí)行發(fā)送消息API時(shí)粪滤,同步等待斧拍,直到消息服務(wù)器返回發(fā)送結(jié)果 。
應(yīng)用場(chǎng)景:
此種方式應(yīng)用場(chǎng)景非常廣泛杖小,例如重要通知郵件肆汹、報(bào)名短信通知、營(yíng)銷短信系統(tǒng)等予权。
同步發(fā)送接口介紹:
MQProducer#send
// 同步-發(fā)送消息
SendResult send(final Message msg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
備注:
同步發(fā)送是指消息發(fā)送方發(fā)出一條消息后昂勉,會(huì)在收到服務(wù)端同步響應(yīng)之后才發(fā)下一條消息的通訊方式。1.2 異步發(fā)送
CommunicationMode#ASYNC
原理:
發(fā)送者向MQ執(zhí)行發(fā)送消息API時(shí)扫腺,指定消息發(fā)送成功后的回掉函數(shù)岗照,然后調(diào)用消息發(fā)送API后,立即返回笆环,消息發(fā)送者線程不阻塞攒至,直到運(yùn)行結(jié)束,消息發(fā)送成功或失敗的回調(diào)任務(wù)在一個(gè)新的線程中執(zhí)行 躁劣。
應(yīng)用場(chǎng)景:
異步發(fā)送一般用于鏈路耗時(shí)較長(zhǎng)迫吐,對(duì)響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場(chǎng)景,例如账忘,您視頻上傳后通知啟動(dòng)轉(zhuǎn)碼服務(wù)志膀,轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。
1.2.1 異步發(fā)送接口介紹
MQProducer#send
//異步 發(fā)送消息闪萄, sendCallback參數(shù)是消息發(fā)送成功后的回調(diào)方法 梧却。
void send(final Message msg, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;
1.2.2 異步相關(guān)核心屬性構(gòu)造器介紹
DefaultMQProducerImpl:異步相關(guān)核心屬性構(gòu)造器介紹
//異步發(fā)送隊(duì)列,默認(rèn)長(zhǎng)度:5w
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
//默認(rèn)異步發(fā)送線程池
private final ExecutorService defaultAsyncSenderExecutor;
//可以自定義的-異步發(fā)送消息線程池
private ExecutorService asyncSenderExecutor;
//構(gòu)造器
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
//有界隊(duì)列,長(zhǎng)度:5w
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
謹(jǐn)記:
如未設(shè)置自定義線程池:
asyncSenderExecutor
將會(huì)使用默認(rèn)線程池:
defaultAsyncSenderExecutor
默認(rèn)線程池任務(wù)隊(duì)列默認(rèn):5w.如隊(duì)列任務(wù)超出5w败去,線程池拒絕策略默認(rèn)為:拒絕策略放航,可能會(huì)有丟失消息發(fā)送的風(fēng)險(xiǎn)。
擴(kuò)展:
異步發(fā)送netty網(wǎng)絡(luò)發(fā)送模塊使用了Semaphore圆裕,如遇性能調(diào)優(yōu)或問題排查广鳍,別忘了><浮!赊时!
備注:
異步發(fā)送是指發(fā)送方發(fā)出一條消息后吨铸,不等服務(wù)端返回響應(yīng),接著發(fā)送下一條消息的通訊方式祖秒。消息隊(duì)列RocketMQ版的異步發(fā)送诞吱,需要您實(shí)現(xiàn)異步發(fā)送回調(diào)的以下接口:SendCallback消息發(fā)送方在發(fā)送了一條消息后,不需要等待服務(wù)端響應(yīng)即可發(fā)送第二條消息竭缝。發(fā)送方通過回調(diào)接口接收服務(wù)端響應(yīng)房维,并處理響應(yīng)結(jié)果。
1.3 單向發(fā)送
CommunicationMode#ONEWAY
原理:
消息發(fā)送者向 MQ 執(zhí)行發(fā)送消息 API 時(shí)抬纸,直接返回咙俩,不等待消息服務(wù)器的結(jié)果, 也不注冊(cè)回調(diào)函數(shù)湿故,簡(jiǎn)單地說阿趁,就是只管發(fā),不在乎消息是否成功存儲(chǔ)在消息服務(wù)器上 坛猪。
應(yīng)用場(chǎng)景:
適用于某些耗時(shí)非常短脖阵,但對(duì)可靠性要求并不高的場(chǎng)景,例如日志收集砚哆。單向發(fā)送接口介紹:
MQProducer#sendOneway
//單向消息 發(fā)送独撇,就是不在乎發(fā)送結(jié)果,消息發(fā)送出去后該方法立 即返回 躁锁。
void sendOneway(final Message msg)
throws MQClientException, RemotingException, InterruptedException;
備注:
發(fā)送方只負(fù)責(zé)發(fā)送消息,不等待服務(wù)端返回響應(yīng)且沒有回調(diào)函數(shù)觸發(fā)卵史,即只發(fā)送請(qǐng)求不等待應(yīng)答战转。此方式發(fā)送消息的過程耗時(shí)非常短,一般在微秒級(jí)別以躯。
三種發(fā)送方式的對(duì)比
下表概括了三者的特點(diǎn)和主要區(qū)別:
二:消息類型講解
目前開源版本RocketMq生產(chǎn)端支持發(fā)送的消息類型為:普通消息槐秧、批量消息、延時(shí)消息忧设、事物消息(開源版本定時(shí)消息和順序消息目前不支持刁标,順序消息可變向?qū)崿F(xiàn)) 。
我們將簡(jiǎn)單介紹消息和使用這些消息類型的注意事項(xiàng)址晕;我們先分析消息相關(guān)的類圖關(guān)系:
分析:
Message作為消息的頂層對(duì)象膀懈,在生產(chǎn)端可表示各種消息;
MessageBatch表示批量消息谨垃;
MessageExt以及它的子類其實(shí)都是在Broker端存儲(chǔ)或查詢使用启搂,后續(xù)可仔細(xì)分析哈硼控;
Message 核心屬性和方法分析:
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
//主題 topic
private String topic;
//消息-Flag一些特殊的消息標(biāo)記,int類型胳赌。標(biāo)記的含義定義在 MessageSysFlag 中
private int flag;
/**
* 擴(kuò)展屬性
* TAGS: 消息TAG牢撼,用于消息過濾
* KEYS: Message 索引鍵, 多個(gè)用空格隔開疑苫, RocketMQ 可以根據(jù)這些 key 快速檢索到消息 熏版。
* WAIT: 消息發(fā)送時(shí)是否等消息存儲(chǔ)完成后再返回
* DELAY: 消息延遲級(jí)別,用于定時(shí)消息或消息重試 捍掺。
*/
private Map<String, String> properties;
//消息體
private byte[] body;
// 事務(wù)Id
private String transactionId;
...省略...
// 消息延遲級(jí)別纳决,用于定時(shí)消息或消息重試 。
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
}
備注:
延遲級(jí)別對(duì)應(yīng)時(shí)間的是下面的常量:
MessageConst.PROPERTY_DELAY_TIME_LEVEL
//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//DELAY: 消息延遲級(jí)別乡小,用于定時(shí)消息或消息重試 阔加。--properties屬性
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
MessageBath 核心屬性和方法分析:
/**
* 批量消息,實(shí)現(xiàn)了 Iterable 迭代接口
* 批量消息發(fā)送是將 同一主題 的多條消息一起 打包發(fā)送到消息服務(wù)端满钟,減少網(wǎng)絡(luò)調(diào)用次數(shù)胜榔,提高網(wǎng)絡(luò)傳輸效率 。
* 當(dāng)然湃番,并不是在同一批次中發(fā)送的消息數(shù)量越多性能就越好夭织,其判斷依據(jù)是單條消息的長(zhǎng)度,如果單條消息內(nèi)容比較長(zhǎng)吠撮, 則打包多條消息發(fā)送會(huì)影響其他 線程發(fā)送消息的響應(yīng)時(shí)間 尊惰,
* 并且單批次消息發(fā)送總長(zhǎng)度不能超過 DefaultMQProducer#maxMessageSize。批量消息 發(fā)送要解決的是 如何將這些消息 編碼以便服務(wù)端能夠正確解碼出每條 消息的消息內(nèi)容 泥兰。
*/
public class MessageBatch extends Message implements Iterable<Message> {
private static final long serialVersionUID = 621335151046335557L;
private final List<Message> messages;
private MessageBatch(List<Message> messages) {
this.messages = messages;
}
/**
* 消息編碼
* @return
*/
public byte[] encode() {
return MessageDecoder.encodeMessages(messages);
}
public Iterator<Message> iterator() {
return messages.iterator();
}
/**
* 消息轉(zhuǎn)換弄屡, messages -> MessageBatch
* 1>批量消息 不支持 延時(shí)消息
* 2>消息主題 必須一致
* 3>消息WAIT 必須一致
* @param messages
* @return
*/
public static MessageBatch generateFromList(Collection<Message> messages) {
assert messages != null;
assert messages.size() > 0;
List<Message> messageList = new ArrayList<Message>(messages.size());
Message first = null;
for (Message message : messages) {
if (message.getDelayTimeLevel() > 0) { //批量消息 不支持 延時(shí)消息
throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching");
}
if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { //不支持重試topic(%RETRY%)
throw new UnsupportedOperationException("Retry Group is not supported for batching");
}
if (first == null) {
first = message;
} else {
if (!first.getTopic().equals(message.getTopic())) {
throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
}
if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
}
}
messageList.add(message);
}
MessageBatch messageBatch = new MessageBatch(messageList);
messageBatch.setTopic(first.getTopic());
messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
return messageBatch;
}
}
小結(jié):
普通消息:消息隊(duì)列RocketMQ版中無特性的消息,沒有其他特性屬性鞋诗,就是普通的Message對(duì)象, 區(qū)別于有特性的定時(shí)和延時(shí)消息膀捷、順序消息和事務(wù)消息;
延時(shí)消息:生產(chǎn)者對(duì)指定消息進(jìn)行延時(shí)投遞削彬,如果客戶端發(fā)送延時(shí)消息Message中的properties屬性必須包含DELAY屬性key全庸;
批量消息:其實(shí)就是Message的集合,多了一些驗(yàn)證融痛;
事務(wù)消息:后續(xù)單獨(dú)講解壶笼。
三:結(jié)論
本文簡(jiǎn)單講解了生產(chǎn)端消息發(fā)送方式的區(qū)別,開源版本消息類型的區(qū)別雁刷,知識(shí)點(diǎn)小但很重要覆劈,建議就是源碼都懂了,那些高大上理論概念是不是很簡(jiǎn)單了?
程序員的核心競(jìng)爭(zhēng)力其實(shí)還是技術(shù)墩崩,因此對(duì)技術(shù)還是要不斷的學(xué)習(xí)氓英,關(guān)注 “IT 巔峰技術(shù)” 公眾號(hào) ,該公眾號(hào)內(nèi)容定位:中高級(jí)開發(fā)鹦筹、架構(gòu)師铝阐、中層管理人員等中高端崗位服務(wù)的,除了技術(shù)交流外還有很多架構(gòu)思想和實(shí)戰(zhàn)案例铐拐。
作者是 《 消息中間件 RocketMQ 技術(shù)內(nèi)幕》 一書作者徘键,同時(shí)也是 “RocketMQ 上海社區(qū)”聯(lián)合創(chuàng)始人,曾就職于拼多多遍蟋、德邦等公司吹害,現(xiàn)任上市快遞公司架構(gòu)負(fù)責(zé)人,主要負(fù)責(zé)開發(fā)框架的搭建虚青、中間件相關(guān)技術(shù)的二次開發(fā)和運(yùn)維管理它呀、混合云及基礎(chǔ)服務(wù)平臺(tái)的建設(shè)。