rocketmq 消費消息大致有以下幾種場景類型
亂序消費,消息被亂序的發(fā)送的隊列茁影,消費者在消費各個隊列時是并行消費宙帝,所以不能保證消息的有序性
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TestTopic", "*");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
順序消息,發(fā)送到同一個隊列的消息需要保證有序消費
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TestTopic", "*");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeOrderlyContext context){
for(MessageExt messageExt : msgs){
System.out.println(new String(messageExt.getBody());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
順序消息用的比較多的是訂單系統(tǒng)募闲,訂單狀態(tài)之間的扭轉(zhuǎn)需要保證有序步脓,所以通常同一個訂單ID發(fā)送相關(guān)的狀態(tài)消息需要保證有序
集群消費和廣播消費的區(qū)別
集群消費,同一個消費組浩螺,均勻的消費該topic下的消息靴患,該消費組下所有的消費者消費的總消息等于該topic下的消息
廣播消費,同一個消費組下的每個消費者都消費該topic下的所有消息