順序消息的實(shí)現(xiàn)
順序消息進(jìn)行消費(fèi)時(shí),若是第一次消費(fèi)失敗烈钞,可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT
,下一次會(huì)繼續(xù)消費(fèi)此消息。
順序消息的消費(fèi)失敗時(shí)的重試邏輯遇西,具體代碼在ProccessQueue中,順序消費(fèi)時(shí)手動(dòng)從processQueue中取消息严嗜,內(nèi)部是從msgTreeMap中取出消息后粱檀,將消息添加到consumingMsgOrderlyTreeMap中,若是消費(fèi)成功漫玄,將該消息從consumingMsgOrderlyTreeMap中刪除即可茄蚯。若是消費(fèi)失敗,執(zhí)行makeMessageToConsumeAgain方法睦优,將這些消息再放回msgTreeMap渗常。
順序消費(fèi)時(shí)有回滾和重試的邏輯,但是新版本不建議使用汗盘≈宓猓回滾和重試的邏輯和上面相同,回滾時(shí)將消息重新放回treeMap隐孽,提交時(shí)不用操作treeMap癌椿,但是需要根據(jù)consumingMsgOrderlyTreeMap找到當(dāng)前消費(fèi)的offset,從下一個(gè)繼續(xù)消費(fèi)菱阵。
順序消息消費(fèi)時(shí)使用同一個(gè)線(xiàn)程踢俄,可以看一下ConsumeMessageOrderlyService
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(), // 迷惑性代碼...
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
因?yàn)閝ueue的長(zhǎng)度是Integer.MAX_VALUE,因此在進(jìn)行消費(fèi)時(shí)使用的是一個(gè)線(xiàn)程晴及,并且有序執(zhí)行都办。
順序消息的消費(fèi)使用同一個(gè)線(xiàn)程是在ConsumeMessageOrderlyService.ConsumeRequest和ProcessQueue中實(shí)現(xiàn)的。
// ProcessQueue
private volatile boolean consuming = false;
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
// 如果有消息可以進(jìn)行消費(fèi)虑稼,并且當(dāng)前queue沒(méi)有消費(fèi)琳钉,則將dispatchToConsume和consuming置為true
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;
this.consuming = true;
}
if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
return dispatchToConsume;
}
// ConsumeMessageOrderlyService
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) { // putMessage返回true時(shí),才將request提交到線(xiàn)程池
// 如果已經(jīng)開(kāi)始對(duì)該queue進(jìn)行消費(fèi)了蛛倦,就不會(huì)再次提交任務(wù)
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
// 提交給線(xiàn)程池的任務(wù)
// 主要代碼
class ConsumeRequest implements Runnable {
@Override
public void run() {
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
// 如果可以繼續(xù)消費(fèi)歌懒,直接在當(dāng)前線(xiàn)程中輪詢(xún)消費(fèi)該P(yáng)rocessQueue即可
for (boolean continueConsume = true; continueConsume; ) {
// 在consumerImpl中的pullMessage方法中持續(xù)給ProcessQueue添加消息
// 手動(dòng)從ProcessQueue中取消息
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
if (!msgs.isEmpty()) {
try {
this.processQueue.getLockConsume().lock();
//消費(fèi)消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
} finally {
this.processQueue.getLockConsume().unlock();
}
// 處理消費(fèi)結(jié)果,若是成功繼續(xù)消費(fèi)
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
}
}
看代碼可以發(fā)現(xiàn)胰蝠,如果順序消息消費(fèi)失敗的話(huà)歼培,即消費(fèi)返回SUSPEND_CURRENT_QUEUE_A_MONENT
時(shí)震蒋,當(dāng)前線(xiàn)程會(huì)停止消費(fèi),在processConsumeResult時(shí)躲庄,會(huì)提交新的任務(wù)到線(xiàn)程池查剖,在新的線(xiàn)程中繼續(xù)消費(fèi)該消息。
核心邏輯是保證一個(gè)ProcessQueue只在一個(gè)線(xiàn)程中輪詢(xún)消費(fèi)消息噪窘。
發(fā)送順序消息時(shí)會(huì)添加一個(gè)隊(duì)列選擇器笋庄,將需要有序的消息發(fā)送到同一個(gè)隊(duì)列。消費(fèi)端拉取特定queue的數(shù)據(jù)時(shí)天生有序倔监,在消費(fèi)時(shí)使用同一個(gè)線(xiàn)程進(jìn)行消費(fèi)直砂,因此就實(shí)現(xiàn)了順序消息。
事務(wù)消息
二階段提交加補(bǔ)償機(jī)制
第一階段提交消息到broker浩习,broker將topic修改為RMQ_SYS_TRANS_HALF_TOPIC
静暂,存入對(duì)consumer不可見(jiàn)的topic/queue。如果此階段寫(xiě)入成功谱秽,執(zhí)行transactionListener.executeLocalTransaction()
洽蛀。
第二階段,根據(jù)本地事務(wù)的執(zhí)行結(jié)果提交或者回滾第一階段提交至broker的消息疟赊,這里使用的是OneWay方法郊供,可靠性低,可能出現(xiàn)失敗或者超時(shí)的情況近哟。
broker端處理RequestCode.END_TRANSACTION
的請(qǐng)求驮审,如果是commit,則將原來(lái)的消息取出吉执,更改為正確的topic/queue疯淫,并進(jìn)行落盤(pán),然后添加Op狀態(tài)鼠证。如果是rollback峡竣,則直接添加Op狀態(tài)即可靠抑。
添加Op狀態(tài)是將消息添加到Op隊(duì)列中量九,Op隊(duì)列是為了補(bǔ)償邏輯時(shí)減少判斷。
補(bǔ)償邏輯:
BrokerController啟動(dòng)時(shí)會(huì)啟動(dòng)TransactionMessageCheckService颂碧,默認(rèn)每隔60s檢查一次HALF_TOPIC下所有的queue中的消息荠列,檢查步驟如下
- 先判斷當(dāng)前queue和對(duì)應(yīng)的opQueue是否添加過(guò)消息,如果沒(méi)有载城,遍歷下一個(gè)queue肌似,若有,進(jìn)行下一步判斷
- 獲取對(duì)應(yīng)的opQueue中的消息诉瓦,若是沒(méi)有消息川队,遍歷下一個(gè)queue力细,若有,進(jìn)行下一步判斷
- 遍歷當(dāng)前queue
- 如果當(dāng)前偏移量已經(jīng)添加了oP狀態(tài)固额,直接遍歷至下一個(gè)偏移量眠蚂,否則進(jìn)行下一步判斷
- 獲取當(dāng)前消息,若為null斗躏,遍歷下一個(gè)偏移量逝慧,若不為null,進(jìn)行下一步判斷
- 若當(dāng)前消息需要舍棄或者跳過(guò)啄糙,遍歷下一個(gè)偏移量笛臣,否則進(jìn)行下一步判斷
- 判斷當(dāng)前消息是否需要check,若暫時(shí)不需要隧饼,重新走判斷流程
- 若是需要check沈堡,broker端給producer發(fā)送
CHECK_TRANSACTION_STATE
消息,producer端接收到消息后燕雁,執(zhí)行TransactionListener.checkLocalTransaction
踱蛀,將check結(jié)果回發(fā)給broker。