RocketMQ集群消費時隊列分配
何時需要消息隊列
業(yè)務(wù)解耦
最終一致性
廣播
錯峰流控
RocketMQ的核心概念
詳情見于文檔
PushConsumer調(diào)用負載均衡的方法
public void doRebalance() {
if (this.rebalanceImpl != null && !this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
可以看出真正負載均衡的是rebalanceImpl這個成員變量的工作,在RebelanceImpl類中代碼為
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
獲取訂閱關(guān)系,訂閱關(guān)系是在consumer.start()中從consumer中復(fù)制過來的,訂閱關(guān)系是以topic為key,tags組成的subscriptionData鹉勒。
第二步遍歷訂閱關(guān)系馋贤,調(diào)用rebalanceByTopc(topic,isOrder)方法,根據(jù)topic和isOrder進行負載均衡
遍歷結(jié)束制圈,調(diào)用truncateMessageQueueNotMyTopic()方法,去除不屬于當前consumer的topic對應(yīng)的消息隊列
分析rebalanceByTopic方法
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case CLUSTERING: {
//SD:集群模式下,從訂閱信息表中獲取topic對應(yīng)的所有消息隊列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//SD:根據(jù)topic和consumerGroupName獲取所有相關(guān)的consumerId
//SD:此處如果同一個consumerGroupName下面的訂閱關(guān)系不一致的話,會導(dǎo)致消息消費失敗
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
//把所有的消息隊列按照配置的分配策略進行分配淹接,獲取當前consumer獲得的消息隊列
try {
allocateResult = strategy.allocate(//
this.consumerGroup, //
this.mQClientFactory.getClientId(), //
mqAll, //
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//根據(jù)負載均衡的結(jié)果更新處理隊列,consumer根據(jù)消息隊列拉取消息
//移除不屬于當前consumer的隊列或者距離上次拉取時間超過最大間隔的隊列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
默認情況下叛溢,使用平均分配消息隊列的策略;
分析truncateMessageQueueNotMyTopic方法
private void truncateMessageQueueNotMyTopic() {
//獲取從consumer處copy的訂閱關(guān)系
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
//遍歷自己的執(zhí)行隊列的消息隊列集合
// 如果目標消息隊列的topic不存在當前的訂閱關(guān)系中塑悼,移除這個消息隊列
for (MessageQueue mq : this.processQueueTable.keySet()) {
if (!subTable.containsKey(mq.getTopic())) {
ProcessQueue pq = this.processQueueTable.remove(mq);
if (pq != null) {
pq.setDropped(true);
log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
}
}
}
}
什么時候負載均衡
在consumer啟動的過程中,rebalanceImpl會從consumer處復(fù)制訂閱關(guān)系
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
//復(fù)制訂閱關(guān)系
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(//
mQClientFactory, //
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//啟動消費消息的服務(wù)
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//啟動工廠方法
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "http://
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//當訂閱關(guān)系發(fā)生變化省楷掉,更新訂閱關(guān)系
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//立即負載均衡
this.mQClientFactory.rebalanceImmediately();
}
工廠方法啟動邏輯
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
rebalanceService是一個線程任務(wù)類,在線程任務(wù)中定時factory執(zhí)行調(diào)用負載均衡
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
this.waitForRunning(WaitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
factory執(zhí)行負載均衡厢蒜,其實就是遍歷factory中的所有consumer,調(diào)用doRebalance()方法
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Exception e) {
log.error("doRebalance exception", e);
}
}
}
}
AllocateMessageQueueStrategy 隊列分配策略
默認情況下PushConsumer的AllocateMessageQueueStrategy
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}