RocketMQ—Producer(三)發(fā)送方式和消息類型

一:發(fā)送方式講解

RocketMQ版提供三種方式來發(fā)送消息:同步(Sync)發(fā)送市框、異步(Async)發(fā)送和單向(Oneway)發(fā)送霞扬。 我們會(huì)介紹每種發(fā)送方式的原理、應(yīng)用場(chǎng)景枫振、代碼差異喻圃,以及三種發(fā)送方式的對(duì)比。

1.1 同步發(fā)送

CommunicationMode#SYNC

原理:

同步發(fā)送是指發(fā)送者向MQ執(zhí)行發(fā)送消息API時(shí)粪滤,同步等待斧拍,直到消息服務(wù)器返回發(fā)送結(jié)果 。

image.png

應(yīng)用場(chǎng)景:

此種方式應(yīng)用場(chǎng)景非常廣泛杖小,例如重要通知郵件肆汹、報(bào)名短信通知、營(yíng)銷短信系統(tǒng)等予权。

同步發(fā)送接口介紹:

MQProducer#send
// 同步-發(fā)送消息
SendResult send(final Message msg) 
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

備注:

同步發(fā)送是指消息發(fā)送方發(fā)出一條消息后昂勉,會(huì)在收到服務(wù)端同步響應(yīng)之后才發(fā)下一條消息的通訊方式。1.2 異步發(fā)送

CommunicationMode#ASYNC

原理:

發(fā)送者向MQ執(zhí)行發(fā)送消息API時(shí)扫腺,指定消息發(fā)送成功后的回掉函數(shù)岗照,然后調(diào)用消息發(fā)送API后,立即返回笆环,消息發(fā)送者線程不阻塞攒至,直到運(yùn)行結(jié)束,消息發(fā)送成功或失敗的回調(diào)任務(wù)在一個(gè)新的線程中執(zhí)行 躁劣。

image.png

應(yīng)用場(chǎng)景:

異步發(fā)送一般用于鏈路耗時(shí)較長(zhǎng)迫吐,對(duì)響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場(chǎng)景,例如账忘,您視頻上傳后通知啟動(dòng)轉(zhuǎn)碼服務(wù)志膀,轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。

1.2.1 異步發(fā)送接口介紹

MQProducer#send
//異步 發(fā)送消息闪萄, sendCallback參數(shù)是消息發(fā)送成功后的回調(diào)方法 梧却。
void send(final Message msg, final SendCallback sendCallback) 
    throws MQClientException, RemotingException, InterruptedException;

1.2.2 異步相關(guān)核心屬性構(gòu)造器介紹

DefaultMQProducerImpl:異步相關(guān)核心屬性構(gòu)造器介紹

 
 //異步發(fā)送隊(duì)列,默認(rèn)長(zhǎng)度:5w
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
//默認(rèn)異步發(fā)送線程池
private final ExecutorService defaultAsyncSenderExecutor;
//可以自定義的-異步發(fā)送消息線程池
private ExecutorService asyncSenderExecutor;
//構(gòu)造器
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;
    //有界隊(duì)列,長(zhǎng)度:5w
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
}

謹(jǐn)記:

如未設(shè)置自定義線程池:

asyncSenderExecutor

將會(huì)使用默認(rèn)線程池:

defaultAsyncSenderExecutor

默認(rèn)線程池任務(wù)隊(duì)列默認(rèn):5w.如隊(duì)列任務(wù)超出5w败去,線程池拒絕策略默認(rèn)為:拒絕策略放航,可能會(huì)有丟失消息發(fā)送的風(fēng)險(xiǎn)。

擴(kuò)展:

異步發(fā)送netty網(wǎng)絡(luò)發(fā)送模塊使用了Semaphore圆裕,如遇性能調(diào)優(yōu)或問題排查广鳍,別忘了><浮!赊时!

備注:

異步發(fā)送是指發(fā)送方發(fā)出一條消息后吨铸,不等服務(wù)端返回響應(yīng),接著發(fā)送下一條消息的通訊方式祖秒。消息隊(duì)列RocketMQ版的異步發(fā)送诞吱,需要您實(shí)現(xiàn)異步發(fā)送回調(diào)的以下接口:SendCallback消息發(fā)送方在發(fā)送了一條消息后,不需要等待服務(wù)端響應(yīng)即可發(fā)送第二條消息竭缝。發(fā)送方通過回調(diào)接口接收服務(wù)端響應(yīng)房维,并處理響應(yīng)結(jié)果。

1.3 單向發(fā)送

CommunicationMode#ONEWAY

原理:

消息發(fā)送者向 MQ 執(zhí)行發(fā)送消息 API 時(shí)抬纸,直接返回咙俩,不等待消息服務(wù)器的結(jié)果, 也不注冊(cè)回調(diào)函數(shù)湿故,簡(jiǎn)單地說阿趁,就是只管發(fā),不在乎消息是否成功存儲(chǔ)在消息服務(wù)器上 坛猪。

image.png

應(yīng)用場(chǎng)景:

適用于某些耗時(shí)非常短脖阵,但對(duì)可靠性要求并不高的場(chǎng)景,例如日志收集砚哆。單向發(fā)送接口介紹:

MQProducer#sendOneway
//單向消息 發(fā)送独撇,就是不在乎發(fā)送結(jié)果,消息發(fā)送出去后該方法立 即返回 躁锁。
void sendOneway(final Message msg) 
    throws MQClientException, RemotingException, InterruptedException;

備注:

發(fā)送方只負(fù)責(zé)發(fā)送消息,不等待服務(wù)端返回響應(yīng)且沒有回調(diào)函數(shù)觸發(fā)卵史,即只發(fā)送請(qǐng)求不等待應(yīng)答战转。此方式發(fā)送消息的過程耗時(shí)非常短,一般在微秒級(jí)別以躯。

三種發(fā)送方式的對(duì)比

下表概括了三者的特點(diǎn)和主要區(qū)別:

image.png

二:消息類型講解

目前開源版本RocketMq生產(chǎn)端支持發(fā)送的消息類型為:普通消息槐秧、批量消息、延時(shí)消息忧设、事物消息(開源版本定時(shí)消息和順序消息目前不支持刁标,順序消息可變向?qū)崿F(xiàn)) 。

我們將簡(jiǎn)單介紹消息和使用這些消息類型的注意事項(xiàng)址晕;我們先分析消息相關(guān)的類圖關(guān)系:

image.png

分析:

  • Message作為消息的頂層對(duì)象膀懈,在生產(chǎn)端可表示各種消息;

  • MessageBatch表示批量消息谨垃;

  • MessageExt以及它的子類其實(shí)都是在Broker端存儲(chǔ)或查詢使用启搂,后續(xù)可仔細(xì)分析哈硼控;

Message 核心屬性和方法分析:


public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主題 topic
    private String topic;
    //消息-Flag一些特殊的消息標(biāo)記,int類型胳赌。標(biāo)記的含義定義在 MessageSysFlag 中
    private int flag;
    /**
     * 擴(kuò)展屬性
     *  TAGS: 消息TAG牢撼,用于消息過濾
     *  KEYS: Message 索引鍵, 多個(gè)用空格隔開疑苫, RocketMQ 可以根據(jù)這些 key 快速檢索到消息 熏版。
     *  WAIT: 消息發(fā)送時(shí)是否等消息存儲(chǔ)完成后再返回
     *  DELAY: 消息延遲級(jí)別,用于定時(shí)消息或消息重試 捍掺。
     */
    private Map<String, String> properties;
    //消息體
    private byte[] body;
    // 事務(wù)Id
    private String transactionId;
    ...省略...

    // 消息延遲級(jí)別纳决,用于定時(shí)消息或消息重試 。
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
}

備注:
延遲級(jí)別對(duì)應(yīng)時(shí)間的是下面的常量:

MessageConst.PROPERTY_DELAY_TIME_LEVEL


//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//DELAY: 消息延遲級(jí)別乡小,用于定時(shí)消息或消息重試 阔加。--properties屬性
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";

MessageBath 核心屬性和方法分析:

/**
 * 批量消息,實(shí)現(xiàn)了 Iterable 迭代接口
 * 批量消息發(fā)送是將 同一主題 的多條消息一起 打包發(fā)送到消息服務(wù)端满钟,減少網(wǎng)絡(luò)調(diào)用次數(shù)胜榔,提高網(wǎng)絡(luò)傳輸效率 。
 * 當(dāng)然湃番,并不是在同一批次中發(fā)送的消息數(shù)量越多性能就越好夭织,其判斷依據(jù)是單條消息的長(zhǎng)度,如果單條消息內(nèi)容比較長(zhǎng)吠撮, 則打包多條消息發(fā)送會(huì)影響其他 線程發(fā)送消息的響應(yīng)時(shí)間 尊惰,
 * 并且單批次消息發(fā)送總長(zhǎng)度不能超過 DefaultMQProducer#maxMessageSize。批量消息 發(fā)送要解決的是 如何將這些消息 編碼以便服務(wù)端能夠正確解碼出每條 消息的消息內(nèi)容 泥兰。
 */
public class MessageBatch extends Message implements Iterable<Message> {

    private static final long serialVersionUID = 621335151046335557L;
    private final List<Message> messages;

    private MessageBatch(List<Message> messages) {
        this.messages = messages;
    }
    /**
     * 消息編碼
     * @return
     */
    public byte[] encode() {
        return MessageDecoder.encodeMessages(messages);
    }
    public Iterator<Message> iterator() {
        return messages.iterator();
    }
    /**
     * 消息轉(zhuǎn)換弄屡, messages -> MessageBatch
     * 1>批量消息 不支持 延時(shí)消息
     * 2>消息主題  必須一致
     * 3>消息WAIT 必須一致
     * @param messages
     * @return
     */
    public static MessageBatch generateFromList(Collection<Message> messages) {
        assert messages != null;
        assert messages.size() > 0;
        List<Message> messageList = new ArrayList<Message>(messages.size());
        Message first = null;
        for (Message message : messages) {
            if (message.getDelayTimeLevel() > 0) { //批量消息 不支持 延時(shí)消息
                throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching");
            }
            if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { //不支持重試topic(%RETRY%)
                throw new UnsupportedOperationException("Retry Group is not supported for batching");
            }
            if (first == null) {
                first = message;
            } else {
                if (!first.getTopic().equals(message.getTopic())) {
                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
                }
                if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                    throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                }
            }
            messageList.add(message);
        }
        MessageBatch messageBatch = new MessageBatch(messageList);
        messageBatch.setTopic(first.getTopic());
        messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
        return messageBatch;
    }
}

小結(jié):

  1. 普通消息:消息隊(duì)列RocketMQ版中無特性的消息,沒有其他特性屬性鞋诗,就是普通的Message對(duì)象, 區(qū)別于有特性的定時(shí)和延時(shí)消息膀捷、順序消息和事務(wù)消息;

  2. 延時(shí)消息:生產(chǎn)者對(duì)指定消息進(jìn)行延時(shí)投遞削彬,如果客戶端發(fā)送延時(shí)消息Message中的properties屬性必須包含DELAY屬性key全庸;

  3. 批量消息:其實(shí)就是Message的集合,多了一些驗(yàn)證融痛;

  4. 事務(wù)消息:后續(xù)單獨(dú)講解壶笼。

三:結(jié)論

本文簡(jiǎn)單講解了生產(chǎn)端消息發(fā)送方式的區(qū)別,開源版本消息類型的區(qū)別雁刷,知識(shí)點(diǎn)小但很重要覆劈,建議就是源碼都懂了,那些高大上理論概念是不是很簡(jiǎn)單了?


程序員的核心競(jìng)爭(zhēng)力其實(shí)還是技術(shù)墩崩,因此對(duì)技術(shù)還是要不斷的學(xué)習(xí)氓英,關(guān)注 “IT 巔峰技術(shù)” 公眾號(hào) ,該公眾號(hào)內(nèi)容定位:中高級(jí)開發(fā)鹦筹、架構(gòu)師铝阐、中層管理人員等中高端崗位服務(wù)的,除了技術(shù)交流外還有很多架構(gòu)思想和實(shí)戰(zhàn)案例铐拐。

作者是 《 消息中間件 RocketMQ 技術(shù)內(nèi)幕》 一書作者徘键,同時(shí)也是 “RocketMQ 上海社區(qū)”聯(lián)合創(chuàng)始人,曾就職于拼多多遍蟋、德邦等公司吹害,現(xiàn)任上市快遞公司架構(gòu)負(fù)責(zé)人,主要負(fù)責(zé)開發(fā)框架的搭建虚青、中間件相關(guān)技術(shù)的二次開發(fā)和運(yùn)維管理它呀、混合云及基礎(chǔ)服務(wù)平臺(tái)的建設(shè)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末棒厘,一起剝皮案震驚了整個(gè)濱河市纵穿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌奢人,老刑警劉巖谓媒,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異何乎,居然都是意外死亡句惯,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門支救,熙熙樓的掌柜王于貴愁眉苦臉地迎上來抢野,“玉大人,你說我怎么就攤上這事搂妻∶杀#” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵欲主,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我逝嚎,道長(zhǎng)扁瓢,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任补君,我火速辦了婚禮引几,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己伟桅,他們只是感情好敞掘,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著楣铁,像睡著了一般玖雁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上盖腕,一...
    開封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天赫冬,我揣著相機(jī)與錄音,去河邊找鬼溃列。 笑死劲厌,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的听隐。 我是一名探鬼主播补鼻,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼雅任!你這毒婦竟也來了风范?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤椿访,失蹤者是張志新(化名)和其女友劉穎乌企,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體成玫,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡加酵,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了哭当。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猪腕。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖钦勘,靈堂內(nèi)的尸體忽然破棺而出陋葡,到底是詐尸還是另有隱情,我是刑警寧澤彻采,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布腐缤,位于F島的核電站,受9級(jí)特大地震影響肛响,放射性物質(zhì)發(fā)生泄漏岭粤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一特笋、第九天 我趴在偏房一處隱蔽的房頂上張望剃浇。 院中可真熱鬧,春花似錦、人聲如沸虎囚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽淘讥。三九已至圃伶,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間适揉,已是汗流浹背留攒。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留嫉嘀,地道東北人炼邀。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像剪侮,于是被迫代替她去往敵國和親拭宁。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容