系列
開篇
- 這篇文章主要分析RocketMq針對(duì)Client的管理,Client包括consumer和producer兩類。
- consumer的管理主要包括consumer的注冊(cè)玉凯、clientId的變更古毛、consumer的rebalance岁疼。
- consumer通過ConsumerManager來管理consumer。
- producer的管理主要是負(fù)責(zé)producer的注冊(cè)诗祸。
- producer通過ProducerManager來管理producer屹培。
-
核心點(diǎn)在于producer和consumer通過心跳數(shù)據(jù)來向broker完成注冊(cè)動(dòng)作,通過定時(shí)任務(wù)ClientHousekeepingService來下線斷開連接的client沟绪。
ConsumerManager
public class ConsumerManager {
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 1刮便、針對(duì)原來不存在的consumerGroupInfo會(huì)添加對(duì)應(yīng)的consumerGroupInfo
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 針對(duì)consumerId變更事件會(huì)執(zhí)行通知操作
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
// scanNotActiveChannel負(fù)責(zé)刪除已經(jīng)下線的client對(duì)應(yīng)的channel對(duì)象
public void scanNotActiveChannel() {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
String group = next.getKey();
ConsumerGroupInfo consumerGroupInfo = next.getValue();
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
while (itChannel.hasNext()) {
Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
ClientChannelInfo clientChannelInfo = nextChannel.getValue();
long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
RemotingUtil.closeChannel(clientChannelInfo.getChannel());
itChannel.remove();
}
}
}
}
}
- ConsumerManager的consumerTable來保存consumeGroup和對(duì)應(yīng)的信息。
- registerConsumer方法負(fù)責(zé)注冊(cè)consumer信息绽慈,針對(duì)consumer本身的id變更會(huì)通知所有的consumer重新立即執(zhí)行rebalance恨旱。
- scanNotActiveChannel負(fù)責(zé)掃描斷開連接的consumer并從consumerGroupInfo中刪除。
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
private final BrokerController brokerController;
public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:
if (args == null || args.length < 1) {
return;
}
// 針對(duì)CHANGE事件會(huì)通知該consumerGroup下的所有client進(jìn)行rebalance
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
break;
case UNREGISTER:
this.brokerController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER:
if (args == null || args.length < 1) {
return;
}
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}
}
- 針對(duì)consumer的clientId發(fā)生變更的場景坝疼,會(huì)通知該consumerGroup下的所有client立即進(jìn)行rebalance動(dòng)作搜贤。
public class ClientRemotingProcessor implements NettyRequestProcessor {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
return this.getConsumeStatus(ctx, request);
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
return this.receiveReplyMessage(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
try {
final NotifyConsumerIdsChangedRequestHeader requestHeader =
(NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
this.mqClientFactory.rebalanceImmediately();
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
}
return null;
}
}
- ClientRemotingProcessor針對(duì)clientId變更事件會(huì)通過rebalanceImmediately立即執(zhí)行rebalance。
ProducerManager
public class ProducerManager {
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try {
ClientChannelInfo clientChannelInfoFound = null;
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new HashMap<>();
this.groupChannelTable.put(group, channelTable);
}
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
}
} finally {
this.groupChannelLock.unlock();
}
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
}
}
public void scanNotActiveChannel() {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
final String group = entry.getKey();
final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, ClientChannelInfo> item = it.next();
// final Integer id = item.getKey();
final ClientChannelInfo info = item.getValue();
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
clientChannelTable.remove(info.getClientId());
RemotingUtil.closeChannel(info.getChannel());
}
}
}
} finally {
this.groupChannelLock.unlock();
}
}
} catch (InterruptedException e) {
}
}
}
- groupChannelTable負(fù)責(zé)保存producerGroup和對(duì)應(yīng)的ClientChannelInfo钝凶。
- registerProducer負(fù)責(zé)注冊(cè)producer到producerGroup當(dāng)中仪芒。
- scanNotActiveChannel負(fù)責(zé)掃描已下線的producer并從producerGroup刪除。
ClientHousekeepingService
public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
public ClientHousekeepingService(final BrokerController brokerController) {
this.brokerController = brokerController;
}
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ClientHousekeepingService.this.scanExceptionChannel();
} catch (Throwable e) {
log.error("Error occurred when scan not active client channels.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
}
private void scanExceptionChannel() {
this.brokerController.getProducerManager().scanNotActiveChannel();
this.brokerController.getConsumerManager().scanNotActiveChannel();
this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
}
- ClientHousekeepingService負(fù)責(zé)定時(shí)掃描異常的client耕陷,包括producer掂名、consumer等。
ClientManageProcessor
public class ClientManageProcessor implements NettyRequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
// 注冊(cè)consumer的數(shù)據(jù)
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
// 注冊(cè)producer數(shù)據(jù)
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
- ClientManageProcessor內(nèi)部通過heartBeat心跳包來實(shí)現(xiàn)producer和consumer的注冊(cè)操作啃炸。