RocketMQ——消息發(fā)送

概述

RocketMQ消息發(fā)送方式:同步(sync)、異步(async)枣购、單向(oneway)

消息Message簡介

org.apache.rocketmq.common.message.Message

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;//消息主題
    private int flag;//消息flag颓芭,RocketMQ不做處理
    private Map<String, String> properties;//擴展屬性
    private byte[] body;//消息體
    private String transactionId;//事務消息事務id

其中擴展屬性包括以下四個:

  • tag:消息tag,用于消息過濾
  • keys:Message索引鍵,多個用空格隔開基跑,RocketMQ根據(jù)這些key快速檢索消息
  • waitStoreMsgOK:消息發(fā)送時瞧挤,是否等待消息存儲完成后再返回锡宋。
  • delayTimeLevel:消息延遲級別,用于延遲消息或消息重試

生產(chǎn)者啟動流程

org.apache.rocketmq.client.producer.DefaultMQProducer

核心屬性如下

屬性名 含義
producerGroup 生產(chǎn)組
createTopicKey 默認topic
defaultTopicQueueNums 默認主題在每一個broker隊列的數(shù)量
sendMsgTimeout 消息發(fā)送超時時間 3秒
compressMsgBodyOverHowmuch 消息體超過該值則壓縮
retryTimesWhenSendFailed 同步消息重試次數(shù)特恬,2次执俩。包含發(fā)送一次,一個消息最終為三次
retryTimesWhenSendAsyncFailed 異步發(fā)送消息重試次數(shù)癌刽,默認2次
retryAnotherBrokerWhenNotStoreOK 消息重試選擇另一個broker時役首,是否不等待存儲結果就返回尝丐,默認為false
maxMessageSize 4M

消息生產(chǎn)者啟動流程

org.apache.rocketmq.client.producer.DefaultMQProducer#start
啟動流程如下圖:


1.jpg

消息發(fā)送流程

2.jpg

核心流程我們可以分為以下步驟查看:

DefaultMQProducerImpl#sendDefaultImpl

這個方法作為消息發(fā)送的核心方法,主要處理以下:

  1. tryToFindTopicPublishInfo 尋找消息的路由信息
  2. selectOneMessageQueue 選擇消息隊列需要發(fā)送的隊列
  3. sendKernelImpl 發(fā)送消息
  4. updateFaultItem 更新發(fā)送失敗的broker信息
  5. 發(fā)送消息出現(xiàn)異常時衡奥,處理各個異常信息
  6. 返回最終的結果

this.tryToFindTopicPublishInfo(msg.getTopic())

消息發(fā)送的核心方法爹袁,方法查找對應的消息路由信息,為消息發(fā)送做準備失息,該方法返回TopicPublishInfo。
TopicPublishInfo 屬性如下:

屬性
private boolean orderTopic = false; 是否為順序消息
private List<MessageQueue> messageQueueList 主題隊列的消息隊列
private volatile ThreadLocalIndex sendWhichQueue 沒選擇一次消息隊列乏屯,該值會增加1
private TopicRouteData topicRouteData topicRouteData

TopicRouteData 屬性如下

屬性
private String orderTopicConf 順序消息配置
private List<QueueData> queueDatas topic隊列元數(shù)據(jù)根时,包含brokerName,readQueueNums辰晕,writeQueueNums蛤迎,perm,topicSynFlag
List<BrokerData> brokerDatas topic分布的broker元數(shù)據(jù)含友,包含 cluster替裆,brokerName,brokerAddrs
HashMap<String/* brokerAddr /, List<String>/ Filter Server */> filterServerTable broker上的過濾服務器地址列表

DefaultMQProducerImpl#tryToFindTopicPublishInfo

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //本地緩存獲取  TopicPublishInfo
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //如果沒有緩存窘问,或者緩存失效則從NameServer獲取
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

最終調用 MQClientInstance#updateTopicRouteInfoFromNameServer

  1. 從NameServer獲取TopicRouteData 數(shù)據(jù)
    簡化后代碼
 TopicRouteData topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
  1. 判斷TopicRouteData是否改變過辆童,改變過則更新
  2. 需要更新則更新對應的brokerAddrTable、topicRouteTable

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

獲取對應要發(fā)送的消息隊列惠赫。一個MessageQueue 包含topic把鉴、brokerName、queueId儿咱。

  1. 默認sendLatencyFaultEnable 為false庭砍,則直接調用tpInfo.selectOneMessageQueue(lastBrokerName)
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

獲取增量index,取余 然后返回對應的MessageQueue混埠。
如果上次已經(jīng)發(fā)送消息怠缸,存在lastBrokerName,則需要獲取非lastBrokerName的broker钳宪。

  1. 如果故障延遲機制啟用揭北,即sendLatencyFaultEnable 為true。
    -1. 如果啟用故障延遲吏颖,則通過latencyFaultTolerance.isAvailable判斷對應的broker是否可用
    -2. 如果可用則會維護latencyFaultTolerance搔体。

DefaultMQProducerImpl#sendKernelImpl

 /**
     * 
     * @param msg 待發(fā)送消息
     * @param mq 消息將發(fā)送到該隊列
     * @param communicationMode 消息發(fā)送模式 sync/async/oneway
     * @param sendCallback 消息回調函數(shù)
     * @param topicPublishInfo 主題路由信息
     * @param timeout 消息超時時間
     */
    private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout)
  1. 獲取broker地址
  2. 消息設置全局id,消息體超過4k侦高,則壓縮并設置sysFlag 為MessageSysFlag.COMPRESSED_FLAG嫉柴。事務消息則設置sysFlag為MessageSysFlag.TRANSACTION_PREPARED_TYPE。
  3. 注冊消息hook函數(shù)
  4. 構建消息SendMessageRequestHeader
  5. MQClientAPIImpl#sendMessage
  6. NettyRemotingClient#invokeSync
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末奉呛,一起剝皮案震驚了整個濱河市计螺,隨后出現(xiàn)的幾起案子夯尽,更是在濱河造成了極大的恐慌,老刑警劉巖登馒,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件匙握,死亡現(xiàn)場離奇詭異,居然都是意外死亡陈轿,警方通過查閱死者的電腦和手機圈纺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來麦射,“玉大人蛾娶,你說我怎么就攤上這事∏鼻铮” “怎么了蛔琅?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長峻呛。 經(jīng)常有香客問我罗售,道長,這世上最難降的妖魔是什么钩述? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任乒融,我火速辦了婚禮威蕉,結果婚禮上先蒋,老公的妹妹穿的比我還像新娘揩尸。我一直安慰自己,他們只是感情好方面,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布话肖。 她就那樣靜靜地躺著,像睡著了一般葡幸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上贺氓,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天蔚叨,我揣著相機與錄音,去河邊找鬼辙培。 笑死蔑水,一個胖子當著我的面吹牛,可吹牛的內容都是我干的扬蕊。 我是一名探鬼主播搀别,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼尾抑!你這毒婦竟也來了歇父?” 一聲冷哼從身側響起蒂培,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎榜苫,沒想到半個月后护戳,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡垂睬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年媳荒,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片驹饺。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡钳枕,死狀恐怖,靈堂內的尸體忽然破棺而出赏壹,到底是詐尸還是另有隱情鱼炒,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布卡儒,位于F島的核電站田柔,受9級特大地震影響,放射性物質發(fā)生泄漏骨望。R本人自食惡果不足惜硬爆,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望擎鸠。 院中可真熱鬧缀磕,春花似錦、人聲如沸劣光。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽绢涡。三九已至牲剃,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間雄可,已是汗流浹背凿傅。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留数苫,地道東北人聪舒。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像虐急,于是被迫代替她去往敵國和親箱残。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內容