在上一章節(jié)中,我們講解了RocketMQ的基本介紹春缕,作為MQ最重要的就是消息的使用了盗胀,今天我們就來帶大家如何玩轉MQ的消息。
消息中間件锄贼,英文Message Queue票灰,簡稱MQ。它沒有標準定義,一般認為:消息中間件屬于分布式系統(tǒng)中一個子系統(tǒng)屑迂,關注于數據的發(fā)送和接收浸策,利用高效可靠的異步消息傳遞機制對分布式系統(tǒng)中的其余各個子系統(tǒng)進行集成。
高效: 對于消息的處理處理速度快惹盼,RocketMQ可以達到單機10萬+的并發(fā)庸汗。
可靠: 一般消息中間件都會有消息持久化機制和其他的機制確保消息不丟失。
異步: 指發(fā)送完一個請求手报,不需要等待返回夫晌,隨時可以再發(fā)送下一個請求,既不需要等待昧诱。
消息中間件不生產消息晓淀,只是消息的搬運工。
首先Message包含的內容主要有幾個方面組成:id(MQ自動生成)盏档、Topic凶掰、tag、proerties蜈亩、內容懦窘。
消息的發(fā)送分為:
- 普通消息
- 順序消息
- 延時消息
- 批量消息
- 分布式消息
普通消息
普通消息的發(fā)送方式主要有三種:發(fā)送同步消息、發(fā)送異步消息稚配、單向發(fā)送
我們可以先使用 RocketMQ
提供的原生客戶端的API畅涂,在 SpringBoot、SpringCloudStream
也進行了集成道川,但本質上這些也是基于原生API的封裝午衰,所以我們只需要掌握原生API的時候,其他的也就無師自通了冒萄。
想要使用 RocketMQ
中的API臊岸,就需要先導入對應的客戶端依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
消息發(fā)送者的步驟分為:
- 創(chuàng)建消息生產者 producer,執(zhí)行生產者組名
- 指定Nameserver地址
- 啟動producer
- 創(chuàng)建消息對象尊流,指定Topic帅戒、Tag和消息體
- 發(fā)送消息
- 關閉生產者producer
消息消費者的步驟分為:
- 創(chuàng)建消費者 Consumer,指定消費者組名
- 指定Nameserver地址
- 訂閱主題Topic和Tag
- 設置回調函數崖技,處理消息
- 啟動消費者consumer
發(fā)送同步消息
發(fā)送同步消息是說消息發(fā)送方發(fā)出數據后逻住,同步等待,一直等收到接收方發(fā)回響應之后才發(fā)下一個請求迎献。這種可靠性同步地發(fā)送方式使用的比較廣泛瞎访,比如:重要的消息通知,短信通知忿晕。
流程如下所示:
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 同步發(fā)送
*/
public class SyncProducer {
public static void main(String[] args) throws Exception{
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 設置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//producer.setSendLatencyFaultEnable(true);
// 啟動Producer實例
producer.start();
for (int i = 0; i < 10; i++) {
// 創(chuàng)建消息装诡,并指定Topic银受,Tag和消息體
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發(fā)送消息到一個Broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//如果不再發(fā)送消息,關閉Producer實例鸦采。
producer.shutdown();
}
}
響應結果如下所示:
msgId: 消息的全局唯一標識(RocketMQ的ID生成是使用機器IP和消息偏移量的組成)宾巍,由消息隊列 MQ 系統(tǒng)自動生成,唯一標識某條消息渔伯。
sendStatus: 發(fā)送的標識:成功顶霞,失敗等
queueId: queueId是Topic的分區(qū);Producer發(fā)送具體一條消息的時锣吼,對應選擇的該Topic下的某一個Queue的標識ID选浑。
queueOffset: Message queue是無限長的數組。一條消息進來下標就會漲1,而這個數組的下標就是queueOffset玄叠,queueOffset是從0開始遞增古徒。
在上面代表的是四個queue,而maxOffset代表我們發(fā)送消息的數量读恃,之前發(fā)送過消息隧膘,所以大家現在看到的數量是17、18...這種寺惫,當你在運行一次發(fā)送消息時疹吃,就會看到十條消息會分布在不同機器上
發(fā)送異步消息
異步消息通常用在對響應時間敏感的業(yè)務場景,即發(fā)送端不能容忍長時間地等待Broker的響應西雀。消息發(fā)送方在發(fā)送了一條消息后萨驶,不等接收方發(fā)回響應,接著進行第二條消息發(fā)送艇肴。發(fā)送方通過回調接口的方式接收服務器響應腔呜,并對響應結果進行處理。
流程如下:
package com.muxiaonong.normal;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 異步發(fā)送--生產者
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception{
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 設置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 啟動Producer實例
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
// 創(chuàng)建消息豆挽,并指定Topic育谬,Tag和消息體
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收異步返回結果的回調
producer.send(msg, new SendCallback() {
//發(fā)送成功
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
}
//發(fā)送異常
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
// 如果不再發(fā)送消息,關閉Producer實例帮哈。
producer.shutdown();
}
}
發(fā)送成功報文:
我們在dashbord下看到已經成功拿到消息了
單向發(fā)送
這種方式不需要我們特別關心發(fā)送結果的場景,比如日志發(fā)送锰镀、單向發(fā)送特點是發(fā)送方只需要負責發(fā)送消息娘侍,不需要等待服務器回應且沒有回調函數觸發(fā),發(fā)送請求不需要等待應答泳炉,只管發(fā)憾筏,這種放松方式過程耗時很短,一般在微妙級別花鹅。
流程如下:
package com.muxiaonong.normal;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 單向發(fā)送
*/
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 實例化消息生產者Producer對象
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 設置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 啟動Producer實例
producer.start();
for (int i = 0; i < 10; i++) {
// 創(chuàng)建消息氧腰,并指定Topic,Tag和消息體
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發(fā)送單向消息,沒有任何返回結果
producer.sendOneway(msg);
}
// 如果不再發(fā)送消息古拴,關閉Producer實例箩帚。
producer.shutdown();
}
}
返回報文:
這種發(fā)送方式,我們客戶端不會感受到發(fā)送結果黄痪,發(fā)送完成之后紧帕,我們并不知道到底有沒有發(fā)送成功,我們只能在 top status 中去查看
普通消息發(fā)送對比:
發(fā)送方式 | 發(fā)送TPS | 可靠性 | 結果反饋 | 使用場景 |
---|---|---|---|---|
同步消息發(fā)送 | 快 | 不丟失 | 有 | 重要通知(郵件桅打、短信通知是嗜、)等 |
異步消息發(fā)送 | 快 | 不丟失 | 有 | 用戶文件上傳自動解析服務,完成后通知其結果 |
單向發(fā)送 | 超快 | 可能丟失 | 無 | 適用于 耗時非常短挺尾,但是對于可靠性要求不高的場景鹅搪,比如日志收集 |
消息的消費方式
普通消息的消費方式主要有三種:集群消費、廣播消費
一遭铺、集群消費模式
集群消費方式下丽柿,一個分組(Group) 下的多個消費者共同消費隊列消息,每一個消費者出來處理的消息不一樣掂僵,一個Consumer Group 中的各個Consumer 實例分攤去消費消息航厚,一條消息只會投遞到一個Consumer Group 下的一個實例,如果一個Topic有三個隊列锰蓬,其中一個 Consumer Group 有三個實例幔睬,那么每個實例只會消費其中一個隊列,集群消費模式是消費者默認的消費方式芹扭。
實例代碼:
package com.muxiaonong.normal.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 集群消費模式
*/
public class BalanceConsumer {
public static void main(String[] args) throws Exception {
// 實例化消費者,指定組名: TopicTest 10條消息 group_consumer 麻顶, lijin 8(2)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 訂閱Topic
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//consumer.setConsumeFromWhere();
//集群模式消費
consumer.setMessageModel(MessageModel.CLUSTERING);
//取消
consumer.unsubscribe("TopicTest");
//再次訂閱Topic即可
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
// 注冊回調函數,處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
Thread.sleep(1000);
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消息者
consumer.start();
//注銷Consumer
//consumer.shutdown();
System.out.printf("Consumer Started.%n");
}
}
我們啟動兩個實例對象舱卡,分別為BalanceConsumer2和BalanceConsumer
辅肾,我們再去生產者生產十條消息后,我們再去看consumer轮锥,分別均攤了這十條消息
二矫钓、廣播消費模式
廣播消費模式中消息將對一個Consumer Group下的各個Consumer實例都投遞一遍。即使這些 Consumer屬于同一個Consumer Group舍杜,消息也會被Consumer Group 中的每個Consumer都消費一次新娜。因為一個消費組下的每個消費者實例都獲取到了topic下面的每個Message Queue去拉取消費。所以消息會投遞到每個消費者實例既绩。每一個消費者下面的消費實例概龄,都會去拿到我們Topic下的每一條消息,但是這種消費進度的保存饲握,不會放在broker里面私杜,而是持久化到我們的本地實例
流程圖如下:
具體代碼
package com.muxiaonong.normal.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 廣播消費模式
*/
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
// 實例化消費者,指定組名: TopicTest 10條消息 group_consumer 蚕键, lijin 8(2)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 訂閱Topic
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//consumer.setConsumeFromWhere();
//廣播模式消費
consumer.setMessageModel(MessageModel.BROADCASTING);
//取消
consumer.unsubscribe("TopicTest");
//再次訂閱Topic即可
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
// 注冊回調函數,處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
Thread.sleep(1000);
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消息者
consumer.start();
//注銷Consumer
//consumer.shutdown();
System.out.printf("Consumer Started.%n");
}
}
我們先啟動 BroadcastConsumer和BroadcastConsumer2
衰粹,生產十條消息以后锣光,我們會看到不管是哪個消費者,都會接收到十條消息寄猩,這個就是廣播消費模式
消息消費的權衡
負載均衡模式: 消費端集群化部署嫉晶,每條消息只需要被處理一次,由于消費進度在服務端維護田篇,可靠性更高替废。
集群消費模式下,不能保證每一次失敗重投的消息路由到同一臺機器上泊柬,因此處理消息時不應該做任何確定性假設椎镣。每一條消息都只會被分發(fā)到一臺機器上處理,如果需要被集群下的每一臺機器都處理兽赁,只能使用廣播模式状答。
廣播模式: 每條消息都需要被相同邏輯的多臺機器處理,消費進度在客戶端維護刀崖,出現重復的概率稍大于集群模式惊科。
廣播模式下,消息隊列 RocketMQ 保證每條消息至少被每臺客戶端消費一次亮钦,但是并不會對消費失敗的消息進行失敗重投馆截,因此需要關注消費失敗的情況,客戶端每一次重啟都會從最新消息消費蜂莉±ⅲ客戶端在被停止期間發(fā)送至服務端的消息會被自動跳過,這一點是需要注意的地方
每條消息都會被大量的客戶端重復處理映穗,因此推薦盡可能使用集群模式窖张。目前僅 Java 客戶端支持廣播模式,不支持順序消息且服務端不維護消費進度蚁滋,所以消息隊列 RocketMQ 控制臺不支持消息堆積查詢宿接、消息堆積報警和訂閱關系查詢功能。
順序消息
順序消息指的是可以按照消息的發(fā)送順序來消費(FIFO)辕录。RocketMQ
可以嚴格的保證消息有序澄阳,可以分為 分區(qū)有序 或者 全局有序。
生產消息時在默認的情況下消息發(fā)送會采取 Round Robin
輪詢方式把消息發(fā)送到不同的 queue ( 分區(qū)隊列)踏拜;而消費消息的時候從多個 queue 上拉取消息,這種情況發(fā)送和消費是不能保證順序低剔。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個 queue 中速梗,消費的時候只從這個 queue 上依次拉取肮塞,則就保證了順序。當發(fā)送和消費參與的 queue 只有一個姻锁,則是全局有序枕赵;如果多個 queue 參與,則為分區(qū)有序位隶,即相對每個 queue 拷窜,消息都是有序的。
全局有序
全局有序主要控制在于創(chuàng)建Topic指定只有一個隊列涧黄,同步確保生產者與消費者都只有一個實例進行即可
分區(qū)有序
在電商業(yè)務場景中篮昧,訂單的流程是:創(chuàng)建、付款笋妥、推送懊昨、完成。 在加入 RocketMQ
后春宣,一個訂單會分別產生對于這個訂單的創(chuàng)建酵颁、付款、推送月帝、完成等消息躏惋,如果我們把所有消息全部送入到 RocketMQ
中的一個主題中,如何實現針對一個訂單的消息順序性呢嚷辅!如下圖:
要完成分區(qū)有序性簿姨,在生產者環(huán)節(jié)使用自定義的消息隊列選擇策略,確保訂單號尾數相同的消息會被先后發(fā)送到同一個隊列中(案例中主題有3個隊列潦蝇,生產環(huán)境中可設定成10個滿足全部尾數的需求)款熬,然后再消費端開啟負載均衡模式,最終確保一個消費者拿到的消息對于一個訂單來說是有序的攘乒。
/** @Author 牧小農
* @Description // 訂單消息生產
* @Date 16:47 2022/8/20
* @Param
* @return
**/
public class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 訂單列表
List<Order> orderList = new OrderProducer().buildOrders();
for (int i = 0; i < orderList.size(); i++) {
String body = orderList.get(i).toString();
Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根據訂單id選擇發(fā)送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//訂單id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 訂單
*/
private static class Order {
private long orderId;
private String desc;
.....
}
/**
* 生成模擬訂單數據 3個訂單 每個訂單4個狀態(tài)
* 每個訂單 創(chuàng)建->付款->推送->完成
*/
private List<Order> buildOrders() {
List<Order> orderList = new ArrayList<Order>();
Order orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
//...............
return orderList;
}
}
訂單消費者
/** @Author 牧小農
* @Description // 訂單消息消費
* @Date 16:46 2022/8/20
* @Param
* @return
**/
public class OrderConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer2");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("PartOrder", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每個queue有唯一的consume線程來消費, 訂單對每個queue(分區(qū))有序
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ",queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模擬業(yè)務邏輯處理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//一會再處理這批消息贤牛,而不是放到重試隊列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
消息生產者
消息消費者:
我們可以看到消息按照順序進行了消費。使用順序消息:首先要保證消息是有序進入MQ的则酝,消息放入MQ之前殉簸,對id等關鍵字進行取模,放入指定 messageQueue
沽讹,同時 consumer
消費消息失敗時般卑,不能返回 reconsume——later
,這樣會導致亂序爽雄,所以應該返回 suspend_current_queue_a_moment
,意思是先等一會蝠检,一會兒再處理這批消息,而不是放到重試隊列里挚瘟。
延時消息
Producer 將消息發(fā)送到消息隊列 RocketMQ
服務端叹谁,但并不期望這條消息立馬投遞(被消費者消費)饲梭,而是延遲一定時間后才投遞到 Consumer 進行消費,該消息即延時消息焰檩。
消息生產和消費有時間窗口要求的場景下憔涉,比如在電商交易中超時未支付關閉訂單的場景,在訂單創(chuàng)建時向 RocketMQ
發(fā)送一條延時消息析苫。這條消息將會在30分鐘以后投遞給消費者兜叨,消費者收到此消息后需要判斷對應的訂單是否已完成支付。如支付未完成衩侥,則取消訂單国旷、釋放庫存。如已完成支付則忽略顿乒。
Apache RocketMQ
目前只支持固定精度(MQ自己規(guī)定的時間段)的定時消息议街,因為如果要支持任意的時間精度,在 Broker 層面璧榄,必須要做消息排序特漩,如果再涉及到持久化,消息排序不可避免的產生巨大性能開銷骨杂。(RocketMQ
的商業(yè)版本 Aliware MQ
提供了任意時刻的定時消息功能涂身,Apache的 RocketMQ
并沒有,阿里并沒有開源)
Apache RocketMQ
發(fā)送延時消息是設置在每一個消息體上的,在創(chuàng)建消息時設定一個延時時間長度搓蚪,消息將從當前發(fā)送時間點開始延遲固定時間之后才開始投遞蛤售。
RocketMQ
延時消息的延遲時長不支持隨意時長的延遲,是通過特定的延遲等級來指定的妒潭。默認支持18個等級的延遲消息悴能,延時等級定義在 RocketMQ
服務端的 MessageStoreConfig
類中。
具體如下所示:
Level | 延遲時間 | Level | 延遲時間 |
---|---|---|---|
1 | 1S | 10 | 6m |
2 | 5S | 11 | 7m |
3 | 10S | 12 | 8m |
4 | 30S | 13 | 9m |
5 | 1m | 14 | 10m |
6 | 2m | 15 | 20m |
7 | 3m | 16 | 30m |
8 | 4m | 17 | 1h |
9 | 5m | 18 | 2h |
延時消息生產者:
/** @Author 牧小農
* @Description // 延時消息-生產者
* @Date 10:00 2022/8/21
* @Param
* @return
**/
public class ScheduledProducer {
public static void main(String[] args) throws Exception {
// 實例化一個生產者來產生延時消息
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
// 設置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 啟動Producer實例
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
// 設置延時等級4,這個消息將在10s之后投遞給消費者(詳看delayTimeLevel)
// delayTimeLevel:(1~18個等級)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(3);
// 發(fā)送消息
producer.send(message);
}
// 關閉生產者
producer.shutdown();
}
}
延時消息消費者:
/** @Author 牧小農
* @Description // 延時消息-消費者
* @Date 10:00 2022/8/21
* @Param
* @return
**/
public class ScheduledConsumer {
public static void main(String[] args) throws Exception {
// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 訂閱Topics
consumer.subscribe("ScheduledTopic", "*");
// 注冊消息監(jiān)聽者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (message.getStoreTimestamp()-message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者
consumer.start();
}
}
當我們生產消息后雳灾,查看消費者信息漠酿,延時10秒后,消息才發(fā)送完成后谎亩,之后進行了消息的消費
批量消息
批量消息發(fā)送: 能顯著提高傳遞小消息的性能炒嘲。限制是這些批量消息有相同的 topic贬养,相同的 waitStoreMsgOK闸拿,而且不能是延時消息。此外秋泄,這一批消息的總大小不應超過 4MB阱持。批量消息是一個 Collection
集合夭拌,所以送入消息只要是集合就行。
批量接收消息: 能提高傳遞小消息的性能,同時與順序消息配合的情況下啼止,還能根據業(yè)務主鍵對順序消息進行去重(是否可去重道逗,需要業(yè)務來決定),減少消費者對消息的處理献烦。
如果我們需要發(fā)送10萬元素的數組,怎么快速發(fā)送完卖词?這里可以使用批量發(fā)送巩那,同時每一批控制在1M左右確保不超過消息大小限制。批量切分發(fā)送.
批量消息生產者:
/** @Author 牧小農
* @Description // 批量消息-生產者 list不要超過4m
* @Date 10:38 2022/8/21
* @Param
* @return
**/
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 設置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 啟動Producer實例
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 2".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 3".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID004", "Hello world 4".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID005", "Hello world 5".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID006", "Hello world 6".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
producer.shutdown();
e.printStackTrace();
}
// 如果不再發(fā)送消息此蜈,關閉Producer實例即横。
producer.shutdown();
}
}
批量消息消費者
/** @Author 牧小農
* @Description // 批量消息-消費者
* @Date 10:38 2022/8/21
* @Param
* @return
**/
public class BatchComuser {
public static void main(String[] args) throws Exception {
// 實例化消息生產者,指定組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 訂閱Topic
consumer.subscribe("BatchTest", "*");
//負載均衡模式消費
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注冊回調函數,處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
這樣我們就實現了批量消息的發(fā)送裆赵,如果我們消息超過了东囚,4M的時候,這個時候可以考慮消息的分割战授,具體代碼如下:
public class ListSplitter implements Iterator<List<Message>> {
private int sizeLimit = 1000 * 1000;//1M
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) { this.messages = messages; }
@Override
public boolean hasNext() { return currIndex < messages.size(); }
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的開銷20字節(jié)
if (tmpSize > sizeLimit) {
if (nextIndex - currIndex == 0) {//單個消息超過了最大的限制(1M)页藻,否則會阻塞進程
nextIndex++; //假如下一個子列表沒有元素,則添加這個子列表然后退出循環(huán),否則退出循環(huán)
}
break;
}
if (tmpSize + totalSize > sizeLimit) { break; }
else { totalSize += tmpSize; }
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
消息的過濾
效率的過濾主要分為兩種: Tag過濾和SQL語法過濾
在實際的開發(fā)應用中,對于一類消息盡可能使用一個Topic進行存儲植兰,但在消費時需要選擇想要的消息份帐,這時可以使用 RocketMQ
的消息過濾功能,具體實現是利用消息的Tag和Key楣导。
Key 一般用于消息在業(yè)務層面的唯一標識废境。對發(fā)送的消息設置好 Key,根據這個 Key 來查找消息筒繁。比如消息異常噩凹,消息丟失,進行查找會很方便毡咏。RocketMQ
會創(chuàng)建專門的索引文件驮宴,用來存儲 Key與消息的映射,由于底層實現是 Hash 索引血当,應盡量使 Key唯一幻赚,避免潛在的哈希沖突。
Tag: 可以理解為是二級分類臊旭。以電商交易平臺為例落恼,訂單消息和支付消息屬于不同業(yè)務類型的消息,分別創(chuàng)建 OrderTopic 和PayTopic
离熏,其中訂單消息根據不同的商品品類以不同的 Tag 再進行細分佳谦,如手機類、家電類滋戳、男裝類钻蔑、女裝類啥刻、化妝品類,最后它們都被各個不同的系統(tǒng)所接收咪笑。通過合理的使用 Topic 和 Tag可帽,可以讓業(yè)務結構清晰,更可以提高效率窗怒。
Key和Tag的主要差別是使用場景不同映跟,Key主要用于通過命令行命令查詢消息,而Tag用于在消息端的代碼中扬虚,用來進行服務端消息過濾努隙。
Tag過濾
使用Tag過濾的方式是在消息生產時傳入感興趣的Tag標簽,然后在消費端就可以根據Tag來選擇您想要的消息辜昵。具體的操作是在創(chuàng)建Message的時候添加荸镊,一個Message只能有一個Tag。
使用案例:
/** @Author 牧小農
* @Description // tag過濾-生產者
* @Date 10:51 2022/8/21
* @Param
* @return
**/
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 設定三種標簽
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消費者
/** @Author 牧小農
* @Description // tag過濾-消費者
* @Date 10:51 2022/8/21
* @Param
* @return
**/
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
//指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
//只有TagA 或者TagB 的消息
consumer.subscribe("TagFilterTest", "TagA || TagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : "
+ msgPro +" ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
我們生成了 TagA\b\c 三條消息堪置,但是消費者只想接收 TagA或B躬存, 那么我們可以在消費者端進行消息過濾
Tag過濾的形式非常簡單。
|| 代表或 * 代表所有
因此Tag過濾對于復雜的場景可能不能進行覆蓋晋柱。在這種情況下优构,可以使用SQL表達式篩選消息。
SQL語法
SQL基本語法:
- 數值比較: >雁竞,>=钦椭,<,<=碑诉,BETWEEN彪腔,=
- 字符比較: =,<>进栽,IN
- 非空比較: IS NULL 或者 IS NOT NULL
- 邏輯符號: AND德挣,OR,NOT
- 常量支持類型為: 數值(123快毛,3.1415)格嗅、字符('abc')單引號包裹起來、NULL唠帝、布爾值(TRUE 或 FALSE)
Sql過濾需要 Broker
開啟這項功能屯掖,需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然后重啟Broker服務襟衰。
消息生產者贴铜,發(fā)送消息時加入消息屬性,通過 putUserProperty
來設置消息的屬性,生產者發(fā)送10條消息绍坝,
生產者:
/** @Author 牧小農
* @Description // sql過濾 -消息生產者(加入消息屬性)
* @Date 11:04 2022/8/21
* @Param
* @return
**/
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 設置SQL過濾的屬性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消費者:
/** @Author 牧小農
* @Description // sql過濾-消費者
* @Date 11:04 2022/8/21
* @Param
* @return
**/
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("SqlFilterTest",
//bySql:通過 SQL過濾
// 1. TAGS不為空且TAGS 在('TagA', 'TagB')
// 2. 同時 a 不等于空并且a在0-3之間
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro +" ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消費結果:按照Tag和SQL過濾消費3條消息徘意。
第一個消息是TagA ,消息的屬性(a)是3
第一個消息是TagB ,消息的屬性(a)是1
第一個消息是TagA ,消息的屬性(a)是0
注意哦!
公眾號后臺回復:rocketMQ 獲取案例源碼
到這里有關于RocketMQ基本消息的講解轩褐,就結束了椎咧,雖然不舍,但是可以關注我灾挨,我們下期見邑退。你是不怕被打嗎?(手動狗頭)
在上面的消息類型講述中劳澄,可以滿足絕大部分業(yè)務場景,同學們可以根據自己實際的業(yè)務場景蜈七,去選擇合適的消息類型方式進行學習和了解秒拔,關注我,后續(xù)精彩內容第一時間收到飒硅,下期砂缩,小農會帶大家了解關于分布式事務消息和 Request-Reply
消息以及后續(xù)RocketMQ集群架構方面的知識。本篇點贊過百三娩,就是中暑庵芭,也出下篇。
我是牧小農怕什么無窮雀监,進一步有進一步的歡喜双吆,大家加油!
關注我会前,下期更精彩好乐。