Spring Cloud 學(xué)習(xí)筆記 - No.7 消息驅(qū)動(dòng) Stream

請(qǐng)先閱讀之前的內(nèi)容:

Spring Cloud Stream

Spring Cloud Stream 是一個(gè)用來(lái)為微服務(wù)應(yīng)用構(gòu)建消息驅(qū)動(dòng)能力的框架记罚,為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn)契沫,并且引入了發(fā)布-訂閱消費(fèi)組以及消息分區(qū)這三個(gè)核心概念。
簡(jiǎn)單的說(shuō)匙监,Spring Cloud Stream 本質(zhì)上就是整合了 Spring Boot 和 Spring Integration,實(shí)現(xiàn)了一套輕量級(jí)的消息驅(qū)動(dòng)的微服務(wù)框架不翩。
通過(guò)使用 Spring Cloud Stream齿诞,可以有效地簡(jiǎn)化開(kāi)發(fā)人員對(duì)消息中間件的使用復(fù)雜度,讓系統(tǒng)開(kāi)發(fā)人員可以有更多的精力關(guān)注于核心業(yè)務(wù)邏輯的處理献起。目前為止 Spring Cloud Stream 只支持下面兩個(gè)消息中間件的自動(dòng)化配置:

構(gòu)建一個(gè) Spring Cloud Stream 消費(fèi)者

我們利用之前創(chuàng)建的 eureka-consumer 項(xiàng)目洋访。
首先在 pom.xml 中添加如下的依賴:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

其中 spring-cloud-starter-stream-rabbit 是 Spring Cloud Stream 對(duì) RabbitMQ 支持的封裝镣陕,其中包含了對(duì) RabbitMQ 的自動(dòng)化配置等內(nèi)容。

隨后創(chuàng)建用于接收來(lái)自 RabbitMQ 消息的消費(fèi)者 SinkReceiver

@EnableBinding(Sink.class)
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        logger.info("Received: " + payload);
    }

}
  • @EnableBinding 注解用來(lái)指定一個(gè)或多個(gè)定義了 @Input@Output 注解的接口姻政,以此實(shí)現(xiàn)對(duì)消息通道(Channel)的綁定呆抑。
    • 綁定了 Sink 接口,該接口是 Spring Cloud Stream 中默認(rèn)實(shí)現(xiàn)的對(duì)輸入消息通道綁定的定義
    • Spring Cloud Stream 還默認(rèn)實(shí)現(xiàn)了綁定輸出消息通道的 Source 接口
    • 還有結(jié)合了 SinkSourceProcessor 接口
  • @StreamListener 注解用來(lái)將被修飾的方法注冊(cè)為消息中間件上數(shù)據(jù)流的事件監(jiān)聽(tīng)器汁展,注解中的屬性值對(duì)應(yīng)了監(jiān)聽(tīng)的消息通道名鹊碍。

重啟項(xiàng)目,從日志中可以看到聲明了一個(gè)名為 input.anonymous.cWlqMyH9Tm--INXERE6nhQ 的隊(duì)列食绿,并通過(guò) RabbitMessageChannelBinder 將自己綁定為它的消費(fèi)者侈咕。

c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.cWlqMyH9Tm--INXERE6nhQ, bound to: input

這些信息我們也能在 RabbitMQ 的控制臺(tái)中發(fā)現(xiàn)它們:


RabbitMQ 的控制臺(tái)

點(diǎn)擊進(jìn)去,通過(guò) Publish Message 功能來(lái)發(fā)送一條消息到該隊(duì)列中:


通過(guò) Publish Message 功能來(lái)發(fā)送一條消息到該隊(duì)列中

從下面的日志可以看出 SinkReceiver 讀取了消息隊(duì)列中的內(nèi)容器紧,由于我們沒(méi)有對(duì)消息進(jìn)行序列化耀销,所以輸出的只是該對(duì)象的引用:

[m--INXERE6nhQ-1] com.example.SinkReceiver                 : Received: [B@beb7ce8

在上面的操作中,我們并沒(méi)有手動(dòng)去配置 RabbitMQ 的信息铲汪,比如 IP树姨,端口等等,這是基于 Spring Boot 的設(shè)計(jì)理念桥状,提供了對(duì) RabbitMQ 默認(rèn)的自動(dòng)化配置帽揪。當(dāng)然,我們可以手動(dòng)在 application.properties 文件中去配置辅斟,例如:

spring.cloud.stream.bindings.input.destination=my_destination

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

編寫消費(fèi)消息的單元測(cè)試用例

@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkReceiverTests.SinkSender.class})
public class SinkReceiverTests {

    @Autowired
    private SinkSender sinkSender;

    @Test
    public void sinkSenderTester() {
        sinkSender.output().send(MessageBuilder.withPayload("Testing Message").build());
    }

    public interface SinkSender {

        String OUTPUT = "input";

        @Output(SinkSender.OUTPUT)
        MessageChannel output();

    }

}

在上面的單元測(cè)試中转晰,我們通過(guò) @Output(SinkSender.OUTPUT) 定義了一個(gè)輸出通過(guò),而該輸出通道的名稱為 input士飒,與前文中的 Sink 中定義的消費(fèi)通道同名查邢,所以這里的單元測(cè)試與前文的消費(fèi)者程序組成了一對(duì)生產(chǎn)者與消費(fèi)者。
運(yùn)行該單元測(cè)試酵幕,日志可以看出 SinkReceiver 讀取了消息隊(duì)列中的內(nèi)容:

[m--INXERE6nhQ-1] com.example.SinkReceiver                 : Received: [B@89040a9

Spring Cloud Stream 應(yīng)用模型

圖片引自:https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/

Spring Cloud Stream 應(yīng)用模型

綁定器

Spring Cloud Stream 構(gòu)建的應(yīng)用程序與消息中間件之間是通過(guò)綁定器: Binder 相關(guān)聯(lián)的扰藕,綁定器對(duì)于應(yīng)用程序而言起到了隔離作用,它使得不同消息中間件的實(shí)現(xiàn)細(xì)節(jié)對(duì)應(yīng)用程序來(lái)說(shuō)是透明的芳撒。
當(dāng)我們需要升級(jí)消息中間件邓深,或是更換其他消息中間件產(chǎn)品時(shí),我們要做的就是更換它們對(duì)應(yīng)的 Binder 綁定器而不需要修改任何Spring Boot的應(yīng)用邏輯笔刹。

所以對(duì)于每一個(gè) Spring Cloud Stream 的應(yīng)用程序來(lái)說(shuō)芥备,它不需要知曉消息中間件的通信細(xì)節(jié),它只需要知道 Binder 對(duì)應(yīng)用程序提供的概念去實(shí)現(xiàn)即可舌菜,而這個(gè)概念就是消息通道:Channel萌壳。

發(fā)布-訂閱模式

消息會(huì)通過(guò)共享的 Topic 主題進(jìn)行廣播,消息消費(fèi)者在訂閱的主題中收到它并觸發(fā)自身的業(yè)務(wù)邏輯處理。

這里所提到的 Topic 主題是 Spring Cloud Stream 中的一個(gè)抽象概念袱瓮,用來(lái)代表發(fā)布共享消息給消費(fèi)者的地方缤骨。
在不同的消息中間件中,Topic 可能對(duì)應(yīng)著不同的概念尺借,比如:在 RabbitMQ 中的它對(duì)應(yīng)了 Exchange绊起、而在 Kakfa 中則對(duì)應(yīng)了 Kafka 中的 Topic。

在上面的例子中褐望,應(yīng)用啟動(dòng)的時(shí)候勒庄,在 RabbitMQ 的 Exchange 中也創(chuàng)建了一個(gè)名為 input 的 Exchange交換器。例如我們分別以 3001 和 3002 兩個(gè)端口啟動(dòng) eureka-consumer 項(xiàng)目瘫里。
可以看到 Queues 中有兩個(gè) Queue:

Queues 中有兩個(gè) Queue

可以看到 Channels 中有兩個(gè) Channel:


Channels 中有兩個(gè) Channel

可以看出 Exchanges 中只有一個(gè)名稱為 input 的 Exchange实蔽,即 Topic 主題。但是點(diǎn)進(jìn)去谨读,可以看出這個(gè)名稱為 input 的 Exchange 有綁定了兩個(gè)消息隊(duì)列:

Exchanges 中只有一個(gè)名稱為 input 的 Exchange

這個(gè)名稱為 input 的 Exchange 有兩個(gè) Bindings

如果我們通過(guò) Exchange 頁(yè)面的 Publish Message 來(lái)發(fā)布消息局装,可以發(fā)現(xiàn)兩個(gè)啟動(dòng)的應(yīng)用程序都輸出了消息內(nèi)容。

圖片引自 http://blog.didispace.com/spring-cloud-starter-dalston-7-2/

發(fā)布-訂閱模式

相對(duì)于點(diǎn)對(duì)點(diǎn)隊(duì)列實(shí)現(xiàn)的消息通信來(lái)說(shuō)劳殖,Spring Cloud Stream 采用的發(fā)布-訂閱模式可以有效的降低消息生產(chǎn)者與消費(fèi)者之間的耦合铐尚,當(dāng)我們需要對(duì)同一類消息增加一種處理方式時(shí),只需要增加一個(gè)應(yīng)用程序并將輸入通道綁定到既有的 Topic 中就可以實(shí)現(xiàn)功能的擴(kuò)展哆姻,而不需要改變?cè)瓉?lái)已經(jīng)實(shí)現(xiàn)的任何內(nèi)容宣增。

消費(fèi)組

很多情況下,消息生產(chǎn)者發(fā)送消息給某個(gè)具體微服務(wù)時(shí)矛缨,只希望被消費(fèi)一次爹脾,但是上面我們啟動(dòng)兩個(gè)應(yīng)用(3001 和 3002 兩個(gè)端口),這個(gè)消息出現(xiàn)了被重復(fù)消費(fèi)兩次的情況箕昭。
為了解決這個(gè)問(wèn)題灵妨,在 Spring Cloud Stream 中提供了消費(fèi)組的概念。

如果在同一個(gè)主題上的應(yīng)用需要啟動(dòng)多個(gè)實(shí)例的時(shí)候落竹,我們可以通過(guò)spring.cloud.stream.bindings.<channelName>.group 屬性為應(yīng)用指定一個(gè)組名泌霍,這樣這個(gè)應(yīng)用的多個(gè)實(shí)例在接收到消息的時(shí)候,只會(huì)有一個(gè)成員真正的收到消息并進(jìn)行處理述召。

例如朱转,我們?cè)?eureka-consumer 項(xiàng)目的配置中增加:

spring.cloud.stream.bindings.input.group=eureka-consumer-input-group

重啟兩個(gè)端口的實(shí)例,隨后通過(guò) Exchange 頁(yè)面的 Publish Message 來(lái)發(fā)布消息桨武,可以發(fā)現(xiàn)只有一個(gè)啟動(dòng)的應(yīng)用程序都輸出了消息內(nèi)容肋拔。并且有時(shí)候是 3001 端口的實(shí)例處理,有時(shí)候是 3002 端口的實(shí)例處理呀酸。
也就是說(shuō),對(duì)于同一條消息琼梆,它多次到達(dá)之后可能是由不同的實(shí)例進(jìn)行消費(fèi)的性誉。

消息分區(qū)

在上面的實(shí)驗(yàn)中可以看到窿吩,消費(fèi)組并無(wú)法控制消息具體被哪個(gè)實(shí)例消費(fèi)。但是對(duì)于一些業(yè)務(wù)場(chǎng)景错览,就需要對(duì)于一些具有相同特征的消息每次都可以被同一個(gè)消費(fèi)實(shí)例處理纫雁。比如:一些用于監(jiān)控服務(wù),為了統(tǒng)計(jì)某段時(shí)間內(nèi)消息生產(chǎn)者發(fā)送的報(bào)告內(nèi)容倾哺,監(jiān)控服務(wù)需要在自身內(nèi)容聚合這些數(shù)據(jù)轧邪,那么消息生產(chǎn)者可以為消息增加一個(gè)固有的特征 ID 來(lái)進(jìn)行分區(qū),使得擁有這些 ID 的消息每次都能被發(fā)送到一個(gè)特定的實(shí)例上實(shí)現(xiàn)累計(jì)統(tǒng)計(jì)的效果羞海,否則這些數(shù)據(jù)就會(huì)分散到各個(gè)不同的節(jié)點(diǎn)導(dǎo)致監(jiān)控結(jié)果不一致的情況忌愚。

而消息分區(qū)概念的引入就是為了解決這樣的問(wèn)題:當(dāng)生產(chǎn)者將消息數(shù)據(jù)發(fā)送給多個(gè)消費(fèi)者實(shí)例時(shí),保證擁有共同特征的消息數(shù)據(jù)始終是由同一個(gè)消費(fèi)者實(shí)例接收和處理却邓。

例如硕糊,我們?cè)?eureka-consumer 項(xiàng)目的配置中增加:

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
  • spring.cloud.stream.bindings.input.consumer.partitioned:通過(guò)該參數(shù)開(kāi)啟消費(fèi)者分區(qū)功能;
  • spring.cloud.stream.instanceCount:該參數(shù)指定了當(dāng)前消費(fèi)者的總實(shí)例數(shù)量腊徙;
  • spring.cloud.stream.instanceIndex:該參數(shù)設(shè)置當(dāng)前實(shí)例的索引號(hào)简十,從0開(kāi)始,最大值為spring.cloud.stream.instanceCount - 1撬腾。

Spring Cloud Stream VS Spring Cloud Bus

我們?cè)?Spring Cloud 學(xué)習(xí)筆記 - No.3 分布式配置 Config 中使用了 Spring Cloud Bus(結(jié)合了 RabbitMQ)螟蝙,那么 Stream 和 Bus 的區(qū)別是什么?

  • Spring Cloud Stream 構(gòu)建消息驅(qū)動(dòng)微服務(wù)
    • building highly scalable event-driven microservices connected with shared messaging systems.
  • Spring Cloud Bus 廣播(例如配置統(tǒng)一管理)和監(jiān)控
    • Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions.

RabbitMQ 負(fù)載均衡

在上面的例子中民傻,我們始終只有一個(gè) RabbitMQ 實(shí)例胰默。在生產(chǎn)環(huán)境中,我們可能需要多個(gè) RabbitMQ 實(shí)例來(lái)實(shí)現(xiàn)高并發(fā)和高可用饰潜。
參見(jiàn):


引用:
程序猿DD Spring Cloud基礎(chǔ)教程
Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動(dòng)的微服務(wù)(入門)【Dalston版】
Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動(dòng)的微服務(wù)(核心概念)【Dalston版】
Spring Cloud Dalston中文文檔

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末初坠,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子彭雾,更是在濱河造成了極大的恐慌碟刺,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件薯酝,死亡現(xiàn)場(chǎng)離奇詭異半沽,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)吴菠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門者填,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人做葵,你說(shuō)我怎么就攤上這事占哟。” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵榨乎,是天一觀的道長(zhǎng)怎燥。 經(jīng)常有香客問(wèn)我,道長(zhǎng)蜜暑,這世上最難降的妖魔是什么铐姚? 我笑而不...
    開(kāi)封第一講書人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮肛捍,結(jié)果婚禮上隐绵,老公的妹妹穿的比我還像新娘。我一直安慰自己拙毫,他們只是感情好依许,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著恬偷,像睡著了一般悍手。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上袍患,一...
    開(kāi)封第一講書人閱讀 49,144評(píng)論 1 285
  • 那天坦康,我揣著相機(jī)與錄音,去河邊找鬼诡延。 笑死滞欠,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的肆良。 我是一名探鬼主播筛璧,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼惹恃!你這毒婦竟也來(lái)了夭谤?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤巫糙,失蹤者是張志新(化名)和其女友劉穎朗儒,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體参淹,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡醉锄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了浙值。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片恳不。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖开呐,靈堂內(nèi)的尸體忽然破棺而出烟勋,到底是詐尸還是另有隱情规求,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布神妹,位于F島的核電站颓哮,受9級(jí)特大地震影響家妆,放射性物質(zhì)發(fā)生泄漏鸵荠。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一伤极、第九天 我趴在偏房一處隱蔽的房頂上張望蛹找。 院中可真熱鬧,春花似錦哨坪、人聲如沸庸疾。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)届慈。三九已至,卻和暖如春忿偷,著一層夾襖步出監(jiān)牢的瞬間金顿,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工鲤桥, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留揍拆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓茶凳,卻偏偏與公主長(zhǎng)得像嫂拴,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子贮喧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • iOS上所有和音頻相關(guān)的都在CoreAudio里面筒狠。但是CoreAuido是個(gè)非常龐大的庫(kù),剛開(kāi)始入門多半會(huì)被其復(fù)...
    偶是星爺閱讀 1,561評(píng)論 0 1
  • 沒(méi)錯(cuò)箱沦,是“同姓戀”辩恼,不是“同性戀”哦! 現(xiàn)在社會(huì)的包容度高饱普,什么同性戀运挫,姐弟戀,試婚等都能微笑面對(duì)套耕∷粒可我老家那小村...
    月兒上山了閱讀 1,629評(píng)論 24 15
  • 每天喊著要改變命運(yùn),改變生活儡循,賺很多錢舶吗,過(guò)很貴的生活,然后拖延癥把這些美好的理想一拖再拖择膝。 為什么拖延誓琼,其中一點(diǎn)就...
    月兒林飛飛閱讀 377評(píng)論 0 0
  • 窗外追趕的雨滴嬉戲 凌亂了廊前盛開(kāi)的月季 那把黑色的油紙傘,掠過(guò)窗前 我看到頑皮的雨花在親吻明凈的玻璃窗葉 旖旎的...
    誰(shuí)言1990閱讀 620評(píng)論 3 26