概覽
定時消息是指消息發(fā)送到Broker后决瞳,并不立即被消費者消費而是要等到特定的時間后才能被消費搂誉。
broker有配置項messageDelayLevel芦拿,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”支示,18個level∠窘茫可以配置自定義messageDelayLevel主之。注意择吊,messageDelayLevel是broker的屬性,不屬于某個topic槽奕。發(fā)消息時几睛,設(shè)置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:
- level == 0史翘,消息為非延遲消息
- 1<=level<=maxLevel枉长,消息延遲特定時間,例如level==1琼讽,延遲1s
- level > maxLevel必峰,則level== maxLevel,例如level==20钻蹬,延遲2h
所有延遲消息都使用同一主題SCHEDULE_TOPIC_XXXX吼蚁,這一主題下一個消息延遲級別,一個消息隊列MessageQueue问欠,一個定時任務(wù)TimerTask進(jìn)行處理肝匆。如果消息的delayLevel大于0,代表延遲消息顺献,首先將它的原來topic旗国、queueId存入到消息屬性中,然后改變消息的topic為SCHEDULE_TOPIC_XXXX注整、queueId為delayLevel-1這一隊列中能曾;然后存入到commitlog中。TimerTask執(zhí)行定時任務(wù)肿轨,從延遲隊列取出這個消息寿冕,根據(jù)topic,queueId獲取consumequeue椒袍,從consumequeue中獲取commitlog的message驼唱,再將message的REAL_TOPIC、REAL_QID屬性進(jìn)行topic驹暑、queueId還原玫恳,再次存入到commitlog文件中,等待消費者消費优俘。
加載
延遲消費隊列的消費進(jìn)度定時會存儲到 ${ROCKET HOME}rocketmq/store/config/delayOffset.json文件中纽窟,所以在項目啟動時,需要加載歷史消費記錄兼吓;還要完成消息延遲延遲消息級別和延遲時間的delayLevelTable數(shù)據(jù)的構(gòu)造;
delayOffset.json
鍵值對為queueId:offset
{
"offsetTable":{12:0,6:0,13:0,5:1,18:0,3:22}
}
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
public boolean load() {
boolean result = super.load();
// delayLevelTable數(shù)據(jù)的構(gòu)造
result = result && this.parseDelayLevel();
return result;
}
啟動定時服務(wù)
- 為每一個延遲隊列創(chuàng)建一個調(diào)度任務(wù),每一個調(diào)度任務(wù)對應(yīng)SCHEDULE_TOPIC_XXXX主題下的一個消息消費隊列森枪;
- 啟動定時任務(wù)视搏,每隔10s持久化延遲消息隊列消息進(jìn)度审孽。
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 定時任務(wù),每一個定時任務(wù)啟動時浑娜,首先延遲1s佑力;以后每次調(diào)度時,首先延遲0.1s筋遭,然后再執(zhí)行定時調(diào)度時間
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// Broker端默認(rèn)10s持久化一次消息進(jìn)度打颤,存儲文件名:${RocketMQ_ HOME}/store/config/consumerOffset.json
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
DeliverDelayedMessageTimerTask
DeliverDelayedMessageTimerTask繼承TimerTask是延遲消息定時任務(wù),一個延遲隊列一個延時TimeTask任務(wù)漓滔。TimeTask定時時間機(jī)制為延遲1秒執(zhí)行第一次编饺,以后每0.1秒執(zhí)行一次DeliverDelayedMessageTimerTask任務(wù);也就是executeOnTimeup()方法响驴,處理一個延遲級別的消息隊列透且,根據(jù)延遲級別消息隊列的消費offset值,取出在這個延遲級別消息隊列offset后面的message豁鲤,進(jìn)行延遲時間判斷秽誊,如果第一個Message的延遲時間不滿足,那就繼續(xù)提交這個DeliverDelayedMessageTimerTask定時任務(wù)琳骡,其他后面消息也不滿足延遲時間锅论,直接return;如果第一個Message到了延遲時間楣号,還原真實的Topic最易、QueueId將消息再次放入commitlog中,進(jìn)入真實的topic的隊列竖席。如果過程中出現(xiàn)了異常情況耘纱,也是重新提交這個DeliverDelayedMessageTimerTask定時任務(wù)。
@Override
public void run() {
try {
if (isStarted()) {
//
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
public void executeOnTimeup() {
// 查找SCHEDULE_TOPIC_XXXX下延遲消息的ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 先初始化失敗調(diào)度offset為最開始o(jì)ffset毕荐,后面再更新
long failScheduleOffset = offset;
if (cq != null) {
// 第一次獲取時offset初始值為0束析;獲取這個offset之后的所有message值
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
// ConsumeQueueExt 存儲ConsumeQueue的擴(kuò)展屬性;CqExtUnit為擴(kuò)展屬性byte字節(jié)大小
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 計算出一個message的offset憎亚,消息大小员寇,tagHashCode
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 消息物理偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
// 消息大小
int sizePy = bufferCQ.getByteBuffer().getInt();
// tagHashCode
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
// 計算是否到達(dá)定期時間
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
// 下一條消息在consumequeue中的offset大小,每個消息在consumequeue中存儲大小為20
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 計算消息是否到期剩余時間
long countdown = deliverTimestamp - now;
// 到達(dá)消息指定延遲時間
if (countdown <= 0) {
// 根據(jù)物理offset第美、消息大小從commitlog獲取消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
// 獲取到消息
if (msgExt != null) {
try {
// 還原成為原來的消息蝶锋,還原topic,queueId
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
// 重新存儲消息到commitlog文件中
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
// 存儲成功什往,繼續(xù)下一條消息
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
// 失敗記錄log
// 再次執(zhí)行延時任務(wù)扳缕,更新不同延時級別對應(yīng)的消費消息offset
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
// 未到消息指定的延遲時間,繼續(xù)構(gòu)建一個延時任務(wù)放入定時任務(wù)中,更新等待時間
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
// 再次創(chuàng)建延時等待任務(wù)躯舔,等待下次延遲隊列中的數(shù)據(jù)驴剔,進(jìn)行處理。更新延遲隊列消費的offset
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
// 釋放獲取到的內(nèi)存消息
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
//未找到有效的消息粥庄,更新延遲隊列定時拉取進(jìn)度
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
//未從commitlog中獲取到延時消息丧失,創(chuàng)建延遲任務(wù),等待commitlog中消息的到來惜互。
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}