系列
- RocketMq broker 配置文件
- RocketMq broker 啟動流程
- RocketMq broker CommitLog介紹
- RocketMq broker consumeQueue介紹
- RocketMq broker 重試和死信隊(duì)列
- RocketMq broker 延遲消息
- RocketMq IndexService介紹
- RocketMq 讀寫分離機(jī)制
- RocketMq Client管理
- RocketMq broker過期文件刪除
開篇
這篇文章的主要目的是分析RocketMq根據(jù)時間戳重置某topic在某consumeGroup下的消費(fèi)位點(diǎn)狼犯。
重置位點(diǎn)的執(zhí)行順序按照admin 到 broker 到 consumer的順序依次觸發(fā)蔑匣,admin負(fù)責(zé)構(gòu)建參數(shù)通知broker碟狞,broker負(fù)責(zé)查詢consumeQueue的具體位移河咽,broker負(fù)責(zé)通知consumer進(jìn)行位移重置。
根據(jù)時間戳查找consumeQueue對應(yīng)的位移,然后由broker通知consumer來持久化消費(fèi)位移,最終會持久化到broker的消費(fèi)位移遵班。
重置位點(diǎn)操作本質(zhì)上是在consumer端執(zhí)行,consumer端負(fù)責(zé)持久化新的消費(fèi)位移然后由定時任務(wù)通知broker更新消費(fèi)位移潮改。
consumer在整個位移重置過程中會設(shè)置ProcessQueue的狀態(tài)為Dropped狭郑,從而阻斷消息拉取任務(wù)ConsumeRequest的執(zhí)行阻斷消息拉取,其次會在consumer側(cè)修改消費(fèi)位移通過心跳通知broker修改consumer的消費(fèi)位移汇在,最后通過重新的rebalance過程開始重新消費(fèi)消息翰萨。
重置命令
public class ResetOffsetByTimeCommand implements SubCommand {
@Override
public String commandName() {
return "resetOffsetByTime";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("g", "group", true, "set the consumer group");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("t", "topic", true, "set the topic");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("s", "timestamp", true, "set the timestamp[now|currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "cplus", false, "reset c++ client offset");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String group = commandLine.getOptionValue("g").trim();
String topic = commandLine.getOptionValue("t").trim();
String timeStampStr = commandLine.getOptionValue("s").trim();
long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : 0;
try {
if (timestamp == 0) {
timestamp = Long.parseLong(timeStampStr);
}
} catch (NumberFormatException e) {
timestamp = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
}
boolean force = true;
if (commandLine.hasOption('f')) {
force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
}
boolean isC = false;
if (commandLine.hasOption('c')) {
isC = true;
}
defaultMQAdminExt.start();
Map<MessageQueue, Long> offsetTable;
try {
// 通過defaultMQAdminExt#resetOffsetByTimestamp來執(zhí)行
offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
} catch (MQClientException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, group, topic, timestamp, force, timeStampStr);
return;
}
throw e;
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
- ResetOffsetByTimeCommand的命令格式如上代碼所示,核心調(diào)用DefaultMQAdminExt#resetOffsetByTimestamp來重置趾疚。
admin
public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
boolean isC)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// 通過defaultMQAdminExtImpl#resetOffsetByTimestamp重置位移
return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC);
}
}
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
boolean isC)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// 1缨历、獲取topic對應(yīng)的TopicRouteData信息,進(jìn)而獲取對應(yīng)的BrokerData信息
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
if (brokerDatas != null) {
// 2糙麦、遍歷topic所在的broker節(jié)點(diǎn)信息
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
// 2辛孵、遠(yuǎn)程調(diào)用broker來執(zhí)行重置操作
Map<MessageQueue, Long> offsetTable =
this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
timeoutMillis, isC);
if (offsetTable != null) {
allOffsetTable.putAll(offsetTable);
}
}
}
}
return allOffsetTable;
}
}
- 獲取topic對應(yīng)的TopicRouteData信息,進(jìn)而獲取對應(yīng)的BrokerData信息赡磅。
- 遍歷topic所在的broker節(jié)點(diǎn)信息魄缚,遠(yuǎn)程調(diào)用broker來執(zhí)行重置操作。
public class MQClientAPIImpl {
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
throws RemotingException, MQClientException, InterruptedException {
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timestamp);
requestHeader.setForce(isForce);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
if (isC) {
request.setLanguage(LanguageCode.CPP);
}
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
return body.getOffsetTable();
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
}
- 遠(yuǎn)程調(diào)用的RequestCode為INVOKE_BROKER_TO_RESET_OFFSET。
- 通過RequestCode可以方便查找BrokerProcessor便于分析冶匹。
broker
public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
// 解析requestHeader的信息
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
boolean isC = false;
LanguageCode language = request.getLanguage();
switch (language) {
case CPP:
isC = true;
break;
}
// 通過Broker2Client#resetOffset重置位移
return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce(), isC);
}
}
public class Broker2Client {
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
boolean isC) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 獲取TopicConfig相關(guān)信息
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
return response;
}
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
// 遍歷所有的寫隊(duì)列并獲取每個MessageQueue的消費(fèi)位移
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
// 查詢每個messageQueue查詢對應(yīng)的consumeQueue的消費(fèi)位移consumerOffset
long consumerOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group <%s> not exist", group));
return response;
}
// 根據(jù)時間戳查詢consumeQueue的位移
long timeStampOffset;
if (timeStamp == -1) {
// 沒時間戳就獲取consumeQueue的最大位移
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
// 根據(jù)時間戳查找consumeQueue的最大位移
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
if (timeStampOffset < 0) {
log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
timeStampOffset = 0;
}
if (isForce || timeStampOffset < consumerOffset) {
offsetTable.put(mq, timeStampOffset);
} else {
offsetTable.put(mq, consumerOffset);
}
}
// RequestCode 為 RESET_CONSUMER_CLIENT_OFFSET
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
if (isC) {
// c++ language
ResetOffsetBodyForC body = new ResetOffsetBodyForC();
List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
body.setOffsetTable(offsetList);
request.setBody(body.encode());
} else {
// other language
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());
}
// 獲取consumeGroup的所有consumer信息并通知位移重置
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
} catch (Exception e) {
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
return response;
}
}
} else {
String errorInfo =
String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
requestHeader.getGroup(),
requestHeader.getTopic(),
requestHeader.getTimestamp());
log.error(errorInfo);
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
return response;
}
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;
}
}
- Broker2Client#resetOffset負(fù)責(zé)執(zhí)行broker側(cè)的消費(fèi)位點(diǎn)的流程习劫。
- 獲取topic對應(yīng)TopicConfig,進(jìn)而獲取topic的所有寫隊(duì)列嚼隘,
- 遍歷所有的寫隊(duì)列诽里,結(jié)合consumeGroup和topic獲取該topic在consumeGroup下所有的隊(duì)列的根據(jù)時間戳查找的消費(fèi)位移。
- 獲取consumeGroup下所有的client端信息飞蛹,依次調(diào)用client通知消費(fèi)位移谤狡。
public class DefaultMessageStore implements MessageStore {
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
// 先根據(jù)topic和queueId查找對應(yīng)的ConsumeQueue對象
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
// 針對ConsumeQueue對象根據(jù)時間戳進(jìn)行查找返回對應(yīng)的位移
return logic.getOffsetInQueueByTime(timestamp);
}
return 0;
}
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
}
- 根據(jù)topic和queueId查找對應(yīng)的ConsumeQueue對象。
public class ConsumeQueue {
public long getOffsetInQueueByTime(final long timestamp) {
// 根據(jù)timestamp查找ConsumeQueue卧檐,文件最后更新時間戳大于該timestamp的文件
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
if (mappedFile != null) {
long offset = 0;
int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
// 獲取commitLog的最小物理偏移量
long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer();
high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
try {
// 二分查找確定時間戳對應(yīng)的位移
while (high >= low) {
// 二分查找的midOffset及對應(yīng)的commitLog的phyOffset
midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
byteBuffer.position(midOffset);
long phyOffset = byteBuffer.getLong();
int size = byteBuffer.getInt();
// 比較二分查找中間值的位移和minPhysicOffset進(jìn)行比較
// 如果中間值偏小那么就從中間值往右進(jìn)行查找
if (phyOffset < minPhysicOffset) {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
continue;
}
// 根據(jù)consumeQueue的記錄的commitLog的消息位移和文件大小查找對應(yīng)的消息存儲時間
long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
// 判斷storeTime的時間和timestamp進(jìn)行下比較
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
// 查找到目的commitLog的位移
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {
// 查找到消息的存儲時間比當(dāng)前查找的timestamp大
high = midOffset - CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;
rightIndexValue = storeTime;
} else {
// 查找到消息的存儲時間比當(dāng)前查找的timestamp小
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
leftIndexValue = storeTime;
}
}
if (targetOffset != -1) {
offset = targetOffset;
} else {
if (leftIndexValue == -1) {
offset = rightOffset;
} else if (rightIndexValue == -1) {
offset = leftOffset;
} else {
offset =
Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
- rightIndexValue) ? rightOffset : leftOffset;
}
}
return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
} finally {
sbr.release();
}
}
}
return 0;
}
}
- 通過二分查找方法來定位重置時間戳對應(yīng)的consumeQueue的位移墓懂。
consumer
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand resetOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
// 1、解析ResetOffsetRequestHeader對象
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
// 2霉囚、解析重置的offsetTable
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
if (request.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
offsetTable = body.getOffsetTable();
}
// 3捕仔、通過offsetTable來重置client的消費(fèi)位移
this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
return null;
}
}
- ClientRemotingProcessor#resetOffset負(fù)責(zé)執(zhí)行broker通知client的重置位移命令。
public class MQClientInstance {
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
log.info("[reset-offset] consumer dose not exist. group={}", group);
return;
}
consumer.suspend();
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
// 重置所有的ProcessQueue標(biāo)記為dropped
if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
ProcessQueue pq = entry.getValue();
pq.setDropped(true);
pq.clear();
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
consumer.updateConsumeOffset(mq, offset);
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
iterator.remove();
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e);
}
}
}
} finally {
if (consumer != null) {
consumer.resume();
}
}
}
}
- consumer會匹配所有重置位移的MessageQueue盈罐,然后設(shè)置對應(yīng)的ProcessQueue的狀態(tài)為Dropped榜跌。
- consumer會針對所有的MessageQueue持久化對應(yīng)的重置消費(fèi)位移。
- consumer會移除processQueueTable所有相關(guān)的MessageQueue對象暖呕。
- ConsumeRequest在ProcessQueue的狀態(tài)為Dropped會停止拉取消息斜做。
- consumer在定時rebalance過程中會重新生成ConsumeRequest并重新開始拉取消息。