提出疑問
第一次pullMessage時(shí)是不是根據(jù)offset去獲取呢
offset從遠(yuǎn)程獲取到還是存在本地呢
consumer端在啟動(dòng)時(shí)會(huì)開啟負(fù)載均衡服務(wù)RebalanceService
,一系列調(diào)用后會(huì)執(zhí)行RebalanceImpl.doRebalance()
方法赋荆,內(nèi)部調(diào)用方法rebalanceByTopic()
执隧,這里主要是為了找到可供消費(fèi)的隊(duì)列唠摹,如果存在闻书,則獲取這個(gè)隊(duì)列可供消費(fèi)的消息偏移量
// RebalanceImpl
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 獲取nextOffset
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 構(gòu)造請求發(fā)送至PullService,不斷的拉取消息
this.dispatchPullRequest(pullRequestList);
return changed;
}
可以看到開始的偏移量是從computePullFromWhere(mq)
獲取到的熊锭。主要看一下PushImpl的內(nèi)容
// RebalancePushImpl 部分代碼
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
return result;
}
可以看到是從offsetStore中獲取的橘荠。
在DefaultMQPushConsumerImpl的start方法中可知默認(rèn)的集群模式下使用的是RemoteBrokerOffsetStore。
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
去查看RemoteBrokerOffsetStore類代碼矾削,type為READ_FROM_STORE
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -1;
}
看一下fetchConsumeOffsetFromBroker
方法
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
可以發(fā)現(xiàn)是去broker端查詢offset壤玫。
這里要注意,提到的offset對應(yīng)的是ConsumeQueue的偏移量哼凯,ConsumeQueue中存儲(chǔ)的為定長20字節(jié)的數(shù)據(jù)欲间。根據(jù)offset找到要消費(fèi)的消息在CommitLog中的offset和消息長度,再去CommitLog中獲取真正的消息断部。
第一次如果找到消息猎贴,broker會(huì)返回下次要消費(fèi)的消息偏移量,在DefaultMQPushConsumerImpl.pullMessage
方法中會(huì)將該偏移量設(shè)置到原request蝴光,下次直接使用該偏移量拉取即可她渴。
// DefaultMQPushConsumerImpl.pullMessage pullCallback部分代碼
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());