RocketMQ閱讀筆記之消息發(fā)送

上篇文章中我們可以了解到NameServer需要等Broker失效至少120s才能將該Broker從路由表中移除高镐,那如果在Broker故障期間,消息生產(chǎn)者Producer根據(jù)獲取到的路由信息可能包含已經(jīng)宕機的Broker,會導(dǎo)致消息發(fā)送失敗畸冲,在接下來的消息發(fā)送階段會解決這個問題嫉髓。

初識消息有關(guān)類

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;  
    # 消息所屬主題
    private String topic;
 # 消息Flag
 private int flag;
 # 擴展屬性
 private Map<String, String> properties;
 # 消息體
 private byte[] body;
 private String transactionId;

其中,Message擴展屬性主要包括下面幾個:

  • tag : 消息Tag,用于消息過濾
  • keys :Message索引建邑闲,多個用空格隔開算行,RocketMQ可以根據(jù)這些key快速檢索到消息
  • waitStoreMsgOK : 消息發(fā)送時是否等到消息存儲完成后再返回
  • delayTimeLevel : 消息延遲級別,用于定時消息或消息重試
public class DefaultMQProducer extends ClientConfig implements MQProducer {

    private final InternalLogger log = ClientLogger.getLog();    
    /**
 * Wrapping internal implementations for virtually all methods presented in this class. */  
  protected final transient DefaultMQProducerImpl defaultMQProducerImpl;   
  # 生產(chǎn)者所屬組监憎,消息服務(wù)器在回查事務(wù)狀態(tài)時會隨機選擇該組中的任何一個生產(chǎn)者發(fā)起事務(wù)回查請求
  private String producerGroup;   
  private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;    
  # 默認(rèn)主題在每一個Broker隊列的數(shù)量
  private volatile int defaultTopicQueueNums = 4;    
  # 發(fā)送消息默認(rèn)超時時間纱意,默認(rèn)3秒
  private int sendMsgTimeout = 3000;
  # 消息體超過該值則啟用壓縮,默認(rèn)4K     
  private int compressMsgBodyOverHowmuch = 1024 * 4;    
  # 同步方式發(fā)送消息重試次數(shù)鲸阔,默認(rèn)為2偷霉,總共執(zhí)行3次
  private int retryTimesWhenSendFailed = 2;
  # 異步方式發(fā)送消息重試次數(shù),默認(rèn)為2    
  private int retryTimesWhenSendAsyncFailed = 2; 
  # 允許發(fā)送的最大消息長度   
  private int maxMessageSize = 1024 * 1024 * 4; // 4M    
  private TraceDispatcher traceDispatcher = null;
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
 private String topic;
 private String brokerName;
 private int queueId;
public class TopicPublishInfo {
    private boolean orderTopic = false;
 private boolean haveTopicRouterInfo = false;
 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
 private TopicRouteData topicRouteData;
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

發(fā)送信息總體過程

  1. 發(fā)送消息的入口 DefaultMQProducerImpl#send() 褐筛,默認(rèn)消息發(fā)送以同步方式發(fā)送类少,默認(rèn)超時時間為3s。
 public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
    }
 public void send(Message msg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        try {
            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
        } catch (MQBrokerException e) {
            throw new MQClientException("unknownn exception", e);
        }
    }

  1. 調(diào)用sendDefaultImpl渔扎,形參為下:
 private SendResult sendDefaultImpl(//
        Message msg, //
        final CommunicationMode communicationMode, //
        final SendCallback sendCallback, //
        final long timeout//
    ) 

其中硫狞,CommunicationMode表示消息發(fā)送的方式,同步晃痴、異步和單向残吩。

public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

然后會驗證服務(wù)服務(wù)是否可用,消息是否符合規(guī)范倘核,具體的驗證就不解釋了泣侮。
然后記錄當(dāng)前時間,后面會判斷是否timeout紧唱。

long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;

根據(jù)要發(fā)送消息的topic,尋找該topic的路由信息活尊。

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

當(dāng)前類有一個屬性,記錄所有topic的路由信息和消息隊列信息漏益。

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();

首先從topicPublishInfoTable中查找該topic的topicPublishInfoTable信息蛹锰,
如果不存在當(dāng)前topic的信息或者當(dāng)前topicPublishInfoTable不可用,則先新創(chuàng)建一個TopicPublishInfo()绰疤,并放入到topicPublishInfoTable中铜犬,然后向NameServer查詢該topic的路由信息,此時會調(diào)用MQClientInstance的updateTopicRouteInfoFromNameServer(topic)方法。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 緩存中獲取 Topic發(fā)布信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 當(dāng)無或者可用的 Topic信息時翎苫,從Namesrv獲取一次 并且緩存
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//如果key存在的情況下权埠,在putIfAbsent下不會修改
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); //進行調(diào)用獲取規(guī)則存下來
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
private final Lock lockNamesrv = new ReentrantLock();

MQClientInstance#updateTopicRouteInfoFromNameServer方法,LOCK_TIMEOUT_MILLIS默認(rèn)是3秒榨了,在此處用到了ReentrantLock.tryLock煎谍。
該鎖的方法的簡單解釋:
假如線程A和線程B使用同一個鎖Lock,此時線程A首先獲取鎖Lock.lock(),并且始終持有不釋放,如果此時B要去獲取鎖龙屉,調(diào)用tryLock(3000, mils),則說明在3秒內(nèi)如果線程A釋放鎖呐粘,會獲取到鎖并返回true,否則3秒過后會獲取不到鎖并返回false。

  try {
           if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {

如果isDefault為true,則使用默認(rèn)主題去查詢转捕,如果查詢到路由信息作岖,則替換路由信息中讀寫隊列個數(shù)為消息生產(chǎn)者默認(rèn)的隊列個數(shù)。

TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
        1000 * 3);//獲取topic規(guī)則
    if (topicRouteData != null) {
        for (QueueData data : topicRouteData.getQueueDatas()) {
            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
            data.setReadQueueNums(queueNums);
            data.setWriteQueueNums(queueNums);
        }
    }
}

如果isDefault為false,則使用參數(shù)topic去查詢五芝,如果未查詢到路由信息痘儡,則返回false,表示路由信息未變化捌蚊。

else {
    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    
}

如果路由信息找到传藏,與本地緩存中的路由信息進行對比,判斷路由信息是否發(fā)生了變化余指,如果沒有發(fā)生變化醉途,則直接返回fasle矾瑰。

if (topicRouteData != null) {
    TopicRouteData old = this.topicRouteTable.get(topic);
    boolean changed = topicRouteDataIsChange(old, topicRouteData);
    if (!changed) {
        changed = this.isNeedUpdateTopicRouteInfo(topic);
    } else {
        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    }

如果發(fā)生了變化,先對topicRouteData進行復(fù)制隘擎,然后根據(jù)獲得的topicRouteData信息對brokerAddrTable進行更新殴穴,

if (changed) {
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }

根據(jù)topicRouteData中的List<\queueData>轉(zhuǎn)化成topicPublishInfo的List<\MessageQueue>列表。具體的是在topicRouteData2TopicPublishInfo中實現(xiàn)的货葬。

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
    Entry<String, MQProducerInner> entry = it.next();
    MQProducerInner impl = entry.getValue();
    if (impl != null) {
        impl.updateTopicPublishInfo(topic, publishInfo);//更新topic的PublishInfo
    }
}

循環(huán)遍歷路由信息的QueueData信息采幌,如果隊列沒有寫權(quán)限,則繼續(xù)遍歷下一個QueueData,根據(jù)brokerName找到brokerData信息震桶,找不到或沒有找到Master節(jié)點休傍,則遍歷下一個QueueData,根據(jù)寫隊列個數(shù),根據(jù)topic+序號創(chuàng)建MessageQueue,填充topicPublishInfo的List<\QueueMessage>,此時尼夺,完成了消息發(fā)送的路由查找尊残。

  //topicRouteData轉(zhuǎn)換為TopicPublishInfo
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
        TopicPublishInfo info = new TopicPublishInfo();
        info.setTopicRouteData(route);

        //有序
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
            String[] brokers = route.getOrderTopicConf().split(";");
            for (String broker : brokers) {
                String[] item = broker.split(":");
                int nums = Integer.parseInt(item[1]);
                for (int i = 0; i < nums; i++) {
                    MessageQueue mq = new MessageQueue(topic, item[0], i);
                    info.getMessageQueueList().add(mq);
                }
            }

            info.setOrderTopic(true);
        }
        //無序
        else {
            List<QueueData> qds = route.getQueueDatas();
            Collections.sort(qds);//按照brokerName升序進行排序的
            for (QueueData qd : qds) {
                if (PermName.isWriteable(qd.getPerm())) {
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }

                    if (null == brokerData) {
                        continue;
                    }

                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }

                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);//由于brokerName是排序的,TopicPublishInfo里面的messageQueueList就是有序的了從小到大
                    }
                }
            }

            info.setOrderTopic(false);
        }

        return info;
    }

獲取到路由信息之后淤堵,如果該路由信息可用寝衫,則先計算嘗試的次數(shù),如果發(fā)送模式是sync,則是3次拐邪,其他情況下是1次慰毅。
之后記錄上次發(fā)送失敗的broker名稱,在第一次發(fā)送的時候扎阶,lastBrokerName為null,然后根據(jù)消息隊列選擇策略選擇消息隊列汹胃。

if (topicPublishInfo != null && topicPublishInfo.ok()) {
    MessageQueue mq = null;
    Exception exception = null;
    SendResult sendResult = null;
    //發(fā)送模式是sync 會有3次其他1次
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName(); //第一次的確是null 但是如果第二次呢婶芭? 所以這里存在的意義
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//選擇一個queue
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //調(diào)用sendKernelImpl發(fā)送消息  發(fā)送消息核心
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        //更新Broker可用信息
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                           // 如下異常continue,進行發(fā)送消息重試
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

消息隊列選擇策略着饥。
有兩種策略犀农, sendLatencyFaultEnable=false, 默認(rèn)不啟用Broker故障延遲機制。
sendLatencyFaultEnable=true,啟用Broker故障延遲機制宰掉。
先介紹默認(rèn)的消息隊列選擇策略呵哨,調(diào)用TopicPublishInfo#selectOneMessageQueue
當(dāng)?shù)谝淮握{(diào)用時,lastBrokerName為null,對sendWhichQueue本地線程變量進行加1轨奄,并與當(dāng)前路由表中消息隊列個數(shù)取模孟害,返回該位置的MessageQueue。
如果該消息發(fā)送失敗挪拟,則可能會進行重試發(fā)送挨务,此時,lastBrokerName不是null,會記錄上次信息發(fā)送失敗的BrokerName,之后獲取sendWhichQueue本地線程變量進行加1玉组,并與當(dāng)前路由表中消息隊列個數(shù)取模谎柄,獲取該位置上的MessageQueue,如果獲取的該信息的BrokerName與上一次發(fā)送失敗的lastBrokerName不相同,則返回該信息球切,否則再遍歷下一個消息谷誓,直到第一個不與lastBrokerName相同的消息返回。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {//第一次進入就是空的
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
 public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

接下來講解啟用Broker故障延遲機制的消息選擇策略
前面部分和上述介紹的差不多吨凑,根據(jù)index先獲取當(dāng)前位置的消息捍歪,然后判斷該消息隊列是否可用,通過isAvailable方法判斷鸵钝。如果該消息隊列可用糙臼,在上次發(fā)送,或者這次發(fā)送的消息隊列的broker姓名與上次發(fā)送失敗的broker姓名一致恩商,則返回該消息隊列变逃。
如果所有的broker都預(yù)計不可用,隨機選擇一個不可用的broker,再從路由信息中選擇下一個消息隊列怠堪,將該消息隊列的broker重置為上面隨機選擇的broker,并重置queueId,并返回該消息隊列揽乱。但是,如果該隨機選擇的broker內(nèi)已經(jīng)沒有要發(fā)送的消息隊列時粟矿,則需要將該broker從latencyFaultTolerance中移除凰棉,并利用默認(rèn)的選擇機制選擇一個消息隊列。

 //延遲故障容錯陌粹,維護每個Broker的發(fā)送消息的延遲
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
if (this.sendLatencyFaultEnable) { //發(fā)送消息延遲容錯開關(guān)
    try {
        int index = tpInfo.getSendWhichQueue().getAndIncrement();
        for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
            int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
            if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                    return mq;
            }
        }

        final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
        int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
        if (writeQueueNums > 0) {
            final MessageQueue mq = tpInfo.selectOneMessageQueue();
            if (notBestBroker != null) {
                mq.setBrokerName(notBestBroker);
                mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
            }
            return mq;
        } else {
            latencyFaultTolerance.remove(notBestBroker);
        }
    } catch (Exception e) {
        log.eror("Error occurred when selecting message queue", e);
    }

    return tpInfo.selectOneMessageQueue();
}

上面部分是根據(jù)不同策略選擇消息隊列的具體解釋撒犀,獲取到消息隊列之后,返回到發(fā)送消息的最外層方法endDefaultImpl中,執(zhí)行消息發(fā)送步驟或舞。

消息發(fā)送

tmpmq是選擇獲取到的消息隊列荆姆。
brokersSent是存儲消息發(fā)送的broker,由上面可知,如果發(fā)送方式是同步映凳,則該數(shù)組長度為3胆筒,其他方式下長度為1。然后記錄當(dāng)前時間魏宽,然后執(zhí)行sendKernelImpl方法進行發(fā)送消息腐泻。之后决乎,獲取發(fā)送完之后的時間队询,執(zhí)行updateFaultItem方法來更新Broker異常信息,一個broker會對應(yīng)一個faultItem构诚。
之后蚌斩,根據(jù)消息發(fā)送的方式,如果是同步的范嘱,如果此次消息沒有成功送膳,則可以再進行嘗試,如果是異步或者單向丑蛤,則執(zhí)行結(jié)束叠聋。如果期間發(fā)送了異常,則會調(diào)用updateFaultItem方法來更新Broker異常信息受裹。
接下來詳細(xì)介紹sendKernelImpl方法和updateFaultItem方法碌补。

MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName)
if (tmpmq != null) {
    mq = tmpmq;
    brokersSent[times] = mq.getBrokerName();
    try {
        beginTimestampPrev = System.currentTimeMillis();
        //調(diào)用sendKernelImpl發(fā)送消息  發(fā)送消息核心
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
        endTimestamp = System.currentTimeMillis();
        //更新Broker可用信息
        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
        switch (communicationMode) {
            case ASYNC:
                return null;
            case ONEWAY:
                return null;
            case SYNC:
                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                        continue;
                    }
                }

                return sendResult;
            default:
                break;
        }
    } 

DefaultMQProducerImpl.sendKernelImpl

該方法的形參有:

  • Message msg : 待發(fā)送的消息
  • MessageQueue mq : 消息將發(fā)送到該消息隊列上
  • CommunicationMode commuicationMode : 消息發(fā)送模式,SYNC棉饶、ASYNC厦章、ONEWAy
  • SendCallback sendCallback :異步消息回調(diào)函數(shù)
  • TopicPublishInfo topicPublishInfo : 主題路由信息
  • long timeout:消息發(fā)送超時時間
  1. 根據(jù)MessageQueue獲取Broker的網(wǎng)絡(luò)地址,如果MQClientInstance的brokerAddrTable未緩存該Broker的信息,則從NameServer主動更新一下topic的路由信息照藻,如果路由更新后還是找不到Broker信息袜啃,則拋出MQClientException,提示Broker不存在幸缕。
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }
  1. 為消息分配全局唯一ID,如果消息體默認(rèn)超過4K,會對消息體采用zip壓縮群发,并設(shè)置消息的系統(tǒng)標(biāo)記為MessageSysFlag.COMPRESED_FLAG。如果是事務(wù)Prepared消息发乔,則設(shè)置消息的系統(tǒng)標(biāo)記為MessageSysFlag.TRANSACTION_PREPARED_TYPE熟妓。
 //for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);//設(shè)置設(shè)置UNIQ_id,所以當(dāng)看見msgId的時候為什么解析不一樣了懂了吧
}

int sysFlag = 0; //又是根據(jù)位來進行每位是啥的判斷
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);//根據(jù)事務(wù)屬性key獲取值看是否是事務(wù)消息
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

  1. 如果注冊了消息發(fā)送鉤子函數(shù)列疗,則執(zhí)行消息發(fā)送之前的增強邏輯滑蚯。
if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }
  1. 構(gòu)建消息發(fā)送請求包。主要包含下列重要信息:
  • 生產(chǎn)者組、主題名稱告材、默認(rèn)創(chuàng)建主題Key坤次、該主題在單個Broker默認(rèn)隊列數(shù)、隊列ID
  • 消息系統(tǒng)標(biāo)記斥赋、消息發(fā)送時間缰猴、消息標(biāo)記、消息擴展屬性疤剑、消息重試次數(shù)滑绒、是否是批量信息。
 //構(gòu)建SendMessageRequestHeader
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTopic(msg.getTopic());
    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
    requestHeader.setQueueId(mq.getQueueId());
    requestHeader.setSysFlag(sysFlag);
    //生成消息時間戳
    requestHeader.setBornTimestamp(System.currentTimeMillis());
    requestHeader.setFlag(msg.getFlag());
    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
    requestHeader.setReconsumeTimes(0);
    requestHeader.setUnitMode(this.isUnitMode());
    requestHeader.setBatch(msg instanceof MessageBatch);
    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
        if (reconsumeTimes != null) {
            requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
        }

        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
        if (maxReconsumeTimes != null) {
            requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
        }
    }
  1. 按照消息發(fā)送方式隘膘,同步疑故、異步、單向方式進行網(wǎng)絡(luò)傳輸弯菊。
SendResult sendResult = null;
switch (communicationMode) {
    case ASYNC:
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//異步發(fā)送消息
            brokerAddr, // 1
            mq.getBrokerName(), // 2
            msg, // 3
            requestHeader, // 4
            timeout, // 5
            communicationMode, // 6
            sendCallback, // 7
            topicPublishInfo, // 8
            this.mQClientFactory, // 9
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
            context, //
            this);
        break;
    case ONEWAY:
    case SYNC:
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//同步以及廣播發(fā)送消息
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}

if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context); //發(fā)送消息后邏輯
}

return sendResult;
  1. 之后就是按照不同的方式進行發(fā)送纵势。在發(fā)送之前會檢查消息發(fā)送是否合理,檢查該Broker是否有寫權(quán)限管钳,該Topic是否可以進行消息發(fā)送钦铁,在NameServer端存儲主題的配置信息,除此之外才漆,開始檢查隊列牛曹,如果隊列不合法,返回錯誤碼
  2. 如果消息重試次數(shù)超過允許的最大重試次數(shù)醇滥,消息將進入到DLD延遲隊列黎比。延遲隊列主題:%DLQ%+消費組名,
  3. 調(diào)用DefaultMessageStore.putMessage進行消息存儲腺办。

DefaultMQProducerImpl.updateFaultItem

由上面可知焰手,在執(zhí)行消息發(fā)送完之后和出現(xiàn)發(fā)送異常的時候,會調(diào)用該方法對broker進行異常更新怀喉。
形參解釋:

  • brokerName : broker名稱
  • currentLatency : 本次消息發(fā)送延遲時間currentLatency
  • isolation : 是否隔離书妻,如果為true,則使用默認(rèn)時長30s來計算BroKer故障規(guī)避時長;如果為false,則使用本次消息發(fā)送延遲時間來計算Broker故障規(guī)避時長躬拢。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
  1. 利用computeNotAvailableDuration() 方法計算規(guī)避時長躲履。
    從latencyMax數(shù)組尾部開始查找,找到第一個比currentLatency小的下標(biāo)聊闯,然后從notAvailableDuration數(shù)組中獲取需要規(guī)避的時長工猜。
//延遲級別數(shù)組
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

//不可用時長數(shù)組
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }

    return 0;
}
  1. 根據(jù)broker名稱從緩存表faultItemTable中獲取FaultItem,如果找到則更新FaultItem,否則創(chuàng)建FaultItem。
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);


@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

一個broker對應(yīng)一個faultItem,記錄broker名稱菱蔬、消息發(fā)送時長和broker恢復(fù)正常時間篷帅。

 class FaultItem implements Comparable<FaultItem> {
        private final String name;
        private volatile long currentLatency;
        private volatile long startTimestamp;

批量消息發(fā)送

批量消息發(fā)送就是將同一主題的多條信息一起打包發(fā)送到消息服務(wù)端史侣,減少網(wǎng)絡(luò)調(diào)用次數(shù)。
單挑信息發(fā)送時魏身,消息體的內(nèi)容將保存在body中惊橱。批量消息發(fā)送,需要將多條消息體的內(nèi)容采用固定格式存儲在body中箭昵。
在消息發(fā)送端税朴,調(diào)用batch方法,將一批消息封裝成MessageBatch對象家制,之后的處理流程與上面的基本一致正林,只需要將該集合的每一條消息的消息體body聚合成一個byte[]數(shù)值,在消息服務(wù)端能夠從該byte[]數(shù)值中正確解析消息即可颤殴。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末觅廓,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子诅病,更是在濱河造成了極大的恐慌哪亿,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贤笆,死亡現(xiàn)場離奇詭異,居然都是意外死亡讨阻,警方通過查閱死者的電腦和手機芥永,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來钝吮,“玉大人埋涧,你說我怎么就攤上這事∑媸荩” “怎么了棘催?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長耳标。 經(jīng)常有香客問我,道長次坡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任宋距,我火速辦了婚禮,結(jié)果婚禮上症脂,老公的妹妹穿的比我還像新娘淫僻。我一直安慰自己壶唤,他們只是感情好嘁傀,可當(dāng)我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著视粮,像睡著了一般细办。 火紅的嫁衣襯著肌膚如雪笑撞。 梳的紋絲不亂的頭發(fā)上钓觉,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天,我揣著相機與錄音瓤狐,去河邊找鬼。 笑死础锐,一個胖子當(dāng)著我的面吹牛荧缘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播信姓,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼绸罗,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了菊值?” 一聲冷哼從身側(cè)響起系洛,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎定页,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體典徊,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年羡铲,在試婚紗的時候發(fā)現(xiàn)自己被綠了儡毕。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡雷恃,死狀恐怖费坊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情讨越,我是刑警寧澤永毅,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站节猿,受9級特大地震影響漫雕,放射性物質(zhì)發(fā)生泄漏峰鄙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一魁蒜、第九天 我趴在偏房一處隱蔽的房頂上張望吩翻。 院中可真熱鬧,春花似錦细移、人聲如沸熊锭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽代乃。三九已至,卻和暖如春搁吓,著一層夾襖步出監(jiān)牢的瞬間擎浴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工贝室, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留仿吞,地道東北人。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓峡迷,卻偏偏與公主長得像你虹,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子夯辖,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容