RocketMq Client管理

系列

開篇

  • 這篇文章主要分析RocketMq針對(duì)Client的管理,Client包括consumer和producer兩類。
  • consumer的管理主要包括consumer的注冊(cè)玉凯、clientId的變更古毛、consumer的rebalance岁疼。
  • consumer通過ConsumerManager來管理consumer。
  • producer的管理主要是負(fù)責(zé)producer的注冊(cè)诗祸。
  • producer通過ProducerManager來管理producer屹培。
  • 核心點(diǎn)在于producer和consumer通過心跳數(shù)據(jù)來向broker完成注冊(cè)動(dòng)作,通過定時(shí)任務(wù)ClientHousekeepingService來下線斷開連接的client沟绪。

ConsumerManager

public class ConsumerManager {

    private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
    private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
        new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        // 1刮便、針對(duì)原來不存在的consumerGroupInfo會(huì)添加對(duì)應(yīng)的consumerGroupInfo
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }

        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        boolean r2 = consumerGroupInfo.updateSubscription(subList);
        // 針對(duì)consumerId變更事件會(huì)執(zhí)行通知操作
        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }

        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

        return r1 || r2;
    }

    // scanNotActiveChannel負(fù)責(zé)刪除已經(jīng)下線的client對(duì)應(yīng)的channel對(duì)象
    public void scanNotActiveChannel() {
        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConsumerGroupInfo> next = it.next();
            String group = next.getKey();
            ConsumerGroupInfo consumerGroupInfo = next.getValue();
            ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
                consumerGroupInfo.getChannelInfoTable();

            Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
            while (itChannel.hasNext()) {
                Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
                ClientChannelInfo clientChannelInfo = nextChannel.getValue();
                long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                    log.warn(
                        "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
                        RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
                    RemotingUtil.closeChannel(clientChannelInfo.getChannel());
                    itChannel.remove();
                }
            }
        }
    }
}
  • ConsumerManager的consumerTable來保存consumeGroup和對(duì)應(yīng)的信息。
  • registerConsumer方法負(fù)責(zé)注冊(cè)consumer信息绽慈,針對(duì)consumer本身的id變更會(huì)通知所有的consumer重新立即執(zhí)行rebalance恨旱。
  • scanNotActiveChannel負(fù)責(zé)掃描斷開連接的consumer并從consumerGroupInfo中刪除。


public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
    private final BrokerController brokerController;

    public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    @Override
    public void handle(ConsumerGroupEvent event, String group, Object... args) {
        if (event == null) {
            return;
        }
        switch (event) {
            case CHANGE:
                if (args == null || args.length < 1) {
                    return;
                }
                // 針對(duì)CHANGE事件會(huì)通知該consumerGroup下的所有client進(jìn)行rebalance
                List<Channel> channels = (List<Channel>) args[0];
                if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
                    for (Channel chl : channels) {
                        this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
                    }
                }
                break;
            case UNREGISTER:
                this.brokerController.getConsumerFilterManager().unRegister(group);
                break;
            case REGISTER:
                if (args == null || args.length < 1) {
                    return;
                }
                Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
                this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
                break;
            default:
                throw new RuntimeException("Unknown event " + event);
        }
    }
}
  • 針對(duì)consumer的clientId發(fā)生變更的場景坝疼,會(huì)通知該consumerGroup下的所有client立即進(jìn)行rebalance動(dòng)作搜贤。


public class ClientRemotingProcessor implements NettyRequestProcessor {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                return this.checkTransactionState(ctx, request);
            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
                return this.notifyConsumerIdsChanged(ctx, request);
            case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
                return this.resetOffset(ctx, request);
            case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
                return this.getConsumeStatus(ctx, request);

            case RequestCode.GET_CONSUMER_RUNNING_INFO:
                return this.getConsumerRunningInfo(ctx, request);

            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);

            case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
                return this.receiveReplyMessage(ctx, request);
            default:
                break;
        }
        return null;
    }

    public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        try {
            final NotifyConsumerIdsChangedRequestHeader requestHeader =
                (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);

            this.mqClientFactory.rebalanceImmediately();
        } catch (Exception e) {
            log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
        }
        return null;
    }
}
  • ClientRemotingProcessor針對(duì)clientId變更事件會(huì)通過rebalanceImmediately立即執(zhí)行rebalance。


ProducerManager

public class ProducerManager {
 
    private final Lock groupChannelLock = new ReentrantLock();
    private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
        new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
    private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
    private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();

    public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
        try {
            ClientChannelInfo clientChannelInfoFound = null;

            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
                    if (null == channelTable) {
                        channelTable = new HashMap<>();
                        this.groupChannelTable.put(group, channelTable);
                    }

                    clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
                    if (null == clientChannelInfoFound) {
                        channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
                        clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
                    }
                } finally {
                    this.groupChannelLock.unlock();
                }

                if (clientChannelInfoFound != null) {
                    clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
                }
            } 
        } catch (InterruptedException e) {
        }
    }


    public void scanNotActiveChannel() {
        try {
            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
                        .entrySet()) {
                        final String group = entry.getKey();
                        final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();

                        Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<Channel, ClientChannelInfo> item = it.next();
                            // final Integer id = item.getKey();
                            final ClientChannelInfo info = item.getValue();

                            long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
                            if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                                it.remove();
                                clientChannelTable.remove(info.getClientId());

                                RemotingUtil.closeChannel(info.getChannel());
                            }
                        }
                    }
                } finally {
                    this.groupChannelLock.unlock();
                }
            } 
        } catch (InterruptedException e) {
        }
    }
}
  • groupChannelTable負(fù)責(zé)保存producerGroup和對(duì)應(yīng)的ClientChannelInfo钝凶。
  • registerProducer負(fù)責(zé)注冊(cè)producer到producerGroup當(dāng)中仪芒。
  • scanNotActiveChannel負(fù)責(zé)掃描已下線的producer并從producerGroup刪除。


ClientHousekeepingService

public class ClientHousekeepingService implements ChannelEventListener {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerController brokerController;

    private ScheduledExecutorService scheduledExecutorService = Executors
        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));

    public ClientHousekeepingService(final BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public void start() {

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    ClientHousekeepingService.this.scanExceptionChannel();
                } catch (Throwable e) {
                    log.error("Error occurred when scan not active client channels.", e);
                }
            }
        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
    }

    private void scanExceptionChannel() {
        this.brokerController.getProducerManager().scanNotActiveChannel();
        this.brokerController.getConsumerManager().scanNotActiveChannel();
        this.brokerController.getFilterServerManager().scanNotActiveChannel();
    }
}
  • ClientHousekeepingService負(fù)責(zé)定時(shí)掃描異常的client耕陷,包括producer掂名、consumer等。


ClientManageProcessor

public class ClientManageProcessor implements NettyRequestProcessor {

    public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
            ctx.channel(),
            heartbeatData.getClientID(),
            request.getLanguage(),
            request.getVersion()
        );
        // 注冊(cè)consumer的數(shù)據(jù)
        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                    data.getGroupName());
            boolean isNotifyConsumerIdsChangedEnable = true;
            if (null != subscriptionGroupConfig) {
                isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                int topicSysFlag = 0;
                if (data.isUnitMode()) {
                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                }
                String newTopic = MixAll.getRetryTopic(data.getGroupName());
                this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                    newTopic,
                    subscriptionGroupConfig.getRetryQueueNums(),
                    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
            }

            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
            );

        // 注冊(cè)producer數(shù)據(jù)
        for (ProducerData data : heartbeatData.getProducerDataSet()) {
            this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                clientChannelInfo);
        }
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
}
  • ClientManageProcessor內(nèi)部通過heartBeat心跳包來實(shí)現(xiàn)producer和consumer的注冊(cè)操作啃炸。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末铆隘,一起剝皮案震驚了整個(gè)濱河市卓舵,隨后出現(xiàn)的幾起案子南用,更是在濱河造成了極大的恐慌,老刑警劉巖掏湾,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件裹虫,死亡現(xiàn)場離奇詭異,居然都是意外死亡融击,警方通過查閱死者的電腦和手機(jī)筑公,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尊浪,“玉大人匣屡,你說我怎么就攤上這事∧吹樱” “怎么了捣作?”我有些...
    開封第一講書人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鹅士。 經(jīng)常有香客問我券躁,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任也拜,我火速辦了婚禮以舒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘慢哈。我一直安慰自己蔓钟,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開白布岸军。 她就那樣靜靜地躺著奋刽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪艰赞。 梳的紋絲不亂的頭發(fā)上佣谐,一...
    開封第一講書人閱讀 52,441評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音方妖,去河邊找鬼狭魂。 笑死,一個(gè)胖子當(dāng)著我的面吹牛党觅,可吹牛的內(nèi)容都是我干的雌澄。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼杯瞻,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼镐牺!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起魁莉,我...
    開封第一講書人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤睬涧,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后旗唁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體畦浓,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年检疫,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了讶请。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡屎媳,死狀恐怖夺溢,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情烛谊,我是刑警寧澤风响,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站晒来,受9級(jí)特大地震影響钞诡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一荧降、第九天 我趴在偏房一處隱蔽的房頂上張望接箫。 院中可真熱鬧,春花似錦朵诫、人聲如沸辛友。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽废累。三九已至,卻和暖如春脱盲,著一層夾襖步出監(jiān)牢的瞬間邑滨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來泰國打工钱反, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留掖看,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓面哥,卻偏偏與公主長得像哎壳,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子尚卫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359