寫一篇RocketMQ卷文讓自己冷靜一下

image

不吃(燒烤)不喝(奶茶可樂)看了好久才概括出這么一點點東西虚青,希望大佬們能夠有耐心看一看峡蟋,遇到說的不對的地方似忧,也歡迎在評論區(qū)或者私信與我交流

另外完整版的代碼注釋项钮,我在我的github上也添加了班眯,感興趣的小伙伴也可以點擊這個鏈接去看一波 github地址

覺得我講的有那么一點點道理,對你有那么一丟丟的幫助的烁巫,也可以給我一波點贊關(guān)注666喲~

dddd.jpeg

廢話不多說署隘,下面開始我的表演~

RocketMQ全局流程圖

RocketMQ主要流程圖.png

上來就是這么一大張圖片,相信大家肯定完全不想看下去亚隙。(那么我為什么還要放在一開始呢磁餐?主要是為了能夠讓大家有一個全局的印象,然后后續(xù)復(fù)習(xí)的時候也可以根據(jù)這個流程圖去具體復(fù)習(xí))

那么恃鞋,下面我們就針對一些問題來具體描述RocketMQ的工作流程 此處內(nèi)容會不斷補充崖媚,也歡迎大家把遇到的問題在評論區(qū)留下來

消息消費邏輯

消息消費可以分為三大模塊

  • Rebalance
  • 拉取消息
  • 消費消息

Rebalance

Rebalance邏輯.png
// RebalanceImpl
public void doRebalance(final boolean isOrder) {
  Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  if (subTable != null) {
    // 遍歷每個主題的隊列
    // subTable 會在 DefaultMQPushConsumerImpl 的 subscribe 和 unsubscribe 時修改
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
      final String topic = entry.getKey();
      try {
        // 對隊列進(jìn)行重新負(fù)載
        this.rebalanceByTopic(topic, isOrder);
      } catch (Throwable e) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("rebalanceByTopic Exception", e);
        }
      }
    }
  }

  this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
  switch (messageModel) {
    case BROADCASTING: {
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      if (mqSet != null) {
        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
        if (changed) {
          this.messageQueueChanged(topic, mqSet, mqSet);
          log.info("messageQueueChanged {} {} {} {}",
                   consumerGroup,
                   topic,
                   mqSet,
                   mqSet);
        }
      } else {
        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
      }
      break;
    }
    case CLUSTERING: {
      // topicSubscribeInfoTable topic訂閱信息緩存表
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      // 發(fā)送請求到broker獲取topic下該消費組內(nèi)當(dāng)前所有的消費者客戶端id
      List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
      if (null == mqSet) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
        }
      }

      if (null == cidAll) {
        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
      }

      if (mqSet != null && cidAll != null) {
        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
        mqAll.addAll(mqSet);

        // 排序保證了同一個消費組內(nèi)消費者看到的視圖保持一致亦歉,確保同一個消費隊列不會被多個消費者分配
        Collections.sort(mqAll);
        Collections.sort(cidAll);

        // 分配算法 (盡量使用前兩種)
        // 默認(rèn)有5種 1)平均分配 2)平均輪詢分配 3)一致性hash
        // 4)根據(jù)配置 為每一個消費者配置固定的消息隊列 5)根據(jù)broker部署機(jī)房名,對每個消費者負(fù)責(zé)不同的broker上的隊列
        // 但是如果消費者數(shù)目大于消息隊列數(shù)量畅哑,則會有些消費者無法消費消息
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

        // 當(dāng)前消費者分配到的隊列
        List<MessageQueue> allocateResult = null;
        try {
          allocateResult = strategy.allocate(
            this.consumerGroup,
            this.mQClientFactory.getClientId(),
            mqAll,
            cidAll);
        } catch (Throwable e) {
          log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                    e);
          return;
        }

        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
        if (allocateResult != null) {
          allocateResultSet.addAll(allocateResult);
        }

        // 更新消息消費隊列肴楷,如果是新增的消息消費隊列,則會創(chuàng)建一個消息拉取請求并立即執(zhí)行拉取
        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        if (changed) {
          log.info(
            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
            allocateResultSet.size(), allocateResultSet);
          this.messageQueueChanged(topic, mqSet, allocateResultSet);
        }
      }
      break;
    }
    default:
      break;
  }
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                   final boolean isOrder) {
  boolean changed = false;

  Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
  while (it.hasNext()) {
    Entry<MessageQueue, ProcessQueue> next = it.next();
    MessageQueue mq = next.getKey();
    ProcessQueue pq = next.getValue();

    if (mq.getTopic().equals(topic)) {
      // 當(dāng)前分配到的隊列中不包含原先的隊列(說明當(dāng)前隊列被分配給了其他消費者)
      if (!mqSet.contains(mq)) {
        // 丟棄 processQueue
        pq.setDropped(true);
        // 移除當(dāng)前消息隊列
        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
          it.remove();
          changed = true;
          log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
        }
      } else if (pq.isPullExpired()) {
        switch (this.consumeType()) {
          case CONSUME_ACTIVELY:
            break;
          case CONSUME_PASSIVELY:
            pq.setDropped(true);
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
              it.remove();
              changed = true;
              log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                        consumerGroup, mq);
            }
            break;
          default:
            break;
        }
      }
    }
  }

  List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
  for (MessageQueue mq : mqSet) {
    // 消息消費隊列緩存中不存在當(dāng)前隊列 本次分配新增的隊列
    if (!this.processQueueTable.containsKey(mq)) {
      // 向broker發(fā)起鎖定隊列請求 (向broker端請求鎖定MessageQueue,同時在本地鎖定對應(yīng)的ProcessQueue)
      if (isOrder && !this.lock(mq)) {
        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
        // 加鎖失敗荠呐,跳過赛蔫,等待下一次隊列重新負(fù)載時再嘗試加鎖
        continue;
      }

      // 從內(nèi)存中移除該消息隊列的消費進(jìn)度
      this.removeDirtyOffset(mq);
      ProcessQueue pq = new ProcessQueue();

      long nextOffset = -1L;
      try {
        nextOffset = this.computePullFromWhereWithException(mq);
      } catch (Exception e) {
        log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
        continue;
      }

      if (nextOffset >= 0) {
        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
        if (pre != null) {
          log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
        } else {
          // 首次添加,構(gòu)建拉取消息的請求
          log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
          PullRequest pullRequest = new PullRequest();
          pullRequest.setConsumerGroup(consumerGroup);
          pullRequest.setNextOffset(nextOffset);
          pullRequest.setMessageQueue(mq);
          pullRequest.setProcessQueue(pq);
          pullRequestList.add(pullRequest);
          changed = true;
        }
      } else {
        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
      }
    }
  }

  // 立即拉取消息(對新增的隊列)
  this.dispatchPullRequest(pullRequestList);

  return changed;
}

由流程圖和代碼泥张,我們可以得知呵恢,集群模式下消息負(fù)載主要有以下幾個步驟:

  1. 從Broker獲取訂閱當(dāng)前Topic的消費者列表
  2. 根據(jù)具體的策略進(jìn)行負(fù)載均衡
  3. 對當(dāng)前消費者分配到的隊列進(jìn)行處理
    1. 原來有,現(xiàn)在沒有:丟棄對應(yīng)的消息處理隊列(ProcessQueue)
    2. 原來沒有媚创,現(xiàn)在有:添加消息處理隊列(ProcessQueue)渗钉,如果是第一次新增,還會創(chuàng)建一個消息拉取請求

拉取消息

拉取消息邏輯.png

拉取消息的代碼太多了钞钙,我就不再這里貼出來了鳄橘。

我在這里說一下大致流程,然后有幾個需要注意的地方

流程:在我們Rebalance第一次添加負(fù)責(zé)的隊列和后續(xù)拉取消息后芒炼,都會再提交一個拉取請求到拉取請求隊列(pullRequestQueue)中瘫怜,然后有一個線程不停的去里面獲取拉取請求,去執(zhí)行拉取的操作

這里說一個RocketMQ消費者這邊設(shè)計的一個亮點

它將拉取消息本刽,消費消息通過兩個任務(wù)隊列的方式進(jìn)行解耦鲸湃,然后每一個模塊僅需要負(fù)責(zé)它自己的功能。(雖然大佬們覺得很常見子寓,但是當(dāng)時我看的時候還是感覺妙呀~)

另外還有一點需要注意的是:拉取消息的時候broker和consumer都會對消息進(jìn)行過濾暗挑,只不過broker是根據(jù)tag的hash進(jìn)行過濾的,而consumer是根據(jù)具體的tag字符串匹配過濾的别瞭。這也是有的時候窿祥,明明拉取到了消息,但是卻沒有需要消費的消息產(chǎn)生的原因

既然說到了消息過濾蝙寨,這邊先簡單提一下RocketMQ消息過濾的幾種方式

  • 表達(dá)式過濾
    • tag
    • SQL92
  • 類過濾

消費消息

消息消費邏輯.png

這邊也先說幾個注意點吧,后面再單獨出篇文章嗤瞎。

(一)順序消費和非順序消費消費失敗的處理

(二)消費失敗偏移量的更新:只有當(dāng)前這批消息全部消費成功后墙歪,才會將偏移量更新成為這批消息最后一條的偏移量

(三)廣播消息失敗不會重試,僅打印失敗日志

補充:為什么同一個消費組下消費者的訂閱信息要相同

首先贝奇,先說一下什么叫做同一個消費組下消費者的訂閱信息要相同

即:在相同的GroupId下虹菲,每一個消費者他們的訂閱內(nèi)容(Topic+Tag)要保持一致,否則會導(dǎo)致消息無法被正常消費

參考文檔:阿里云:訂閱關(guān)系一致

Rebalance細(xì)節(jié).png

我們在看待這個問題的時候掉瞳,可以把它分為兩類情況考慮

  • topic不一致
  • tag不一致

(一)topic不一致的問題

首先先說一個場景毕源,消費者A監(jiān)聽了TopicA浪漠,消費者B監(jiān)聽了TopicB,但是消費者A和消費者B同屬一個groupTest

在Rebalance階段霎褐,消費者A對TopicA進(jìn)行負(fù)載均衡時址愿,會去查詢groupTest下的所有消費者信息。獲取到了消費者A和消費者B冻璃。此時就會將TopicA的隊列對消費者A和消費者B進(jìn)行負(fù)載均衡(例如消費者A分配到了1234四個隊列响谓,消費者B分配到了5678四個隊列)。此時消費者B沒有針對TopicA的處理邏輯省艳,就會導(dǎo)致推送到5678這幾個隊列里面的消息沒有辦法得到處理娘纷。

(二)tag不一致的問題

隨著消費者A,消費者B負(fù)載均衡的不斷進(jìn)行跋炕,會不斷把最新的訂閱信息(消息過濾規(guī)則)上報給broker赖晶。broker就會不斷的覆蓋更新,導(dǎo)致tag信息不停地變化辐烂,而tag的變化在消費者拉取消息時broker的過濾就會產(chǎn)生影響嬉探,會導(dǎo)致一些本來要被消費者拉取到的消息被broker過濾掉

消費者總結(jié)

講了這么多的消費者的內(nèi)容,出現(xiàn)了好多名詞棉圈,也把消費者的一些比較核心的內(nèi)容逐個講了一遍涩堤。

那么,在這里分瘾,我們將消費者這個模塊里面的所有東西胎围,在進(jìn)行一個完整的串聯(lián)。然后消費者這一方面的介紹就要告一段落了

消費者業(yè)務(wù)串聯(lián)流程圖

延時隊列是如何工作的

RocketMQ延時隊列工作流程圖.png

由流程圖中我們不難看出德召,RocketMQ對延時消息的處理白魂,是交由Timer去完成的(相關(guān)類ScheduleMessageService)。在Timer的任務(wù)隊列中讀取需要處理的延遲任務(wù)上岗,將消息從延遲隊列轉(zhuǎn)發(fā)到具體的業(yè)務(wù)隊列中

此處補充一點:此處提到的Timer為java工具類包(java.util.Timer)下的一個定時任務(wù)工具福荸。它主要由兩個部分:TaskQueue queue(任務(wù)隊列)和TimerThread thread(工作線程)。這邊我把它簡單的類比為一個單線程的工作線程池

另外在ScheduleMessageService中使用到了Timer的兩個方法肴掷,我在這里先單獨列出來下

  • this.timer.schedule :在任務(wù)執(zhí)行成功后敬锐,再加上對應(yīng)的周期,然后再執(zhí)行
  • this.timer.scheduleAtFixedRate :每隔指定時間就執(zhí)行一次呆瞻,與任務(wù)執(zhí)行時間無關(guān)

話不多少台夺,貼上源碼(源碼雖然枯燥,但希望可以耐心的看完)

// ScheduleMessageService
public void start() {
  if (started.compareAndSet(false, true)) {
    super.load();
    this.timer = new Timer("ScheduleMessageTimerThread", true);
    // 根據(jù)延時隊列創(chuàng)建對應(yīng)的定時任務(wù)
    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
      Integer level = entry.getKey();
      Long timeDelay = entry.getValue();
      Long offset = this.offsetTable.get(level);
      if (null == offset) {
        offset = 0L;
      }

      if (timeDelay != null) {
        // 第一次痴脾,延遲一秒執(zhí)行任務(wù)颤介,后續(xù)根據(jù)對應(yīng)延時時間來執(zhí)行
        // 延時級別和消息隊列id對應(yīng)關(guān)系 : 消息隊列id = 延時級別 - 1
        // shedule 在任務(wù)執(zhí)行成功后,再加上對應(yīng)的周期,然后再執(zhí)行
        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
      }
    }

    // scheduleAtFixedRate 每隔指定時間就執(zhí)行一次滚朵,與任務(wù)執(zhí)行時間無關(guān)
    this.timer.scheduleAtFixedRate(new TimerTask() {

      @Override
      public void run() {
        try {
          if (started.get()) {
            // 每個十秒持久化一次延遲隊列的處理進(jìn)度
            ScheduleMessageService.this.persist();
          }
        } catch (Throwable e) {
          log.error("scheduleAtFixedRate flush exception", e);
        }
      }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
  }
}
// DeliverDelayedMessageTimerTask
@Override
public void run() {
  try {
    if (isStarted()) {
      this.executeOnTimeup();
    }
  } catch (Exception e) {
    // XXX: warn and notify me
    log.error("ScheduleMessageService, executeOnTimeup exception", e);
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
      this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
  }
}

public void executeOnTimeup() {
  // 根據(jù) 延時隊列topic 和 延時隊列id 查找消費隊列
  ConsumeQueue cq =
    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                                                                     delayLevel2QueueId(delayLevel));

  long failScheduleOffset = offset;

  if (cq != null) {
    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
    if (bufferCQ != null) {
      try {
        long nextOffset = offset;
        int i = 0;
        // 遍歷ConsumeQueue冤灾,每一個標(biāo)準(zhǔn)的ConsumeQueue條目為20字節(jié)
        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
          long offsetPy = bufferCQ.getByteBuffer().getLong();
          int sizePy = bufferCQ.getByteBuffer().getInt();
          long tagsCode = bufferCQ.getByteBuffer().getLong();

          if (cq.isExtAddr(tagsCode)) {
            if (cq.getExt(tagsCode, cqExtUnit)) {
              tagsCode = cqExtUnit.getTagsCode();
            } else {
              //can't find ext content.So re compute tags code.
              log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                        tagsCode, offsetPy, sizePy);
              long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
              tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
            }
          }

          long now = System.currentTimeMillis();
          long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

          nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

          // > 0 未到消息消費時間
          long countdown = deliverTimestamp - now;

          if (countdown <= 0) {
            MessageExt msgExt =
              ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
              offsetPy, sizePy);

            if (msgExt != null) {
              try {
                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                  log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                  continue;
                }
                // 放到對應(yīng)的 %RETRY%+gid 重試topic下進(jìn)行消費(轉(zhuǎn)發(fā)消息)
                PutMessageResult putMessageResult =
                  ScheduleMessageService.this.writeMessageStore
                  .putMessage(msgInner);

                if (putMessageResult != null
                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                  if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
                                                                                                            putMessageResult.getAppendMessageResult().getWroteBytes());
                    ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
                  }
                  continue;
                } else {
                  // XXX: warn and notify me
                  log.error(
                    "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                    msgExt.getTopic(), msgExt.getMsgId());
                  ScheduleMessageService.this.timer.schedule(
                    new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                       nextOffset), DELAY_FOR_A_PERIOD);
                  ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                           nextOffset);
                  return;
                }
              } catch (Exception e) {
                /*
                                         * XXX: warn and notify me
                                         */
                log.error(
                  "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
              }
            }
          } else {
            // 會將下次任務(wù)執(zhí)行時間設(shè)置為countdown 即 消息的延時轉(zhuǎn)發(fā)時間-當(dāng)前時間
            ScheduleMessageService.this.timer.schedule(
              new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
              countdown);
            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
            return;
          }
        } // end of for

        // 更新延時隊列拉取任務(wù)進(jìn)度
        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
          this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
        return;
      } finally {

        bufferCQ.release();
      }
    } // end of if (bufferCQ != null)
    else {
      // 消費隊列不存在,默認(rèn)為沒有需要消費的任務(wù)辕近,跳過本次消費

      long cqMinOffset = cq.getMinOffsetInQueue();
      long cqMaxOffset = cq.getMaxOffsetInQueue();
      if (offset < cqMinOffset) {
        // 下次拉取任務(wù)進(jìn)度更新
        failScheduleOffset = cqMinOffset;
        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                  offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
      }

      if (offset > cqMaxOffset) {
        failScheduleOffset = cqMaxOffset;
        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                  offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
      }
    }
  } // end of if (cq != null)

  // 根據(jù)延時等級創(chuàng)建一個任務(wù)
  ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                                                failScheduleOffset), DELAY_FOR_A_WHILE);
}

我的博客傳送門

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末韵吨,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子亏推,更是在濱河造成了極大的恐慌学赛,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吞杭,死亡現(xiàn)場離奇詭異盏浇,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)芽狗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進(jìn)店門绢掰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人童擎,你說我怎么就攤上這事滴劲。” “怎么了顾复?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵班挖,是天一觀的道長。 經(jīng)常有香客問我芯砸,道長萧芙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任假丧,我火速辦了婚禮双揪,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘包帚。我一直安慰自己渔期,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布渴邦。 她就那樣靜靜地躺著疯趟,像睡著了一般。 火紅的嫁衣襯著肌膚如雪几莽。 梳的紋絲不亂的頭發(fā)上迅办,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼。 笑死纤垂,一個胖子當(dāng)著我的面吹牛矾策,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播峭沦,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼贾虽,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了吼鱼?” 一聲冷哼從身側(cè)響起蓬豁,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎菇肃,沒想到半個月后地粪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡琐谤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年蟆技,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斗忌。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡质礼,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出织阳,到底是詐尸還是另有隱情眶蕉,我是刑警寧澤,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布唧躲,位于F島的核電站造挽,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏惊窖。R本人自食惡果不足惜刽宪,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望界酒。 院中可真熱鬧圣拄,春花似錦、人聲如沸毁欣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凭疮。三九已至饭耳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間执解,已是汗流浹背寞肖。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人新蟆。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓觅赊,卻偏偏與公主長得像,于是被迫代替她去往敵國和親琼稻。 傳聞我的和親對象是個殘疾皇子吮螺,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容