背景
rocketmq支持順序消費,是很多業(yè)務中要用的一個場景慨代,我就好奇他是怎么實現(xiàn)的,需要了解背后的原理啸如,是怎么支持順序消費的侍匙,這樣有問題的時候我們才能快速的定位問題,這是一個合格的架構師必備的能力叮雳。
分配MessageQueue
rocketmq 在啟動消費時想暗,會對topic的mq進行reblance,如果是新分配的message queue帘不,如果是順序消費说莫,即isorder為true。則需要先對該
message queue 獲取分布式鎖寞焙,獲取成功才能真正開始消費储狭,代碼入心:
boolean allMQLocked = true;
List<PullRequest> pullRequestList = new ArrayList<>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//新分配的message queue 如果是順序消費,需要先獲取鎖捣郊,獲取成功
//則創(chuàng)建messagequeue 開始拉起數(shù)據辽狈,否則不能消費給mq。
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
//如果獲取失敗呛牲,則不消費這個mq刮萌。
allMQLocked = false;
continue;
}
//如果是順序消費,只有獲取成功娘扩,才開始消費的準備工作尊勿。
this.removeDirtyOffset(mq);
ProcessQueue pq = createProcessQueue(topic);
pq.setLocked(true);
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
獲取鎖
獲取鎖的代碼不需要看,我們只需要關心下請求參數(shù)即可畜侦,因為關鍵實現(xiàn)在broker端:
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
順序消費獲取鎖的代碼可用看出,需要告訴broker端三個參數(shù):
- consumer group 消費分組躯保。
- 客戶端id旋膳,即consumer的標識
- mq,即message queue 是對那個queue的順序消費途事。
請求類型是LOCK_BATCH_MQ验懊,broker server 會用默認的processor來處理這個請擅羞。如果沒有獲取到鎖,則lockedMq是空的义图,沒有直减俏,則返回false,所以接下來碱工,我們看下服務端是怎么做的娃承,來保證這個順序消費。
Broker鎖實現(xiàn)
broker server 處理LOCK_BATCH_MQ
的請求時通過defaultRequestProcessorPair來負責處理怕篷,defaultRequestProcessorPair是AdminBrokerProcessor,實現(xiàn)邏輯在lockBatchMQ方法历筝,代碼如下:
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
Set<MessageQueue> lockOKMQSet = new HashSet<>();
//根據group和mq,嘗試對沒有被其他consumer鎖定會加鎖廊谓,只有沒有枷鎖的messagequeue梳猪,或者其他的鎖已經過期了,才能上鎖蒸痹。
//selfLockOKMQSet 是成功獲取鎖的message queue
Set<MessageQueue> selfLockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
requestBody.getConsumerGroup(),
requestBody.getMqSet(),
requestBody.getClientId());
//看是否要請求其他的server春弥,客戶端發(fā)起的時false,broker發(fā)起的是true
if (requestBody.isOnlyThisBroker() || !brokerController.getBrokerConfig().isLockInStrictMode()) {
lockOKMQSet = selfLockOKMQSet;
} else {
//設置OnlyThisBroker為true叠荠,讓其他的server接到請求時不再請求其他的server了
requestBody.setOnlyThisBroker(true);
//獲取副本數(shù)
int replicaSize = this.brokerController.getMessageStoreConfig().getTotalReplicas();
//計算過半quorum
int quorum = replicaSize / 2 + 1;
if (quorum <= 1) {
//如果就一個匿沛,則不需要再請求其他的broker server
lockOKMQSet = selfLockOKMQSet;
} else {
//有多個副本,對所有broker嘗試加鎖蝙叛。
final ConcurrentMap<MessageQueue, Integer> mqLockMap = new ConcurrentHashMap<>();
//先對本地加鎖的mq 標記為1
for (MessageQueue mq : selfLockOKMQSet) {
if (!mqLockMap.containsKey(mq)) {
mqLockMap.put(mq, 0);
}
mqLockMap.put(mq, mqLockMap.get(mq) + 1);
}
BrokerMemberGroup memberGroup = this.brokerController.getBrokerMemberGroup();
if (memberGroup != null) {
Map<Long, String> addrMap = new HashMap<>(memberGroup.getBrokerAddrs());
addrMap.remove(this.brokerController.getBrokerConfig().getBrokerId());
final CountDownLatch countDownLatch = new CountDownLatch(addrMap.size());
requestBody.setMqSet(selfLockOKMQSet);
requestBody.setOnlyThisBroker(true);
for (Long brokerId : addrMap.keySet()) {
try {
this.brokerController.getBrokerOuterAPI().lockBatchMQAsync(addrMap.get(brokerId),
requestBody, 1000, new LockCallback() {
@Override
public void onSuccess(Set<MessageQueue> lockOKMQSet) {
for (MessageQueue mq : lockOKMQSet) {
if (!mqLockMap.containsKey(mq)) {
mqLockMap.put(mq, 0);
}
//加鎖成功俺祠,對加鎖次數(shù)加1
mqLockMap.put(mq, mqLockMap.get(mq) + 1);
}
countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
countDownLatch.countDown();
}
});
} catch (Exception e) {
LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
countDownLatch.countDown();
}
}
try {
countDownLatch.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("lockBatchMQ exception on {}, {}", this.brokerController.getBrokerConfig().getBrokerName(), e);
}
}
//計算哪些mq是成功實現(xiàn)過半加鎖的,返回給客戶端
for (MessageQueue mq : mqLockMap.keySet()) {
if (mqLockMap.get(mq) >= quorum) {
lockOKMQSet.add(mq);
}
}
}
}
上面的代碼挺多借帘,主要是實現(xiàn)了兩個關鍵點蜘渣,分別是對本地mq 加鎖,和對其他的broker server 獲取鎖肺然,計算加鎖成功的broker server是否過半蔫缸,過半則成功,否則失敗际起。
- 對本地message queue 加鎖
看本broker server 的message queue 嘗試獲取鎖拾碌,能加鎖成功的條件是沒有加鎖的mq,或者已經加鎖了街望,但是已經過期了校翔,其他的都是被其他的客戶端鎖定中,關鍵代碼如下:
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (groupValue != null) {
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
//檢查clientid和是否過期
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
- 分布式鎖
分布式我們都知道需要通過zk灾前,redis防症,consul等實現(xiàn),但是rocketmq并沒有這樣做,個人理解是rocketmq 不想因為這個問題要依賴其他的外部組件蔫敲,因為依賴一個組件你還要對依賴組件的穩(wěn)定性饲嗽,所以自己巧妙的實現(xiàn)了對所有broker server message queue 加鎖時,應用了leader選舉的思想奈嘿,因為broker肯定是集群部署貌虾,不同的客戶端同時發(fā)起順序消費時,很有可能鏈接的不同的broker server裙犹,如果只對單broker server判斷獲取鎖成功是有問題的尽狠,通過對所有的broker server都獲取鎖,如果有一半以上獲取鎖成功伯诬,則肯定是只有一個客戶端能獲取到鎖晚唇,類似leader選舉的思路,是值得學習的地方盗似。
定期刷新鎖
順序消費的這個鎖也是一個鎖租約的機制哩陕,到了時間不續(xù)租,就釋放了赫舒,所以broker分布式鎖除了兩看consumer的客戶端id悍及,還有一個時間的限制,如果客戶端出現(xiàn)問題接癌,沒有主動更新鎖的時間心赶,則會被其他的客戶端獲取到鎖,續(xù)租也有可能是和其他的客戶端并發(fā)的缺猛,所以就有可能鎖續(xù)租失敗缨叫,失敗了就不能消費這個message queue了,所以在消費的時候需要檢查是否持有鎖荔燎,更新是通過一個定時任務更新的耻姥,時間周期為20秒一次,通過rocketmq.client.rebalance.lockInterval
變量控制有咨。
還有一個值得注意的是琐簇,一個topic有多個message queue,兩個客戶端同時發(fā)起順序消費時座享,在獲取分布式鎖時婉商,有可能兩個分別獲得部分mq的鎖,rocketmq的順序是保證在mq級別的渣叛。
分發(fā)消息
獲取到對應message queue的鎖后丈秩,就可以創(chuàng)建pullRequest請求到隊列messageRequestQueue
中,這時候拉消息的線程就會被換醒淳衙,去拉消息癣籽,拉到消息后挽唉,會把消息緩存在一個treeMap中,這個和并發(fā)消費是一樣的筷狼,添加到treeMap中,返回結果判斷是否需要提交新的ConsumeRequest task匠童,如果前面的消費任務已經消費完了埂材,則會返回true,即需要提交新的ConsumeRequest汤求,代碼如下:
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
順序消費在分發(fā)的時候俏险,不像并發(fā)消費一樣,默認一個請求提交一個ConsumeRequest task到線程執(zhí)行扬绪,來實現(xiàn)并發(fā)消費竖独。
順序消費如果沒有入在消費的判斷,在把消息加入到processQueue時會判斷有沒有線程在消費挤牛,如果有莹痢,則不能提交消費任務,只有沒有線程消費的時候墓赴,才創(chuàng)建一個ConsumeRequest task到線程池執(zhí)行, 因為有提交一個任務后竞膳,會不斷的從processQueue 的treemap 里獲取message,如果獲取不到了诫硕,才把consuming的標記設置為false坦辟,下次拉到消息時,就重新提交一個新的ConsumeRequest章办。
ConsumeRequest 的run 方法如下:
public void run() {
.....
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
//consumeBatchSize 默認是1锉走,從tree map里取出一批消息,默認是一條消息
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
//.... hook partion
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//這里需要加鎖藕届,一定是等前面一條消息處理完后挪蹭,才能繼續(xù)消費下一條消息。
this.processQueue.getConsumeLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//執(zhí)行業(yè)務的消費代碼
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
} finally {
this.processQueue.getConsumeLock().unlock();
}
//去掉部分代碼
long consumeRT = System.currentTimeMillis() - beginTimestamp;
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
代碼有點多翰舌,省掉了部分非關鍵的代碼嚣潜,ConsumeRequest 的run 方法主要干了如下幾件事情:
- 首先獲取鎖,這個鎖是以message queue為單位的椅贱,就是為每個message queue 創(chuàng)建了一個object懂算,通過對synchronized 對object 加鎖,防止并發(fā)執(zhí)行庇麦。
- 檢查processqueue 是否還被鎖住计技,就是前面說的,會定期更新鎖山橄,即續(xù)租成功垮媒,就還是locked,如果失敗,則不能消費睡雇。
- 檢查消費的時間萌衬,如果持續(xù)消費超過了1分鐘,說明消費有瓶頸它抱,則等10毫秒再繼續(xù)消費秕豫。
- 取消息,從msgTreeMap里獲取消息观蓄,默認是一次獲取1條混移,這里還有對這條消息做了一個暫存,存在consumingMsgOrderlyTreeMap里面侮穿,是用來消費成功后歌径,做commit offset的。
5.獲取 processqueue的consumer lock亲茅,拿到鎖后回铛,即開始執(zhí)行業(yè)務的消費代碼,這里的鎖不是很理解芯急,順序消費的task 同時只有一個線程在運行勺届,前面已經對message queue加了一個大鎖。
6.執(zhí)行業(yè)務的消費代碼娶耍,獲取消費結果免姿。
7.處理消費結果,如果成功的情況下榕酒,會更新本地的offset胚膊,這里不更新到broker server端,還是統(tǒng)一通過定時任務上報給broker server的想鹰。
總結時刻
本文對rocketmq 的順序消費模式的代碼擼了一遍紊婉,讓我們了解了順序消費背后的原理和邏輯,即是怎么保證客戶端能順序消費消息的辑舷,主要有下幾點:
- 順序消費時group級別對message queue保證有順序喻犁。
- 開始消費message queue前需要獲取分布式鎖,這里和選舉leader一樣的思路何缓,通過對集群的broker都獲取鎖肢础,有一半獲取成功就說明加鎖成功。
- 順序消費時拉到消息后碌廓,只提交一個ConsumeRequest任務传轰,甚至有可能不提交,如果前面一個還在消費的情況下谷婆,通過一個ConsumeRequest來循環(huán)從msgTree里獲取慨蛙,默認一次取一條消息辽聊,來執(zhí)行業(yè)務的消費代碼,也就是單線程在執(zhí)行期贫,雖然是線程池跟匆。
- 每消費完一條消息,更新一次消費的offset唯灵。
注:目前看機會中贾铝,關注基礎架構,中間件埠帕,高并發(fā)系統(tǒng)建設和治理。