官方對consumer的定義如下:
Similar to previously mentioned producer group, consumers of the exactly same role are grouped together and named Consumer Group.
Consumer Group is a great concept with which achieving goals of load-balance and fault-tolerance, in terms of message consuming, is super easy.
Warning: consumer instances of a consumer group must have exactly the same topic subscription(s).
大意是消費者準(zhǔn)確按照相同角色來分組,分組的目的是負(fù)載均衡和失敗轉(zhuǎn)移妨托,并且警告同個分組中的消費者一定要訂閱相同的topic。
在代碼中看到consumerGroup變量定義注釋如下镀岛,大意差不多竣灌。
Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve load balance. It's required and needs to be globally unique.
通過上面的定義,可以準(zhǔn)確知道同一組的consumer一定要訂閱相同的topic,那么問題來了衩侥,訂閱的時候除了topic還有tags,這個tag會有影響嗎矛物?沒有找到相關(guān)資料茫死,我就自己做了測試。
instanceName | groupName | Topic | Tag |
---|---|---|---|
A | GroupA | TopicA | TagA |
B | GroupA | TopicA | TagB |
測試目標(biāo):測試在相同的消費組中的消費者泽谨,訂閱相同的topic時璧榄,tag不同會不會影響消費和負(fù)載均衡特漩。
測試計劃:
1.創(chuàng)建生產(chǎn)者發(fā)送100條消息;
2.創(chuàng)建消費者
instanceName | groupName | Topic | Tag |
---|---|---|---|
A | GroupA | TopicA | TagA |
B | GroupA | TopicA | TagB |
3.觀察消息消費情況和隊列分配情況骨杂。
發(fā)消息
public class SimpleProducer {
public static void sendSync() throws Exception {
ClientConfig clientConfig=new ClientConfig();
clientConfig.setNamesrvAddr("localhost:9876");
MQClientInstance clientInstance=MQClientManager.getInstance().getAndCreateMQClientInstance(clientConfig);
DefaultMQProducer producer = clientInstance.getDefaultMQProducer();
producer.setProducerGroup("GroupA");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicA", "TagA", ("Hello mq" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.println("send " + i + " , result:" + sendResult.getMsgId());
}
producer.shutdown();
}
}
消費者
public class SimpleConsumer {
public static void pushConsume(final String instanceName, final String group, final String topic, final String tag) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, tag);
consumer.setInstanceName(instanceName);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("[" + instanceName + "," + group + "," + topic + "," + tag + "] consume: " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
測試
public static void main(String[] args) throws Exception {
try {
SimpleProducer.sendSync();
} catch (Exception e) {
e.printStackTrace();
}
Thread t2 = new Thread() {
@Override
public void run() {
try {
SimpleConsumer.pushConsume("A", "GroupA", "TopicA", "TagA");
} catch (Exception e) {
e.printStackTrace();
}
}
};
t2.start();
Thread t3 = new Thread() {
@Override
public void run() {
try {
SimpleConsumer.pushConsume("B", "GroupA", "TopicA", "TagB");
} catch (Exception e) {
e.printStackTrace();
}
}
};
t3.start();
t2.join();
t3.join();
}
結(jié)果:
....
send 97 , result:AC1100013B2118B4AAC2808264CC0061
send 98 , result:AC1100013B2118B4AAC2808264CD0062
send 99 , result:AC1100013B2118B4AAC2808264CE0063
[A,GroupA,TopicA,TagA] consume: Hello mq1
[A,GroupA,TopicA,TagA] consume: Hello mq2
[A,GroupA,TopicA,TagA] consume: Hello mq5
....
查看隊列分布情況:
消費者A
./bin/mqadmin consumerStatus -n "localhost:9876" -g "GroupA" -i "172.17.0.1@A"
#Consumer MQ Detail#
#Topic #Broker Name #QID #ProcessQueueInfo
%RETRY%GroupA mo-x 0 ProcessQueueInfo [commitOffset=0, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=false, lastPullTimestamp=20180625231132580, lastConsumeTimestamp=20180625231129554]
TopicA mo-x 0 ProcessQueueInfo [commitOffset=50, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=true, lastPullTimestamp=20180625231132552, lastConsumeTimestamp=20180625231129547]
TopicA mo-x 1 ProcessQueueInfo [commitOffset=50, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=true, lastPullTimestamp=20180625231132572, lastConsumeTimestamp=20180625231129549]
消費者B
./bin/mqadmin consumerStatus -n "localhost:9876" -g "GroupA" -i "172.17.0.1@B"
#Consumer MQ Detail#
#Topic #Broker Name #QID #ProcessQueueInfo
TopicA mo-x 2 ProcessQueueInfo [commitOffset=25, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231150500, droped=false, lastPullTimestamp=20180625231209016, lastConsumeTimestamp=20180625231149571]
TopicA mo-x 3 ProcessQueueInfo [commitOffset=25, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231150500, droped=false, lastPullTimestamp=20180625231209016, lastConsumeTimestamp=20180625231149565]
mo@mo-x:~/rocket-mq$ ./bin/mqadmin consumerProgress -n localhost:9876 -g "GroupA"
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
#Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime
%RETRY%GroupA mo-x 0 0 0 172.17.0.1 0 1970-01-01 08:00:00
TopicA mo-x 0 25 25 172.17.0.1 0 2018-06-25 23:11:29
TopicA mo-x 1 25 25 172.17.0.1 0 2018-06-25 23:11:29
TopicA mo-x 2 25 25 172.17.0.1 0 2018-06-25 23:11:29
TopicA mo-x 3 25 25 172.17.0.1 0 2018-06-25 23:11:29
測試結(jié)果總結(jié):
1.生產(chǎn)者發(fā)送了100條TagA消息到TopicA
2.消費者A和消費者B都在GroupA中涂身,都訂閱TopicA
3.消費者A訂閱TagA,消費者B訂閱TagB
4.消費者A收到了部分消息
5.消費者A分配到了兩個GroupA-TopicA的隊列
6.消費者B分配到了兩個GroupA-TopicA的隊列
總結(jié):
Tag對同組同Topic的消費者有影響搓蚪,當(dāng)存在不同Tag的時候蛤售,會導(dǎo)致消費混亂,比如TagA的消息被TagB的消費者消費了妒潭。