概述
rocketmq原理想必大家都有了解了,網(wǎng)上也有很多博客和資料講述的很詳細(xì)支竹。本章主要是想講一講rocketmq中topic字逗、group、tag之間的關(guān)系醒陆。
我一般喜歡帶著問題去查看源碼從而驗(yàn)證問題的結(jié)論瀑构。所以先來看看我的問題:
1、在consumer訂閱消息中中允許topic刨摩、tag相同寺晌、group不同的消費(fèi)者同時(shí)消費(fèi)消息嗎?
2澡刹、在consumer訂閱消息中允許group呻征、tag相同、topic不同的消費(fèi)者同時(shí)消費(fèi)消息嗎罢浇?
3陆赋、在consumer訂閱消息中允許group、topic相同嚷闭、tag不同的消費(fèi)者同時(shí)消費(fèi)消息嗎攒岛?
想要知道上面的問題“蹋看看rocketmq中是如何實(shí)現(xiàn)訂閱關(guān)系的吧灾锯。
1、訂閱關(guān)系核心管理類方法:ConsumerManager#registerConsumer
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
//1嗅榕、獲取consumer組信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
//2顺饮、更新消費(fèi)客戶端ip信息
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
//3、更新消費(fèi)端訂閱的topic凌那、tag等信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
說明:consumerTable是一個(gè)Map類型變量兼雄,存放的是所有消費(fèi)組信息,key存放的是groupName案怯,value存放的是ConsumerGroupInfo組信息君旦,我們再繼續(xù)往下看consumerGroupInfo.updateSubscription方法澎办,該方法是更新消費(fèi)組信息的核心方法嘲碱。
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
//循環(huán)consumer訂閱信息
for (SubscriptionData sub : subList) {
//根據(jù)topic獲取訂閱信息
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
//如果訂閱信息不存在金砍,則直接新增
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
}
//如果訂閱信息存在,判斷新的訂閱信息版本高于老的訂閱信息(一般都是高于)
else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
//新的訂閱信息覆蓋老的訂閱信息麦锯,這里可以看出來恕稠,同一個(gè)group和topic的情況下,tag不同扶欣,
//也會被覆蓋掉鹅巍,所以問題3的答案有了
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
//這里循環(huán)判斷subscriptionTable 與本次注冊進(jìn)來的subList比較
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
for (SubscriptionData sub : subList) {
//判斷當(dāng)前subscriptionTable中所有topic訂閱信息是否都在新注冊的列表中
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
next.getValue().toString()
);
//如果不存在,則刪除group對應(yīng)的topic訂閱信息料祠,
it.remove();
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
現(xiàn)在我們來解答前面提的三個(gè)問題:
1骆捧、在consumer訂閱消息中中允許topic、tag相同髓绽、group不同的消費(fèi)者同時(shí)消費(fèi)消息嗎敛苇?
答:可以。因?yàn)樵贑onsumerManage#consumerTable 中是以groupName為key的顺呕,每個(gè)groupName對應(yīng)的ConsumerGroupInfo相互隔離的枫攀。
2、在consumer訂閱消息中允許group株茶、tag相同来涨、topic不同的消費(fèi)者同時(shí)消費(fèi)消息嗎?
答:不可以启盛。如下場景:
現(xiàn)有消費(fèi)客戶端consumer1蹦掐,consumer2,topic1僵闯,topic2笤闯,group:group1,tag:tag1
consumer1訂閱topic1棍厂,group1颗味,tag1的訂閱信息
consumer2訂閱topic2,group1牺弹,tag1的訂閱信息
步驟1:consumer1注冊consumerGroupInfo信息調(diào)用updateSubscription方法更新subscriptionTableMap信息
步驟2:consumer2注冊consumerGroupInfo信息調(diào)用updateSubscription方法時(shí)浦马,如上述源碼所示,因?yàn)閏onsumer2只訂閱了topic2张漂,所以consumer1訂閱的topic1訂閱信息會被刪除掉晶默。
3、在consumer訂閱消息中允許group航攒、topic相同磺陡、tag不同的消費(fèi)者同時(shí)消費(fèi)消息嗎?
答:不可以,如下場景:
現(xiàn)有消費(fèi)客戶端consumer1币他,consumer2坞靶,topic1,group:group1蝴悉,tag:tag1彰阴,tag2
步驟1:consumer1訂閱信息為:topic1,group1拍冠,tag1
步驟2:consumer2訂閱信息為:topic1尿这,group1,tag2庆杜,此時(shí)會更新訂閱信息的時(shí)候會拿consumer2的訂閱信息覆蓋掉consumer1的訂閱信息射众,具體代碼請參考ConsumerGroupInfo#updateSubscription方法
那么。晃财。责球。product中的group是用來干嘛的?ConsumerManage中的consumerGroupInfo信息從哪來的拓劝?
請聽下回分解雏逾。rocketmq源碼系列(4)-consumer啟動過程的那些事