一术荤、NameServer啟動(dòng)
源碼入口:NamesrvStartup#main
1.NamesrvController controller = createNamesrvController(args);
- 檢測(cè)命令行參數(shù)
- 創(chuàng)建核心配置對(duì)象它匕,NamesrvConfig疯淫、NettyServerConfig
- 解析 -c 彩库、-p參數(shù)
- 檢查RocketMQ_HOME環(huán)境變量
- final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);創(chuàng)建controller
- controller.getConfiguration().registerConfig(properties); 注冊(cè)所有配置信息
2.start(controller);
- controller.initialize()威蕉; 執(zhí)行初始化
○ this.kvConfigManager.load(); 加載KV配置
○ this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);創(chuàng)建NettyServer網(wǎng)絡(luò)處理對(duì)象
○ this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); 創(chuàng)建Netty服務(wù)器工作線程池
○ this.registerProcessor(); 注冊(cè)NameServer的Processor 注冊(cè)到RemotingServer中
○ NamesrvController.this.routeInfoManager.scanNotActiveBroker() 啟動(dòng)定時(shí)任務(wù)玻粪,移除不活躍的Broker
○ NamesrvController.this.kvConfigManager.printAllPeriodically() 定時(shí)打印KV配置信息 - Runtime.getRuntime().addShutdownHook 注冊(cè)關(guān)閉鉤子梗逮,在關(guān)閉服務(wù)時(shí)釋放資源
- controller.start()项秉; 啟動(dòng)controller
NameServer的作用主要有兩個(gè):
1.維護(hù)broker的服務(wù)地址信息,并進(jìn)行更新
2.給Producer慷彤、consumer提供Broker的服務(wù)列表
二娄蔼、Broker啟動(dòng)
源碼入口:Brokerstartup#main
1.createBrokerController(args)
- 構(gòu)建四個(gè)核心配置對(duì)象:BrokerConfig、NettyServerConfig底哗、NettyClientConfig卵洗、MessageStoreConfig
- BrokerConfig只解析 -c參數(shù)
- RocketMq_HOME環(huán)境變量檢查
- RemotingUtil.string2SocketAddress(addr) 將namesrvAddr地址進(jìn)行拆分
- messageStoreConfig.getBrokerRole() 通過BrokerId判斷主從:masterId=0犹撒,Deldger集群的所有Broker節(jié)點(diǎn)ID都是-1
- 解析 -p碱茁、-m參數(shù)官脓,并將解析的參數(shù)添加到四個(gè)核心配置對(duì)象中
- BrokerController controller = new BrokerController 創(chuàng)建brokerController,將四個(gè)核心配置類傳入
- controller.getConfiguration().registerConfig(properties); 重新注冊(cè)(更新)配置
- controller.initialize(); 初始化controller
○ 加載磁盤上的配置文件:topicConfigManager前标、consumerOffsetManager坠韩、subscriptionGroupManager、consumerFilterManager
○ this.messageStore =new DefaultMessageStore() 構(gòu)建消息存儲(chǔ)組件
○ this.messageStore.load() 加載磁盤文件
○ this.remotingServer = new NettyRemotingServer 構(gòu)建Netty網(wǎng)絡(luò)組件
○ this.fastRemotingServer = new NettyRemotingServer 這個(gè)fastRemotingServer與RemotingServer功能基本差不多炼列,處理VIP端口請(qǐng)求
○ 后面就是初始化一些線程池
○ this.registerProcessor(); broker注冊(cè)一些Processor處理方法 - Runtime.getRuntime().addShutdownHook 注冊(cè)關(guān)閉鉤子
2.start(BrokerController controller)
- this.messageStore.start(); 這里啟動(dòng)服務(wù)主要是為了將CommitLog的寫入事件分發(fā)給ComsumeQueue和IndexFile
- 啟動(dòng)兩個(gè)Netty服務(wù):remotingServer只搁、fastRemotingServer
- this.fileWatchService.start(); 文件監(jiān)聽服務(wù)
- this.brokerOuterAPI.start(); brokerOuterAPI可以理解為一個(gè)Netty客戶端,往外發(fā)請(qǐng)求的組件俭尖,例如發(fā)送心跳
- this.pullRequestHoldService.start(); 長(zhǎng)輪詢請(qǐng)求暫停服務(wù)
- this.filterServerManager.start(); 使用filter進(jìn)行過濾
- BrokerController.this.registerBrokerAll() Broker核心的心跳注冊(cè)任務(wù),主要作用就是將broker注冊(cè)到Namesrv中
broker的核心作用:
1.作為client時(shí)须蜗,向nameServer發(fā)送心跳信息、發(fā)起事務(wù)的狀態(tài)檢查
2.作為服務(wù)端時(shí)目溉,用于存儲(chǔ)消息明肮、響應(yīng)consumer端的請(qǐng)求
三、Netty服務(wù)注冊(cè)框架
四缭付、Broker心跳注冊(cè)過程
源碼入口:BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister())
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
//這里才是比較關(guān)鍵的地方柿估。先判斷是否需要注冊(cè),然后調(diào)用doRegisterBrokerAll方法真正去注冊(cè)陷猫。
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
// Broker注冊(cè)最核心的部分
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
// 注冊(cè)broker方法
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
//注冊(cè)完保存主從節(jié)點(diǎn)的地址
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
//使用CopyOnWriteArrayList提升并發(fā)安全性
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
// 獲取所有nameServer的地址信息
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
//通過CountDownLatch秫舌,保證在所有NameServer上完成注冊(cè)后再一起結(jié)束的妖。
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
NameServer處理請(qǐng)求:
//NameServer處理請(qǐng)求的核心代碼
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER: //Broker注冊(cè)請(qǐng)求處理。版本默認(rèn)是當(dāng)前框架版本
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request); //當(dāng)前版本
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
實(shí)際就是將broker信息注冊(cè)到routeInfo中:
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
try {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
} catch (Exception e) {
throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
}
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
//routeInfoManager就是管理路由信息的核心組件足陨。
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
五嫂粟、Producer發(fā)送消息
源碼入口:DefaultMQProducer#start
1.this.defaultMQProducerImpl.start(); 生產(chǎn)端啟動(dòng)
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// 默認(rèn)就是CREATE_JUST
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
//修改當(dāng)前的instanceName為當(dāng)前進(jìn)程ID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//客戶端核心的MQ客戶端工廠 對(duì)于事務(wù)消息發(fā)送者,在這里面會(huì)完成事務(wù)消息的發(fā)送者的服務(wù)注冊(cè)
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//注冊(cè)MQ客戶端工廠示例
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//啟動(dòng)示例 --所有客戶端組件都交由mQClientFactory啟動(dòng)
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 向所有的broker發(fā)送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.startScheduledTask();
}
六墨缘、Consumer消費(fèi)消息
消費(fèi)端入口:DefaultMQPushConsumer#start
this.defaultMQPushConsumerImpl.start();
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//客戶端示例工廠星虹,生產(chǎn)者也是交由這個(gè)工廠啟動(dòng)的。
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//負(fù)載均衡策略
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
//從這里可以看出镊讼,廣播模式與集群模式的最本質(zhì)區(qū)別就是offset存儲(chǔ)的地方不一樣宽涌。
switch (this.defaultMQPushConsumer.getMessageModel()) {
//廣播模式是在消費(fèi)者本地存儲(chǔ)offset
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
//集群模式是在Broker遠(yuǎn)端存儲(chǔ)offset
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
//順序消費(fèi)監(jiān)聽創(chuàng)建ConsumeMessageOrderlyService
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
//并發(fā)消費(fèi)監(jiān)聽創(chuàng)建ConsumeMessageConcurrentlyService
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
//注冊(cè)消費(fèi)者。與生產(chǎn)者類似蝶棋,客戶端只要按要求注冊(cè)即可卸亮,后續(xù)會(huì)隨mQClientFactory一起啟動(dòng)。
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
1玩裙、consumer端的消費(fèi)模式:
● 集群模式:集群模式下每個(gè)consumer都會(huì)分配不同的消息
● 廣播模式:廣播模式下每個(gè)消息都推送給所有consumer
2兼贸、關(guān)于offset存儲(chǔ):
● 廣播模式:this.offsetStore = new LocalFileOffsetStore(); 存儲(chǔ)在每個(gè)consumer中
● 集群模式:this.offsetStore = new RemoteBrokerOffsetStore(); 存儲(chǔ)在broker端