RocketMQ系列(三):producer

rocketMq消息體

消息體.png

properties擴展中存了什么呢

  • tag: 消息tag宠漩,用于消息過濾

  • keys:message索引鍵恨闪,多個空格隔開凳忙,rocketMq可以根據(jù)這些key快速檢索到消息。具體怎么快速檢索划滋,在消息存儲章節(jié)細聊扑媚。

  • waitStoreMsgOk: 消息發(fā)送時是否等消息存儲完成后再返回

  • delayTimeLevel: 消息延遲級別腰湾,用于定時消息和消息重試

producer啟動過程

啟動過程就是新建一個MQClientInstance實例。整個JVM實例中只存在一個MQClientInstance實例疆股。

clientId為 本機ip + JVM線程id费坊。
MQClientInstance作用是:與nameServer交互。網(wǎng)絡(luò)請求旬痹,心跳檢測附井。

發(fā)送消息過程

1、獲取路由信息

路由信息其實就是消息隊列列表
topic路由信息會緩存在producer中两残,以一個list變量的形式存在在內(nèi)存中永毅。
如果本地沒有topic路由信息,向nameServer發(fā)送請求人弓,獲取路由信息沼死,更新本地路由表。

所以問題了崔赌,本地有了意蛀,這個路由表什么時候更新呢?
起一個定時任務(wù)峰鄙,每隔30s從nameServer獲取topic路由表,更新本地路由太雨。
producer會跟topic涉及的所有broker建立長連接吟榴,沒隔30秒發(fā)送一個心跳。
broker端也會每10秒掃描一次注冊的producer囊扳,如果2分鐘沒有心跳吩翻,則斷開連接。

2锥咸、按照負載均衡策略狭瞎,選擇路由

2.1 默認投遞方式:輪訓

  • 對queue list進行排序
  • 獲取一個全局自增的計數(shù)變量。獲取一次自增一次搏予。
  • 用這個變量對隊列size取模
  • 模到了幾熊锭,就選擇哪個隊列。因為變量是自增的,所以模的值也是根據(jù)隊列size自增的碗殷,也就是輪訓的精绎。
  • 當自增值增加到int最大值后,該值重置為0

2.2 默認投遞方式增強:輪訓算法和延遲最小策略

默認的投遞方式比較簡單锌妻,但是也暴露了一個問題代乃,就是有些queue可能由于自身數(shù)量積壓等原因,可能投遞的過程比較長仿粹,就盡量不要選擇這樣的queue了搁吓。rocketMq在每次發(fā)送一個MQ消息后,都會統(tǒng)計一下消息投遞的時間延遲吭历,根據(jù)這個時間延遲堕仔,可以知道往哪些queue投遞的速度快。在這種場景下毒涧,會優(yōu)先使用消息投遞最小延遲策略贮预。

2.3 順序消息的投遞方式

如果不用默認方式,可以自己選擇MessageQueueSelector契讲。
recketMq也提供了集中選擇器實現(xiàn)仿吞,當然也可以自己實現(xiàn)。

生產(chǎn)者在消息投遞的過程中捡偏,使用了 MessageQueueSelector 作為隊列選擇的策略接口唤冈,其定義如下:

package org.apache.rocketmq.client.producer;

import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

public interface MessageQueueSelector {
        /**
         * 根據(jù)消息體和參數(shù),從一批消息隊列中挑選出一個合適的消息隊列
         * @param mqs  待選擇的MQ隊列選擇列表
         * @param msg  待發(fā)送的消息體
         * @param arg  附加參數(shù)
         * @return  選擇后的隊列
         */
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
投遞策略 策略實現(xiàn)類 說明
隨機分配策略 SelectMessageQueueByRandom 使用了簡單的隨機數(shù)選擇算法
基于Hash分配策略 SelectMessageQueueByHash 根據(jù)附加參數(shù)的Hash值银伟,按照消息隊列列表的大小取余數(shù)你虹,得到消息隊列的index
基于機器機房位置分配策略 SelectMessageQueueByMachineRoom 開源的版本沒有具體的實現(xiàn),基本的目的應(yīng)該是機器的就近原則分配

hash代碼實現(xiàn)

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

3彤避、根據(jù)選擇出的路由傅物,發(fā)送消息到broker

發(fā)送消息的三種方式

  • 可靠同步發(fā)送
    發(fā)送者執(zhí)行發(fā)送消息api時,同步等待琉预,直到消息服務(wù)器返回發(fā)送結(jié)果董饰。

  • 可靠異步發(fā)送
    發(fā)送者執(zhí)行發(fā)送消息api時,指定消息發(fā)送成功后的回調(diào)函數(shù)圆米,然后立即返回卒暂,線程不阻塞,消息發(fā)送成功或失敗娄帖,在回調(diào)函數(shù)(一個新的線程)中執(zhí)行也祠。

  • 單向發(fā)送
    發(fā)送者執(zhí)行發(fā)送消息api時,直接返回近速,也沒有回調(diào)函數(shù)诈嘿。簡單的說堪旧,就是只管發(fā),不在乎消息是否成功存儲在消息服務(wù)器上永淌。

怎么控制是使用哪種方式崎场??
在producer調(diào)用send函數(shù)的時候遂蛀,有不同的send函數(shù)谭跨。

如何保證消息一定發(fā)送成功

rocketMq有重試機制。調(diào)用api的時候李滴,會返回成功螃宙。如果返回不成功,則進行下一次投遞所坯,往下一個queue投谆扎。直到server端返回了成功。
如果在設(shè)置的重置次數(shù)用完了芹助,還沒成功堂湖。那就是真失敗了。這個時候用戶端就會感知了状土,會拋異常給用戶端无蜂。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蒙谓,隨后出現(xiàn)的幾起案子斥季,更是在濱河造成了極大的恐慌,老刑警劉巖累驮,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件酣倾,死亡現(xiàn)場離奇詭異,居然都是意外死亡谤专,警方通過查閱死者的電腦和手機躁锡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來置侍,“玉大人映之,你說我怎么就攤上這事∈澹” “怎么了惕医?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵耕漱,是天一觀的道長算色。 經(jīng)常有香客問我,道長螟够,這世上最難降的妖魔是什么灾梦? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任峡钓,我火速辦了婚禮,結(jié)果婚禮上若河,老公的妹妹穿的比我還像新娘能岩。我一直安慰自己,他們只是感情好萧福,可當我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布拉鹃。 她就那樣靜靜地躺著,像睡著了一般鲫忍。 火紅的嫁衣襯著肌膚如雪膏燕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天悟民,我揣著相機與錄音坝辫,去河邊找鬼。 笑死射亏,一個胖子當著我的面吹牛近忙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播智润,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼及舍,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了做鹰?” 一聲冷哼從身側(cè)響起击纬,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎钾麸,沒想到半個月后更振,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡饭尝,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年肯腕,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片钥平。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡实撒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出涉瘾,到底是詐尸還是另有隱情知态,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布立叛,位于F島的核電站负敏,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏秘蛇。R本人自食惡果不足惜其做,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一顶考、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧妖泄,春花似錦驹沿、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至罚渐,卻和暖如春梭域,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背搅轿。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工病涨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人璧坟。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓既穆,卻偏偏與公主長得像,于是被迫代替她去往敵國和親雀鹃。 傳聞我的和親對象是個殘疾皇子幻工,可洞房花燭夜當晚...
    茶點故事閱讀 45,691評論 2 361