定時消息機(jī)制

概覽

定時消息是指消息發(fā)送到Broker后决瞳,并不立即被消費者消費而是要等到特定的時間后才能被消費搂誉。

broker有配置項messageDelayLevel芦拿,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”支示,18個level∠窘茫可以配置自定義messageDelayLevel主之。注意择吊,messageDelayLevel是broker的屬性,不屬于某個topic槽奕。發(fā)消息時几睛,設(shè)置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:

  • level == 0史翘,消息為非延遲消息
  • 1<=level<=maxLevel枉长,消息延遲特定時間,例如level==1琼讽,延遲1s
  • level > maxLevel必峰,則level== maxLevel,例如level==20钻蹬,延遲2h

所有延遲消息都使用同一主題SCHEDULE_TOPIC_XXXX吼蚁,這一主題下一個消息延遲級別,一個消息隊列MessageQueue问欠,一個定時任務(wù)TimerTask進(jìn)行處理肝匆。如果消息的delayLevel大于0,代表延遲消息顺献,首先將它的原來topic旗国、queueId存入到消息屬性中,然后改變消息的topic為SCHEDULE_TOPIC_XXXX注整、queueId為delayLevel-1這一隊列中能曾;然后存入到commitlog中。TimerTask執(zhí)行定時任務(wù)肿轨,從延遲隊列取出這個消息寿冕,根據(jù)topic,queueId獲取consumequeue椒袍,從consumequeue中獲取commitlog的message驼唱,再將message的REAL_TOPIC、REAL_QID屬性進(jìn)行topic驹暑、queueId還原玫恳,再次存入到commitlog文件中,等待消費者消費优俘。

加載

延遲消費隊列的消費進(jìn)度定時會存儲到 ${ROCKET HOME}rocketmq/store/config/delayOffset.json文件中纽窟,所以在項目啟動時,需要加載歷史消費記錄兼吓;還要完成消息延遲延遲消息級別和延遲時間的delayLevelTable數(shù)據(jù)的構(gòu)造;

delayOffset.json
鍵值對為queueId:offset
{
    "offsetTable":{12:0,6:0,13:0,5:1,18:0,3:22}
}

// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);
public boolean load() {
    boolean result = super.load();
    // delayLevelTable數(shù)據(jù)的構(gòu)造
    result = result && this.parseDelayLevel();
    return result;
}

啟動定時服務(wù)

  1. 為每一個延遲隊列創(chuàng)建一個調(diào)度任務(wù),每一個調(diào)度任務(wù)對應(yīng)SCHEDULE_TOPIC_XXXX主題下的一個消息消費隊列森枪;
  2. 啟動定時任務(wù)视搏,每隔10s持久化延遲消息隊列消息進(jìn)度审孽。
public void start() {
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                // 定時任務(wù),每一個定時任務(wù)啟動時浑娜,首先延遲1s佑力;以后每次調(diào)度時,首先延遲0.1s筋遭,然后再執(zhí)行定時調(diào)度時間
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
        // Broker端默認(rèn)10s持久化一次消息進(jìn)度打颤,存儲文件名:${RocketMQ_ HOME}/store/config/consumerOffset.json
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}
DeliverDelayedMessageTimerTask

DeliverDelayedMessageTimerTask繼承TimerTask是延遲消息定時任務(wù),一個延遲隊列一個延時TimeTask任務(wù)漓滔。TimeTask定時時間機(jī)制為延遲1秒執(zhí)行第一次编饺,以后每0.1秒執(zhí)行一次DeliverDelayedMessageTimerTask任務(wù);也就是executeOnTimeup()方法响驴,處理一個延遲級別的消息隊列透且,根據(jù)延遲級別消息隊列的消費offset值,取出在這個延遲級別消息隊列offset后面的message豁鲤,進(jìn)行延遲時間判斷秽誊,如果第一個Message的延遲時間不滿足,那就繼續(xù)提交這個DeliverDelayedMessageTimerTask定時任務(wù)琳骡,其他后面消息也不滿足延遲時間锅论,直接return;如果第一個Message到了延遲時間楣号,還原真實的Topic最易、QueueId將消息再次放入commitlog中,進(jìn)入真實的topic的隊列竖席。如果過程中出現(xiàn)了異常情況耘纱,也是重新提交這個DeliverDelayedMessageTimerTask定時任務(wù)。

@Override
public void run() {
    try {
        if (isStarted()) {
            // 
            this.executeOnTimeup();
        }
    } catch (Exception e) {
        // XXX: warn and notify me
        log.error("ScheduleMessageService, executeOnTimeup exception", e);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
            this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    }
}

public void executeOnTimeup() {
    // 查找SCHEDULE_TOPIC_XXXX下延遲消息的ConsumeQueue
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));
    // 先初始化失敗調(diào)度offset為最開始o(jì)ffset毕荐,后面再更新
    long failScheduleOffset = offset;

    if (cq != null) {
        // 第一次獲取時offset初始值為0束析;獲取這個offset之后的所有message值
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                // ConsumeQueueExt 存儲ConsumeQueue的擴(kuò)展屬性;CqExtUnit為擴(kuò)展屬性byte字節(jié)大小
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                // 計算出一個message的offset憎亚,消息大小员寇,tagHashCode
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 消息物理偏移量
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    // 消息大小
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // tagHashCode
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                tagsCode, offsetPy, sizePy);
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    long now = System.currentTimeMillis();
                    // 計算是否到達(dá)定期時間
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    // 下一條消息在consumequeue中的offset大小,每個消息在consumequeue中存儲大小為20
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    // 計算消息是否到期剩余時間
                    long countdown = deliverTimestamp - now;
                    // 到達(dá)消息指定延遲時間
                    if (countdown <= 0) {
                        // 根據(jù)物理offset第美、消息大小從commitlog獲取消息
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);
                        // 獲取到消息
                        if (msgExt != null) {
                            try {
                                // 還原成為原來的消息蝶锋,還原topic,queueId
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                    log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                            msgInner.getTopic(), msgInner);
                                    continue;
                                }
                                // 重新存儲消息到commitlog文件中
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.writeMessageStore
                                        .putMessage(msgInner);
                                // 存儲成功什往,繼續(xù)下一條消息
                                if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    // 失敗記錄log
                                    // 再次執(zhí)行延時任務(wù)扳缕,更新不同延時級別對應(yīng)的消費消息offset
                                    log.error(
                                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                        msgExt.getTopic(), msgExt.getMsgId());
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                /*
                                 * XXX: warn and notify me
                                 */
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        // 未到消息指定的延遲時間,繼續(xù)構(gòu)建一個延時任務(wù)放入定時任務(wù)中,更新等待時間
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for

                // 再次創(chuàng)建延時等待任務(wù)躯舔,等待下次延遲隊列中的數(shù)據(jù)驴剔,進(jìn)行處理。更新延遲隊列消費的offset
                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {
                // 釋放獲取到的內(nèi)存消息
                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {
            //未找到有效的消息粥庄,更新延遲隊列定時拉取進(jìn)度
            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)
    //未從commitlog中獲取到延時消息丧失,創(chuàng)建延遲任務(wù),等待commitlog中消息的到來惜互。
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末布讹,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子训堆,更是在濱河造成了極大的恐慌描验,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蔫慧,死亡現(xiàn)場離奇詭異挠乳,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)姑躲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進(jìn)店門睡扬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人黍析,你說我怎么就攤上這事卖怜。” “怎么了阐枣?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵马靠,是天一觀的道長。 經(jīng)常有香客問我蔼两,道長甩鳄,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任额划,我火速辦了婚禮妙啃,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘俊戳。我一直安慰自己揖赴,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布抑胎。 她就那樣靜靜地躺著燥滑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪阿逃。 梳的紋絲不亂的頭發(fā)上铭拧,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天赃蛛,我揣著相機(jī)與錄音,去河邊找鬼羽历。 笑死焊虏,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的秕磷。 我是一名探鬼主播,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼炼团,長吁一口氣:“原來是場噩夢啊……” “哼澎嚣!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起瘟芝,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤易桃,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后锌俱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體晤郑,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年贸宏,在試婚紗的時候發(fā)現(xiàn)自己被綠了造寝。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡吭练,死狀恐怖诫龙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鲫咽,我是刑警寧澤签赃,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站分尸,受9級特大地震影響锦聊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜箩绍,卻給世界環(huán)境...
    茶點故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一孔庭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧伶选,春花似錦史飞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至陨簇,卻和暖如春吐绵,著一層夾襖步出監(jiān)牢的瞬間迹淌,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工己单, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留唉窃,地道東北人。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓纹笼,卻偏偏與公主長得像纹份,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子廷痘,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,851評論 2 361

推薦閱讀更多精彩內(nèi)容