一、概述
一個最簡單的Consumer的啟動代碼如下:
public static void main(String[] args) throws Exception{
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c1");
// Specify name server addresses.
consumer.setNamesrvAddr("10.1.11.155:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("qqq", "TagA||TagB");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
msgs.forEach(m-> System.out.println(new String(m.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
最重要的有幾個步驟:
- new DefaultMQPushConsumer("c1");
- consumer.subscribe("qqq", "TagA||TagB");
- registerMessageListener()
-
start()
下面就這幾個方法進(jìn)行深入
二问拘、實例化一個DefaultMQPushConsumer
通過下面的代碼就能夠?qū)嵗粋€DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c1");
這個默認(rèn)構(gòu)造方法其實調(diào)用了下面的構(gòu)造方法
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
一共三個參數(shù):
- consumerGroup
消費(fèi)者組 - rpcHook
遠(yuǎn)程調(diào)用的hook赦抖,我看Rocket的德行一般就在RPC調(diào)用之前,針對訪問控制(ACL)做了一個前置 - allocateMessageQueueStrategy
隊列負(fù)載均衡策略宦赠,默認(rèn)會用 AllocateMessageQueueAveragely(平均算法)
系統(tǒng)提供了6種策略:后續(xù)研究下- AllocateMachineRoomNearby
- AllocateMessageQueueAveragely
- AllocateMessageQueueAveragelyByCircle
- AllocateMessageQueueByConfig
- AllocateMessageQueueByMachineRoom
- AllocateMessageQueueConsistentHash
三陪毡、定義監(jiān)聽的主題及subExpression
代碼如下:
consumer.subscribe("qqq", "TagA||TagB");
解析一下topic及subExpression米母,生成一個SubscriptionData實體(包含了topic、subExpression的value和hash值)毡琉,并放到map中
四铁瞒、注冊監(jiān)聽處理器 registerMessageListener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// msgs.forEach(m-> System.out.println(new String(m.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
新建了MessageListenerConcurrently接口的匿名類,并實現(xiàn)了consumeMessage()方法桅滋,注意慧耍,這里MessageListenerConcurrently是一個接口,繼承自MessageListener丐谋,同樣繼承自MessageListener的還有MessageListenerOrderly(順序消費(fèi)監(jiān)聽器)
五芍碧、啟動 start()
這個是核心的核心了
1、this.checkConfig();
檢查group等參數(shù)是否合法
2号俐、this.copySubscription();
復(fù)制監(jiān)聽規(guī)則泌豆,其實就是把之前生成的SubscriptionData復(fù)制一份,交給RebalancePushImpl(這個應(yīng)該是負(fù)載均衡消費(fèi)的核心類)吏饿,除了復(fù)制監(jiān)聽規(guī)則踪危,還會把以下信息復(fù)制過去:
- setConsumerGroup 復(fù)制組名稱
- setMessageModel 復(fù)制消費(fèi)模式:廣播/集群
- setAllocateMessageQueueStrategy 復(fù)制負(fù)載均衡算法
- setmQClientFactory(在下面3中初始化) 復(fù)制mqClient
3、實例化一個MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
4猪落、指定offsetStore
如果是集群模式贞远,則會實例化一個LocalFileOffsetStore
如果是廣播模式,則會實例化一個RemoteBrokerOffsetStore
都會調(diào)用load的方法笨忌,但是RemoteBrokerOffsetStore是一個空實現(xiàn)
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
5蓝仲、實例化ConsumeMessageService
主要有兩種:并發(fā)消費(fèi)service及順序消費(fèi)service
- 并發(fā)消費(fèi) ConsumeMessageOrderlyService.start();
start() 會啟動一個定時任務(wù),延遲15分鐘蜜唾,每15分鐘執(zhí)行一次杂曲,清理失效的消息處理線程 - 順序消費(fèi) ConsumeMessageConcurrentlyService
start() 會啟動一個定時任務(wù),延遲1秒袁余,每20秒執(zhí)行一次 lockMQPeriodically()方法擎勘,這個方法應(yīng)該是順序消費(fèi)的精髓,可以深入研讀一下
6颖榜、啟動客戶端 mQClientFactory.start();
producer啟動的時候也會調(diào)用這個方法
上面1 ~ 5 的步驟棚饵,大部分是為了這個start()做準(zhǔn)備,這個start做了很多事情掩完,如下:
-
this.mQClientAPIImpl.fetchNameServerAddr();
如果沒有指定namesrv的地址噪漾,則會通過RPC獲取,這個最終是通過調(diào)用一個外部的URL連接來獲取的且蓬,這個鏈接可以自己定義欣硼,官方也推薦這種方法,最底層也最靈活恶阴。 -
this.mQClientAPIImpl.start();
啟動netty客戶端诈胜,可以和broker及namesrv通訊的通道 -
this.startScheduledTask();
啟動一批自動任務(wù)-
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
定期獲取namesrv的地址豹障,前面在啟動的時候已經(jīng)獲取一次了 -
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
定期從namesrv拉取meta信息及topic等路由信息 -
cleanOfflineBroker()、sendHeartbeatToAllBrokerWithLock()
定期檢查下線的broker及發(fā)送心跳 -
MQClientInstance.this.persistAllConsumerOffset();
定期固化消費(fèi)者的offset -
MQClientInstance.this.adjustThreadPool();
定期調(diào)整處理線程池的大小焦匈,這個4.4版本好像是個空實現(xiàn)血公,沒用
-
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
-
this.pullMessageService.start();
啟動客戶端主動pull的服務(wù) -
this.rebalanceService.start();
啟動負(fù)載均衡服務(wù)
最后還會調(diào)用一次getDefaultMQProducerImpl().start(false),沒錯缓熟,就是producer的start累魔,但是參數(shù)是false,不會最終調(diào)用producer的start够滑,就把配置刷過去了垦写。
7、其他方法
- this.updateTopicSubscribeInfoWhenSubscriptionChanged();
- this.mQClientFactory.checkClientInBroker();
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
this.mQClientFactory.rebalanceImmediately();
這幾個方法后續(xù)研究下
到此為止彰触,consumer的啟動流程完畢