消息從producer發(fā)送到了broker之后互广,消息的訂閱者就可以訂閱消費消息科侈。
roketmq消息拉取方式有兩種方式:推(push)和拉(pull)冗栗,推其實也只是對pull的一層封裝泄朴,本質(zhì)還是拉的方式罢吃。
rocketmq消息消費模式:集群消費和廣播消費,不同的消費方式區(qū)別還是有點大的宴偿,消息隊列分配湘捎、消息ack機制和消息消費進度管理上都有區(qū)別。
-
消息消費方式:并行消費和順序消費窄刘。并行消費是ConsumeMessageConcurrentlyService進行處理窥妇,順序消費是ConsumeMessageOrderlyService進行處理,順序消費rocketmq只支持單隊列的順序娩践。
我們以Pull拉取方式來進行分析:
啟動一個consumer實例來進行消費活翩,需要設(shè)置以下信息:
consumeGroup:設(shè)置消費者所在消費組
consumeFromWhere:設(shè)置消費偏移量,CONSUME_FROM_FIRST_OFFSET:從最開始消息消費翻伺,CONSUME_FROM_TIMESTAMP:從某個時間戳開始消費消息
subscribe:設(shè)置訂閱信息材泄,主要是訂閱的topic以及tag信息
messageListener:設(shè)置消息消費業(yè)務(wù)處理邏輯,實際處理消息的業(yè)務(wù)邏輯
以上這些信息都是后續(xù)消息拉取消費的基本信息很有作用
public static void main(String[] args) throws InterruptedException, MQClientException {
//創(chuàng)建consumer實例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
//設(shè)置消費偏移量
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//設(shè)置consumer訂閱topic和tag信息
consumer.subscribe("TopicTest", "*");
//設(shè)置consumer的消息消費業(yè)務(wù)邏輯MessageListener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
接下來看下DefaultMQPushConsumer:這是消息消費的具體實例
消息消費實例在創(chuàng)建的時候吨岭,設(shè)置了consumeGrou信息和負(fù)載均衡策略拉宗,默認(rèn)的負(fù)載均衡策略是平均分配的策略,這里rocketmq用到了策略模式
//創(chuàng)建實例的構(gòu)造函數(shù)
public DefaultMQPushConsumer(final String consumerGroup) {
//這里設(shè)置了consumer的consumeGroup信息和消息消費的負(fù)載均衡策略辣辫,默認(rèn)平均分配策略
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
?
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
//設(shè)置consumerGroup
this.consumerGroup = consumerGroup;
//設(shè)置消費負(fù)載均衡策略
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
//創(chuàng)建消息消費內(nèi)部對象實例旦事,具體的消息拉取消費委托給這個類
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
這里列舉下DefaultMQPushConsumer的數(shù)據(jù)結(jié)構(gòu),簡單認(rèn)識下:
//內(nèi)部實現(xiàn)急灭,這里大多數(shù)功能都是委托給它來處理
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
?
//消費組名稱
private String consumerGroup;
?
//消息消費模式
private MessageModel messageModel = MessageModel.CLUSTERING;
?
//消費偏移量
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
?
//消費時間戳
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
?
//負(fù)載均衡策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
?
//消費訂閱信息姐浮,以topic為維度
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
?
//消息消費業(yè)務(wù)邏輯
private MessageListener messageListener;
?
//消費進度管理器,集群消費和廣播消費來確定管理器類型
private OffsetStore offsetStore;
?
//消息消費線程最小數(shù)
private int consumeThreadMin = 20;
?
//消息消費線程最大數(shù)
private int consumeThreadMax = 64;
?
//動態(tài)調(diào)整線程池個數(shù)閾值
private long adjustThreadPoolNumsThreshold = 100000;
?
//并發(fā)最大消息2000個
private int consumeConcurrentlyMaxSpan = 2000;
?
//本地拉取消息堆積個數(shù)葬馋,用于消息流量控制
private int pullThresholdForQueue = 1000;
?
//本地拉取消息堆積大小卖鲤,用于消息流量控制
private int pullThresholdSizeForQueue = 100;
?
private int pullThresholdForTopic = -1;
private int pullThresholdSizeForTopic = -1;
private long pullInterval = 0;
?
//消息消費批次數(shù)
private int consumeMessageBatchMaxSize = 1;
?
//消息批量拉取數(shù)
private int pullBatchSize = 32;
private boolean postSubscriptionWhenPull = false;
private boolean unitMode = false;
?
//消息消費最大次數(shù)肾扰,-1表示16次,用延遲消息的方式來實現(xiàn)重新消費
private int maxReconsumeTimes = -1;
private long suspendCurrentQueueTimeMillis = 1000;
?
//消費超時時間15分鐘
private long consumeTimeout = 15;
?
//這個是消息鏈路追蹤用的
private TraceDispatcher traceDispatcher = null;
消息拉取消費的源頭始于DefaultMQPushConsumerImpl.start()扫尖,這個是案發(fā)第一現(xiàn)場:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
//消費者一些校驗
this.checkConfig();
//將defaultMQPushConsumer中的訂閱信息拷貝到rebalanceImpl中
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//創(chuàng)建或獲取MQClientInstance白对,IP@InstanceName作為key,所以一個應(yīng)用中他只有一個MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
?
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
?
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//消息廣播模式换怖,采用客戶端本地管理消息消費進度
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
//消息集群模式,采用broker遠(yuǎn)端管理消息消費進度
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加載消息隊列消費進度蟀瞧,以messageQueue為維度沉颂,廣播模式從本地文件讀取,集群模式從個空實現(xiàn)
this.offsetStore.load();
//設(shè)置消息消費服務(wù)悦污,并行消費和順序消費是不同的
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//消息消費服務(wù)線程啟動铸屉,沒有消息的話線程會阻塞
this.consumeMessageService.start();
//注冊消息消費類,以consumerGroup作為key切端,所以一個group中彻坛,必須消息訂閱信息都是相同,不然會出現(xiàn)訂閱信息覆蓋踏枣,導(dǎo)致最終消息丟失
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//MQClientInstance啟動昌屉,這個是重頭戲,開啟了netty客戶端茵瀑,拉消息線程间驮,消息消費的負(fù)載均衡,啟動定時任務(wù)等
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//更新消費者訂閱主題的路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
//發(fā)送心跳信息
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//開啟消息隊列負(fù)載均衡
this.mQClientFactory.rebalanceImmediately();
}
這里主要是做了下面這些事情:
將defaultMQPushConsumer中的訂閱消息拷貝到rebalanceImpl中
創(chuàng)建或獲取MQClientInstance马昨,他是以IP@InstanceName作為keyMQClientInstance作為value竞帽,放在map中,一個應(yīng)用中他只有一個實例
設(shè)置PullAPIWrapper鸿捧,這個是拉取和處理消息的包裝器
設(shè)置消息消費進度管理器屹篓,集群消費RemoteBrokerOffsetStore,廣播消費LocalFileOffsetStore
加載消息隊列消費進度this.offsetStore.load()匙奴,LocalFileOffsetStore從本地文件進行加載堆巧,RemoteBrokerOffsetStore是一個空的實現(xiàn),其實最后是從broker獲取消息消費進度
設(shè)置消息消費服務(wù)饥脑,并行消費是ConsumeMessageConcurrentlyService恳邀,順序消費是ConsumeMessageOrderlyService
啟動消息消費服務(wù)線程,this.consumeMessageService.start()灶轰,沒有消息過來時谣沸,線程是阻塞的
注冊消息消費類,以consumerGroup作為key笋颤,所以一個group中乳附,必須消息訂閱信息都是相同内地,不然會出現(xiàn)訂閱信息覆蓋,導(dǎo)致最終消息丟失
MQClientInstance線程啟動赋除,這個和DefaultMQPushConsumerImpl是一樣的重頭戲阱缓,它里面開啟 了Netty客戶端線程,拉取消息線程举农,消息消費的負(fù)載均衡荆针,啟動定時任務(wù)等。這里稍后分析颁糟,重中之重航背。
更新主體訂閱信息
發(fā)送心跳信息給所有的broker
喚醒消息隊列負(fù)載均衡線程
訂閱信息的拷貝:訂閱信息的拷貝主要是為了后面的消息拉取過濾和消息消費隊列的負(fù)載均衡
把consumer的訂閱信息,按topic的維度進行封裝棱貌,因為一個consumer可能會訂閱多個topic主題信息玖媚。封裝的信息主要是topic、subString婚脱、tag和tag的hashCode信息
集群消費模式下今魔,為這個consumerGroup構(gòu)建一個重試主題信息,topic為%RETRY%+consumerGroup障贸,訂閱信息是*所有的
private void copySubscription() throws MQClientException {
try {
//獲取consumer的消息訂閱信息错森,一個consumer可能訂閱多個topic信息
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
//組裝訂閱信息,主要是topic惹想,consumerGroup问词,substring,tag和tag的hashcode
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
//將這些訂閱信息存放到rebalanceImpl中嘀粱,以topic的維度
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
?
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
?
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
//集群模式下激挪,為一個consumerGroup構(gòu)建一個重試主體訂閱信息,topic為%RETRY%+consumerGroup锋叨,訂閱信息是*垄分,所有的重試消息
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
MQClientInstance啟動流程:這里的東西都很關(guān)鍵,他是整個JVM共用的娃磺,多個consumer實例都是使用的一個MQClientInstance
rocketmq網(wǎng)絡(luò)通信是基于netty的薄湿,rocketmq基于netty實現(xiàn)了一個私有化協(xié)議,所以需要開啟一個netty客戶端線程
啟動定時任務(wù)偷卧,主要是獲取nameServer地址豺瘤,更新主題路由信息,給所有broker發(fā)送心跳信息听诸,持久化消息消息消費進度信息
開啟拉取消息服務(wù)線程坐求,這個線程會不停地從消息拉取請求隊列里面拿出請求,然后拉取消息進行消費
開啟消息隊列負(fù)載均衡線程
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
//開啟客戶端通信晌梨,netty網(wǎng)絡(luò)通信客戶端
this.mQClientAPIImpl.start();
//開啟定時任務(wù)桥嗤,獲取nameServer地址须妻,更新主體路由信息,給所有broker發(fā)送心跳信息泛领,持久化消息消息消費進度信息
this.startScheduledTask();
//開啟拉取消息服務(wù)線程荒吏,從消息拉取請求隊列里面拿出請求拉取消息
this.pullMessageService.start();
//進行消息隊列負(fù)載均衡處理
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
客戶端向集群中所有的broker上報心跳信息:這里上報的消息主要是當(dāng)前客戶端的consumer信息和producer信息
consumer信息:groupName、消費偏移量渊鞋、主題消息訂閱信息
producer信息:producer group name
當(dāng)前客戶端往集群中的所有broker發(fā)送心跳信息绰更,如果當(dāng)前客戶端只有producer信息,就只往master broker發(fā)送心跳信息锡宋,這樣節(jié)省網(wǎng)絡(luò)開銷提升性能动知。定時任務(wù)會每30秒向所有broker發(fā)送心跳信息,同步consumer和producer信息员辩。
private void sendHeartbeatToAllBroker() {
//準(zhǔn)備當(dāng)前客戶端的consumer和producer的信息
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
if (!this.brokerAddrTable.isEmpty()) {
//發(fā)送心跳次數(shù)統(tǒng)計信息
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
//往集群中的所有broker發(fā)送心跳信息
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
//只有producer的信息的話,就不向slave broker發(fā)送心跳信息
if (id != MixAll.MASTER_ID)
continue;
}
try {
//采用netty客戶端鸵鸥,向broker發(fā)送心跳信息
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
}
}
}
Consumer消費負(fù)載均衡:rocketmq的負(fù)載均衡都是在客戶端進行實現(xiàn)的奠滑,producer發(fā)送的時候輪詢主體隊列進行發(fā)送消息,consumer消費消息的時候按消息隊列和消費者進行負(fù)載均衡妒穴。
public class RebalanceService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
?
log.info(this.getServiceName() + " service end");
}
}
?
public class MQClientInstance {
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
}
RebalanceService負(fù)載均衡線程不停滴進行消費的負(fù)載均衡處理宋税,按照每個consumerGroup進行doRebalance。然后每個consumer按每個topic進行rebalance讼油,我們以集群消費模式去進行分析:
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//按主題進行負(fù)載均衡處理
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//刪除非當(dāng)前consumer訂閱的消息隊列
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
//獲取主體的隊列路由信息杰赛,因為定時任務(wù)會不停滴從nameServer獲取主體路由信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//任意挑選一個broker來獲取主體所有的消費者,因為consumer定時向集群中的所有broker發(fā)送了心跳信息矮台,里面攜帶了消費者訂閱信息乏屯,所以可以任意選一個broker
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//排序
Collections.sort(mqAll);
Collections.sort(cidAll);
//獲取負(fù)載均衡策略,rocketmq默認(rèn)的是AllocateMessageQueueAveragely平均分配
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//根據(jù)當(dāng)前負(fù)載均衡策略瘦赫,依據(jù)主題隊列信息辰晕,所有的消費者信息,當(dāng)前消費者來獲取當(dāng)前消費者的消費隊列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//更新消息隊列确虱,移除非當(dāng)前消費者訂閱隊列含友,新加入隊列創(chuàng)建拉取消息請求
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
廣播和集群的負(fù)載均衡不同,主要不同是廣播是需要處理主題下的所有消息隊列校辩,集群是需要處理根據(jù)負(fù)載均衡策略產(chǎn)生的隊列窘问。
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//這里就把消息拉取請求添加到了PullMessageServic的pullRequestQueue隊列中
this.dispatchPullRequest(pullRequestList);
return changed;
}
移除不屬于當(dāng)前consumer的MessageQueue,因為負(fù)載均衡進行了隊列分配宜咒,所以當(dāng)前consumer只消費分配給自己的消息隊列MessageQueue惠赫。
當(dāng)前consumer消費的消息隊列都是存放在processQueueTable中,ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable荧呐,每個消息對對對應(yīng)一個處理隊列汉形。負(fù)載均衡新分配的隊列會生成PullRequest消息拉取請求computePullFromWhere計算消息消費偏移量纸镊。這里常見消息拉取請求,把PullRequest添加到了PullMessageService中的pullRequestQueue中概疆,這樣就喚醒了PullMessageService消息拉取線程逗威。
????PullMessageService是具體處理消息拉取請求的,他是繼承自ServiceThread岔冀,也可看成一個拉取消息的線程凯旭,在后臺不停地從阻塞隊列里拿消息拉取請求,然后從broker拉取消息消費使套。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//從阻塞隊列獲取消息拉取請求
PullRequest pullRequest = this.pullRequestQueue.take();
//拉取消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
//具體消息的拉取罐呼,這個是交給DefaultMQPushConsumerImpl來處理的
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
//...
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (!this.consumeOrderly) {
//...
} else {
//這里是處理順序消息的,處理拉取請求之前先要獲取消費處理隊列鎖
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
//...
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//處理拉取到的消息
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
//設(shè)置下一次拉取請求的消息偏移量 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
//...
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
???? PullMessageService在處理消息拉取請求的時候做了這些事情:
- 獲取當(dāng)前消息消費處理隊列侦高,判斷當(dāng)前待處理隊列緩存未處理消息個數(shù)和消息大小嫉柴,這個是進行消息消費流量控制的,防止客戶端處理不過來繼續(xù)拉取消息把客戶端搞掛了奉呛。未處理消息個數(shù)超過1000個或消息大小超過100M就延遲拉取计螺。
- 順序消費的時候,拉取消息前需要獲取消息消費處理隊列的鎖瞧壮。這個是順序消費里面的內(nèi)容登馒,后續(xù)順序消息里面再講。其實順序消息處理就是做什么之前先去獲取鎖咆槽,創(chuàng)建消息拉取請求之前獲取鎖陈轿,執(zhí)行消息拉取請求的時候先去獲取鎖,拉取到的消息消費之前先去獲取鎖秦忿。
- 這里把消息消費封裝成PullCallback麦射,用Netty的ChannelFutureListener異步回調(diào),待消息拉取成功來回調(diào)PullCallback進行消息消費處理小渊。
- 處理通過Netty拉取到的消息pullAPIWrapper.processPullResult法褥,這里會把拉取到的消息用消費者訂閱信息的tag進行過濾下,以為在broker端是用tag的hashcode進行過濾的酬屉,所以這里要用tag整串進行匹配下半等。
- 把拉取到的消息放到消息消費處理隊列中processQueue.putMessage(pullResult.getMsgFoundList()),供后續(xù)消費
- 提交消息消費請求到ConsumeMessageService中
- 重新創(chuàng)建消息拉取請求呐萨,broker端會返回消息下一個拉取偏移量杀饵。這里拿到這個下一個消息偏移量重新構(gòu)建消息拉取請求,繼續(xù)拉取消息谬擦。這里就是Push方式摄悯,他是封裝的Pull方式牺蹄,不停地構(gòu)建拉取請求拉取消息舅锄。
????消息消費最后是把拉取到的消息封裝成ConsumeRequest提交給ConsumeMessageService進行處理的,ConsumeMessageService對于并行消費和順序消費分別有對應(yīng)不同的實現(xiàn)類ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService话肖,在它們里面是有個消費處理線程池,ConsumeRequest也是一個任務(wù)葡幸,提交到線程池中分配線程進行處理最筒。
???先來看下ConsumeMessageOrderlyService:
public void run() {
//消息消費前獲取MessageQueue鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
//廣播消息直接進來了,集群消息需要先鎖定ProcessQueue才行
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
try {
//獲取ProcessQueue的重入鎖
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//調(diào)用業(yè)務(wù)消費消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
//處理消息消費結(jié)果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
- 獲取MessageQueue隊列的鎖蔚叨,獲取到鎖之后才能進去消費床蜘,沒有獲取到鎖,延遲再進行消費
- 獲取消息消費處理隊列ProcessQueue的鎖蔑水,這里是可重入鎖
- 調(diào)用消息業(yè)務(wù)處理邏輯
- 處理消息消費結(jié)果
????ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService的處理流程沒有太大的區(qū)別邢锯,只是并行消費不需要獲取MessageQueue和ProcessQueue 的鎖。
?????接下來看下消息消費處理結(jié)果:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
- 消息消費成功的時候ackIndex會被設(shè)置為consumeRequest.getMsgs().size() - 1搀别,但是消費處理失敗的時候丹擎,ackIndex會被設(shè)置為-1,這樣這一批消息都會被從頭重新消費一次歇父,所以這里存在重復(fù)消費的問題
- 集群模式下消息消費失敗會進行重試鸥鹉,會把這個消息發(fā)送到broker,broker會為每個消費組設(shè)置一個retry topic庶骄,這個消費組的consumer在構(gòu)建消息訂閱信息的時候,就已經(jīng)為這個消費者構(gòu)建了重試隊列的訂閱信息践磅。如果發(fā)送重試消息失敗单刁,會把這個消息消費請求提交到本地,延遲再進行消費府适。
- 更新MessageQueue的消息消費偏移量羔飞,不同的消費進度管理方式處理方式不同,但是這里都是先更新到本地緩存中檐春, 后續(xù)有定時任務(wù)會進行消費進度的持久化逻淌。