上篇文章中我們可以了解到NameServer需要等Broker失效至少120s才能將該Broker從路由表中移除高镐,那如果在Broker故障期間,消息生產(chǎn)者Producer根據(jù)獲取到的路由信息可能包含已經(jīng)宕機的Broker,會導(dǎo)致消息發(fā)送失敗畸冲,在接下來的消息發(fā)送階段會解決這個問題嫉髓。
初識消息有關(guān)類
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
# 消息所屬主題
private String topic;
# 消息Flag
private int flag;
# 擴展屬性
private Map<String, String> properties;
# 消息體
private byte[] body;
private String transactionId;
其中,Message擴展屬性主要包括下面幾個:
- tag : 消息Tag,用于消息過濾
- keys :Message索引建邑闲,多個用空格隔開算行,RocketMQ可以根據(jù)這些key快速檢索到消息
- waitStoreMsgOK : 消息發(fā)送時是否等到消息存儲完成后再返回
- delayTimeLevel : 消息延遲級別,用于定時消息或消息重試
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Wrapping internal implementations for virtually all methods presented in this class. */
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
# 生產(chǎn)者所屬組监憎,消息服務(wù)器在回查事務(wù)狀態(tài)時會隨機選擇該組中的任何一個生產(chǎn)者發(fā)起事務(wù)回查請求
private String producerGroup;
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
# 默認(rèn)主題在每一個Broker隊列的數(shù)量
private volatile int defaultTopicQueueNums = 4;
# 發(fā)送消息默認(rèn)超時時間纱意,默認(rèn)3秒
private int sendMsgTimeout = 3000;
# 消息體超過該值則啟用壓縮,默認(rèn)4K
private int compressMsgBodyOverHowmuch = 1024 * 4;
# 同步方式發(fā)送消息重試次數(shù)鲸阔,默認(rèn)為2偷霉,總共執(zhí)行3次
private int retryTimesWhenSendFailed = 2;
# 異步方式發(fā)送消息重試次數(shù),默認(rèn)為2
private int retryTimesWhenSendAsyncFailed = 2;
# 允許發(fā)送的最大消息長度
private int maxMessageSize = 1024 * 1024 * 4; // 4M
private TraceDispatcher traceDispatcher = null;
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic;
private String brokerName;
private int queueId;
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
發(fā)送信息總體過程
- 發(fā)送消息的入口 DefaultMQProducerImpl#send() 褐筛,默認(rèn)消息發(fā)送以同步方式發(fā)送类少,默認(rèn)超時時間為3s。
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
try {
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
} catch (MQBrokerException e) {
throw new MQClientException("unknownn exception", e);
}
}
- 調(diào)用sendDefaultImpl渔扎,形參為下:
private SendResult sendDefaultImpl(//
Message msg, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final long timeout//
)
其中硫狞,CommunicationMode表示消息發(fā)送的方式,同步晃痴、異步和單向残吩。
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
然后會驗證服務(wù)服務(wù)是否可用,消息是否符合規(guī)范倘核,具體的驗證就不解釋了泣侮。
然后記錄當(dāng)前時間,后面會判斷是否timeout紧唱。
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
根據(jù)要發(fā)送消息的topic,尋找該topic的路由信息活尊。
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
當(dāng)前類有一個屬性,記錄所有topic的路由信息和消息隊列信息漏益。
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
首先從topicPublishInfoTable中查找該topic的topicPublishInfoTable信息蛹锰,
如果不存在當(dāng)前topic的信息或者當(dāng)前topicPublishInfoTable不可用,則先新創(chuàng)建一個TopicPublishInfo()绰疤,并放入到topicPublishInfoTable中铜犬,然后向NameServer查詢該topic的路由信息,此時會調(diào)用MQClientInstance的updateTopicRouteInfoFromNameServer(topic)方法。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 緩存中獲取 Topic發(fā)布信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 當(dāng)無或者可用的 Topic信息時翎苫,從Namesrv獲取一次 并且緩存
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//如果key存在的情況下权埠,在putIfAbsent下不會修改
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); //進行調(diào)用獲取規(guī)則存下來
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
private final Lock lockNamesrv = new ReentrantLock();
MQClientInstance#updateTopicRouteInfoFromNameServer方法,LOCK_TIMEOUT_MILLIS默認(rèn)是3秒榨了,在此處用到了ReentrantLock.tryLock煎谍。
該鎖的方法的簡單解釋:
假如線程A和線程B使用同一個鎖Lock,此時線程A首先獲取鎖Lock.lock(),并且始終持有不釋放,如果此時B要去獲取鎖龙屉,調(diào)用tryLock(3000, mils),則說明在3秒內(nèi)如果線程A釋放鎖呐粘,會獲取到鎖并返回true,否則3秒過后會獲取不到鎖并返回false。
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
如果isDefault為true,則使用默認(rèn)主題去查詢转捕,如果查詢到路由信息作岖,則替換路由信息中讀寫隊列個數(shù)為消息生產(chǎn)者默認(rèn)的隊列個數(shù)。
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);//獲取topic規(guī)則
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}
如果isDefault為false,則使用參數(shù)topic去查詢五芝,如果未查詢到路由信息痘儡,則返回false,表示路由信息未變化捌蚊。
else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
如果路由信息找到传藏,與本地緩存中的路由信息進行對比,判斷路由信息是否發(fā)生了變化余指,如果沒有發(fā)生變化醉途,則直接返回fasle矾瑰。
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
如果發(fā)生了變化,先對topicRouteData進行復(fù)制隘擎,然后根據(jù)獲得的topicRouteData信息對brokerAddrTable進行更新殴穴,
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
根據(jù)topicRouteData中的List<\queueData>轉(zhuǎn)化成topicPublishInfo的List<\MessageQueue>列表。具體的是在topicRouteData2TopicPublishInfo中實現(xiàn)的货葬。
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);//更新topic的PublishInfo
}
}
循環(huán)遍歷路由信息的QueueData信息采幌,如果隊列沒有寫權(quán)限,則繼續(xù)遍歷下一個QueueData,根據(jù)brokerName找到brokerData信息震桶,找不到或沒有找到Master節(jié)點休傍,則遍歷下一個QueueData,根據(jù)寫隊列個數(shù),根據(jù)topic+序號創(chuàng)建MessageQueue,填充topicPublishInfo的List<\QueueMessage>,此時尼夺,完成了消息發(fā)送的路由查找尊残。
//topicRouteData轉(zhuǎn)換為TopicPublishInfo
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
//有序
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
}
//無序
else {
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);//按照brokerName升序進行排序的
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);//由于brokerName是排序的,TopicPublishInfo里面的messageQueueList就是有序的了從小到大
}
}
}
info.setOrderTopic(false);
}
return info;
}
獲取到路由信息之后淤堵,如果該路由信息可用寝衫,則先計算嘗試的次數(shù),如果發(fā)送模式是sync,則是3次拐邪,其他情況下是1次慰毅。
之后記錄上次發(fā)送失敗的broker名稱,在第一次發(fā)送的時候扎阶,lastBrokerName為null,然后根據(jù)消息隊列選擇策略選擇消息隊列汹胃。
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//發(fā)送模式是sync 會有3次其他1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName(); //第一次的確是null 但是如果第二次呢婶芭? 所以這里存在的意義
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//選擇一個queue
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//調(diào)用sendKernelImpl發(fā)送消息 發(fā)送消息核心
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
//更新Broker可用信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
// 如下異常continue,進行發(fā)送消息重試
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
消息隊列選擇策略着饥。
有兩種策略犀农, sendLatencyFaultEnable=false, 默認(rèn)不啟用Broker故障延遲機制。
sendLatencyFaultEnable=true,啟用Broker故障延遲機制宰掉。
先介紹默認(rèn)的消息隊列選擇策略呵哨,調(diào)用TopicPublishInfo#selectOneMessageQueue
當(dāng)?shù)谝淮握{(diào)用時,lastBrokerName為null,對sendWhichQueue本地線程變量進行加1轨奄,并與當(dāng)前路由表中消息隊列個數(shù)取模孟害,返回該位置的MessageQueue。
如果該消息發(fā)送失敗挪拟,則可能會進行重試發(fā)送挨务,此時,lastBrokerName不是null,會記錄上次信息發(fā)送失敗的BrokerName,之后獲取sendWhichQueue本地線程變量進行加1玉组,并與當(dāng)前路由表中消息隊列個數(shù)取模谎柄,獲取該位置上的MessageQueue,如果獲取的該信息的BrokerName與上一次發(fā)送失敗的lastBrokerName不相同,則返回該信息球切,否則再遍歷下一個消息谷誓,直到第一個不與lastBrokerName相同的消息返回。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {//第一次進入就是空的
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
接下來講解啟用Broker故障延遲機制的消息選擇策略
前面部分和上述介紹的差不多吨凑,根據(jù)index先獲取當(dāng)前位置的消息捍歪,然后判斷該消息隊列是否可用,通過isAvailable方法判斷鸵钝。如果該消息隊列可用糙臼,在上次發(fā)送,或者這次發(fā)送的消息隊列的broker姓名與上次發(fā)送失敗的broker姓名一致恩商,則返回該消息隊列变逃。
如果所有的broker都預(yù)計不可用,隨機選擇一個不可用的broker,再從路由信息中選擇下一個消息隊列怠堪,將該消息隊列的broker重置為上面隨機選擇的broker,并重置queueId,并返回該消息隊列揽乱。但是,如果該隨機選擇的broker內(nèi)已經(jīng)沒有要發(fā)送的消息隊列時粟矿,則需要將該broker從latencyFaultTolerance中移除凰棉,并利用默認(rèn)的選擇機制選擇一個消息隊列。
//延遲故障容錯陌粹,維護每個Broker的發(fā)送消息的延遲
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
if (this.sendLatencyFaultEnable) { //發(fā)送消息延遲容錯開關(guān)
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.eror("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
上面部分是根據(jù)不同策略選擇消息隊列的具體解釋撒犀,獲取到消息隊列之后,返回到發(fā)送消息的最外層方法endDefaultImpl中,執(zhí)行消息發(fā)送步驟或舞。
消息發(fā)送
tmpmq是選擇獲取到的消息隊列荆姆。
brokersSent是存儲消息發(fā)送的broker,由上面可知,如果發(fā)送方式是同步映凳,則該數(shù)組長度為3胆筒,其他方式下長度為1。然后記錄當(dāng)前時間魏宽,然后執(zhí)行sendKernelImpl方法進行發(fā)送消息腐泻。之后决乎,獲取發(fā)送完之后的時間队询,執(zhí)行updateFaultItem方法來更新Broker異常信息,一個broker會對應(yīng)一個faultItem构诚。
之后蚌斩,根據(jù)消息發(fā)送的方式,如果是同步的范嘱,如果此次消息沒有成功送膳,則可以再進行嘗試,如果是異步或者單向丑蛤,則執(zhí)行結(jié)束叠聋。如果期間發(fā)送了異常,則會調(diào)用updateFaultItem方法來更新Broker異常信息受裹。
接下來詳細(xì)介紹sendKernelImpl方法和updateFaultItem方法碌补。
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName)
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//調(diào)用sendKernelImpl發(fā)送消息 發(fā)送消息核心
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
//更新Broker可用信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
}
DefaultMQProducerImpl.sendKernelImpl
該方法的形參有:
- Message msg : 待發(fā)送的消息
- MessageQueue mq : 消息將發(fā)送到該消息隊列上
- CommunicationMode commuicationMode : 消息發(fā)送模式,SYNC棉饶、ASYNC厦章、ONEWAy
- SendCallback sendCallback :異步消息回調(diào)函數(shù)
- TopicPublishInfo topicPublishInfo : 主題路由信息
- long timeout:消息發(fā)送超時時間
- 根據(jù)MessageQueue獲取Broker的網(wǎng)絡(luò)地址,如果MQClientInstance的brokerAddrTable未緩存該Broker的信息,則從NameServer主動更新一下topic的路由信息照藻,如果路由更新后還是找不到Broker信息袜啃,則拋出MQClientException,提示Broker不存在幸缕。
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
- 為消息分配全局唯一ID,如果消息體默認(rèn)超過4K,會對消息體采用zip壓縮群发,并設(shè)置消息的系統(tǒng)標(biāo)記為MessageSysFlag.COMPRESED_FLAG。如果是事務(wù)Prepared消息发乔,則設(shè)置消息的系統(tǒng)標(biāo)記為MessageSysFlag.TRANSACTION_PREPARED_TYPE熟妓。
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);//設(shè)置設(shè)置UNIQ_id,所以當(dāng)看見msgId的時候為什么解析不一樣了懂了吧
}
int sysFlag = 0; //又是根據(jù)位來進行每位是啥的判斷
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);//根據(jù)事務(wù)屬性key獲取值看是否是事務(wù)消息
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
- 如果注冊了消息發(fā)送鉤子函數(shù)列疗,則執(zhí)行消息發(fā)送之前的增強邏輯滑蚯。
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
- 構(gòu)建消息發(fā)送請求包。主要包含下列重要信息:
- 生產(chǎn)者組、主題名稱告材、默認(rèn)創(chuàng)建主題Key坤次、該主題在單個Broker默認(rèn)隊列數(shù)、隊列ID
- 消息系統(tǒng)標(biāo)記斥赋、消息發(fā)送時間缰猴、消息標(biāo)記、消息擴展屬性疤剑、消息重試次數(shù)滑绒、是否是批量信息。
//構(gòu)建SendMessageRequestHeader
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
//生成消息時間戳
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
- 按照消息發(fā)送方式隘膘,同步疑故、異步、單向方式進行網(wǎng)絡(luò)傳輸弯菊。
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//異步發(fā)送消息
brokerAddr, // 1
mq.getBrokerName(), // 2
msg, // 3
requestHeader, // 4
timeout, // 5
communicationMode, // 6
sendCallback, // 7
topicPublishInfo, // 8
this.mQClientFactory, // 9
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
context, //
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//同步以及廣播發(fā)送消息
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context); //發(fā)送消息后邏輯
}
return sendResult;
- 之后就是按照不同的方式進行發(fā)送纵势。在發(fā)送之前會檢查消息發(fā)送是否合理,檢查該Broker是否有寫權(quán)限管钳,該Topic是否可以進行消息發(fā)送钦铁,在NameServer端存儲主題的配置信息,除此之外才漆,開始檢查隊列牛曹,如果隊列不合法,返回錯誤碼
- 如果消息重試次數(shù)超過允許的最大重試次數(shù)醇滥,消息將進入到DLD延遲隊列黎比。延遲隊列主題:%DLQ%+消費組名,
- 調(diào)用DefaultMessageStore.putMessage進行消息存儲腺办。
DefaultMQProducerImpl.updateFaultItem
由上面可知焰手,在執(zhí)行消息發(fā)送完之后和出現(xiàn)發(fā)送異常的時候,會調(diào)用該方法對broker進行異常更新怀喉。
形參解釋:
- brokerName : broker名稱
- currentLatency : 本次消息發(fā)送延遲時間currentLatency
- isolation : 是否隔離书妻,如果為true,則使用默認(rèn)時長30s來計算BroKer故障規(guī)避時長;如果為false,則使用本次消息發(fā)送延遲時間來計算Broker故障規(guī)避時長躬拢。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
- 利用computeNotAvailableDuration() 方法計算規(guī)避時長躲履。
從latencyMax數(shù)組尾部開始查找,找到第一個比currentLatency小的下標(biāo)聊闯,然后從notAvailableDuration數(shù)組中獲取需要規(guī)避的時長工猜。
//延遲級別數(shù)組
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用時長數(shù)組
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
- 根據(jù)broker名稱從緩存表faultItemTable中獲取FaultItem,如果找到則更新FaultItem,否則創(chuàng)建FaultItem。
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
一個broker對應(yīng)一個faultItem,記錄broker名稱菱蔬、消息發(fā)送時長和broker恢復(fù)正常時間篷帅。
class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
private volatile long startTimestamp;
批量消息發(fā)送
批量消息發(fā)送就是將同一主題的多條信息一起打包發(fā)送到消息服務(wù)端史侣,減少網(wǎng)絡(luò)調(diào)用次數(shù)。
單挑信息發(fā)送時魏身,消息體的內(nèi)容將保存在body中惊橱。批量消息發(fā)送,需要將多條消息體的內(nèi)容采用固定格式存儲在body中箭昵。
在消息發(fā)送端税朴,調(diào)用batch方法,將一批消息封裝成MessageBatch對象家制,之后的處理流程與上面的基本一致正林,只需要將該集合的每一條消息的消息體body聚合成一個byte[]數(shù)值,在消息服務(wù)端能夠從該byte[]數(shù)值中正確解析消息即可颤殴。