RocketMQ 與 Spring Cloud Stream整合(六、順序消息)

RocketMQ 提供了兩種順序級別:

  • 普通順序消息:Producer 將相關聯(lián)的消息發(fā)送到相同的消息隊列妻导。
  • 完全嚴格順序:在【普通順序消息】的基礎上,Consumer 嚴格順序消費怀各。

官方文檔是這么描述的:

消息有序倔韭,指的是一類消息消費時,能按照發(fā)送的順序來消費瓢对。例如:一個訂單產(chǎn)生了三條消息分別是訂單創(chuàng)建寿酌、訂單付款、訂單完成硕蛹。消費時要按照這個順序消費才能有意義醇疼,但是同時訂單之間是可以并行消費的。RocketMQ 可以嚴格的保證消息有序法焰。

順序消息分為全局順序消息與分區(qū)順序消息秧荆,全局順序是指某個 Topic 下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可埃仪。

  • 全局順序:對于指定的一個 Topic乙濒,所有消息按照嚴格的先入先出(FIFO)的順序進行發(fā)布和消費。適用場景:性能要求不高卵蛉,所有的消息嚴格按照 FIFO 原則進行消息發(fā)布和消費的場景
  • 分區(qū)順序:對于指定的一個 Topic颁股,所有消息根據(jù) Sharding key 進行區(qū)塊分區(qū)。 同一個分區(qū)內(nèi)的消息按照嚴格的 FIFO 順序進行發(fā)布和消費傻丝。Sharding key 是順序消息中用來區(qū)分不同分區(qū)的關鍵字段甘有,和普通消息的 Key 是完全不同的概念。適用場景:性能要求高葡缰,以 Sharding key 作為分區(qū)字段亏掀,在同一個區(qū)塊中嚴格的按照 FIFO 原則進行消息發(fā)布和消費的場景。

注意泛释,分區(qū)順序就是普通順序消息滤愕,全局順序就是完全嚴格順序。

下面胁澳,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例该互。

6.1 搭建生產(chǎn)者

來演示發(fā)送順序消息。

6.1.1 配置文件

修改前面項目的 [application.yml]配置文件韭畸,添加 partition-key-expression 配置項宇智,設置 Producer 發(fā)送順序消息的 Sharding key。完整配置如下:

spring:
  application:
    name: stream-rocketmq-producer-application
  cloud:
    # Spring Cloud Stream 配置項胰丁,對應 BindingServiceProperties 類
    stream:
      # Binding 配置項随橘,對應 BindingProperties Map
      bindings:
        erbadagang-output:
          destination: ERBADAGANG-TOPIC-01 # 目的地。這里使用 RocketMQ Topic
          content-type: application/json # 內(nèi)容格式锦庸。這里使用 JSON

          # Producer 配置項机蔗,對應 ProducerProperties 類
          producer:
            partition-key-expression: payload['id'] # 分區(qū) key 表達式。該表達式基于 Spring EL,從消息中獲得分區(qū) key萝嘁。

      # Spring Cloud Stream RocketMQ 配置項
      rocketmq:
        # RocketMQ Binder 配置項梆掸,對應 RocketMQBinderConfigurationProperties 類
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Map
        bindings:
          erbadagang-output:
            # RocketMQ Producer 配置項牙言,對應 RocketMQProducerProperties 類
            producer:
              group: test # 生產(chǎn)者分組
              sync: true # 是否同步發(fā)送消息酸钦,默認為 false 異步。

server:
  port: 18080

partition-key-expression 配置項咱枉,該表達式基于 Spring EL卑硫,從消息中獲得 Sharding key。

這里蚕断,我們設置該配置項為 payload['id']欢伏,表示從 Spring Message 的 payload 的 id。稍后我們發(fā)送的消息的 payload 為 Demo01Message亿乳,那么 id 就是 Demo01Message.id硝拧。

如果我們想從消息的 headers 中獲得 Sharding key,可以設置為 headers['partitionKey']风皿。

② Spring Cloud Stream 使用 PartitionHandler 進行 Sharding key 的獲得與計算河爹,最終 Sharding key 的結果為 key.hashCode() % partitionCount

在獲取到 Sharding key 之后桐款,Spring Cloud Alibaba Stream RocketMQ 提供的 PartitionMessageQueueSelector 選擇消息發(fā)送的隊列咸这。

我們以發(fā)送一條 id 為 1 的 Demo01Message 消息為示例,最終會發(fā)送到對應 RocketMQ Topic 的隊列為 1魔眨。計算過程如下:

// 第一步媳维,PartitionHandler 使用 `partition-key-expression` 表達式,從 Message 中獲得 Sharding key
key => 1

// 第二步遏暴,PartitionHandler 計算最終的 Sharding key
// 默認情況下侄刽,每個 RocketMQ Topic 的隊列總數(shù)是 4。
key => key.hashCode() % partitionCount = 1.hashCode() % 4 = 1 % 4 = 1

// 第三步朋凉,PartitionMessageQueueSelector 獲得對應 RocketMQ Topic 的隊列
隊列 => queues.get(key) = queues.get(1)

這樣州丹,我們就能保證相同 Sharding Key 的消息,發(fā)送到相同的對應 RocketMQ Topic 的隊列中杂彭。當前墓毒,前提是該 Topic 的隊列總數(shù)不能變噢,不然計算的 Sharding Key 會發(fā)生變化亲怠。

6.1.2 Demo01Controller

增加發(fā)送 3 條順序消息的 HTTP 接口所计。代碼如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.controller;

import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.Demo01Message;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private MySource mySource;//<1>

    @GetMapping("/send_orderly")
    public boolean sendOrderly() {
        // 發(fā)送 3 條相同 id 的消息
        int id = new Random().nextInt();
        for (int i = 0; i < 3; i++) {
            // 創(chuàng)建 Message
            Demo01Message message = new Demo01Message().setId(id);
            // 創(chuàng)建 Spring Message 對象
            Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                    .build();
            // 發(fā)送消息
            mySource.erbadagangOutput().send(springMessage);
        }
        return true;
    }

}

每次發(fā)送的 3 條消息使用相同的 id,配合上我們使用它作為 Sharding key团秽,就可以發(fā)送對應 Topic 的相同隊列中主胧。

另外叭首,發(fā)送的雖然是順序消息,但是和發(fā)送普通消息的代碼是一模一樣的踪栋。

6.2 搭建消費者

演示順序消費消息焙格。

8.2.1 配置文件

修改 [application.yml]配置文件,添加 orderly 配置項己英,設置 Consumer 順序消費消息间螟。完整配置如下:

spring:
  application:
    name: erbadagang-consumer-application
  cloud:
    # Spring Cloud Stream 配置項,對應 BindingServiceProperties 類
    stream:
      # Binding 配置項损肛,對應 BindingProperties Map
      bindings:
        erbadagang-input:
          destination: ERBADAGANG-TOPIC-01 # 目的地。這里使用 RocketMQ Topic
          content-type: application/json # 內(nèi)容格式荣瑟。這里使用 JSON
          group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消費者分組,命名規(guī)則:組名+topic名

      # Spring Cloud Stream RocketMQ 配置項
      rocketmq:
        # RocketMQ Binder 配置項治拿,對應 RocketMQBinderConfigurationProperties 類
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Map
        bindings:
          erbadagang-input:
            # RocketMQ Consumer 配置項笆焰,對應 RocketMQConsumerProperties 類
            consumer:
              enabled: true # 是否開啟消費劫谅,默認為 true
              broadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費
              orderly: true # 是否順序消費嚷掠,默認為 false 并發(fā)消費捏检。

server:
  port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

6.2.2 Demo01Consumer

[Demo01Consumer]類不皆,在消費消息時贯城,打印出消息所在隊列編號線程編號,這樣我們通過隊列編號可以判斷消息是否順序發(fā)送霹娄,通過線程編號可以判斷消息是否順序消費能犯。代碼如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class Demo01Consumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @StreamListener(MySink.ERBADAGANG_INPUT)
    public void onMessage(Message<?> message) {
        logger.info("[onMessage][線程編號:{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
    }

}

6.3 簡單測試

① 執(zhí)行 ConsumerApplication,啟動消費者的實例犬耻。

② 執(zhí)行 ProducerApplication踩晶,啟動生產(chǎn)者的實例。

之后枕磁,請求 http://127.0.0.1:18080/demo01/send_orderly 接口渡蜻,發(fā)送順序消息。IDEA 控制臺輸出日志如下:

2020-08-06 17:31:17.892  INFO 16556 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][線程編號:70 消息內(nèi)容:GenericMessage [payload={"id":-1387755989}, headers={rocketmq_QUEUE_ID=1, rocketmq_RECONSUME_TIMES=0, scst_partition=1, rocketmq_BORN_TIMESTAMP=1596706229166, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=C0A82B7C341418B4AAC21D818BAE0001, rocketmq_SYS_FLAG=0, id=6386b54a-4e37-b884-3010-485308229a10, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, timestamp=1596706277892}]]

2020-08-06 17:31:17.892  INFO 16556 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][線程編號:70 消息內(nèi)容:GenericMessage [payload={"id":-1387755989}, headers={rocketmq_QUEUE_ID=1, rocketmq_RECONSUME_TIMES=0, scst_partition=1, rocketmq_BORN_TIMESTAMP=1596706229244, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=C0A82B7C341418B4AAC21D818BFC0002, rocketmq_SYS_FLAG=0, id=d9dd1b0d-2565-7f52-3825-786574b2fc1b, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, timestamp=1596706277892}]]

id 為 -1387755989 的消息被發(fā)送到 RocketMQ 消息隊列編號為 rocketmq_QUEUE_ID=1计济,并且在線程編號為 70 的線程中消費茸苇。

底線


本文源代碼使用 Apache License 2.0開源許可協(xié)議,這里是本文源碼Gitee地址峭咒,可通過命令git clone+地址下載代碼到本地税弃,也可直接點擊鏈接通過瀏覽器方式查看源代碼。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末凑队,一起剝皮案震驚了整個濱河市则果,隨后出現(xiàn)的幾起案子幔翰,更是在濱河造成了極大的恐慌,老刑警劉巖西壮,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遗增,死亡現(xiàn)場離奇詭異,居然都是意外死亡款青,警方通過查閱死者的電腦和手機做修,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來抡草,“玉大人饰及,你說我怎么就攤上這事】嫡穑” “怎么了燎含?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長腿短。 經(jīng)常有香客問我屏箍,道長,這世上最難降的妖魔是什么橘忱? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任赴魁,我火速辦了婚禮,結果婚禮上钝诚,老公的妹妹穿的比我還像新娘颖御。我一直安慰自己,他們只是感情好敲长,可當我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布郎嫁。 她就那樣靜靜地躺著,像睡著了一般祈噪。 火紅的嫁衣襯著肌膚如雪泽铛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天辑鲤,我揣著相機與錄音盔腔,去河邊找鬼。 笑死月褥,一個胖子當著我的面吹牛弛随,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播宁赤,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼舀透,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了决左?” 一聲冷哼從身側響起愕够,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤走贪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后惑芭,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體坠狡,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年遂跟,在試婚紗的時候發(fā)現(xiàn)自己被綠了逃沿。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡幻锁,死狀恐怖凯亮,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情越败,我是刑警寧澤触幼,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站究飞,受9級特大地震影響,放射性物質發(fā)生泄漏堂鲤。R本人自食惡果不足惜亿傅,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望瘟栖。 院中可真熱鬧葵擎,春花似錦、人聲如沸半哟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽寓涨。三九已至盯串,卻和暖如春戒良,著一層夾襖步出監(jiān)牢的瞬間几缭,已是汗流浹背年栓。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留食拜,地道東北人负甸。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓呻待,卻偏偏與公主長得像,于是被迫代替她去往敵國和親迫淹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,543評論 2 349