在Consumer消費(fèi)的時(shí)候總有幾個(gè)疑問:
- 消費(fèi)完成后育八,這個(gè)消費(fèi)進(jìn)度存在哪里
- 消費(fèi)完成后补箍,還沒保存消費(fèi)進(jìn)度就掛了改执,會(huì)不會(huì)導(dǎo)致重復(fù)消費(fèi)
Consumer
消費(fèi)進(jìn)度保存
消費(fèi)完成后啸蜜,會(huì)返回一個(gè)ConsumeConcurrentlyStatus.CONSUME_SUCCESS告訴MQ消費(fèi)成功,以MessageListener的consumeMessage為入口分析辈挂。
消費(fèi)的時(shí)候衬横,是以ConsumeRequest類為Runnable對(duì)象,在線程池中進(jìn)行處理的终蒂,即ConsumeRequest的run方法會(huì)處理這個(gè)狀態(tài)
@Override
public void run() {
//....
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
// 如果這個(gè)ProcessQueue廢棄了蜂林,則不處理
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
}
在消費(fèi)完成后,將status交給processConsumeResult處理拇泣,代碼如下
public void processConsumeResult(//
final ConsumeConcurrentlyStatus status, //
final ConsumeConcurrentlyContext context, //
final ConsumeRequest consumeRequest//
) {
//....消費(fèi)成功或者失敗的處理
// 將這批消息從ProcessQueue中移除噪叙,代表消費(fèi)完畢,并返回當(dāng)前ProcessQueue中的消息最小的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新消費(fèi)進(jìn)度
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
在分析ProcessQueue的時(shí)候霉翔,說過removeMessage返回有兩種情況:
- 如果移除這批消息之后已經(jīng)沒有消息了睁蕾,那么返回ProcessQueue中最大的offset+1
- 如果還有消息,那么返回treeMap中最小的key债朵,即未消費(fèi)的消息中最小的offset
getOffsetStore返回RemoteBrokerOffsetStore子眶,看下其實(shí)現(xiàn)
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
// 通過MessageQueue獲取本地的對(duì)應(yīng)的消費(fèi)進(jìn)度
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
//increaseOnly 為false則直接覆蓋
//increaseOnly為true則會(huì)判斷更新的值比老的值大才會(huì)進(jìn)行更新
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
這里的increaseOnly參數(shù)根據(jù)不同的情況傳入不同的值,有些情況下會(huì)出現(xiàn)并發(fā)修改的情況序芦,那么需要傳入true臭杰,內(nèi)部會(huì)進(jìn)行CAS的操作,能保證正確的賦值芝加,而一些場景下硅卢,只需要進(jìn)行直接覆蓋或者說沒有并發(fā)修改的問題那么傳入false就行了。
消費(fèi)進(jìn)度持久化
offsetTable是一個(gè)Map藏杖,其保存了消費(fèi)進(jìn)度将塑,這只一個(gè)內(nèi)存的結(jié)構(gòu),在Consumer啟動(dòng)的時(shí)候蝌麸,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù)將本地的數(shù)據(jù)同步到broker点寥,每persistConsumerOffsetInterval(默認(rèn)為5)秒進(jìn)行一次操作
// mqs為需要持久化的隊(duì)列集合
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (mqs != null && !mqs.isEmpty()) {
// 遍歷本地的消費(fèi)進(jìn)度
for(Map.Entry<MessageQueue, AtomicLong> entry:this.offsetTable.entrySet()){
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
// 如果該隊(duì)列在需要持久化的隊(duì)列中
if (mqs.contains(mq)) {
try {
// 將消費(fèi)進(jìn)度發(fā)送到broker
this.updateConsumeOffsetToBroker(mq, offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {//廢棄的消費(fèi)進(jìn)度
unusedMQ.add(mq);
}
}
}
}
// 如果有廢棄的MQ,則將其消費(fèi)進(jìn)度廢棄
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
}
}
}
傳入的是當(dāng)前Consumer分配的MessageQueue列表来吩,rebalance之后敢辩,可能分配的MessageQueue已經(jīng)變化,所以offsetTable里有些消費(fèi)進(jìn)度的隊(duì)列時(shí)不需要的弟疆,所以將它的消費(fèi)進(jìn)度廢棄
updateConsumeOffsetToBroker方法就是簡單的網(wǎng)絡(luò)請(qǐng)求戚长,將offset發(fā)送給Broker
消費(fèi)進(jìn)度提交
除了定時(shí)提交消費(fèi)進(jìn)度之外,在拉取消息的時(shí)候怠苔,會(huì)順便將本地的消費(fèi)進(jìn)度一起傳到broker同廉,例如查看拉取消息的方法DefaultMQPushConsumerImpl#pullMessage中的一段代碼
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
// 集群消費(fèi)模式
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 通過offsetStore獲取當(dāng)前消費(fèi)進(jìn)度
// ReadOffsetType.READ_FROM_MEMORY表示從本地獲取(即offsetTable)
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {//
// 傳給Broker,讓其判斷是否需要保存消費(fèi)進(jìn)度
commitOffsetEnable = true;
}
}
// 構(gòu)造一些標(biāo)志位,這里主要看commitOffsetEnable值
// 將commitOffsetEnable放到一個(gè)int類型的值中迫肖,讓broker判斷是否需要保存消費(fèi)進(jìn)度
int sysFlag = PullSysFlag.buildSysFlag(//
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
//....
// 通過拉取消息請(qǐng)求锅劝,將commitOffsetValue和sysFlag傳給broker
this.pullAPIWrapper.pullKernelImpl(//
pullRequest.getMessageQueue(), // 1
subExpression, // 2
subscriptionData.getSubVersion(), // 3
pullRequest.getNextOffset(), // 4
this.defaultMQPushConsumer.getPullBatchSize(), // 5
sysFlag, // 6
commitOffsetValue, // 7
BrokerSuspendMaxTimeMillis, // 8
ConsumerTimeoutMillisWhenSuspend, // 9
CommunicationMode.ASYNC, // 10
pullCallback// 11
);
具體broker對(duì)消費(fèi)進(jìn)度的處理看后面分析
Broker
消費(fèi)進(jìn)度保存
RocketMQ的網(wǎng)絡(luò)請(qǐng)求都有一個(gè)RequestCode,更新消費(fèi)進(jìn)度的Code為UPDATE_CONSUMER_OFFSET蟆湖,通過查到其使用的地方故爵,找到對(duì)應(yīng)的Processor為ClientManageProcessor,其processRequest處理對(duì)應(yīng)的請(qǐng)求
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
更新消費(fèi)進(jìn)度的方法為updateConsumerOffset隅津,里面解析了請(qǐng)求體之后又調(diào)用了ConsumerOffsetManager.commitOffset方法
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
}
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}",
clientHost, key, queueId, offset, storeOffset);
}
}
}
邏輯也很簡單就不多說了诬垂,有意思的是,Broker的保存消費(fèi)進(jìn)度的結(jié)構(gòu)和Consumer類似伦仍,Broker多了一個(gè)維度剥纷,因?yàn)锽roker接收的是所有消費(fèi)者的進(jìn)度,而Consumer保存的是自己的
在Consumer的消費(fèi)進(jìn)度上報(bào)到Broker之后呢铆,Broker只是保存到內(nèi)存晦鞋,這并不可靠,大概也能猜出棺克,和Consumer一樣悠垛,也有一個(gè)定時(shí)任務(wù)將消費(fèi)進(jìn)度持久化。這時(shí)娜谊,先看下ConsumerOffsetManager這個(gè)類的繼承關(guān)系确买,他的父類是ConfigManager,這個(gè)東西很重要纱皆,是幾個(gè)重要配置信息持久化類湾趾,看下其繼承關(guān)系:
分別是訂閱關(guān)系管理,消費(fèi)進(jìn)度管理派草,Topic信息管理搀缠,和延遲隊(duì)列信息管理,這4個(gè)配置信息都需要通過ConfigManager去持久化和加載近迁,看下ConfigManager的幾個(gè)方法
public abstract class ConfigManager {
// 將對(duì)象轉(zhuǎn)換成json串
public abstract String encode();
//將文件里內(nèi)容(json格式)的轉(zhuǎn)換成對(duì)象
public boolean load() {
String fileName = null;
// 獲取文件地址
fileName = this.configFilePath();
// 將文件里的內(nèi)容讀取出來
String jsonString = MixAll.file2String(fileName);
// json轉(zhuǎn)換成指定對(duì)象的數(shù)據(jù)
this.decode(jsonString);
}
// 配置文件地址
public abstract String configFilePath();
// 與load類似
private boolean loadBak() {
String fileName = null;
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName + ".bak");
this.decode(jsonString);
return true;
}
// json轉(zhuǎn)換成指定對(duì)象的數(shù)據(jù)
public abstract void decode(final String jsonString);
// 將對(duì)象里的數(shù)據(jù)轉(zhuǎn)換成json并持久化到configFilePath()文件中
public synchronized void persist() {
String jsonString = this.encode(true);
String fileName = this.configFilePath();
MixAll.string2File(jsonString, fileName);
}
public abstract String encode(final boolean prettyFormat);
那么ConsumerOffsetManager會(huì)實(shí)現(xiàn)encode和decode方法并在某個(gè)地方定時(shí)調(diào)用persist方法艺普,查看其使用的地方,找到BrokerController的initialize方法鉴竭,有段定時(shí)任務(wù)如下:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
可以看到歧譬,每flushConsumerOffsetInterval(默認(rèn)5000)毫秒會(huì)進(jìn)行一次持久化
拉取消息的時(shí)候保存消費(fèi)進(jìn)度
拉取消息的Code為RequestCode.PULL_MESSAGE,對(duì)應(yīng)的Processor為PullMessageProcessor搏存,找到其中消費(fèi)進(jìn)度處理的地方
// 上面說的consumer傳過來的commitOffsetEnable
// 當(dāng)Consumer本地消費(fèi)進(jìn)度大于0的時(shí)候這個(gè)參數(shù)為true
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.
// brokerAllowSuspend在處理消息請(qǐng)求的時(shí)候?yàn)閠rue瑰步,hold請(qǐng)求自己處理是false
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
// Master才需要保存進(jìn)度,slave只是同步broker的消息
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getCommitOffset());//consumer傳上來的offset
}
總的來說:
當(dāng)broker為master的時(shí)候璧眠,且Consumer消費(fèi)進(jìn)度大于0則在拉取消息的時(shí)候順便將消費(fèi)進(jìn)度保存到broker
問題分析
重復(fù)消費(fèi)問題
在ProcessQueue的removeMessage的第二種情況有個(gè)問題缩焦,假設(shè)有如下情況:
批量拉取了4條消息ABCD兵钮,分別對(duì)應(yīng)的offset為400|401|402|403,此時(shí)consumeBatchSize(批量消費(fèi)數(shù)量舌界,默認(rèn)為1,即一條一條消費(fèi))泰演,那么會(huì)分4個(gè)線程去消費(fèi)這幾個(gè)消息呻拌,出現(xiàn)下面消費(fèi)次序
消費(fèi)D -> removeMessage -> 返回400(情況2)
消費(fèi)C -> removeMessage -> 返回400(情況2)
消費(fèi)B -> removeMessage -> 返回400(情況2)
消費(fèi)A -> removeMessage -> 返回404(情況1)
在消費(fèi)A之前,本地消費(fèi)進(jìn)度持久化到Broker之后睦焕,應(yīng)用宕機(jī)了藐握,那么此時(shí)Broker保存的是offset=400(準(zhǔn)確來說,在消費(fèi)完A且保存消費(fèi)進(jìn)度到broker之前垃喊,offset都是400)猾普。那么會(huì)有什么問題呢?
先假設(shè)消費(fèi)完DCB且消費(fèi)進(jìn)度上傳完成宕機(jī)本谜,然后重啟應(yīng)用初家,這時(shí)候會(huì)先從broker獲取應(yīng)該從哪里消費(fèi)(),因?yàn)镈CB消費(fèi)完成后都是保存400這個(gè)消費(fèi)進(jìn)度乌助,那么返回的是400溜在,這時(shí)候consumer會(huì)請(qǐng)求offset為400的消費(fèi),到這里他托,已經(jīng)重復(fù)消費(fèi)了DCB掖肋。
消費(fèi)進(jìn)度保存在哪里
- consumer保存在內(nèi)存,定時(shí)上傳broker
- broker保存在內(nèi)存赏参,定時(shí)刷新到磁盤文件
注:以上沒有特別聲明的都是并發(fā)消費(fèi)模式