背景及痛點
現(xiàn)如今消息中間件(MQ)在互聯(lián)網(wǎng)項目中被廣泛的應用书蚪,特別是大數(shù)據(jù)行業(yè)應用的特別的多读存,現(xiàn)在市面上也流行這多個消息中間件框架让簿,比如ActiveMQ
莲祸、RabbitMQ
、RocketMQ
缴阎、Kafka
等,這些消息中間件各有各的優(yōu)劣贸典,但是想要解決的問題都基本相同。由于每個框架都有它自己的使用方式绳锅,這無疑是增加了開發(fā)者的學習成本以及添加相同的業(yè)務復雜度。框架的變更或者多個中間件的混合使用使得業(yè)務邏輯代碼中中間件的切換喳坠、項目的維護和開發(fā)都會變得更加繁瑣晾浴。
有沒有一種技術讓我們不再需要關注具體MQ的使用細節(jié),我們只需要專注業(yè)務邏輯的開發(fā)笙各,讓程序根據(jù)實際項目的使用自己去適配綁定数尿,自動在各種MQ內(nèi)切換呢何陆?springcloud stream
便為此而生。
關于stream
我們用一句話來描述stream就是:屏蔽底層消息中間件的差異钠怯,降低切換版本鞠鲜,統(tǒng)一消息的編程模型
官方定義SpringCloud Stream
是一個構(gòu)建消息驅(qū)動微服務的框架,應用通過inputs
或者outputs
來與SpringCloud Stream
中的binder
對象交互弄砍,我們通過配置來綁定消息中間件慨畸,而SpringCloud Stream
的binder
對象負責與消息中間件交互,所以我們只需要搞清楚如何與SpringCloud Stream
交互即可方便的使用消息中間件。
SpringCloud Stream
通過Spring Integration
來連接消息代理中間件以實現(xiàn)消息事件驅(qū)動凡人,它提供了個性化的自動化配置忠荞,引用了發(fā)布訂閱
、消費組
兴使、分區(qū)
的三個核心概念励幼,但是目前僅支持RabbitMQ
和Kafka
設計思想
在此之前
生產(chǎn)者和消費者通過消息媒介(queue等)傳遞信息內(nèi)容(Message)啡莉,消息必須通過特定的通道(MessageChannel)毛萌,通過消息的發(fā)布與訂閱來決定消息的發(fā)送和消費(publish/subscrib
)缤削。
引入中間件
現(xiàn)在假如我們用到了RabbitMQ
和Kafka
民宿,由于這兩個消息中間件的架構(gòu)上的不同着绷,像RabbitMQ
有Exchange
,而Kafka
有topiche
和Partitions
分區(qū)
(binder中锌云,input對于消費者荠医,output對應生產(chǎn)者。)
這些中間件的差異性導致我們實際項目開發(fā)給我們造成了一定的困擾桑涎,我們?nèi)绻昧藘蓚€消息隊列的其中一種,但是后面因為業(yè)務需求彬向,需要改用另外一種消息隊列進行遷移,這時候無疑就是一 個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統(tǒng)耦合了,這時候springcloud Stream
給我們提供了一種解耦合的方式攻冷。
屏蔽底層差異
在沒有綁定器(Builder
)這個概念的情況下娃胆,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,由于各消息中間件構(gòu)建的初衷不同,它們的實現(xiàn)細節(jié)上會有較大的差異性,通過定義綁定器作為中間件,完美地實現(xiàn)了應用程序與消息中間件細節(jié)之間的隔離等曼。通過向應用程序暴露統(tǒng)一的Channel通道里烦, 使得應用程序不需要再考慮各種不同的消息中間件實現(xiàn)。
通過定義綁定器Binder作為中間層禁谦,實現(xiàn)了應用程序與消息中間件細節(jié)之間的隔離胁黑。
處理架構(gòu)
Stream對消息中間件的進一步封裝可以做到代碼層面對中間件的無感知,甚至于動態(tài)的切換中間件(rabbitmq切換為kafka),使得微服務開發(fā)的高度解耦州泊,服務可以關注更多自己的業(yè)務流程丧蘸。
其遵循了發(fā)布-訂閱模式,主要使用的就是Topic主題進行廣播拥诡,RabbitMQ就是Exchange触趴,在Kafka中就是Topic
通過定義綁定器Binder作為中間層,實現(xiàn)了應用程序與消息中間件細節(jié)之間的隔離渴肉。
stream流程
-
Binder
:很方便的連接中間件冗懦,屏蔽差異 -
Channel
:通道是隊列Queue的一種抽象,在消息通訊系統(tǒng)中就是實現(xiàn)存儲和轉(zhuǎn)發(fā)的媒介仇祭,通過對Channel對隊列進行配置 -
Source和Sink
:簡單的可理解為參照對象是Spring Cloud Stream自身披蕉,從Stream發(fā)布消息就是輸出,接受消息就是輸入
常用api和注解
使用示例
基本環(huán)境
注冊中心
:Eureka乌奇,可以是其他没讲。-
消息中間件
:RabbitMQrabbitmq: host: localhost port: 5672 username: guest password: guest
生產(chǎn)端
依賴
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
# stream 配置
stream:
binders: # 配置綁定的消息中間件的服務信息
defaultRabbit: # 自定義的一個名稱,用來下面 bindings 綁定
type: rabbit # 消息組件的類型
environment: #相關環(huán)境配置礁苗,設置rabbitmq的環(huán)境
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 通道名稱
destination: testExchange # 定義要使用的Exchange的名稱
content-type: application/json # 設置消息類型爬凑,對象為json,文本是text/plain
binder: defaultRabbit # 設置要綁定的服務的具體設置试伙,就是我們上面配置的defaultRabbit
eureka:
client:
#表示是否將自己注冊進EurekaServer默認為true
register-with-eureka: true
#是否從EurekaServer抓取已有的注冊消息嘁信,默認為true,單節(jié)點無所謂疏叨,集群必須設置為true才能配合ribbon使用負載均衡
fetch-registry: true
service-url:
#單機版
defaultZone: http://localhost:8080/eureka/
instance:
prefer-ip-address: true
instance-id: sender01
定義接口
這里需要定義一個接口并實現(xiàn)它潘靖,方便其他業(yè)務調(diào)用。
public interface IMessageProvider {
/**
* 發(fā)送接口
* @param msg
* @return
*/
public String send(String msg);
}
接口實現(xiàn)
接口實現(xiàn)中需要添加
@EnableBinding
注解蚤蔓,并引入Source.class
,為什么引入Source.class
呢卦溢?因為它是生產(chǎn)者,我們參考stream流程圖就可以知道
import com.martain.study.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
@EnableBinding(Source.class)
public class MessageProvider implements IMessageProvider {
/**
* 注入消息發(fā)送管道
*/
@Resource
private MessageChannel output;
@Override
public String send(String msg) {
output.send(MessageBuilder.withPayload(msg).build());
System.out.println("******send message:"+msg);
return msg;
}
}
定義測試controller
@RestController
public class TestController {
@Autowired
IMessageProvider messageProvider;
@GetMapping("/sendMsg")
public String sendMsg(){
String msg = UUID.randomUUID().toString();
return messageProvider.send(msg);
}
}
啟動類
@SpringBootApplication
public class StreamProviderApplication8801 {
public static void main(String[] args) {
SpringApplication.run(StreamProviderApplication8801.class,args);
}
}
服務啟動之后秀又,多次請求/sendMsg
单寂,發(fā)送了多條消息。
消費端
依賴
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
與生產(chǎn)者類似涮坐,只是bindings中的output改成了input
server:
port: 8802
spring:
application:
name: cloud-stream-consume
cloud:
# stream 配置
stream:
binders: # 配置綁定的消息中間件的服務信息
defaultRabbit: # 自定義的一個名稱凄贩,用來下面 bindings 綁定
type: rabbit # 消息組件的類型
environment: #相關環(huán)境配置,設置rabbitmq的環(huán)境
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 通道名稱
destination: testExchange # 定義要使用的Exchange的名稱
content-type: application/json # 設置消息類型袱讹,對象為json疲扎,文本是text/plain
binder: defaultRabbit # 設置要綁定的服務的具體設置,就是我們上面配置的defaultRabbit
eureka:
client:
#表示是否將自己注冊進EurekaServer默認為true
register-with-eureka: true
#是否從EurekaServer抓取已有的注冊消息捷雕,默認為true椒丧,單節(jié)點無所謂,集群必須設置為true才能配合ribbon使用負載均衡
fetch-registry: true
service-url:
#單機版
defaultZone: http://localhost:8080/eureka/
instance:
prefer-ip-address: true
instance-id: recover01
接收服務
接收服務只需要再類名前添加
@EnableBinding()
注解救巷,并引入Sink.class
類壶熏,而實際接收的方法中需要添加@StreamListener(Sink.INPUT)
注解。
package com.martain.study.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
/**
* 獲取本服務的端口
*/
@Value("${server.port}")
private String serverPort;
/**
* 這里表示監(jiān)聽sink的input
* @param message
*/
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("**** recv msg :"+message.getPayload()+" in port "+serverPort);
}
}
啟動類
@SpringBootApplication
public class StreamConsumerApplication8802 {
public static void main(String[] args) {
SpringApplication.run(StreamConsumerApplication8802.class,args);
}
}
啟動生產(chǎn)服務后浦译,在啟動消費服務棒假,多次請求生產(chǎn)服務發(fā)送消息溯职,我們可以發(fā)現(xiàn)消費者能很快的消費這些消息。
消息分組
當我們有多個
消費者
時,這個時候生產(chǎn)者生產(chǎn)一條消息帽哑,會發(fā)現(xiàn)所有的消費者都會消費這個消息谜酒。比如在一些訂單系統(tǒng)的場景中,如果一個訂單被多個處理服務一起獲取到妻枕,就容易造成數(shù)據(jù)錯誤僻族,那我們?nèi)绾伪苊膺@種情況呢?這時我們就可以使用Stream的消息分組
來解決重復消費問題屡谐。
如何實現(xiàn)Stream的消息分組呢述么?我們只要簡單的在yml文件中配置spring.cloud.stream.bindings.input.group
即可。示例如下:
...
spring:
application:
name: cloud-stream-consume
cloud:
# stream 配置
stream:
binders: # 配置綁定的消息中間件的服務信息
defaultRabbit: # 自定義的一個名稱愕掏,用來下面 bindings 綁定
type: rabbit # 消息組件的類型
environment: #相關環(huán)境配置度秘,設置rabbitmq的環(huán)境
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 通道名稱
destination: testExchange # 定義要使用的Exchange的名稱
content-type: application/json # 設置消息類型,對象為json亭珍,文本是text/plain
binder: defaultRabbit # 設置要綁定的服務的具體設置敷钾,就是我們上面配置的defaultRabbit
group: groupA # 配置分組
...
如果沒有設置該屬性,當消費服務啟動時肄梨,會有個隨機的組名
阻荒。
如果我們將所有的消費服務的group
熟悉都設置成一致的話,這些服務就會在同一個組里面众羡,從而能夠保證消息只被應用消費一次侨赡。
同一組的消費者是競爭關系,不可以重復消費粱侣。
消息持久化
當生產(chǎn)者在持續(xù)生產(chǎn)消息羊壹,消費服務突然掛了,使得擁有許多消息并沒有被消費齐婴,如果消費沒有配置分組的話油猫,消費服務重啟是無法消費未消費的消息的,如果配置了分組的話柠偶,當消費服務重啟之后可以自動去消費未消費的數(shù)據(jù)情妖。