玩轉(zhuǎn)SpringCloud Stream

背景及痛點

現(xiàn)如今消息中間件(MQ)在互聯(lián)網(wǎng)項目中被廣泛的應用书蚪,特別是大數(shù)據(jù)行業(yè)應用的特別的多读存,現(xiàn)在市面上也流行這多個消息中間件框架让簿,比如ActiveMQ莲祸、RabbitMQRocketMQ缴阎、Kafka等,這些消息中間件各有各的優(yōu)劣贸典,但是想要解決的問題都基本相同。由于每個框架都有它自己的使用方式绳锅,這無疑是增加了開發(fā)者的學習成本以及添加相同的業(yè)務復雜度。框架的變更或者多個中間件的混合使用使得業(yè)務邏輯代碼中中間件的切換喳坠、項目的維護和開發(fā)都會變得更加繁瑣晾浴。

有沒有一種技術讓我們不再需要關注具體MQ的使用細節(jié),我們只需要專注業(yè)務邏輯的開發(fā)笙各,讓程序根據(jù)實際項目的使用自己去適配綁定数尿,自動在各種MQ內(nèi)切換呢何陆?springcloud stream便為此而生。

關于stream

我們用一句話來描述stream就是:屏蔽底層消息中間件的差異钠怯,降低切換版本鞠鲜,統(tǒng)一消息的編程模型

官方定義SpringCloud Stream是一個構(gòu)建消息驅(qū)動微服務的框架,應用通過inputs或者outputs來與SpringCloud Stream中的binder對象交互弄砍,我們通過配置來綁定消息中間件慨畸,而SpringCloud Streambinder對象負責與消息中間件交互,所以我們只需要搞清楚如何與SpringCloud Stream交互即可方便的使用消息中間件。

SpringCloud Stream通過Spring Integration來連接消息代理中間件以實現(xiàn)消息事件驅(qū)動凡人,它提供了個性化的自動化配置忠荞,引用了發(fā)布訂閱消費組兴使、分區(qū)的三個核心概念励幼,但是目前僅支持RabbitMQKafka

設計思想

在此之前

以前的架構(gòu)

生產(chǎn)者和消費者通過消息媒介(queue等)傳遞信息內(nèi)容(Message)啡莉,消息必須通過特定的通道(MessageChannel)毛萌,通過消息的發(fā)布與訂閱來決定消息的發(fā)送和消費(publish/subscrib)缤削。

引入中間件

現(xiàn)在假如我們用到了RabbitMQKafka民宿,由于這兩個消息中間件的架構(gòu)上的不同着绷,像RabbitMQExchange,而KafkatopichePartitions分區(qū)

引入中間件之后

(binder中锌云,input對于消費者荠医,output對應生產(chǎn)者。)

這些中間件的差異性導致我們實際項目開發(fā)給我們造成了一定的困擾桑涎,我們?nèi)绻昧藘蓚€消息隊列的其中一種,但是后面因為業(yè)務需求彬向,需要改用另外一種消息隊列進行遷移,這時候無疑就是一 個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統(tǒng)耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式攻冷。

屏蔽底層差異

在沒有綁定器(Builder)這個概念的情況下娃胆,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,由于各消息中間件構(gòu)建的初衷不同,它們的實現(xiàn)細節(jié)上會有較大的差異性,通過定義綁定器作為中間件,完美地實現(xiàn)了應用程序與消息中間件細節(jié)之間的隔離等曼。通過向應用程序暴露統(tǒng)一的Channel通道里烦, 使得應用程序不需要再考慮各種不同的消息中間件實現(xiàn)。

通過定義綁定器Binder作為中間層禁谦,實現(xiàn)了應用程序與消息中間件細節(jié)之間的隔離胁黑。

處理架構(gòu)

Stream對消息中間件的進一步封裝可以做到代碼層面對中間件的無感知,甚至于動態(tài)的切換中間件(rabbitmq切換為kafka),使得微服務開發(fā)的高度解耦州泊,服務可以關注更多自己的業(yè)務流程丧蘸。

處理架構(gòu)

其遵循了發(fā)布-訂閱模式,主要使用的就是Topic主題進行廣播拥诡,RabbitMQ就是Exchange触趴,在Kafka中就是Topic

通過定義綁定器Binder作為中間層,實現(xiàn)了應用程序與消息中間件細節(jié)之間的隔離渴肉。

stream流程

stream流程
  • Binder:很方便的連接中間件冗懦,屏蔽差異
  • Channel:通道是隊列Queue的一種抽象,在消息通訊系統(tǒng)中就是實現(xiàn)存儲和轉(zhuǎn)發(fā)的媒介仇祭,通過對Channel對隊列進行配置
  • Source和Sink:簡單的可理解為參照對象是Spring Cloud Stream自身披蕉,從Stream發(fā)布消息就是輸出,接受消息就是輸入

常用api和注解

常用api和注解

使用示例

基本環(huán)境

  • 注冊中心:Eureka乌奇,可以是其他没讲。

  • 消息中間件:RabbitMQ

    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
    

生產(chǎn)端

依賴

 <!--stream rabbit -->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    # stream 配置
    stream:
      binders: # 配置綁定的消息中間件的服務信息
        defaultRabbit: # 自定義的一個名稱,用來下面 bindings 綁定
          type: rabbit  # 消息組件的類型
          environment:  #相關環(huán)境配置礁苗,設置rabbitmq的環(huán)境
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output:  # 通道名稱
          destination: testExchange # 定義要使用的Exchange的名稱
          content-type: application/json  # 設置消息類型爬凑,對象為json,文本是text/plain
          binder: defaultRabbit # 設置要綁定的服務的具體設置试伙,就是我們上面配置的defaultRabbit

eureka:
  client:
    #表示是否將自己注冊進EurekaServer默認為true
    register-with-eureka: true
    #是否從EurekaServer抓取已有的注冊消息嘁信,默認為true,單節(jié)點無所謂疏叨,集群必須設置為true才能配合ribbon使用負載均衡
    fetch-registry: true
    service-url:
      #單機版
      defaultZone: http://localhost:8080/eureka/
  instance:
    prefer-ip-address: true
    instance-id: sender01

定義接口

這里需要定義一個接口并實現(xiàn)它潘靖,方便其他業(yè)務調(diào)用。

public interface IMessageProvider {
    /**
     * 發(fā)送接口
     * @param msg
     * @return
     */
    public String send(String msg);
}

接口實現(xiàn)

接口實現(xiàn)中需要添加@EnableBinding注解蚤蔓,并引入Source.class,為什么引入Source.class呢卦溢?因為它是生產(chǎn)者,我們參考stream流程圖就可以知道

import com.martain.study.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder; 
import javax.annotation.Resource;

@EnableBinding(Source.class)
public class MessageProvider implements IMessageProvider {
    /**
     * 注入消息發(fā)送管道
     */
    @Resource
    private MessageChannel output;

    @Override
    public String send(String msg) {
        output.send(MessageBuilder.withPayload(msg).build());
        System.out.println("******send message:"+msg);
        return msg;
    }
}

定義測試controller


@RestController
public class TestController {

    @Autowired
    IMessageProvider messageProvider;

    @GetMapping("/sendMsg")
    public String sendMsg(){
        String msg = UUID.randomUUID().toString();
        return messageProvider.send(msg);
    }
 
}

啟動類

@SpringBootApplication
public class StreamProviderApplication8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamProviderApplication8801.class,args);
    }
} 

服務啟動之后秀又,多次請求/sendMsg单寂,發(fā)送了多條消息。

生產(chǎn)服務生產(chǎn)消息

消費端

依賴

   <!--stream rabbit -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--eureka client-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

配置文件

與生產(chǎn)者類似涮坐,只是bindings中的output改成了input

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consume
  cloud:
    # stream 配置
    stream:
      binders: # 配置綁定的消息中間件的服務信息
        defaultRabbit: # 自定義的一個名稱凄贩,用來下面 bindings 綁定
          type: rabbit  # 消息組件的類型
          environment:  #相關環(huán)境配置,設置rabbitmq的環(huán)境
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input:  # 通道名稱
          destination: testExchange # 定義要使用的Exchange的名稱
          content-type: application/json  # 設置消息類型袱讹,對象為json疲扎,文本是text/plain
          binder: defaultRabbit # 設置要綁定的服務的具體設置,就是我們上面配置的defaultRabbit

eureka:
  client:
    #表示是否將自己注冊進EurekaServer默認為true
    register-with-eureka: true
    #是否從EurekaServer抓取已有的注冊消息捷雕,默認為true椒丧,單節(jié)點無所謂,集群必須設置為true才能配合ribbon使用負載均衡
    fetch-registry: true
    service-url:
      #單機版
      defaultZone: http://localhost:8080/eureka/
  instance:
    prefer-ip-address: true
    instance-id: recover01

接收服務

接收服務只需要再類名前添加@EnableBinding()注解救巷,并引入Sink.class類壶熏,而實際接收的方法中需要添加@StreamListener(Sink.INPUT)注解。

package com.martain.study.controller; 
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; 

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    /**
     * 獲取本服務的端口
     */
    @Value("${server.port}")
    private String serverPort;
    /**
     * 這里表示監(jiān)聽sink的input
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("**** recv msg :"+message.getPayload()+"   in port "+serverPort);
    }
}

啟動類

@SpringBootApplication
public class StreamConsumerApplication8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication8802.class,args);
    }
}

啟動生產(chǎn)服務后浦译,在啟動消費服務棒假,多次請求生產(chǎn)服務發(fā)送消息溯职,我們可以發(fā)現(xiàn)消費者能很快的消費這些消息。

消費者消費消息

消息分組

當我們有多個消費者時,這個時候生產(chǎn)者生產(chǎn)一條消息帽哑,會發(fā)現(xiàn)所有的消費者都會消費這個消息谜酒。比如在一些訂單系統(tǒng)的場景中,如果一個訂單被多個處理服務一起獲取到妻枕,就容易造成數(shù)據(jù)錯誤僻族,那我們?nèi)绾伪苊膺@種情況呢?這時我們就可以使用Stream的消息分組來解決重復消費問題屡谐。

如何實現(xiàn)Stream的消息分組呢述么?我們只要簡單的在yml文件中配置spring.cloud.stream.bindings.input.group即可。示例如下:

...
spring:
  application:
    name: cloud-stream-consume
  cloud:
    # stream 配置
    stream:
      binders: # 配置綁定的消息中間件的服務信息
        defaultRabbit: # 自定義的一個名稱愕掏,用來下面 bindings 綁定
          type: rabbit  # 消息組件的類型
          environment:  #相關環(huán)境配置度秘,設置rabbitmq的環(huán)境
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input:  # 通道名稱
          destination: testExchange # 定義要使用的Exchange的名稱
          content-type: application/json  # 設置消息類型,對象為json亭珍,文本是text/plain
          binder: defaultRabbit # 設置要綁定的服務的具體設置敷钾,就是我們上面配置的defaultRabbit
          group: groupA # 配置分組
          
            ...

如果沒有設置該屬性,當消費服務啟動時肄梨,會有個隨機的組名阻荒。

如果我們將所有的消費服務的group熟悉都設置成一致的話,這些服務就會在同一個組里面众羡,從而能夠保證消息只被應用消費一次侨赡。

同一組的消費者是競爭關系,不可以重復消費粱侣。

消息持久化

當生產(chǎn)者在持續(xù)生產(chǎn)消息羊壹,消費服務突然掛了,使得擁有許多消息并沒有被消費齐婴,如果消費沒有配置分組的話油猫,消費服務重啟是無法消費未消費的消息的,如果配置了分組的話柠偶,當消費服務重啟之后可以自動去消費未消費的數(shù)據(jù)情妖。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市诱担,隨后出現(xiàn)的幾起案子毡证,更是在濱河造成了極大的恐慌,老刑警劉巖蔫仙,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件料睛,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機恤煞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門屎勘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人居扒,你說我怎么就攤上這事挑秉。” “怎么了苔货?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長立哑。 經(jīng)常有香客問我夜惭,道長,這世上最難降的妖魔是什么铛绰? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任诈茧,我火速辦了婚禮,結(jié)果婚禮上捂掰,老公的妹妹穿的比我還像新娘敢会。我一直安慰自己,他們只是感情好这嚣,可當我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布鸥昏。 她就那樣靜靜地躺著,像睡著了一般姐帚。 火紅的嫁衣襯著肌膚如雪吏垮。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天罐旗,我揣著相機與錄音膳汪,去河邊找鬼。 笑死九秀,一個胖子當著我的面吹牛遗嗽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播鼓蜒,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼痹换,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了友酱?” 一聲冷哼從身側(cè)響起晴音,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎缔杉,沒想到半個月后锤躁,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年系羞,在試婚紗的時候發(fā)現(xiàn)自己被綠了郭计。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡椒振,死狀恐怖昭伸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情澎迎,我是刑警寧澤庐杨,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站夹供,受9級特大地震影響灵份,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜哮洽,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一填渠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧鸟辅,春花似錦氛什、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽号阿。三九已至容诬,卻和暖如春伦泥,著一層夾襖步出監(jiān)牢的瞬間望忆,已是汗流浹背琐簇。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工柴钻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留究西,地道東北人卦碾。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓饺饭,卻偏偏與公主長得像渤早,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子瘫俊,可洞房花燭夜當晚...
    茶點故事閱讀 44,955評論 2 355