RocketMQ消息發(fā)送方式
RocketMQ支持3種消息發(fā)送方式: 同步(sync)庐舟、異步(async)滞造、單向(oneway)河胎。
同步: 發(fā)送者向MQ執(zhí)行發(fā)送消息API時,同步等待,直到消息服務器返回發(fā)送結果。
異步: 發(fā)送者向MQ執(zhí)行發(fā)送消息API時,指定消息發(fā)送成功后的回調函數(shù),然后調用消息發(fā)送API后,立即返回,消息發(fā)送者線程不阻塞,直到運行結束,消息發(fā)送成功或失敗的回調任務在一個新的線程中執(zhí)行幔嗦。
單向: 消息發(fā)送者向MQ執(zhí)行發(fā)送消息API時,直接返回,不等待消息服務器的結果,也不注冊回調函數(shù),簡單地說,就是只管發(fā),不在乎消息是否成功存儲在消息服務器上酿愧。
RocketMQ消息
RocketMQ消息封裝類是org.apache.rocketmq.common.message.Message。
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
public Message(String topic, byte[] body)
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)
public Message(String topic, String tags, byte[] body)
public Message(String topic, String tags, String keys, byte[] body)
public void setKeys(String keys)
public void putUserProperty(final String name, final String value)
public String getUserProperty(final String name)
public int getDelayTimeLevel()
public void setDelayTimeLevel(int level)
Message的基礎屬性主要包括消息所屬主題topic邀泉、消息Flag(RocketMQ不做處理)嬉挡、擴展屬性、消息體汇恤。
RocketMQ定義的MessageFlag如下:
MessageSysFlag
public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
public static int getTransactionValue(final int flag)
public static int resetTransactionValue(final int flag, final int type)
public static int clearCompressedFlag(final int flag)
Message全屬性構造函數(shù):
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
this.topic = topic;
this.flag = flag;
this.body = body;
if (tags != null && tags.length() > 0)
this.setTags(tags);
if (keys != null && keys.length() > 0)
this.setKeys(keys);
this.setWaitStoreMsgOK(waitStoreMsgOK);
}
public void setKeys(String keys) {
this.putProperty(MessageConst.PROPERTY_KEYS, keys);
}
public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {
this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
}
Message擴展屬性主要包含下面幾個庞钢。
tag: 消息TAG,用于消息過濾。
keys: Message索引鍵,多個用空格隔開,RocketMQ可以根據(jù)這些key快速檢索到消息因谎。
waitStoreMsgOK: 消息發(fā)送時是否等消息存儲完成后再返回基括。
delayTimeLevel: 消息延遲級別,用于定時消息或消息重試。
這些擴展信息屬性存儲在Message的properties蓝角。
生產者啟動流程
消息生產者的代碼都在client模塊中,相對于RocketMQ來說,它就是客戶端,也是消息的提供者,我們在應用系統(tǒng)中初始化生產者的一個實例既可使用它來發(fā)消息阱穗。
DefaultMQProducer消息發(fā)送者
DefaultMQProducer是默認的消息生產者實現(xiàn)類,它實現(xiàn)MQAdmin的接口。
下面介紹DefaultMQProducer的主要方法使鹅。
1揪阶、void createTopic(String key,String newTopic,int queueNum,int topicSysFlag)創(chuàng)建主題。
key: 目前未實際作用,可以與newTopic相同患朱。
newTopic: 主題名稱鲁僚。
queueNum: 隊列數(shù)量。
topicSysFlag: 主題系統(tǒng)標簽,默認為0。
2冰沙、public long searchOffset(MessageQueue mq, long timestamp)
根據(jù)時間戳從隊列中查找其偏移量侨艾。
3、public long maxOffset(MessageQueue mq)
查找該消息隊列中最大的物理偏移量拓挥。
4唠梨、public long minOffset(MessageQueue mq)
查找該消息隊列中最小的物理偏移量。
5侥啤、public MessageExt viewMessage(String offsetMsgId)
根據(jù)消息偏移量查找消息当叭。
6、public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
根據(jù)條件查詢消息盖灸。
topic: 消息主題蚁鳖。
key: 消息索引字段。
maxNum: 本次最多取出消息條數(shù)赁炎。
begin: 開始時間醉箕。
end: 結束時間。
7徙垫、public MessageExt viewMessage(String topic,String msgId)
根據(jù)主題與消息ID查找消息讥裤。
8、public List<MessageQueue> fetchPublishMessageQueues(String topic)
查找主題下所有的消息隊列姻报。
9坞琴、public SendResult send(Message msg)
同步發(fā)送消息,具體發(fā)送到主題中的哪個消息對聯(lián)由負載均衡算法決定。
10逗抑、public SendResult send(Message msg,long timeout)
同步發(fā)送消息,如果發(fā)送超過timeout則拋出超時異常。
11寒亥、public void send(Message msg,SendCallback sendCallback)
異步發(fā)送消息,sendCallback參數(shù)是消息發(fā)送成功后的回調方法邮府。
12、public void send(Message msg, SendCallback sendCallback, long timeout)
異步發(fā)送消息,如果發(fā)送超過timeout指定的值,則拋出超時異常溉奕。
13褂傀、public void sendOneway(Message msg)
單向消息發(fā)送,就是不在乎發(fā)送結果,消息發(fā)送出去后該方法立即返回。
14加勤、public SendResult send(Message msg, MessageQueue mq)
同步方式發(fā)送消息,發(fā)送到指定消息隊列仙辟。
15、public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
異步方式發(fā)送消息,發(fā)送到指定消息隊列鳄梅。
16叠国、public void sendOneway(Message msg,MessageQueue mq)
單向方式發(fā)送消息,發(fā)送到指定的消息隊列。
17戴尸、public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
消息發(fā)送,指定消息選擇算法,覆蓋消息生產者默認的消息隊列負載粟焊。
18、public SendResult send(Collection<Message> msgs, MessageQueue messageQueue,long timeout)
同步批量發(fā)送。
DefaultMQProducer核心屬性
private String producerGroup;
private String createTopicKey = MixAll.DEFAULT_TOPIC;
private volatile int defaultTopicQueueNums = 4;
private int sendMsgTimeout = 3000;
private int compressMsgBodyOverHowmuch = 1024 * 4;
private int retryTimesWhenSendFailed = 2;
private int retryTimesWhenSendAsyncFailed = 2;
private boolean retryAnotherBrokerWhenNotStoreOK = false;
private int maxMessageSize = 1024 * 1024 * 4; // 4M
producerGroup:
生產者所屬組,消息服務器在回查事務狀態(tài)時會隨機選擇該組中任何一個生產者發(fā)起事務回查請求项棠。
createTopicKey:
默認topicKey悲雳。
defaultTopicQueueNums:
默認主題在每一個Broker隊列數(shù)量。
sendMsgTimeout:
發(fā)送消息默認超時時間,默認3s香追。
compressMsgBodyOverHowmuch:
消息體超過該值則啟用壓縮,默認4K合瓢。
retryTimesWhenSendFailed:
同步方式發(fā)送消息重試次數(shù),默認為2,總共執(zhí)行3次。
retryTimesWhenSendAsyncFailed:
異步方式發(fā)送消息重試次數(shù),默認為2透典。
retryAnotherBrokerWhenNotStoreOK:
消息重試時選擇另外一個Broker時,是否不等儲存結果就返回,默認為false晴楔。
maxMessageSize:
允許發(fā)送的最大消息長度,默認為4M,該值最大值為2^32-1。
消息生產者啟動流程
我們可以從DefaultMQProducerImpl的start方法來追蹤,具體細節(jié)如下掷匠。
Step1:
檢查producerGroup是否符合要求;并改變生產者的instanceName為進程ID滥崩。
DefaultMQProducerImpl#start
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
Step2:
創(chuàng)建MQClientInstance實例。整個JVM實例中只存在一個MQClientManager實例,維護一個MQClientInstance緩存表private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =new ConcurrentHashMap<String, MQClientInstance>(),也就是同一個clientId只會創(chuàng)建一個MQClientInstance讹语。
DefaultMQProducerImpl#start
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
MQClientManager#getAndCreateMQClientInstance
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
org.apache.rocketmq.client.ClientConfig#buildMQClientId
// 創(chuàng)建clientId的方法
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
clientId為客戶端IP+instance+(unitName可選),如果在同一臺服務器部署兩個應用程序,應用程序豈不是clientId相同,會造成混亂?
為了避免這個問題,如果instance為默認值DEFAULT的話,RocketMQ會自動將instance設置為進程ID,這樣避免了不同進程的相互影響,但同一個JVM中的不同消費者和不同生產者在啟動時獲取到的MQClientInstance實例都是同一個钙皮。根據(jù)后面的介紹,MQClientInstance封裝了RocketMQ網(wǎng)絡處理API,是消息生產者(Producer)、消息消費者(Consumer)與NameServer顽决、Broker打交道的網(wǎng)絡通道短条。
Step3:
向MQClientInstance注冊,將當前生產者加入到MQClientInstance管理中,方便后續(xù)調用網(wǎng)絡請求、進行心跳檢測等才菠。
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
}
Step4:
啟動MQClientInstance,如果MQClientInstance已經啟動,則本次啟動不會真正執(zhí)行茸时。
消息發(fā)送基本流程
消息發(fā)送流程主要的步驟: 驗證消息、查找路由赋访、消息發(fā)送(包含異常處理機制)可都。
同步消息發(fā)送入口,代碼如下
// DefaultMQProducer#send
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
// DefaultMQProducerImpl#send
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
默認消息發(fā)送以同步方式發(fā)送,默認超時時間為3s。
消息長度驗證
消息發(fā)送之前,首先確保生產者處于運行狀態(tài),然后驗證消息是否符合相應的規(guī)范,具體的規(guī)范要求是主題名稱蚓耽、消息體不能為空渠牲、消息長度不能等于0且默認不能超過允許發(fā)送消息的最大長度4M(maxMessageSize = 1024 * 1024 * 4)。
查找主題路由信息
消息發(fā)送之前,首先需要獲取主題的路由信息,只有獲取了這些信息我們才能知道消息發(fā)送到具體的Broker節(jié)點步悠。
DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
tryToFindTopicPublishInfo是查找主題的路由信息的方法签杈。如果生產者中緩存了topic的路由信息,如果該路由信息中包含了消息隊列,則直接返回該路由信息,如果沒有緩存或沒有消息隊列,則向NameServer查詢該Topic的路由信息。如果最終未找到路由信息,則拋出異常:無法找到主題相關路由信息異常鼎兽。
先看一下TopicPublishInfo
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
}
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
TopicPublishInfo的屬性:
orderTopic:
是否是順序消息
List<MessageQueue> messageQueueList:
該主題隊列的消息隊列
sendWhichQueue :
每選擇一次消息隊列,該值會自增1,如果Integer.MAX_VALUE,則重置為0,用于選擇消息隊列答姥。
List<QueueData> queueDatas:
topic隊列元數(shù)據(jù)。
List<BrokerData> brokerDatas:
topic分布的broker元數(shù)據(jù)谚咬。
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable:
broker上過濾服務器地址列表鹦付。
第一次發(fā)送消息時,本地沒有緩存topic的路由信息,查詢NameServer嘗試獲取,如果路由信息未找到,再次嘗試用默認主題DefaultMQProducerImpl#createTopicKey去查詢,如果BrokerConfig#autoCreateTopicEnable為true時,NameServer將返回路由信息,如果autoCreateTopicEnable為false將拋出無法找到路由異常。代碼MQClientInstance#updateTopicRouteInfoFromNameServer這個方法的功能是消息生產者更新和維護路由緩存,具體代碼如下择卦。
Step1:
如果isDefault為true,則使用默認主題去查詢,如果查詢到路由信息,則替換路由信息中讀寫隊列個數(shù),為消息生產者默認的隊列個數(shù)(defaultTopicQueueNums);如果isDefault為false,則使用參數(shù)topic去查詢;如果未查詢到路由信息,則返回false,表示路由信息未變化睁壁。
MQClientInstance#updateTopicRouteInfoFromNameServer
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
Step2:
如果路由信息未找到,與本地緩存中的路由信息進行對比,判斷路由信息是否發(fā)生了改變,如果未發(fā)生變化,則直接返回false背苦。
MQClientInstance#updateTopicRouteInfoFromNameServer
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
Step3:
更新MQClientInstance Broker地址緩存表。
MQClientInstance#updateTopicRouteInfoFromNameServer
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
Step4:
根據(jù)topicRouteData中的List<QueueData>轉換成topicPublishInfo的List<MessageQueue>列表潘明。其具體實現(xiàn)在topicRouteData2TopicPublishInfo,然后會更新該MQClientInstance所管轄的所有消息發(fā)送關于topic的路由信息行剂。
MQClientInstance#topicRouteData2TopicPublishInfo
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
循環(huán)遍歷路由信息的QueueData信息,如果隊列沒有寫權限,則繼續(xù)遍歷下一個QueueData;根據(jù)brokerName找到brokerData信息,找不到或沒有找到Master節(jié)點,則遍歷下一個QueueData;根據(jù)寫隊列個數(shù),根據(jù)topic+序號創(chuàng)建MessageQueue,填充topicPublishInfo的List<MessageQueue>。完成消息發(fā)送的路由查找钳降。
選擇消息隊列
根據(jù)路由信息選擇消息隊列,返回的消息隊列按照broker厚宰、序號排序。舉例說明,如果topicA在broker-a,broker-b上分別創(chuàng)建了4個隊列,那么返回的消息隊列:[{"brokerName":"broker-a","queueId":0},{"brokerName":"broker-a","queueId":1},{"brokerName":"broker-a","queueId":2},{"brokerName":"broker-a","queueId":3},{"brokerName":"broker-b","queueId":0},{"brokerName":"broker-a","queueId":1},{"brokerName":"broker-a","queueId":2},{"brokerName":"broker-a","queueId":3}],那么RocketMQ如何選擇消息隊列呢?
首先消息發(fā)送端采用重試機制,由retryTimesWhenSendFailed指定同步方式重試次數(shù),異步重試機制在收到消息發(fā)送結構后執(zhí)行回調之前進行重試遂填。由retryTimesWhenSendAsyncFailed指定,接下來就是循環(huán)執(zhí)行,選擇消息隊列铲觉、發(fā)送消息,發(fā)送成功則返回,收到異常則重試。選擇消息隊列有兩種方式吓坚。
1撵幽、sendLatencyFaultEnable = false,默認不啟用Broker故障延遲機制。
2礁击、sendLatencyFaultEnable = true,啟用Broker故障延遲機制盐杂。
1.默認機制
sendLatencyFaultEnable = false,調用TopicPublishInfo#selectOneMessageQueue
TopicPublishInfo#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
首先在一次消息發(fā)送過程中,可能會多次執(zhí)行算則消息隊列這個方法,lastBrokerName就是上一次選擇的執(zhí)行發(fā)送消息失敗的Broker。第一次執(zhí)行消息隊列選擇時,lastBrokerName為null,此時直接用sendWhichQueue自增再獲取值,與當前路由表中消息隊列個數(shù)取模,返回該位置的MessageQueue(selectOneMessageQueue()方法),如果消息發(fā)送再失敗的話,下次進行消息隊列選擇時規(guī)避上次MessageQueue所在的Broker,否則還是很有可能再次失敗哆窿。
該算法在一次消息發(fā)送過程中能成功規(guī)避故障的Broker,但如果Broker宕機,由于路由算法中的消息隊列是按Broker排序的,如果上一次根據(jù)路由算法選擇的是宕機的Broker的第一個隊列,name隨后的下次選擇是宕機Broker的第二個隊列,消息發(fā)送很有可能會失敗,再次引發(fā)重試,帶來不必要的性能損耗,那么有什么方法在一次消息發(fā)送失敗后,暫時將該Broker排除在消息隊列選擇范圍外呢?或許有朋友會問,Broker不可用后,路由信息中為什么還會有包含該Broker的路由信息呢?其實這不難解釋:首先,NameServer檢測Broker是否可用是有延遲的,最短一次心跳檢測間隔(10s);其次,NameServer不會檢測到Broker當即后馬上推送消息給消息生產者,而是消息生產者每個30s更新一次路由信息,所以消息生產者最快感知Broker最新的路由信息也需要30s链烈。如果引入一種機制,在Broker宕機期間,如果一次消息發(fā)送失敗后,可以將該Broker暫時排除在消息隊列的選擇范圍中。
2.Broker故障延遲機制
MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
首先對上述代碼進行解讀挚躯。
1强衡、根據(jù)對消息隊列進行輪詢獲取一個消息隊列。
2码荔、驗證該消息隊列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName())是關鍵漩勤。
3、 如果返回的MessageQueue可用,移除lantencyFaultTolerance關于該topic條目,表明該Broker故障已經恢復缩搅。
Broker故障延遲機制核心類如下:
public interface LatencyFaultTolerance<T> {
void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
boolean isAvailable(final T name);
void remove(final T name);
T pickOneAtLeast();
}
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
}
class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
private volatile long startTimestamp;
}
public class MQFaultStrategy {
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
}
LatencyFaultTolerance: 延遲機制接口規(guī)范锯七。
1混卵、void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
更新失敗條目 。
name: brokerName
currentLatenmcy: 消息發(fā)送故障延遲時間雁竞。
notAvailableDuration: 不可用持續(xù)時長,在這個時間內,Broker將被規(guī)避碘菜。
2、boolean isAvailable(final T name);
判斷Broker是否可用势决。
name: broker名稱。
3、void remove(final T name)
移除Fault條目,意味著Broker重新參與路由計算筑累。
4、T pickOneAtLeast()
嘗試從規(guī)避的Broker中選擇一個可用的Broker,如果沒有找到,將返回null丝蹭。
FaultItem: 失敗條目(規(guī)避規(guī)則條目)慢宗。
final String name 條目唯一鍵,這里為brokerName。
private volatile long currentLatency 本次消息發(fā)送延遲。
private volatile long startTimeStamp 故障規(guī)避開始時間镜沽。
MQFaultStrategy: 消息失敗策略,延遲實現(xiàn)的門面類敏晤。
long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
latencyMax,根據(jù)currentLatency本次消息發(fā)送延遲,從latencyMax尾部向前匯總愛到第一個比currentLatency小的索引index,如果沒有找到,返回0。然后根據(jù)這個索引從notAvailableDuration數(shù)組中取出對應的時間,在這個時長內,Broker將設置為不可用缅茉。
DefaultMQProducerImpl#sendDefaultImpl
beginTimestampPrev = System.currentTimeMillis();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
上述代碼如果發(fā)送過程中拋出了異常,調用DefaultMQProducerImpl#updateFaultItem,該方法則直接調用MQFaultStrategy#updateFaultItem方法,關注一下各個參數(shù)的含義嘴脾。
第一個參數(shù): broker名稱。
第二個參數(shù): 本次消息發(fā)送延遲時間 currentLatency蔬墩。
第三個參數(shù): isolation,是否隔離,該參數(shù)的含義如果為true,則使用默認時長30s來計算Broker故障規(guī)避時長,如果為false,則使用本次消息發(fā)送延遲時間來計算Broker故障規(guī)避時長译打。
MQFaultStrategy#updateFaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
如果isolation為true,則使用30s作為computeNotAvailableDuration方法的參數(shù);如果isolation為false,則使用本次消息發(fā)送時延作為computeNotAvailableDuration方法的參數(shù),那computeNotAvailableDuration的作用是計算因本次消息發(fā)送故障需要將Broker規(guī)避的時長,也就是接下來多久的時間內該Broker將不參與消息發(fā)送隊列負載。具體算法: 從latencyMax數(shù)組尾部開始尋找,找到第一個比currentLatency小的下標,然后從notAvailableDuration數(shù)組中獲取需要規(guī)避的時長,該方法最終調用LatencyFaultTolerance的updateFaultItem拇颅。
LatencyFaultToleranceImpl#updateFaultItem
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
根據(jù)broker名稱從緩存表中獲取FaultItem,如果找到則更新FaultItem,否則創(chuàng)建FaultItem奏司。這里有兩個關鍵點。
1樟插、currentLatency韵洋、startTimeStamp被volatile修飾。
2岸夯、startTimeStamp為當前系統(tǒng)時間加上需要規(guī)避的時長麻献。startTimeStamp是判斷broker當前是否可用的直接依據(jù),請看FaultItem#isAvailable方法。
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
消息發(fā)送
消息發(fā)送API核心入口: DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)
消息發(fā)送參數(shù)詳解猜扮。
1勉吻、Message msg: 待發(fā)送消息。
2旅赢、MessageQueue mq: 消息將發(fā)送到該消息隊列上齿桃。
3、CommunicationMode communicationMode: 消息發(fā)送模式,SYNC煮盼、ASYNC短纵、ONEWAY。
4僵控、SendCallback sendCallback: 異步消息回調函數(shù)香到。
5、TopicPublishInfo topicPublishInfo: 主題路由信息报破。
6悠就、long timeout: 消息發(fā)送超時時間。
Step1:
根據(jù)MessageQueue獲取Broker的網(wǎng)絡地址充易。如果MQClientInstance的brokerAddrTable未緩存該Broker的信息,則從NameServer主動更新一下topic的路由信息梗脾。如果路由更新后還是找不到Broker信息,則拋出MQClientException,提示Broker不存在。
DefaultMQProducerImpl#sendKernelImpl
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
Step2:
為消息分配全局唯一ID,如果消息體默認超過4K(compressMsgBodyOverHowmuch),會對消息體采用zip壓縮,并設置消息的系統(tǒng)標記為MessageSysFlag.COMPRESSED_FLAG盹靴。如果是事務Prrepared消息,則設置消息的系統(tǒng)標記為MessageSysFlag.TRANSACTION_PREPARED_TYPE炸茧。
DefaultMQProducerImpl#sendKernelImpl
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
Step3:
如果注冊了消息發(fā)送鉤子函數(shù),則執(zhí)行消息發(fā)送之前的增強邏輯瑞妇。通過DefaultMQProducerImpl#registerSendMessageHook注冊鉤子處理類,并且可以注冊多個。
DefaultMQProducerImpl#sendKernelImpl
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
SendMessageHook
public interface SendMessageHook {
String hookName();
void sendMessageBefore(final SendMessageContext context);
void sendMessageAfter(final SendMessageContext context);
}
Step4:
構建消息發(fā)送請求包梭冠。主要包含如下重要信息: 生產者組辕狰、主題名稱、默認創(chuàng)建主題Key妈嘹、該主題在單個Broker默認隊列數(shù)柳琢、隊列ID(隊列序號)、消息系統(tǒng)標題標記(MessageSysFlag)润脸、消息發(fā)送時間柬脸、消息標記(RocketMQ對消息中的flag不做任何處理,供應用程序使用)、消息擴展屬性毙驯、消息重試次數(shù)倒堕、是否是批量消息等。
DefaultMQProducerImpl#sendKernelImpl
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
Step5:
根據(jù)消息發(fā)送方式,同步爆价、異步垦巴、單向方式進行網(wǎng)絡傳輸。
MQClientAPIImpl#sendMessage
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
default:
assert false;
break;
}
return null;
}
Step6:
如果注冊了消息發(fā)送鉤子函數(shù),執(zhí)行after邏輯铭段。注意,就算消息發(fā)送過程中發(fā)生RemotingExceptuion骤宣、MQBrokerException、InterruptedException時該方法也會執(zhí)行序愚。
DefaultMQProducerImpl#sendKernelImpl
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
1.同步發(fā)送
MQ客戶端發(fā)送消息的入口是MQClientAPIImpl#sendMessage憔披。請求命令是RequestCode.SEND_MESSAGE,我們可以找到該命令的處理類:org.apache.rocketmq.broker.processor.SendMessageProcessor。入口方法在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage爸吮。
Step1:
檢查消息發(fā)送是否合理,這里完成了以下幾件事情芬膝。
1、檢查該Broker是否有寫權限形娇。
2锰霜、檢查該Topic是否可以進行消息發(fā)送。主要針對默認主題,默認主題不能發(fā)送消息,僅僅供路由查找桐早。
3癣缅、在NameServer端存儲主題的配置信息,默認路徑: ${ROCKETMQ_HOME}/store/config/topic.json。下面是主題存儲信息哄酝。
order: 是否是順序消息;
perm: 權限碼;
readQueueNums: 讀隊列數(shù)量;
writerQueueNums: 寫隊列數(shù)量;
topicName: 主題名稱;
topicSysFlag: topic Flag;
topicFilterType: 主題過濾方式友存。
4、檢查隊列,如果隊列不合法,返回錯誤碼炫七。
AbstractSendMessageProcessor#msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
}
}
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
Step2:
如果消息重試次數(shù)超過允許的最大重試次數(shù),消息將進入到DLQ延遲隊列。延遲隊列主題: %DLQ%+消費組名钾唬。
Step3:
調用DefaultMessageStore#putMessage進行消息存儲万哪。
2.異步發(fā)送
消息異步發(fā)送是指消息生產者調用發(fā)送的API后,無需阻塞等待消息服務器返回本次消息發(fā)送結果,只需要提供一個回調函數(shù),供消息發(fā)送客戶端在收到響應結果回調侠驯。異步方式相比同步方式,消息發(fā)送端的發(fā)送性能會顯著提高,但為了保護消息服務器的負載壓力,RocketMQ對消息發(fā)送的異步消息進行了并發(fā)控制,通過參數(shù)clientAsyncSemaphoreValue來控制,默認為65535。異步消息發(fā)送雖然也可以通過DefaultMQProducer#retryTimesWhenSendAsyncFailed屬性來控制消息重試次數(shù),但是重試的調用入口是在收到服務器響應包時進行的,如果出現(xiàn)網(wǎng)絡異常奕巍、網(wǎng)絡超時等將不會重試吟策。
3.單向發(fā)送
單向發(fā)送是值消息生產者調用消息發(fā)送的API后,無需等待消息服務器返回本次消息發(fā)送結果,并且無需提供回調函數(shù),表示消息發(fā)送鴨羹就不關心本次消息是否成功,其實現(xiàn)原理與異步消息發(fā)送相同,只是消息發(fā)送客戶端在收到響應結果后什么都不做而已,并且沒有重試機制。
有些內容摘錄在<<RocketMQ技術內幕>>