RocketMQ 提供了兩種順序級別:
- 普通順序消息:Producer 將相關聯(lián)的消息發(fā)送到相同的消息隊列妻导。
- 完全嚴格順序:在【普通順序消息】的基礎上,Consumer 嚴格順序消費怀各。
官方文檔是這么描述的:
消息有序倔韭,指的是一類消息消費時,能按照發(fā)送的順序來消費瓢对。例如:一個訂單產(chǎn)生了三條消息分別是訂單創(chuàng)建寿酌、訂單付款、訂單完成硕蛹。消費時要按照這個順序消費才能有意義醇疼,但是同時訂單之間是可以并行消費的。RocketMQ 可以嚴格的保證消息有序法焰。
順序消息分為全局順序消息與分區(qū)順序消息秧荆,全局順序是指某個 Topic 下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可埃仪。
- 全局順序:對于指定的一個 Topic乙濒,所有消息按照嚴格的先入先出(FIFO)的順序進行發(fā)布和消費。適用場景:性能要求不高卵蛉,所有的消息嚴格按照 FIFO 原則進行消息發(fā)布和消費的場景
- 分區(qū)順序:對于指定的一個 Topic颁股,所有消息根據(jù) Sharding key 進行區(qū)塊分區(qū)。 同一個分區(qū)內(nèi)的消息按照嚴格的 FIFO 順序進行發(fā)布和消費傻丝。Sharding key 是順序消息中用來區(qū)分不同分區(qū)的關鍵字段甘有,和普通消息的 Key 是完全不同的概念。適用場景:性能要求高葡缰,以 Sharding key 作為分區(qū)字段亏掀,在同一個區(qū)塊中嚴格的按照 FIFO 原則進行消息發(fā)布和消費的場景。
注意泛释,分區(qū)順序就是普通順序消息滤愕,全局順序就是完全嚴格順序。
下面胁澳,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例该互。
6.1 搭建生產(chǎn)者
來演示發(fā)送順序消息。
6.1.1 配置文件
修改前面項目的 [application.yml
]配置文件韭畸,添加 partition-key-expression
配置項宇智,設置 Producer 發(fā)送順序消息的 Sharding key。完整配置如下:
spring:
application:
name: stream-rocketmq-producer-application
cloud:
# Spring Cloud Stream 配置項胰丁,對應 BindingServiceProperties 類
stream:
# Binding 配置項随橘,對應 BindingProperties Map
bindings:
erbadagang-output:
destination: ERBADAGANG-TOPIC-01 # 目的地。這里使用 RocketMQ Topic
content-type: application/json # 內(nèi)容格式锦庸。這里使用 JSON
# Producer 配置項机蔗,對應 ProducerProperties 類
producer:
partition-key-expression: payload['id'] # 分區(qū) key 表達式。該表達式基于 Spring EL,從消息中獲得分區(qū) key萝嘁。
# Spring Cloud Stream RocketMQ 配置項
rocketmq:
# RocketMQ Binder 配置項梆掸,對應 RocketMQBinderConfigurationProperties 類
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Map
bindings:
erbadagang-output:
# RocketMQ Producer 配置項牙言,對應 RocketMQProducerProperties 類
producer:
group: test # 生產(chǎn)者分組
sync: true # 是否同步發(fā)送消息酸钦,默認為 false 異步。
server:
port: 18080
① partition-key-expression
配置項咱枉,該表達式基于 Spring EL卑硫,從消息中獲得 Sharding key。
這里蚕断,我們設置該配置項為 payload['id']
欢伏,表示從 Spring Message 的 payload 的 id
。稍后我們發(fā)送的消息的 payload 為 Demo01Message亿乳,那么 id
就是 Demo01Message.id
硝拧。
如果我們想從消息的 headers 中獲得 Sharding key,可以設置為 headers['partitionKey']
风皿。
② Spring Cloud Stream 使用 PartitionHandler 進行 Sharding key 的獲得與計算河爹,最終 Sharding key 的結果為 key.hashCode() % partitionCount
。
在獲取到 Sharding key 之后桐款,Spring Cloud Alibaba Stream RocketMQ 提供的 PartitionMessageQueueSelector 選擇消息發(fā)送的隊列咸这。
我們以發(fā)送一條 id
為 1 的 Demo01Message 消息為示例,最終會發(fā)送到對應 RocketMQ Topic 的隊列為 1魔眨。計算過程如下:
// 第一步媳维,PartitionHandler 使用 `partition-key-expression` 表達式,從 Message 中獲得 Sharding key
key => 1
// 第二步遏暴,PartitionHandler 計算最終的 Sharding key
// 默認情況下侄刽,每個 RocketMQ Topic 的隊列總數(shù)是 4。
key => key.hashCode() % partitionCount = 1.hashCode() % 4 = 1 % 4 = 1
// 第三步朋凉,PartitionMessageQueueSelector 獲得對應 RocketMQ Topic 的隊列
隊列 => queues.get(key) = queues.get(1)
這樣州丹,我們就能保證相同 Sharding Key 的消息,發(fā)送到相同的對應 RocketMQ Topic 的隊列中杂彭。當前墓毒,前提是該 Topic 的隊列總數(shù)不能變噢,不然計算的 Sharding Key 會發(fā)生變化亲怠。
6.1.2 Demo01Controller
增加發(fā)送 3 條順序消息的 HTTP 接口所计。代碼如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.controller;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.Demo01Message;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
@RestController
@RequestMapping("/demo01")
public class Demo01Controller {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private MySource mySource;//<1>
@GetMapping("/send_orderly")
public boolean sendOrderly() {
// 發(fā)送 3 條相同 id 的消息
int id = new Random().nextInt();
for (int i = 0; i < 3; i++) {
// 創(chuàng)建 Message
Demo01Message message = new Demo01Message().setId(id);
// 創(chuàng)建 Spring Message 對象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 發(fā)送消息
mySource.erbadagangOutput().send(springMessage);
}
return true;
}
}
每次發(fā)送的 3 條消息使用相同的 id,配合上我們使用它作為 Sharding key团秽,就可以發(fā)送對應 Topic 的相同隊列中主胧。
另外叭首,發(fā)送的雖然是順序消息,但是和發(fā)送普通消息的代碼是一模一樣的踪栋。
6.2 搭建消費者
演示順序消費消息焙格。
8.2.1 配置文件
修改 [application.yml
]配置文件,添加 orderly
配置項己英,設置 Consumer 順序消費消息间螟。完整配置如下:
spring:
application:
name: erbadagang-consumer-application
cloud:
# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類
stream:
# Binding 配置項损肛,對應 BindingProperties Map
bindings:
erbadagang-input:
destination: ERBADAGANG-TOPIC-01 # 目的地。這里使用 RocketMQ Topic
content-type: application/json # 內(nèi)容格式荣瑟。這里使用 JSON
group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消費者分組,命名規(guī)則:組名+topic名
# Spring Cloud Stream RocketMQ 配置項
rocketmq:
# RocketMQ Binder 配置項治拿,對應 RocketMQBinderConfigurationProperties 類
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Map
bindings:
erbadagang-input:
# RocketMQ Consumer 配置項笆焰,對應 RocketMQConsumerProperties 類
consumer:
enabled: true # 是否開啟消費劫谅,默認為 true
broadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費
orderly: true # 是否順序消費嚷掠,默認為 false 并發(fā)消費捏检。
server:
port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者
6.2.2 Demo01Consumer
[Demo01Consumer]類不皆,在消費消息時贯城,打印出消息所在隊列編號和線程編號,這樣我們通過隊列編號可以判斷消息是否順序發(fā)送霹娄,通過線程編號可以判斷消息是否順序消費能犯。代碼如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@StreamListener(MySink.ERBADAGANG_INPUT)
public void onMessage(Message<?> message) {
logger.info("[onMessage][線程編號:{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
}
}
6.3 簡單測試
① 執(zhí)行 ConsumerApplication,啟動消費者的實例犬耻。
② 執(zhí)行 ProducerApplication踩晶,啟動生產(chǎn)者的實例。
之后枕磁,請求 http://127.0.0.1:18080/demo01/send_orderly 接口渡蜻,發(fā)送順序消息。IDEA 控制臺輸出日志如下:
2020-08-06 17:31:17.892 INFO 16556 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號:70 消息內(nèi)容:GenericMessage [payload={"id":-1387755989}, headers={rocketmq_QUEUE_ID=1, rocketmq_RECONSUME_TIMES=0, scst_partition=1, rocketmq_BORN_TIMESTAMP=1596706229166, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=C0A82B7C341418B4AAC21D818BAE0001, rocketmq_SYS_FLAG=0, id=6386b54a-4e37-b884-3010-485308229a10, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, timestamp=1596706277892}]]
2020-08-06 17:31:17.892 INFO 16556 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號:70 消息內(nèi)容:GenericMessage [payload={"id":-1387755989}, headers={rocketmq_QUEUE_ID=1, rocketmq_RECONSUME_TIMES=0, scst_partition=1, rocketmq_BORN_TIMESTAMP=1596706229244, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=C0A82B7C341418B4AAC21D818BFC0002, rocketmq_SYS_FLAG=0, id=d9dd1b0d-2565-7f52-3825-786574b2fc1b, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, timestamp=1596706277892}]]
id 為 -1387755989 的消息被發(fā)送到 RocketMQ 消息隊列編號為 rocketmq_QUEUE_ID=1计济,并且在線程編號為 70 的線程中消費茸苇。
底線
本文源代碼使用 Apache License 2.0開源許可協(xié)議,這里是本文源碼Gitee地址峭咒,可通過命令git clone+地址
下載代碼到本地税弃,也可直接點擊鏈接通過瀏覽器方式查看源代碼。