1、NameServer的作用
Name Server 是專為 RocketMQ 設(shè)計(jì)的輕量級(jí)名稱服務(wù),具有簡(jiǎn)單缀拭、可集群橫吐擴(kuò)展、無(wú)狀態(tài)填帽,節(jié)點(diǎn)之間互不通信等特點(diǎn)蛛淋。
整個(gè)Rocketmq集群的工作原理如下圖所示:
任何Producer、Consumer篡腌、Broker與所有NameServer通信褐荷,向NameServer請(qǐng)求或者發(fā)送數(shù)據(jù)。而且都是單向的嘹悼,Producer和Consumer請(qǐng)求數(shù)據(jù)叛甫,Broker發(fā)送數(shù)據(jù)。正是因?yàn)檫@種單向的通信杨伙,RocketMQ水平擴(kuò)容變得很容易其监。
Broker集群
Broker用于接收生產(chǎn)者發(fā)送消息,或者消費(fèi)者消費(fèi)消息的請(qǐng)求限匣。一個(gè)Broker集群由多組Master/Slave組成抖苦,Master可寫(xiě)可讀,Slave只可以讀米死,Master將寫(xiě)入的數(shù)據(jù)同步給Slave锌历。每個(gè)Broker節(jié)點(diǎn),在啟動(dòng)時(shí)峦筒,都會(huì)遍歷NameServer列表究西,與每個(gè)NameServer建立長(zhǎng)連接,注冊(cè)自己的信息物喷,之后定時(shí)上報(bào)卤材。
Producer集群
消息的生產(chǎn)者,通過(guò)NameServer集群獲得Topic的路由信息脯丝,包括Topic下面有哪些Queue,這些Queue分布在哪些Broker上等伏伐。Producer只會(huì)將消息發(fā)送到Master節(jié)點(diǎn)上宠进,因此只需要與Master節(jié)點(diǎn)建立連接。
Consumer集群
消息的消費(fèi)者藐翎,通過(guò)NameServer集群獲得Topic的路由信息材蹬,連接到對(duì)應(yīng)的Broker上消費(fèi)消息实幕。注意,由于Master和Slave都可以讀取消息堤器,因此Consumer會(huì)與Master和Slave都建立連接昆庇。
2遗锣、NameServer類結(jié)構(gòu)
- NamesrvStartup: NameServer的啟動(dòng)類锦秒;
- NamesrvController: NameServer的核心控制類幅狮;
- KVConfigManager: 讀取或變更NameServer的配置屬性螺垢,加載NamesrvConfig中配置的配置文件到內(nèi)存拯田;
- KVConfigSerializeWrapper: NameServer配置信息序列化包裝類叶沛;
- RouteInfoManager: NameServer數(shù)據(jù)的載體瘪弓,記錄Broker劫映,Topic等信息乓旗;
- DefaultRequestProcessor: NameServer處理請(qǐng)求的請(qǐng)求類府蛇,負(fù)責(zé)處理所有與NameServer交互的請(qǐng)求;
- BrokerHousekeepingService: BrokerHouseKeepingService實(shí)現(xiàn)ChannelEventListener接口屿愚,可以說(shuō)是通道在發(fā)送異常時(shí)的回調(diào)方法(Nameserver與Broker的連接通道在關(guān)閉汇跨、通道發(fā)送異常、通道空閑時(shí))妆距;
- NamesrvConfig: NamesrvConfig,主要指定nameserver的相關(guān)配置目錄屬性穷遂;
- NettyRemotingServer: Netty服務(wù)類;
(1)NameServer啟動(dòng)流程
NameServer的啟動(dòng)是由NamesrvStartup完成的毅厚,啟動(dòng)過(guò)程如下:
- 獲取并解析配置參數(shù)塞颁,包括NamesrvConfig和NettyServerConfig;
- 調(diào)用NamesrvController.initialize()初始化NamesrvController吸耿;若初始化失敗祠锣,則直接關(guān)閉NamesrvController;
- 然后調(diào)用NamesrvController.start()方法來(lái)開(kāi)啟NameServer服務(wù)咽安;
- 注冊(cè)ShutdownHookThread服務(wù)伴网。在JVM退出之前,調(diào)用NamesrvController.shutdown()來(lái)進(jìn)行關(guān)閉服務(wù)妆棒,釋放資源澡腾;
public class NamesrvStartup {
private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 創(chuàng)建NamesrvController
NamesrvController controller = createNamesrvController(args);
// 啟動(dòng)NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
// 構(gòu)建命令行
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// nameServer配置參數(shù)
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// netty server 配置參數(shù)
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
// 命令行參數(shù)是否包含配置文件
if (commandLine.hasOption('c')) {
// 獲取配置文件路徑
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 是否打印參數(shù)
if (commandLine.hasOption('p')) {
// 都不打印
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
System.exit(0);
}
// 設(shè)置命令行的參數(shù),優(yōu)先級(jí)高(會(huì)覆蓋掉配置文件的配置項(xiàng))
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 未設(shè)置 rocketMQ home
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 配置Logger
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 控制臺(tái)打印參數(shù)
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 創(chuàng)建 NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 注冊(cè)配置參數(shù)糕珊,防止丟失
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化NamesrvController
boolean initResult = controller.initialize();
// 初始化失敗
if (!initResult) {
// 關(guān)閉NamesrvController
controller.shutdown();
// 關(guān)閉JVM
System.exit(-3);
}
// 注冊(cè)關(guān)閉鉤子方法:當(dāng)JVM關(guān)閉的時(shí)候动分,先關(guān)閉NamesrvController
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
// 關(guān)閉NamesrvController
controller.shutdown();
return null;
}
}));
// 啟動(dòng)NamesrvController
controller.start();
return controller;
}
public static void shutdown(final NamesrvController controller) {
controller.shutdown();
}
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Name server config properties file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
return options;
}
}
調(diào)用NamesrvController.initialize()初始化NamesrvController
public boolean initialize() {
// 加載KV配置
this.kvConfigManager.load();
// 初始化通信層
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化線程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
// 增加定時(shí)任務(wù)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
//
// @Override
// public void run() {
// NamesrvController.this.routeInfoManager.printAllPeriodically();
// }
// }, 1, 5, TimeUnit.MINUTES);
return true;
}
加載KV配置,創(chuàng)建NettyServer網(wǎng)絡(luò)處理對(duì)象红选,然后開(kāi)啟兩個(gè)定時(shí)任務(wù)澜公,此類定時(shí)任務(wù)統(tǒng)稱為心跳檢測(cè)
- NameServer每隔10秒掃描一次Broker,移除處于不激活狀態(tài)的Broker
- NameServer每隔10分鐘打印一次KV配置
3喇肋、NameServer如何保證數(shù)據(jù)的最終一致
NameServer作為一個(gè)名稱服務(wù)坟乾,需要提供服務(wù)注冊(cè)迹辐、服務(wù)剔除、服務(wù)發(fā)現(xiàn)這些基本功能甚侣,但是NameServer節(jié)點(diǎn)之間并不通信明吩,在某個(gè)時(shí)刻各個(gè)節(jié)點(diǎn)數(shù)據(jù)可能不一致的情況下,如何保證客戶端可以最終拿到正確的數(shù)據(jù)殷费。下面分別從路由元信息印荔、路由注冊(cè)、路由剔除宗兼,路由發(fā)現(xiàn)四個(gè)角度進(jìn)行介紹躏鱼。
路由元信息
NameServer路由實(shí)現(xiàn)類: RoutelnfoManager
RouteInfoManager作為NameServer數(shù)據(jù)的載體,記錄Broker殷绍、Topic染苛、QueueData等信息。
Broker在啟動(dòng)時(shí)會(huì)將Broker信息主到、Topic信息茶行、QueueData信息注冊(cè)到所有的NameServer上,并和所有NameServer節(jié)點(diǎn)保持長(zhǎng)連接登钥,之后也會(huì)定時(shí)注冊(cè)信息畔师;
Producer、Consumer也會(huì)和其中一個(gè)NameServer節(jié)點(diǎn)保持長(zhǎng)連接牧牢,定時(shí)從NameServer中獲取Topic路由信息看锉;
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- topicQueueTable: Topic 消息隊(duì)列路由信息,消息發(fā)送時(shí)根據(jù)路由表進(jìn)行負(fù) 載均衡 塔鳍。
- brokerAddrTable : Broker 基礎(chǔ)信息伯铣, 包含 brokerName、 所屬集群名稱 轮纫、 主備 Broker
地址腔寡。 - clusterAddrTable: Broker 集群信息,存儲(chǔ)集群中所有 Broker 名稱 掌唾。
- brokerLiveTable: Broker 狀態(tài)信息 放前。 NameServer 每次 收到心跳包時(shí)會(huì) 替換該信 息 。
- filterServerTable : Broker上的 FilterServer列表糯彬,用于類模式消息過(guò)濾凭语,
RocketMQ 基于訂閱發(fā)布機(jī)制 , 一個(gè) Topic 擁有 多 個(gè)消息隊(duì) 列 撩扒,一 個(gè) Broker 為每一主 題默 認(rèn)創(chuàng)建 4 個(gè)讀隊(duì)列 4 個(gè)寫(xiě)隊(duì)列 似扔。 多個(gè) Broker 組成 一個(gè)集群 , BrokerName 由相同的多臺(tái) Broker 組 成 Master-Slave 架構(gòu) , brokerId 為 0 代表 Master虫几, 大于 0 表示 Slave。 BrokerLivelnfo 中 的 lastUpdateTimestamp 存儲(chǔ)上次收到 Broker 心跳包的時(shí)間 挽拔。
路由注冊(cè)
對(duì)于Zookeeper辆脸、Etcd這樣強(qiáng)一致性組件,數(shù)據(jù)只要寫(xiě)到主節(jié)點(diǎn)螃诅,內(nèi)部會(huì)通過(guò)狀態(tài)機(jī)將數(shù)據(jù)復(fù)制到其他節(jié)點(diǎn)啡氢,Zookeeper使用的是Zab協(xié)議,etcd使用的是raft協(xié)議术裸。
但是NameServer節(jié)點(diǎn)之間是互不通信的倘是,無(wú)法進(jìn)行數(shù)據(jù)復(fù)制。RocketMQ采取的策略是袭艺,在Broker節(jié)點(diǎn)在啟動(dòng)的時(shí)候搀崭,輪訓(xùn)NameServer列表,與每個(gè)NameServer節(jié)點(diǎn)建立長(zhǎng)連接猾编,發(fā)起注冊(cè)請(qǐng)求瘤睹。NameServer內(nèi)部會(huì)維護(hù)一個(gè)Broker表,用來(lái)動(dòng)態(tài)存儲(chǔ)Broker的信息答倡。
同時(shí)轰传,Broker節(jié)點(diǎn)為了證明自己是存活的,會(huì)將最新的信息上報(bào)給NameServer瘪撇,然后每隔30秒向NameServer發(fā)送心跳包获茬,心跳包中包含 BrokerId、Broker地址倔既、Broker名稱恕曲、Broker所屬集群名稱等等,然后NameServer接收到心跳包后叉存,會(huì)更新時(shí)間戳码俩,記錄這個(gè)Broker的最新存活時(shí)間。
NameServer在處理心跳包的時(shí)候歼捏,存在多個(gè)Broker同時(shí)操作一張Broker表稿存,為了防止并發(fā)修改Broker表導(dǎo)致不安全,路由注冊(cè)操作引入了ReadWriteLock讀寫(xiě)鎖瞳秽,這個(gè)設(shè)計(jì)亮點(diǎn)允許多個(gè)消息生產(chǎn)者并發(fā)讀瓣履,保證了消息發(fā)送時(shí)的高并發(fā),但是同一時(shí)刻N(yùn)ameServer只能處理一個(gè)Broker心跳包练俐,多個(gè)心跳包串行處理袖迎。這也是讀寫(xiě)鎖的經(jīng)典使用場(chǎng)景,即讀多寫(xiě)少。
Broker端心跳包發(fā)送
Broker端心跳包發(fā)送( BrokerController#start)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
}
catch (Exception e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
該方法主要是遍歷 NameServer列表燕锥, Broker消息服務(wù)器依次向 NameServer發(fā)送心跳包辜贵。
public RegisterBrokerResult registerBrokerAll(//
final String clusterName,// 1
final String brokerAddr,// 2
final String brokerName,// 3
final long brokerId,// 4
final String haServerAddr,// 5
final TopicConfigSerializeWrapper topicConfigWrapper,// 6
final List<String> filterServerList,// 7
final boolean oneway// 8
) {
RegisterBrokerResult registerBrokerResult = null;
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
RegisterBrokerResult result =
this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway);
if (result != null) {
registerBrokerResult = result;
}
log.info("register broker to name server {} OK", namesrvAddr);
}
catch (Exception e) {
log.warn("registerBroker Exception, " + namesrvAddr, e);
}
}
}
NameServer端處理心跳包
Step1:路由 注冊(cè)需要加寫(xiě)鎖 ,防止并發(fā)修改 RoutelnfoManager 中的路由 表 归形。 Broker 所屬 集群是否存在托慨, 如果不存在,則創(chuàng) 建暇榴,然 后將 broker 名加入到集群 合中厚棵。
Step2 :維護(hù) BrokerData信息,首先從 brokerAddrTable根據(jù) BrokerName嘗試獲取 Broker信息蔼紧,如果不存在婆硬, 則新建 BrokerData并放入到 brokerAddrTable, registerFirst設(shè) 置為 true;如果存在 , 直接替換原先的奸例, registerFirst設(shè)置為 false彬犯,表示非第一次注冊(cè) 。
Step3 :如果Broker為Master查吊,并且BrokerTopic配置信息發(fā)生變化或者是初次注冊(cè)躏嚎, 則需要?jiǎng)?chuàng)建或更新 Topic路由元數(shù)據(jù),填充 topicQueueTable菩貌, 其實(shí)就是為默認(rèn)主題自動(dòng)注 冊(cè)路由信息卢佣,其中包含 MixAII.DEFAULT TOPIC 的路由信息。 當(dāng)消息生產(chǎn)者發(fā)送主題時(shí)箭阶, 如果該主題未創(chuàng)建并且BrokerConfig的autoCreateTopicEnable為true時(shí)虚茶, 將返回MixAII. DEFAULT TOPIC的路由信息。
路由剔除
正常情況下仇参,如果Broker關(guān)閉嘹叫,則會(huì)與NameServer斷開(kāi)長(zhǎng)連接,Netty的通道關(guān)閉監(jiān)聽(tīng)器會(huì)監(jiān)聽(tīng)到連接斷開(kāi)事件诈乒,然后會(huì)將這個(gè)Broker信息剔除掉罩扇。
Broker 每隔 30s 向 NameServer 發(fā)送一個(gè)心跳包,心跳包中包含 BrokerId怕磨、Broker地址喂饥、Broker名稱、 Broker所屬集群名稱肠鲫、Broker關(guān)聯(lián)的 FilterServer列表员帮。 但是如果 Broker若機(jī) , NameServer無(wú)法收到心跳包导饲,此時(shí) NameServer如何來(lái)剔除這些失 效的 Broker 呢? Name Server會(huì)每隔 IOs 掃描 brokerLiveTable狀態(tài)表捞高,如果 BrokerLive 的 lastUpdateTimestamp 的時(shí)間戳距當(dāng)前時(shí)間超過(guò) 120s氯材,則認(rèn)為 Broker失效,移除該 Broker, 關(guān)閉與Broker連接硝岗,并同時(shí)更新topicQueueTable氢哮、 brokerAddrTable、 brokerLiveTable型檀、 filterServerTable命浴。
RocktMQ 有兩個(gè)觸發(fā)點(diǎn)來(lái)觸發(fā)路由刪除 。
- NameServer定時(shí)掃描 brokerLiveTable檢測(cè)上次心跳包與 當(dāng)前系統(tǒng)時(shí)間的時(shí)間差贱除, 如果時(shí)間戳大于 120s,則需要移除該 Broker 信息 媳溺。
- Broker在正常被關(guān)閉的情況下月幌,會(huì)執(zhí)行 unregisterBroker指令。
路由發(fā)現(xiàn)
RocketMQ 路由發(fā)現(xiàn)是非實(shí)時(shí)的悬蔽,當(dāng) Topic 路由出現(xiàn)變化后扯躺, NameServer不主動(dòng)推送給客戶端 , 而 是由客戶端定時(shí)拉取主題最新的路由 蝎困。
com.alibaba.rocketmq.filtersrv.processor.DefaultRequestProcessor#pullMessageForward
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request)
throws Exception {
final RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
// 由于異步返回录语,所以必須要設(shè)置
response.setOpaque(request.getOpaque());
DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
final FilterClassInfo findFilterClass =
this.filtersrvController.getFilterClassManager().findFilterClass(
requestHeader.getConsumerGroup(), requestHeader.getTopic());
if (null == findFilterClass) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Find Filter class failed, not registered");
return response;
}
if (null == findFilterClass.getMessageFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Find Filter class failed, registered but no class");
return response;
}
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
// 構(gòu)造從Broker拉消息的參數(shù)
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.filtersrvController.getBrokerName());
long offset = requestHeader.getQueueOffset();
int maxNums = requestHeader.getMaxMsgNums();
final PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
responseHeader.setMaxOffset(pullResult.getMaxOffset());
responseHeader.setMinOffset(pullResult.getMinOffset());
responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
response.setRemark(null);
switch (pullResult.getPullStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
List<MessageExt> msgListOK = new ArrayList<MessageExt>();
try {
for (MessageExt msg : pullResult.getMsgFoundList()) {
boolean match = findFilterClass.getMessageFilter().match(msg);
if (match) {
msgListOK.add(msg);
}
}
// 有消息返回
if (!msgListOK.isEmpty()) {
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
response, msgListOK);
return;
}
// 全部都被過(guò)濾掉了
else {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
}
}
// 只要拋異常,就終止過(guò)濾禾乘,并返回客戶端異常
catch (Throwable e) {
final String error =
String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
requestHeader.getConsumerGroup(), requestHeader.getTopic());
log.error(error, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
response, null);
return;
}
break;
case NO_MATCHED_MSG:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_NEW_MSG:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_ILLEGAL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
break;
default:
break;
}
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
}
@Override
public void onException(Throwable e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
return;
}
};
pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
return null;
}
- 調(diào)用 RouterlnfoManager 的方法澎埠,從路由 表 topicQueueTable、 brokerAddrTable始藕、 filterServerTable中分別填充TopicRouteData中的List<Queu巳Data>蒲稳、List<BrokerData>和 filterServer 地址表 。
- 如果找到主題對(duì)應(yīng)的路由信息并且該主題為順序消息伍派,則從 NameServer KVconfig 中獲取關(guān)于順序消息相關(guān) 的配置填充路由信息 江耀。
如果找不到路由信息 CODE 則使用 TOPIC NOT_EXISTS ,表示沒(méi)有找到對(duì)應(yīng)的路由 诉植。