前言: 本文作者張?zhí)彀昃啵?jié)選自筆者與其合著的《Spring Cloud微服務(wù)架構(gòu)進(jìn)階》,即將在八月出版問(wèn)世代咸。本文將其中Spring Cloud Stream應(yīng)用與自定義Rocketmq Binder的內(nèi)容抽取出來(lái)蹈丸,主要介紹Spring Cloud Stream的相關(guān)概念,并概述相關(guān)的編程模型。
概述
Spring Cloud Stream 簡(jiǎn)介
Spring Cloud Stream 是一個(gè)用來(lái)為微服務(wù)應(yīng)用構(gòu)建消息驅(qū)動(dòng)能力的框架逻杖。它可以基于Spring Boot 來(lái)創(chuàng)建獨(dú)立的奋岁,可用于生產(chǎn)的Spring 應(yīng)用程序。他通過(guò)使用Spring Integration來(lái)連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)荸百。Spring Cloud Stream 為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn)闻伶,引用了發(fā)布-訂閱、消費(fèi)組够话、分區(qū)的三個(gè)核心概念蓝翰。Spring Cloud Stream目前僅支持RabbitMQ、Kafka女嘲。
消息隊(duì)列簡(jiǎn)介
消息隊(duì)列中間件是分布式系統(tǒng)中最為重要的組件之一畜份,主要解決應(yīng)用耦合,異步消息澡为,流量削鋒等問(wèn)題漂坏,是大型分布式系統(tǒng)不可缺少的中間件。消息隊(duì)列技術(shù)是分布式應(yīng)用間交換信息的一種技術(shù)媒至,消息可駐留在內(nèi)存或磁盤上顶别,隊(duì)列存儲(chǔ)消息直到它們被應(yīng)用程序讀走。通過(guò)消息隊(duì)列拒啰,應(yīng)用程序可以相對(duì)獨(dú)立地執(zhí)行驯绎,它們不需要知道彼此的位置,只需要處理從消息隊(duì)列發(fā)送來(lái)的消息和向消息隊(duì)列發(fā)送消息谋旦。
消息隊(duì)列的主要特點(diǎn)是異步處理和解耦剩失。其主要的使用場(chǎng)景就是將比較耗時(shí)而且不需要同步返回結(jié)果的操作作為消息放入消息隊(duì)列。同時(shí)由于使用了消息隊(duì)列册着,只要保證消息格式不變拴孤,消息的發(fā)送方和接受者并不需要彼此聯(lián)系,也不需要受對(duì)方的影響甲捏,即解耦演熟。
消息隊(duì)列的使用場(chǎng)景有:
- 跨系統(tǒng)的異步通信,需要異步交互的場(chǎng)景都可以使用消息隊(duì)列司顿。
- 消息驅(qū)動(dòng)的架構(gòu)(EDA)芒粹,系統(tǒng)分解為消息隊(duì)列,消息隊(duì)列制造者和消息隊(duì)列消費(fèi)者大溜,一個(gè)是處理流程可以根據(jù)需求拆分成多個(gè)階段化漆,每個(gè)階段之間通過(guò)隊(duì)列連接起來(lái)。
- 流量削鋒钦奋,它是消息隊(duì)列中的常用場(chǎng)景之一座云,一般在秒殺或團(tuán)搶活動(dòng)中使用廣泛疙赠。秒殺活動(dòng),一般會(huì)因?yàn)榱髁窟^(guò)大疙教,導(dǎo)致流量暴增棺聊,應(yīng)用掛掉伞租,為解決這個(gè)問(wèn)題贞谓,一般需要在應(yīng)用前端加入消息隊(duì)列,來(lái)緩和流量的暴增葵诈。
在軟件的正常功能開(kāi)發(fā)過(guò)程中裸弦,開(kāi)發(fā)人員并不需要去刻意的尋找消息隊(duì)列的使用場(chǎng)景,而是當(dāng)出現(xiàn)性能瓶頸時(shí)作喘,去查看業(yè)務(wù)邏輯是否存在可以異步處理的耗時(shí)操作理疙,如果存在的話便可以引入消息隊(duì)列來(lái)解決。否則盲目的使用消息隊(duì)列可能會(huì)增加維護(hù)和開(kāi)發(fā)的成本卻無(wú)法得到可觀的性能提升泞坦,那就得不償失了窖贤。
常見(jiàn)的消息隊(duì)列
目前業(yè)界有四款常用的消息隊(duì)列,它們分別是RabbitMQ贰锁、RocketMQ赃梧、ActiveMQ和Kafka。我們這里主要介紹前兩種豌熄。
RabbitMQ
RabbitMQ在2007年發(fā)布授嘀,是一個(gè)在AMQP(高級(jí)消息隊(duì)列協(xié)議)基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng)锣险,是當(dāng)前最流行的消息中間件之一蹄皱。
RabbitMQ的主要特性有:
- 可靠性: RabbitMQ提供了多種技術(shù)可以讓你在性能和可靠性之間進(jìn)行權(quán)衡。這些技術(shù)包括持久性機(jī)制芯肤、投遞確認(rèn)巷折、發(fā)布者證實(shí)和高可用性機(jī)制;
- 靈活的路由:消息在到達(dá)隊(duì)列前是通過(guò)交換機(jī)進(jìn)行路由的崖咨。RabbitMQ為典型的路由邏輯提供了多種內(nèi)置交換機(jī)類型锻拘。如果你有更復(fù)雜的路由需求,可以將這些交換機(jī)組合起來(lái)使用掩幢,你甚至可以實(shí)現(xiàn)自己的交換機(jī)類型逊拍,并且當(dāng)做RabbitMQ的插件來(lái)使用;
- 消息集群:在相同局域網(wǎng)中的多個(gè)RabbitMQ服務(wù)器可以聚合在一起际邻,作為一個(gè)獨(dú)立的邏輯代理來(lái)使用芯丧;
- 隊(duì)列高可用:隊(duì)列可以在集群中的機(jī)器上進(jìn)行鏡像,以確保在硬件問(wèn)題下還保證消息安全世曾;
- 多種協(xié)議的支持:RabbitMQ支持多種消息隊(duì)列協(xié)議缨恒;
- 多語(yǔ)言支持:RabbitMQ的服務(wù)器端用Erlang語(yǔ)言編寫谴咸,其客戶端支持基本所有編程語(yǔ)言;
- 管理界面: RabbitMQ有一個(gè)易用的用戶界面骗露,使得用戶可以監(jiān)控和管理消息Broker的許多方面岭佳;
- 跟蹤機(jī)制:如果消息異常,RabbitMQ提供消息跟蹤機(jī)制萧锉,使用者可以跟蹤發(fā)現(xiàn)異常珊随;
- 插件機(jī)制:提供了許多插件,來(lái)從多方面進(jìn)行擴(kuò)展柿隙,也可以編寫自己的插件叶洞;
RabbitMQ的優(yōu)點(diǎn)有:
- 由于erlang語(yǔ)言的特性,mq 性能較好禀崖,高并發(fā)衩辟;
- 健壯俩莽、穩(wěn)定牵触、易用皆愉、跨平臺(tái)闯参、支持多種語(yǔ)言崩侠、文檔齊全圈澈;
- 有消息確認(rèn)機(jī)制和持久化機(jī)制咱揍,可靠性高务嫡;
- 高度可定制的路由折晦;
- 管理界面較豐富钥星,在互聯(lián)網(wǎng)公司也有較大規(guī)模的應(yīng)用;
- 社區(qū)活躍度高满着;
RabbitMQ的缺點(diǎn)有:
- 盡管結(jié)合erlang語(yǔ)言本身的并發(fā)優(yōu)勢(shì)谦炒,性能較好,但是不利于做二次開(kāi)發(fā)和維護(hù)风喇;
- 實(shí)現(xiàn)了代理架構(gòu)宁改,意味著消息在發(fā)送到客戶端之前可以在中央節(jié)點(diǎn)上排隊(duì)。此特性使得RabbitMQ易于使用和部署魂莫,但是使得其運(yùn)行速度較慢还蹲,因?yàn)橹醒牍?jié)點(diǎn)增加了延遲,消息封裝后也比較大耙考;
- 需要學(xué)習(xí)比較復(fù)雜的接口和協(xié)議谜喊,學(xué)習(xí)和維護(hù)成本較高;
RocketMQ
RocketMQ出自阿里公司的開(kāi)源產(chǎn)品倦始,用 Java 語(yǔ)言實(shí)現(xiàn)斗遏,在設(shè)計(jì)時(shí)參考了 Kafka,并做出了自己的一些改進(jìn)鞋邑,消息可靠性上比 Kafka 更好诵次。RocketMQ在阿里集團(tuán)被廣泛應(yīng)用在訂單账蓉,交易,充值逾一,流計(jì)算铸本,消息推送,日志流式處理遵堵,binglog分發(fā)等場(chǎng)景箱玷。
RocketMQ的主要特性有:
- 是一個(gè)隊(duì)列模型的消息中間件,具有高性能鄙早、高可靠汪茧、高實(shí)時(shí)、分布式特點(diǎn)限番;
- Producer、Consumer呀舔、隊(duì)列都可以分布式弥虐;
- Producer向一些隊(duì)列輪流發(fā)送消息,隊(duì)列集合稱為Topic媚赖,Consumer如果做廣播消費(fèi)霜瘪,則一個(gè)consumer實(shí)例消費(fèi)這個(gè)Topic對(duì)應(yīng)的所有隊(duì)列,如果做集群消費(fèi)惧磺,則多個(gè)Consumer實(shí)例平均消費(fèi)這個(gè)topic對(duì)應(yīng)的隊(duì)列集合颖对;
- 能夠保證嚴(yán)格的消息順序;
- 提供豐富的消息拉取模式磨隘;
- 高效的訂閱者水平擴(kuò)展能力缤底;
- 實(shí)時(shí)的消息訂閱機(jī)制;
- 億級(jí)消息堆積能力番捂;
- 較少的依賴个唧;
RocketMQ的優(yōu)點(diǎn)有:
- 單機(jī)支持 1 萬(wàn)以上持久化隊(duì)列;
- RocketMQ 的所有消息都是持久化的设预,先寫入系統(tǒng) PAGECACHE徙歼,然后刷盤,可以保證內(nèi)存與磁盤都有一份數(shù)據(jù)鳖枕;
- 模型簡(jiǎn)單魄梯,接口易用(JMS 的接口很多場(chǎng)合并不太實(shí)用);
- 性能非常好宾符,可以大量堆積消息在broker中酿秸;
- 支持多種消費(fèi),包括集群消費(fèi)吸奴、廣播消費(fèi)等允扇。
- 各個(gè)環(huán)節(jié)分布式擴(kuò)展設(shè)計(jì)缠局,主從HA;
RocketMQ的缺點(diǎn)有:
- 支持的客戶端語(yǔ)言不多考润,目前是java及c++狭园,其中c++不成熟;
- RocketMQ社區(qū)關(guān)注度及成熟度也不及前兩者糊治;
- 沒(méi)有web管理界面唱矛,提供了一個(gè)CLI(命令行界面)管理工具帶來(lái)查詢、管理和診斷各種問(wèn)題井辜;
- 沒(méi)有在消息隊(duì)列的核心部分實(shí)現(xiàn)JMS等接口绎谦;
原理簡(jiǎn)介
如圖是Stream源碼的流程圖。Stream首先會(huì)動(dòng)態(tài)注冊(cè)相關(guān)BeanDefinition粥脚,并且處理@StreamListener注解窃肠;然后在Bean實(shí)例初始化之后,會(huì)調(diào)用BindingService進(jìn)行服務(wù)綁定刷允;BindingService在綁定服務(wù)時(shí)會(huì)首先獲取特定的Binder綁定器冤留,然后綁定Producer和Consumer;最后Stream的相關(guān)實(shí)例就會(huì)進(jìn)行發(fā)送和接受消息的處理树灶。
編程模型
Spring Cloud Stream提供了一系列的預(yù)先定義的注解來(lái)聲明輸入型和輸出型channel纤怒,業(yè)務(wù)系統(tǒng)基于這些channel與消息中間件進(jìn)行通信,而不是直接與消息中間件進(jìn)行通信天通。
聲明和綁定Channels
通過(guò)給業(yè)務(wù)應(yīng)用的配置類添加@EnableBinding
注解來(lái)將一個(gè)Spring應(yīng)用轉(zhuǎn)變成Spring Cloud Stream應(yīng)用泊窘。@EnableBinding
注解本身?yè)碛?code>@Configuration元注解來(lái)進(jìn)行相關(guān)配置并且會(huì)觸發(fā)Spring Cloud Stream框架的初始化機(jī)制。
@Configuration
@EnableIntegration
public @interface EnableBinding {
...
Class<?>[] value() default {};
}
@EnableBinding
注解可以使用聲明輸入型和輸出行channel的接口類作為其value屬性值像寒。@EnableBinding
注解只能使用在業(yè)務(wù)系統(tǒng)的Configuration類上烘豹,可以提供盡可能多的接口類作為該注解的value屬性值,比如說(shuō)@EnableBinding(value={Order.class, Payment.class})
萝映,Order和Payment都是聲明了channel的接口類吴叶。
在Spring Cloud Stream應(yīng)用中序臂,接口類可以通過(guò)被@Input
和@Output
注解修飾的函數(shù)來(lái)聲明的輸入型和輸出型channels蚌卤。
public interface OnlineStore{
@Input
SubscribableChannel orders(); #聲明輸入型channel,表示接收訂單
@Output
MessageChannel stock(); #聲明輸出型channel,表示向供應(yīng)商進(jìn)貨
}
使用這個(gè)接口類當(dāng)作@EnableBinding
的value屬性值可以觸發(fā)Stream框架的初始化機(jī)制侮叮,創(chuàng)建兩個(gè)channel,名字分別為orders和stock悼瘾,orders是輸入型channel囊榜,而stock是輸出型channel审胸。
@EnableBinding(OnlineStore.class)
public class ShopConfiguration {
...
}
自定義信道
使用@Input
和@Output
注解,編程人員可以給每個(gè)信道一個(gè)自定義的名稱卸勺,使用這個(gè)自定義信道砂沛,可以與消息對(duì)立中相應(yīng)的Channel進(jìn)行交互。
public interface OnlineStore{
@Input("inboundOrders")
SubscribableChannel orders();
}
在上邊代碼示例中曙求,自定義信道的名稱為inboundOrders碍庵,Stream框架會(huì)創(chuàng)建出名為inboundOrders的信道。
Spring Cloud Stream提供了預(yù)先設(shè)置的三種接口來(lái)定義輸入型channel和輸出型channel悟狱,它們是Source静浴、Sink和Processor。Source用來(lái)聲明輸出型channel挤渐,它的信道名稱為output苹享。Sink用來(lái)聲明輸入型channel,它的信道名稱為input挣菲。Processor則用來(lái)聲明輸出輸入型的channel富稻。
# Source
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
# Sink
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
# Processor
public interface Processor extends Source, Sink {
}
產(chǎn)生和消費(fèi)消息
使用Spring Integration注解或者Spring Cloud Stream的@StreamListener注解可以進(jìn)行消息的發(fā)送和消費(fèi)。@StreamListener
注解基于Spring Messaging注解(比如說(shuō)@MessageMapping白胀,@JmsListener,@RabbitListener)抚岗,除此之外或杠,該注解添加了內(nèi)容(content)類型管理和類型強(qiáng)制等特性。
作為Spring Integration的補(bǔ)充宣蔚,Spring Cloud Stream提供了它自己的@StreamListener注解向抢,該注解構(gòu)建在Spring Messaging注解的基礎(chǔ)上,比如說(shuō)@MessageMapping胚委、@JmsListener和@RabbitListener
挟鸠。@StreamListener
注解提供了更加簡(jiǎn)便處理輸入消息的模型。
Spring Cloud Stream提供了可擴(kuò)展的消息轉(zhuǎn)換(MessageConverter)機(jī)制來(lái)處理數(shù)據(jù)轉(zhuǎn)換亩冬,并將轉(zhuǎn)換后的數(shù)據(jù)分配給對(duì)應(yīng)的被@StreamListener
修飾的方法艘希。下面這個(gè)例子展示了一個(gè)處理外部訂單消息的應(yīng)用。
@EnableBinding(Sink.class)
public class OrderHandler {
@Autowired
OrderService orderService;
@StreamListener(Sink.INPUT)
public void handle(Order order) {
orderService.handle(order);
}
}
假設(shè)硅急,輸入的Message對(duì)象有一個(gè)string類型的Payload和一個(gè)值為application/json的contentType覆享。在使用@StreamListener
時(shí),MessageConverter
會(huì)使用消息的contentType來(lái)解析String類型的Payload并賦值給Order對(duì)象营袜。
就像其他的Spring Messaging方法一樣撒顿,被@StreamListener
注解的方法的參數(shù)可以使用@Payload
和@Headers
進(jìn)行注解。對(duì)于返回?cái)?shù)據(jù)的方法荚板,必須使用@SendTo
注解來(lái)指定該返回?cái)?shù)據(jù)發(fā)送到哪個(gè)輸出型channel凤壁。
@EnableBinding(Processor.class)
public class TransformProcessor {
@Autowired
VotingService votingService;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
}
}
Spring Cloud Stream支持將消息分配到多個(gè)@StreamListener
修飾的方法吩屹。為了能使用該分配機(jī)制,一個(gè)方法必須首先滿足下列條件:
- 方法不能有返回值拧抖。
- 方法必須是單獨(dú)一類消息的處理函數(shù)煤搜。
使用注解的condition屬性中的SpEL表達(dá)式可以設(shè)置@StreamListener
接收消息的條件判斷。所有匹配了該condition的方法都會(huì)在同一個(gè)線程中被調(diào)用徙鱼,但是方法調(diào)用相對(duì)順序不能保證宅楞。
下面就是一個(gè)@StreamListener
分配消息的例子。在這個(gè)例子中袱吆,所有頭部屬性type對(duì)應(yīng)的值為food的消息都會(huì)被分配給receiveFoodOrder方法厌衙,所有頭部屬性type對(duì)應(yīng)的值為compute的消息都會(huì)被分配給receiveComputeOrder方法。
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='food'")
public void receiveFoodOrder(@Payload FoodOrder foodOrder) {
// handle the message
}
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='compute'")
public void receiveComputeOrder(@Payload ComputeOrder computeOrder) {
// handle the message
}
}
小結(jié)
本文主要介紹了Spring Cloud Stream中涉及到的相關(guān)概念绞绒,重點(diǎn)介紹了Spring Cloud Stream的編程模型婶希,為后面文章實(shí)戰(zhàn)應(yīng)用和自定義奠定一些基礎(chǔ)。Spring Cloud Stream封裝了多種消息中間件的操作接口蓬衡,目前只有kafka和rabbitmq喻杈,下一篇將會(huì)介紹如何自已實(shí)現(xiàn)一個(gè)Rocketmq的綁定器。