順序消息
之前我本地使用的client版本是3.6.2的,但是公司服務(wù)器上安得是3.2.6的版本冈止。導(dǎo)致我測試順序消息一直不成功耕挨。后來將client版本降低到3.2.6終于測試成功。所以在使用時(shí)谈火,還是要注意一下版本的匹配,否則可能有詭異的錯(cuò)誤舌涨。
producer
package com.yunsheng.orderExample;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
int orderId = 0;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicOrder","TagA", "KEY" + i,
("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
解析:
要保證消息的順序性糯耍,在發(fā)送消息時(shí),這一組消息必須發(fā)送到同一個(gè)queue中囊嘉。(一個(gè)broker默認(rèn)4個(gè)queue)温技。
在上面的代碼中,orderId表示一個(gè)訂單號(hào)扭粱。
在send方法中實(shí)現(xiàn)了一個(gè)選擇器舵鳞。這個(gè)選擇器的作用就是根據(jù)orderId對(duì)queue的數(shù)量取模,保證同一個(gè)orderId的所有消息落到同一個(gè)queue上琢蛤。
consumer
package com.yunsheng.orderExample;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.yunsheng.Factory;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrder", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random(10);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()) + "%n");
}
try {
Thread.sleep(random.nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
解析:
上面保證了生產(chǎn)端的消息順序性蜓堕,那么消費(fèi)端必須保證消息被順序的消費(fèi)。使用MessageListenerOrderly博其。作用是套才,必須等前面的消息消費(fèi)完,后面的消息才能進(jìn)行消費(fèi)慕淡。
在代碼里加了sleep驗(yàn)證背伴。
結(jié)果:
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ 1
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ 2
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ 3
ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ 4
ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ 5
ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ 6
ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ 7
ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ 8
ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ 9
可以看到并不是單線程處理的,但是保證了順序消費(fèi)。