rocketMq消息體
properties擴展中存了什么呢
tag: 消息tag宠漩,用于消息過濾
keys:message索引鍵恨闪,多個空格隔開凳忙,rocketMq可以根據(jù)這些key快速檢索到消息。具體怎么快速檢索划滋,在消息存儲章節(jié)細聊扑媚。
waitStoreMsgOk: 消息發(fā)送時是否等消息存儲完成后再返回
delayTimeLevel: 消息延遲級別腰湾,用于定時消息和消息重試
producer啟動過程
啟動過程就是新建一個MQClientInstance實例。整個JVM實例中只存在一個MQClientInstance實例疆股。
clientId為 本機ip + JVM線程id费坊。
MQClientInstance作用是:與nameServer交互。網(wǎng)絡(luò)請求旬痹,心跳檢測附井。
發(fā)送消息過程
1、獲取路由信息
路由信息其實就是消息隊列列表
topic路由信息會緩存在producer中两残,以一個list變量的形式存在在內(nèi)存中永毅。
如果本地沒有topic路由信息,向nameServer發(fā)送請求人弓,獲取路由信息沼死,更新本地路由表。
所以問題了崔赌,本地有了意蛀,這個路由表什么時候更新呢?
起一個定時任務(wù)峰鄙,每隔30s從nameServer獲取topic路由表,更新本地路由太雨。
producer會跟topic涉及的所有broker建立長連接吟榴,沒隔30秒發(fā)送一個心跳。
broker端也會每10秒掃描一次注冊的producer囊扳,如果2分鐘沒有心跳吩翻,則斷開連接。
2锥咸、按照負載均衡策略狭瞎,選擇路由
2.1 默認投遞方式:輪訓
- 對queue list進行排序
- 獲取一個全局自增的計數(shù)變量。獲取一次自增一次搏予。
- 用這個變量對隊列size取模
- 模到了幾熊锭,就選擇哪個隊列。因為變量是自增的,所以模的值也是根據(jù)隊列size自增的碗殷,也就是輪訓的精绎。
- 當自增值增加到int最大值后,該值重置為0
2.2 默認投遞方式增強:輪訓算法和延遲最小策略
默認的投遞方式比較簡單锌妻,但是也暴露了一個問題代乃,就是有些queue可能由于自身數(shù)量積壓等原因,可能投遞的過程比較長仿粹,就盡量不要選擇這樣的queue了搁吓。rocketMq在每次發(fā)送一個MQ消息后,都會統(tǒng)計一下消息投遞的時間延遲吭历,根據(jù)這個時間延遲堕仔,可以知道往哪些queue投遞的速度快。在這種場景下毒涧,會優(yōu)先使用消息投遞最小延遲策略贮预。
2.3 順序消息的投遞方式
如果不用默認方式,可以自己選擇MessageQueueSelector契讲。
recketMq也提供了集中選擇器實現(xiàn)仿吞,當然也可以自己實現(xiàn)。
生產(chǎn)者在消息投遞的過程中捡偏,使用了 MessageQueueSelector 作為隊列選擇的策略接口唤冈,其定義如下:
package org.apache.rocketmq.client.producer;
import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
public interface MessageQueueSelector {
/**
* 根據(jù)消息體和參數(shù),從一批消息隊列中挑選出一個合適的消息隊列
* @param mqs 待選擇的MQ隊列選擇列表
* @param msg 待發(fā)送的消息體
* @param arg 附加參數(shù)
* @return 選擇后的隊列
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
投遞策略 | 策略實現(xiàn)類 | 說明 |
---|---|---|
隨機分配策略 | SelectMessageQueueByRandom | 使用了簡單的隨機數(shù)選擇算法 |
基于Hash分配策略 | SelectMessageQueueByHash | 根據(jù)附加參數(shù)的Hash值银伟,按照消息隊列列表的大小取余數(shù)你虹,得到消息隊列的index |
基于機器機房位置分配策略 | SelectMessageQueueByMachineRoom | 開源的版本沒有具體的實現(xiàn),基本的目的應(yīng)該是機器的就近原則分配 |
hash代碼實現(xiàn)
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
3彤避、根據(jù)選擇出的路由傅物,發(fā)送消息到broker
發(fā)送消息的三種方式
可靠同步發(fā)送
發(fā)送者執(zhí)行發(fā)送消息api時,同步等待琉预,直到消息服務(wù)器返回發(fā)送結(jié)果董饰。可靠異步發(fā)送
發(fā)送者執(zhí)行發(fā)送消息api時,指定消息發(fā)送成功后的回調(diào)函數(shù)圆米,然后立即返回卒暂,線程不阻塞,消息發(fā)送成功或失敗娄帖,在回調(diào)函數(shù)(一個新的線程)中執(zhí)行也祠。單向發(fā)送
發(fā)送者執(zhí)行發(fā)送消息api時,直接返回近速,也沒有回調(diào)函數(shù)诈嘿。簡單的說堪旧,就是只管發(fā),不在乎消息是否成功存儲在消息服務(wù)器上永淌。
怎么控制是使用哪種方式崎场??
在producer調(diào)用send函數(shù)的時候遂蛀,有不同的send函數(shù)谭跨。
如何保證消息一定發(fā)送成功
rocketMq有重試機制。調(diào)用api的時候李滴,會返回成功螃宙。如果返回不成功,則進行下一次投遞所坯,往下一個queue投谆扎。直到server端返回了成功。
如果在設(shè)置的重置次數(shù)用完了芹助,還沒成功堂湖。那就是真失敗了。這個時候用戶端就會感知了状土,會拋異常給用戶端无蜂。