spring cloud stream使用筆記

一练般、背景

以前我們在spring boot中構(gòu)建一個(gè)消息驅(qū)動(dòng)的微服務(wù)應(yīng)用跋涣,通常會(huì)使用rabbitMQ或是kafka來做消息中間件蝗砾,應(yīng)用中均需代碼實(shí)現(xiàn)具體消息中間件的通信細(xì)節(jié)汇荐。此時(shí)如果再更換一個(gè)新的消息中間件日熬,這會(huì)我們又需新增這些通信代碼棍厌,寫起來會(huì)比較繁瑣,而stream出現(xiàn)就是為了簡化這一過程。

二耘纱、簡介


它是一個(gè)構(gòu)建消息驅(qū)動(dòng)的微服務(wù)應(yīng)用的框架敬肚。通過一些抽象出來的基礎(chǔ)概念,來簡化消息中間件的使用束析。我們可以看下官網(wǎng)上的處理模型圖:

image
  • 關(guān)鍵概念

Inputs 接收消息的通道
Output 發(fā)送消息的通道
Binder 可理解為一個(gè)抽象的中間件艳馒,應(yīng)用通過在spring cloud stream中所注入的inputs,outputs通道來跟外界消息通信,而這些通道又是通過具體中間件的Binder實(shí)現(xiàn)來連接到消息隊(duì)列的服務(wù)器上员寇。有了Binder弄慰,甚至可以不改一行代碼,就切換中間件的類型蝶锋。目前Binder實(shí)現(xiàn)支持的具體中間件類型為:rabbitMQ 和 kfaka這倆

當(dāng)然MQ中的消費(fèi)組group 和 分區(qū) partion的概念他也有曹动,跟kafka里面的概念是一樣的。
Group:消費(fèi)組牲览,一個(gè)消息到達(dá)一個(gè)消費(fèi)組后墓陈,只能被這個(gè)消費(fèi)組的其中一個(gè)實(shí)例消費(fèi)掉;
Partion:消息分區(qū)第献,一個(gè)非常大的topic可以分布到多個(gè)broker(即服務(wù)器)上

三贡必、使用步驟

以下以rabbitMQ為具體中間件作為示例:

1. 在pom.xml中添加依賴
 <dependency>
       <groupid >org.springframework.cloud</groupid>
       <artifactid>spring-cloud-starter-stream-rabbit</artifactid>
</dependency>
2.自定義通道的創(chuàng)建

自定義通道的創(chuàng)建有兩種方式,一種是提前在代碼里定義好的庸毫,另一種是在運(yùn)行時(shí)通過讀取完通道名來創(chuàng)建的仔拟。

2.1 方式一:提前定義好的通道
  • 定義生產(chǎn)者
    我們定義一個(gè)生產(chǎn)者類SampleSource,這類要完成2件事:
  • 2.1.1 自定義發(fā)送通道
  • 2.1.2 完成發(fā)送消息的功能
@EnableBinding(SampleSource.MultiOutputSource.class)
public class SampleSource {
   
       //自定義發(fā)送通道
    public interface MultiOutputSource {
        String OUTPUT1 = "output1";

        String OUTPUT2 = "output2";

        @Output(OUTPUT1)
        MessageChannel output1();

        @Output(OUTPUT2)
        MessageChannel output2();
    }
}

注意:要加上@EnableBinding 綁定通道才能夠發(fā)出消息到mq的服務(wù)器飒赃。

  • 方式二:動(dòng)態(tài)創(chuàng)建通道
@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @PathVariable("target") target,
           @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, target, contentType);
    }

    private void sendMessage(String body, String target, Object contentType) {
        resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}

這是官網(wǎng)的示例代碼利花,可以看到關(guān)鍵代碼是這兩句

@Autowired
private BinderAwareChannelResolver resolver;
//中間省略代碼...
  resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));

resolveDestination里面的處理就是,先查找傳入的target通道名载佳,看下有沒創(chuàng)建過炒事,如果沒有就會(huì)默認(rèn)創(chuàng)建一個(gè)。

  • 生產(chǎn)者實(shí)現(xiàn)發(fā)送消息的函數(shù)
    在SampleSource這類里添加sendMessage函數(shù)
public class SampleSource{
 @Bean
    @InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public synchronized MessageSource<String> messageSource1() {
        return new MessageSource<String>() {
            public Message<String> receive() {
                String message = "FromSource1";
                System.out.println("******************");
                System.out.println("From Source1");
                System.out.println("******************");
                System.out.println("Sending value: " + message);
                return new GenericMessage(message);
            }
        };
    }

    @Bean
    @InboundChannelAdapter(value = MultiOutputSource.OUTPUT2, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public synchronized MessageSource<String> timerMessageSource() {
        return new MessageSource<String>() {
            public Message<String> receive() {
                String message = "FromSource2";
                System.out.println("******************");
                System.out.println("From Source2");
                System.out.println("******************");
                System.out.println("Sending value: " + message);
                return new GenericMessage(message);
            }
        };
    }

}

對應(yīng)的配置application.yml:

spring:
  cloud.stream.bindings:
    output1:
      contentType: application/json #約定消息的內(nèi)容編碼格式
    output2:
      contentType: application/json #約定消息的內(nèi)容編碼格式

  rabbitmq:
    host: 127.0.0.1 
    port: 5672
    username: sa
    password: 123456
3. 消費(fèi)者類的實(shí)現(xiàn)

消費(fèi)者類要做的事也是相似的:
3.1.自定義接收通道
3.2消費(fèi)消息的功能實(shí)現(xiàn)

@EnableBinding(SampleSink.MultiInputSink.class)
public class SampleSink {

    @StreamListener(MultiInputSink.INPUT1)
    public synchronized void receive1(String message) {
        System.out.println("******************");
        System.out.println("At Sink1");
        System.out.println("******************");
        System.out.println("Received message " + message);
    }

    @StreamListener(MultiInputSink.INPUT2)
    public synchronized void receive2(String message) {
        System.out.println("******************");
        System.out.println("At Sink2");
        System.out.println("******************");
        System.out.println("Received message " + message);
    }

    public interface MultiInputSink {
        String INPUT1 = "input1";

        String INPUT2 = "input2";

        @Input(INPUT1)
        SubscribableChannel input1();

        @Input(INPUT2)
        SubscribableChannel input2();
    }
}

對應(yīng)的配置 application.yml (當(dāng)然用application.propertities)

spring:
  cloud.stream:
    bindings:
      input1:
        group: inputGroup #加上group是為了持久化
      input2:
        group: inputGroup2
rabbit:
      host: 127.0.0.1 
      port: 5672
      username: sa
      password: 123456
4.死信隊(duì)列設(shè)置

問題列表:

  • 消息在什么條件下進(jìn)入死信隊(duì)列蔫慧?發(fā)送失敗后挠乳,如何設(shè)置重試次數(shù) or TTL?

  • 進(jìn)入死信隊(duì)列之后姑躲,若又需要該消息重新回到原隊(duì)列進(jìn)行處理睡扬,該怎么辦


轉(zhuǎn)入死信隊(duì)列的條件:

配置死信隊(duì)列及消息消費(fèi)失敗重試次數(shù)(application.yml):

image
  • 配置消息消費(fèi)重試次數(shù)

兩種方式
1) 如果允許重試一定次數(shù):如上圖配置所示,設(shè)置max_attempt 黍析,大于1即可
2)如果不允許重發(fā)卖怜,消費(fèi)失敗了就進(jìn)入死信隊(duì)列,在配置中添加requeueRejected設(shè)為true

spring:
  cloud.stream:
    bindings:
      input1:
        group: inputGroup1
    rabbit:
      bindings:
        input1:
          consumer:
            autoBindDlq: true #啟用死信隊(duì)列阐枣,默認(rèn)會(huì)生成一個(gè)DLX EXCHANGE马靠,當(dāng)消息重復(fù)消費(fèi)失敗后
            dlqDeadLetterExchange: input-deadLetter.DLX  #如果該列聲明牍戚,那么deadLetterExchange也要聲明,這個(gè)保持一致
            deadLetterExchange: input-deadLetter.DLX #與dlqDeadLetterExchange保持一致
            requeueRejected: true
      host: 127.0.0.1   
      port: 5672
      username: sa
      password: 123456
  • 在代碼中將某消息轉(zhuǎn)入死信隊(duì)列虑粥,另可見官網(wǎng)示例

5如孝、消息的格式:

(1)消息頭,包含了以下字段:

image

翻看源碼娩贷,MessageHeader是個(gè)Map結(jié)構(gòu)第晰,左邊是字段名,右邊是字段內(nèi)容彬祖∽率荩可以在創(chuàng)建MessageHeader的時(shí)候傳入已經(jīng)初始化的Map,注意我們可以在這指定body的contentType储笑。contentType能填什么內(nèi)容甜熔,查找下表即可(官網(wǎng)上找到的):

image

(2)至于消息體(payLoad),它支持自定義結(jié)構(gòu)突倍,格式自定腔稀。


6、如何保證消息的可靠性羽历?

一般是通過具體的消息中間件來保證焊虏。

配置組就可以來保證消息可靠性。見官網(wǎng)描述:消息的持久性

設(shè)置持久性的屬性:durableSubscription

image

四秕磷、相關(guān)鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末诵闭,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子澎嚣,更是在濱河造成了極大的恐慌疏尿,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件易桃,死亡現(xiàn)場離奇詭異褥琐,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)颈抚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門踩衩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人贩汉,你說我怎么就攤上這事∶啵” “怎么了匹舞?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長线脚。 經(jīng)常有香客問我赐稽,道長叫榕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任姊舵,我火速辦了婚禮晰绎,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘括丁。我一直安慰自己荞下,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布史飞。 她就那樣靜靜地躺著尖昏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪构资。 梳的紋絲不亂的頭發(fā)上抽诉,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機(jī)與錄音吐绵,去河邊找鬼迹淌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛己单,可吹牛的內(nèi)容都是我干的巍沙。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼荷鼠,長吁一口氣:“原來是場噩夢啊……” “哼句携!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起允乐,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤矮嫉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后牍疏,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蠢笋,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年鳞陨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了昨寞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,809評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡厦滤,死狀恐怖援岩,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情掏导,我是刑警寧澤享怀,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站趟咆,受9級(jí)特大地震影響添瓷,放射性物質(zhì)發(fā)生泄漏梅屉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一鳞贷、第九天 我趴在偏房一處隱蔽的房頂上張望坯汤。 院中可真熱鬧,春花似錦搀愧、人聲如沸惰聂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽庶近。三九已至,卻和暖如春眷蚓,著一層夾襖步出監(jiān)牢的瞬間鼻种,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工沙热, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叉钥,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親哆料。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容