請(qǐng)先閱讀之前的內(nèi)容:
- Spring Cloud 學(xué)習(xí)筆記 - No.1 服務(wù)注冊(cè)發(fā)現(xiàn)
- Spring Cloud 學(xué)習(xí)筆記 - No.2 服務(wù)消費(fèi) Ribbon & Feign
- Spring Cloud 學(xué)習(xí)筆記 - No.3 分布式配置 Config
- Spring Cloud 學(xué)習(xí)筆記 - No.4 斷路器 Hystrix
- Spring Cloud 學(xué)習(xí)筆記 - No.5 服務(wù)網(wǎng)關(guān) Zuul
- Spring Cloud 學(xué)習(xí)筆記 - No.6 通過(guò) Swagger2 構(gòu)建 API 文檔
Spring Cloud Stream
Spring Cloud Stream 是一個(gè)用來(lái)為微服務(wù)應(yīng)用構(gòu)建消息驅(qū)動(dòng)能力的框架记罚,為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn)契沫,并且引入了發(fā)布-訂閱、消費(fèi)組以及消息分區(qū)這三個(gè)核心概念。
簡(jiǎn)單的說(shuō)匙监,Spring Cloud Stream 本質(zhì)上就是整合了 Spring Boot 和 Spring Integration,實(shí)現(xiàn)了一套輕量級(jí)的消息驅(qū)動(dòng)的微服務(wù)框架不翩。
通過(guò)使用 Spring Cloud Stream齿诞,可以有效地簡(jiǎn)化開(kāi)發(fā)人員對(duì)消息中間件的使用復(fù)雜度,讓系統(tǒng)開(kāi)發(fā)人員可以有更多的精力關(guān)注于核心業(yè)務(wù)邏輯的處理献起。目前為止 Spring Cloud Stream 只支持下面兩個(gè)消息中間件的自動(dòng)化配置:
-
RabbitMQ
- 點(diǎn)擊查看:安裝并啟動(dòng)一個(gè) RabbitMQ 實(shí)例
- Kafka
構(gòu)建一個(gè) Spring Cloud Stream 消費(fèi)者
我們利用之前創(chuàng)建的 eureka-consumer
項(xiàng)目洋访。
首先在 pom.xml
中添加如下的依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
其中 spring-cloud-starter-stream-rabbit
是 Spring Cloud Stream 對(duì) RabbitMQ 支持的封裝镣陕,其中包含了對(duì) RabbitMQ 的自動(dòng)化配置等內(nèi)容。
隨后創(chuàng)建用于接收來(lái)自 RabbitMQ 消息的消費(fèi)者 SinkReceiver
:
@EnableBinding(Sink.class)
public class SinkReceiver {
private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
logger.info("Received: " + payload);
}
}
-
@EnableBinding
注解用來(lái)指定一個(gè)或多個(gè)定義了@Input
或@Output
注解的接口姻政,以此實(shí)現(xiàn)對(duì)消息通道(Channel)的綁定呆抑。- 綁定了
Sink
接口,該接口是 Spring Cloud Stream 中默認(rèn)實(shí)現(xiàn)的對(duì)輸入消息通道綁定的定義 - Spring Cloud Stream 還默認(rèn)實(shí)現(xiàn)了綁定輸出消息通道的
Source
接口 - 還有結(jié)合了
Sink
和Source
的Processor
接口
- 綁定了
-
@StreamListener
注解用來(lái)將被修飾的方法注冊(cè)為消息中間件上數(shù)據(jù)流的事件監(jiān)聽(tīng)器汁展,注解中的屬性值對(duì)應(yīng)了監(jiān)聽(tīng)的消息通道名鹊碍。
重啟項(xiàng)目,從日志中可以看到聲明了一個(gè)名為 input.anonymous.cWlqMyH9Tm--INXERE6nhQ
的隊(duì)列食绿,并通過(guò) RabbitMessageChannelBinder
將自己綁定為它的消費(fèi)者侈咕。
c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.cWlqMyH9Tm--INXERE6nhQ, bound to: input
這些信息我們也能在 RabbitMQ 的控制臺(tái)中發(fā)現(xiàn)它們:
點(diǎn)擊進(jìn)去,通過(guò) Publish Message 功能來(lái)發(fā)送一條消息到該隊(duì)列中:
從下面的日志可以看出 SinkReceiver
讀取了消息隊(duì)列中的內(nèi)容器紧,由于我們沒(méi)有對(duì)消息進(jìn)行序列化耀销,所以輸出的只是該對(duì)象的引用:
[m--INXERE6nhQ-1] com.example.SinkReceiver : Received: [B@beb7ce8
在上面的操作中,我們并沒(méi)有手動(dòng)去配置 RabbitMQ 的信息铲汪,比如 IP树姨,端口等等,這是基于 Spring Boot 的設(shè)計(jì)理念桥状,提供了對(duì) RabbitMQ 默認(rèn)的自動(dòng)化配置帽揪。當(dāng)然,我們可以手動(dòng)在 application.properties
文件中去配置辅斟,例如:
spring.cloud.stream.bindings.input.destination=my_destination
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
編寫消費(fèi)消息的單元測(cè)試用例
@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkReceiverTests.SinkSender.class})
public class SinkReceiverTests {
@Autowired
private SinkSender sinkSender;
@Test
public void sinkSenderTester() {
sinkSender.output().send(MessageBuilder.withPayload("Testing Message").build());
}
public interface SinkSender {
String OUTPUT = "input";
@Output(SinkSender.OUTPUT)
MessageChannel output();
}
}
在上面的單元測(cè)試中转晰,我們通過(guò) @Output(SinkSender.OUTPUT)
定義了一個(gè)輸出通過(guò),而該輸出通道的名稱為 input
士飒,與前文中的 Sink
中定義的消費(fèi)通道同名查邢,所以這里的單元測(cè)試與前文的消費(fèi)者程序組成了一對(duì)生產(chǎn)者與消費(fèi)者。
運(yùn)行該單元測(cè)試酵幕,日志可以看出 SinkReceiver
讀取了消息隊(duì)列中的內(nèi)容:
[m--INXERE6nhQ-1] com.example.SinkReceiver : Received: [B@89040a9
Spring Cloud Stream 應(yīng)用模型
圖片引自:https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/
綁定器
Spring Cloud Stream 構(gòu)建的應(yīng)用程序與消息中間件之間是通過(guò)綁定器: Binder 相關(guān)聯(lián)的扰藕,綁定器對(duì)于應(yīng)用程序而言起到了隔離作用,它使得不同消息中間件的實(shí)現(xiàn)細(xì)節(jié)對(duì)應(yīng)用程序來(lái)說(shuō)是透明的芳撒。
當(dāng)我們需要升級(jí)消息中間件邓深,或是更換其他消息中間件產(chǎn)品時(shí),我們要做的就是更換它們對(duì)應(yīng)的 Binder 綁定器而不需要修改任何Spring Boot的應(yīng)用邏輯笔刹。
所以對(duì)于每一個(gè) Spring Cloud Stream 的應(yīng)用程序來(lái)說(shuō)芥备,它不需要知曉消息中間件的通信細(xì)節(jié),它只需要知道 Binder 對(duì)應(yīng)用程序提供的概念去實(shí)現(xiàn)即可舌菜,而這個(gè)概念就是消息通道:Channel萌壳。
發(fā)布-訂閱模式
消息會(huì)通過(guò)共享的 Topic 主題進(jìn)行廣播,消息消費(fèi)者在訂閱的主題中收到它并觸發(fā)自身的業(yè)務(wù)邏輯處理。
這里所提到的 Topic 主題是 Spring Cloud Stream 中的一個(gè)抽象概念袱瓮,用來(lái)代表發(fā)布共享消息給消費(fèi)者的地方缤骨。
在不同的消息中間件中,Topic 可能對(duì)應(yīng)著不同的概念尺借,比如:在 RabbitMQ 中的它對(duì)應(yīng)了 Exchange绊起、而在 Kakfa 中則對(duì)應(yīng)了 Kafka 中的 Topic。
在上面的例子中褐望,應(yīng)用啟動(dòng)的時(shí)候勒庄,在 RabbitMQ 的 Exchange 中也創(chuàng)建了一個(gè)名為 input
的 Exchange交換器。例如我們分別以 3001 和 3002 兩個(gè)端口啟動(dòng) eureka-consumer
項(xiàng)目瘫里。
可以看到 Queues 中有兩個(gè) Queue:
可以看到 Channels 中有兩個(gè) Channel:
可以看出 Exchanges 中只有一個(gè)名稱為 input
的 Exchange实蔽,即 Topic 主題。但是點(diǎn)進(jìn)去谨读,可以看出這個(gè)名稱為 input
的 Exchange 有綁定了兩個(gè)消息隊(duì)列:
如果我們通過(guò) Exchange 頁(yè)面的 Publish Message 來(lái)發(fā)布消息局装,可以發(fā)現(xiàn)兩個(gè)啟動(dòng)的應(yīng)用程序都輸出了消息內(nèi)容。
圖片引自 http://blog.didispace.com/spring-cloud-starter-dalston-7-2/
相對(duì)于點(diǎn)對(duì)點(diǎn)隊(duì)列實(shí)現(xiàn)的消息通信來(lái)說(shuō)劳殖,Spring Cloud Stream 采用的發(fā)布-訂閱模式可以有效的降低消息生產(chǎn)者與消費(fèi)者之間的耦合铐尚,當(dāng)我們需要對(duì)同一類消息增加一種處理方式時(shí),只需要增加一個(gè)應(yīng)用程序并將輸入通道綁定到既有的 Topic 中就可以實(shí)現(xiàn)功能的擴(kuò)展哆姻,而不需要改變?cè)瓉?lái)已經(jīng)實(shí)現(xiàn)的任何內(nèi)容宣增。
消費(fèi)組
很多情況下,消息生產(chǎn)者發(fā)送消息給某個(gè)具體微服務(wù)時(shí)矛缨,只希望被消費(fèi)一次爹脾,但是上面我們啟動(dòng)兩個(gè)應(yīng)用(3001 和 3002 兩個(gè)端口),這個(gè)消息出現(xiàn)了被重復(fù)消費(fèi)兩次的情況箕昭。
為了解決這個(gè)問(wèn)題灵妨,在 Spring Cloud Stream 中提供了消費(fèi)組的概念。
如果在同一個(gè)主題上的應(yīng)用需要啟動(dòng)多個(gè)實(shí)例的時(shí)候落竹,我們可以通過(guò)spring.cloud.stream.bindings.<channelName>.group
屬性為應(yīng)用指定一個(gè)組名泌霍,這樣這個(gè)應(yīng)用的多個(gè)實(shí)例在接收到消息的時(shí)候,只會(huì)有一個(gè)成員真正的收到消息并進(jìn)行處理述召。
例如朱转,我們?cè)?eureka-consumer
項(xiàng)目的配置中增加:
spring.cloud.stream.bindings.input.group=eureka-consumer-input-group
重啟兩個(gè)端口的實(shí)例,隨后通過(guò) Exchange 頁(yè)面的 Publish Message 來(lái)發(fā)布消息桨武,可以發(fā)現(xiàn)只有一個(gè)啟動(dòng)的應(yīng)用程序都輸出了消息內(nèi)容肋拔。并且有時(shí)候是 3001 端口的實(shí)例處理,有時(shí)候是 3002 端口的實(shí)例處理呀酸。
也就是說(shuō),對(duì)于同一條消息琼梆,它多次到達(dá)之后可能是由不同的實(shí)例進(jìn)行消費(fèi)的性誉。
消息分區(qū)
在上面的實(shí)驗(yàn)中可以看到窿吩,消費(fèi)組并無(wú)法控制消息具體被哪個(gè)實(shí)例消費(fèi)。但是對(duì)于一些業(yè)務(wù)場(chǎng)景错览,就需要對(duì)于一些具有相同特征的消息每次都可以被同一個(gè)消費(fèi)實(shí)例處理纫雁。比如:一些用于監(jiān)控服務(wù),為了統(tǒng)計(jì)某段時(shí)間內(nèi)消息生產(chǎn)者發(fā)送的報(bào)告內(nèi)容倾哺,監(jiān)控服務(wù)需要在自身內(nèi)容聚合這些數(shù)據(jù)轧邪,那么消息生產(chǎn)者可以為消息增加一個(gè)固有的特征 ID 來(lái)進(jìn)行分區(qū),使得擁有這些 ID 的消息每次都能被發(fā)送到一個(gè)特定的實(shí)例上實(shí)現(xiàn)累計(jì)統(tǒng)計(jì)的效果羞海,否則這些數(shù)據(jù)就會(huì)分散到各個(gè)不同的節(jié)點(diǎn)導(dǎo)致監(jiān)控結(jié)果不一致的情況忌愚。
而消息分區(qū)概念的引入就是為了解決這樣的問(wèn)題:當(dāng)生產(chǎn)者將消息數(shù)據(jù)發(fā)送給多個(gè)消費(fèi)者實(shí)例時(shí),保證擁有共同特征的消息數(shù)據(jù)始終是由同一個(gè)消費(fèi)者實(shí)例接收和處理却邓。
例如硕糊,我們?cè)?eureka-consumer
項(xiàng)目的配置中增加:
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
-
spring.cloud.stream.bindings.input.consumer.partitioned
:通過(guò)該參數(shù)開(kāi)啟消費(fèi)者分區(qū)功能; -
spring.cloud.stream.instanceCount
:該參數(shù)指定了當(dāng)前消費(fèi)者的總實(shí)例數(shù)量腊徙; -
spring.cloud.stream.instanceIndex
:該參數(shù)設(shè)置當(dāng)前實(shí)例的索引號(hào)简十,從0開(kāi)始,最大值為spring.cloud.stream.instanceCount
- 1撬腾。
Spring Cloud Stream VS Spring Cloud Bus
我們?cè)?Spring Cloud 學(xué)習(xí)筆記 - No.3 分布式配置 Config 中使用了 Spring Cloud Bus(結(jié)合了 RabbitMQ)螟蝙,那么 Stream 和 Bus 的區(qū)別是什么?
-
Spring Cloud Stream 構(gòu)建消息驅(qū)動(dòng)微服務(wù)
- building highly scalable event-driven microservices connected with shared messaging systems.
-
Spring Cloud Bus 廣播(例如配置統(tǒng)一管理)和監(jiān)控
- Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions.
RabbitMQ 負(fù)載均衡
在上面的例子中民傻,我們始終只有一個(gè) RabbitMQ 實(shí)例胰默。在生產(chǎn)環(huán)境中,我們可能需要多個(gè) RabbitMQ 實(shí)例來(lái)實(shí)現(xiàn)高并發(fā)和高可用饰潜。
參見(jiàn):
引用:
程序猿DD Spring Cloud基礎(chǔ)教程
Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動(dòng)的微服務(wù)(入門)【Dalston版】
Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動(dòng)的微服務(wù)(核心概念)【Dalston版】
Spring Cloud Dalston中文文檔