官方定義 Spring Cloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架。
應(yīng)用程序通過(guò) inputs 或者 outputs 來(lái)與 Spring Cloud Stream 中binder 交互欣尼,通過(guò)我們配置來(lái) binding 堕阔,而 Spring Cloud Stream 的 binder 負(fù)責(zé)與消息中間件交互溉贿。所以锥余,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅(qū)動(dòng)的方式躺苦。
通過(guò)使用Spring Integration來(lái)連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)瓮恭。Spring Cloud Stream 為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn)雄坪,引用了發(fā)布-訂閱、消費(fèi)組屯蹦、分區(qū)的三個(gè)核心概念维哈。目前僅支持RabbitMQ绳姨、Kafka。
這里還要講解一下什么是Spring Integration 阔挠? Integration 集成
企業(yè)應(yīng)用集成(EAI)是集成應(yīng)用之間數(shù)據(jù)和服務(wù)的一種應(yīng)用技術(shù)飘庄。四種集成風(fēng)格:
1.文件傳輸:兩個(gè)系統(tǒng)生成文件,文件的有效負(fù)載就是由另一個(gè)系統(tǒng)處理的消息购撼。該類風(fēng)格的例子之一是針對(duì)文件輪詢目錄或FTP目錄跪削,并處理該文件。
2.共享數(shù)據(jù)庫(kù):兩個(gè)系統(tǒng)查詢同一個(gè)數(shù)據(jù)庫(kù)以獲取要傳遞的數(shù)據(jù)迂求。一個(gè)例子是你部署了兩個(gè)EAR應(yīng)用碾盐,它們的實(shí)體類(JPA、Hibernate等)共用同一個(gè)表揩局。
3.遠(yuǎn)程過(guò)程調(diào)用:兩個(gè)系統(tǒng)都暴露另一個(gè)能調(diào)用的服務(wù)毫玖。該類例子有EJB服務(wù),或SOAP和REST服務(wù)凌盯。
4.消息:兩個(gè)系統(tǒng)連接到一個(gè)公用的消息系統(tǒng)孕豹,互相交換數(shù)據(jù),并利用消息調(diào)用行為十气。該風(fēng)格的例子就是眾所周知的中心輻射式的(hub-and-spoke)JMS架構(gòu)励背。
為什么需要SpringCloud Stream消息驅(qū)動(dòng)呢?
比方說(shuō)我們用到了RabbitMQ和Kafka砸西,由于這兩個(gè)消息中間件的架構(gòu)上的不同叶眉,像RabbitMQ有exchange,kafka有Topic芹枷,partitions分區(qū)衅疙,這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開(kāi)發(fā)給我們?cè)斐闪艘欢ǖ睦_,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種鸳慈,后面的業(yè)務(wù)需求饱溢,我想往另外一種消息隊(duì)列進(jìn)行遷移,這時(shí)候無(wú)疑就是一個(gè)災(zāi)難性的走芋,一大堆東西都要重新推倒重新做绩郎,因?yàn)樗覀兊南到y(tǒng)耦合了,這時(shí)候springcloud Stream給我們提供了一種解耦合的方式翁逞。
Spring Cloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架肋杖。應(yīng)用程序通過(guò) inputs 或者 outputs 來(lái)與 Spring Cloud Stream 中binder 交互,通過(guò)我們配置來(lái) binding 挖函,而 Spring Cloud Stream 的 binder 負(fù)責(zé)與中間件交互状植。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅(qū)動(dòng)的方式。
Spring Cloud Stream由一個(gè)中間件中立的核組成津畸。應(yīng)用通過(guò)Spring Cloud Stream插入的input(相當(dāng)于消費(fèi)者consumer振定,它是從隊(duì)列中接收消息的)和output(相當(dāng)于生產(chǎn)者producer,它是從隊(duì)列中發(fā)送消息的肉拓。)通道與外界交流吩案。
通道通過(guò)指定中間件的Binder實(shí)現(xiàn)與外部代理連接。業(yè)務(wù)開(kāi)發(fā)者不再關(guān)注具體消息中間件帝簇,只需關(guān)注Binder對(duì)應(yīng)用程序提供的抽象概念來(lái)使用消息中間件實(shí)現(xiàn)業(yè)務(wù)即可。
-
Binder
Binder 是 Spring Cloud Stream 的一個(gè)抽象概念靠益,是應(yīng)用與消息中間件之間的粘合劑丧肴。目前 Spring Cloud Stream 實(shí)現(xiàn)了 Kafka 和 Rabbit MQ 的binder。
通過(guò) binder 胧后,可以很方便的連接中間件芋浮,可以動(dòng)態(tài)的改變消息的destinations(對(duì)應(yīng)于 Kafka 的topic,Rabbit MQ 的 exchanges)壳快,這些都可以通過(guò)外部配置項(xiàng)來(lái)做到纸巷。甚至可以任意的改變中間件的類型而不需要修改一行代碼。
-
Publish-Subscribe
消息的發(fā)布(Publish)和訂閱(Subscribe)是事件驅(qū)動(dòng)的經(jīng)典模式眶痰。Spring Cloud Stream 的數(shù)據(jù)交互也是基于這個(gè)思想瘤旨。生產(chǎn)者把消息通過(guò)某個(gè) topic 廣播出去(Spring Cloud Stream 中的 destinations)。其他的微服務(wù)竖伯,通過(guò)訂閱特定 topic 來(lái)獲取廣播出來(lái)的消息來(lái)觸發(fā)業(yè)務(wù)的進(jìn)行存哲。
這種模式,極大的降低了生產(chǎn)者與消費(fèi)者之間的耦合七婴。即使有新的應(yīng)用的引入祟偷,也不需要破壞當(dāng)前系統(tǒng)的整體結(jié)構(gòu)。
-
Consumer Groups
“Group”打厘,如果使用過(guò) Kafka 的童鞋并不會(huì)陌生修肠。Spring Cloud Stream 的這個(gè)分組概念的意思基本和 Kafka 一致。
微服務(wù)中動(dòng)態(tài)的縮放同一個(gè)應(yīng)用的數(shù)量以此來(lái)達(dá)到更高的處理能力是非常必須的户盯。對(duì)于這種情況嵌施,同一個(gè)事件防止被重復(fù)消費(fèi),只要把這些應(yīng)用放置于同一個(gè) “group” 中莽鸭,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次艰管。
-
Consumer Groups
bindings 是我們通過(guò)配置把應(yīng)用和spring cloud stream 的 binder 綁定在一起,之后我們只需要修改 binding 的配置來(lái)達(dá)到動(dòng)態(tài)修改topic蒋川、exchange牲芋、type等一系列信息而不需要修改一行代碼。
Demo演練
這里新建3個(gè)模塊stream-Publish、stream-Subscribe1缸浦、stream-Subscribe2夕冲,其中stream-Publish作為消息發(fā)布模塊,stream-Subscribe1和stream-Subscribe2作為消息消費(fèi)模塊(沒(méi)有kafka的裂逐,windows下載鏈接:http://kafka.apache.org/downloads內(nèi)帶zookeeper)
springcloud版本:Hoxton.SR3
3個(gè)項(xiàng)目都需要以下依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
stream-Publish的yml配置
server:
port: 7888
spring:
application:
name: producer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中間件服務(wù)器
zk-nodes: localhost:2181 #Zookeeper的節(jié)點(diǎn)歹鱼,如果集群,后面加,號(hào)分隔
auto-create-topics: true #如果設(shè)置為false,就不會(huì)自動(dòng)創(chuàng)建Topic 有可能你Topic還沒(méi)創(chuàng)建就直接調(diào)用了卜高。
bindings:
output: #這里用stream給我們提供的默認(rèn)output弥姻,后面會(huì)講到自定義output
destination: stream-demo #消息發(fā)往的目的地
content-type: text/plain #消息發(fā)送的格式,接收端不用指定格式掺涛,但是發(fā)送端要
stream-Publish的SendService
@EnableBinding(Source.class)
public class SendService {
@Autowired
private Source source;
public void sendMsg(String msg) { h
source.output().send(MessageBuilder.withPayload(msg).build());
}
}
stream-Publish的測(cè)試Controller
@RestController
@SpringBootApplication
public class DemoApplication {
@Autowired
private SendService sendService;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@GetMapping("/send/{msg}")
public void send(@PathVariable("msg") String msg){
sendService.sendMsg(msg);
}
}
然后需要來(lái)寫(xiě)消息消費(fèi)者的代碼庭敦,stream-Subscribe1、stream-Subscribe2(同前一個(gè)薪缆,換個(gè)端口)
stream-Subscribe1的yml文件
server:
port: 7889
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
#input是接收秧廉,注意這里不能再像前面一樣寫(xiě)output了
input:
destination: stream-demo
stream-Subscribe1的接受消息的Service(啟動(dòng)類默認(rèn)就OK)
@EnableBinding(Sink.class)
public class RecieveService {
@StreamListener(Sink.INPUT)
public void recieve(Object payload){
System.out.println(payload);
}
}
然后我們啟動(dòng)zookeeper,和Kafka拣帽,然后啟動(dòng)這三個(gè)項(xiàng)目
訪問(wèn)我們stream-Publish的測(cè)試接口疼电,如下:
請(qǐng)求成功后我們可以看到消息消費(fèi)者的控制臺(tái)打印出消息hello
自定義信道
好了到現(xiàn)在為止,我們進(jìn)行了一個(gè)簡(jiǎn)單的消息發(fā)送和接收减拭,用的是Stream給我們提供的默認(rèn)Source蔽豺,Sink,接下來(lái)我們要自己進(jìn)行自定義拧粪,這種方式在工作中還是用的比較多的茫虽,因?yàn)槲覀円煌南⑼ǖ腊l(fā)消息,必然不能全都叫input,output的既们,那樣的話就亂套了濒析,因此首先自定義一個(gè)接口,如下:
Source(發(fā)射器): 一個(gè)接口類啥纸,內(nèi)部定義了一個(gè)輸出管道号杏,例如定義一個(gè)輸出管道 @output("XXOO")。說(shuō)明這個(gè)發(fā)射器將會(huì)向這個(gè)管道發(fā)射數(shù)據(jù)斯棒。
Sink(接收器):一個(gè)接口類盾致,內(nèi)部定義了一個(gè)輸入管道,例如定義一個(gè)輸入管道 @input("XXOO")荣暮。說(shuō)明這個(gè)接收器將會(huì)從這個(gè)管道接收數(shù)據(jù)庭惜。
Binder(綁定器):用于與管道進(jìn)行綁定。Binder將于消息中間件進(jìn)行關(guān)聯(lián)穗酥。@ EnableBinding (Source.class/Sink.class)护赊。@EnableBinding()里面是可以定義多個(gè)發(fā)射器/接收器
自定義MySource:
在stream-Publish中創(chuàng)建自定義接口
public interface MySource {
@Output("myOutput")//管道名稱為"myOutput,對(duì)應(yīng)在yml文件里
MessageChannel myOutput();
}
修改stream-Publish中的SendService文件
@EnableBinding(MySource.class) //使用我們自定義的Mysource
public class SendService {
@Autowired
private MySource mySource;
public void sendMsg(String msg) {
mySource.myOutput().send(MessageBuilder.withPayload(msg).build());
}
}
修改stream-Publish中yml文件
server:
port: 7888
spring:
application:
name: producer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中間件服務(wù)器
zk-nodes: localhost:2181 #Zookeeper的節(jié)點(diǎn)惠遏,如果集群,后面加,號(hào)分隔
auto-create-topics: true #如果設(shè)置為false,就不會(huì)自動(dòng)創(chuàng)建Topic 有可能你Topic還沒(méi)創(chuàng)建就直接調(diào)用了骏啰。
bindings:
myOutput: #自定義output
destination: stream-demo #消息發(fā)往的目的地
content-type: text/plain #消息發(fā)送的格式节吮,接收端不用指定格式,但是發(fā)送端要 #這里用stream給我們提供的默認(rèn)output判耕,后面會(huì)講到自定義output
到這里透绩,我們的消息發(fā)送服務(wù)已經(jīng)修改完啦,接下來(lái)修改消息消費(fèi)服務(wù)stream-Subscribe1壁熄,類似于上面帚豪,之類我直接貼代碼了:
public interface MySink {
@Input("myInput")
SubscribableChannel myInput();
}
@EnableBinding(MySink.class)
public class RecieveService {
@StreamListener("myInput")
public void recieve(Object payload){
System.out.println(payload);
}
}
server:
port: 7889
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
#修改為我們自己的myInput
myInput:
destination: stream-demo
然后測(cè)試成功,和上面一樣就不放圖了
消息分組(Consumer Groups)
“Group”草丧,如果使用過(guò) Kafka 的讀者并不會(huì)陌生狸臣。Spring Cloud Stream 的這個(gè)分組概念的意思基本和 Kafka 一致。微服務(wù)中動(dòng)態(tài)的縮放同一個(gè)應(yīng)用的數(shù)量以此來(lái)達(dá)到更高的處理能力是非常必須的方仿。對(duì)于這種情況,同一個(gè)事件防止被重復(fù)消費(fèi)统翩,
只要把這些應(yīng)用放置于同一個(gè) “group” 中仙蚜,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。不同的組是可以消費(fèi)的厂汗,同一個(gè)組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系委粉,只有其中一個(gè)可以消費(fèi)。
我們只需要修改yml文件就可以啦娶桦,兩個(gè)消費(fèi)者服務(wù)都配置相同的名稱的group
server:
port: 7889
spring:
application:
name: consumer_1
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
input:
destination: stream-demo
group: group #加上一條group就可以啦
可以看到stream-Subscribe1和stream-Subscribe2是屬于同一組的贾节。springcloud-stream模塊的發(fā)的消息只能被stream-Subscribe1或stream-Subscribe2其中一個(gè)接收到,這樣避免了重復(fù)消費(fèi)衷畦。
消息分區(qū)
Spring Cloud Stream對(duì)給定應(yīng)用的多個(gè)實(shí)例之間分隔數(shù)據(jù)予以支持栗涂。在分隔方案中,物理交流媒介(如:代理主題)被視為分隔成了多個(gè)片(partitions)祈争。一個(gè)或者多個(gè)生產(chǎn)者應(yīng)用實(shí)例給多個(gè)消費(fèi)者應(yīng)用實(shí)例發(fā)送消息并確保相同特征的數(shù)據(jù)被同一消費(fèi)者實(shí)例處理斤程。
Spring Cloud Stream對(duì)分割的進(jìn)程實(shí)例實(shí)現(xiàn)進(jìn)行了抽象。使得Spring Cloud Stream 為不具備分區(qū)功能的消息中間件(RabbitMQ)也增加了分區(qū)功能擴(kuò)展菩混。
那么我們就要進(jìn)行一些配置了忿墅,比如我只想要stream-Subscribe2模塊接收到消息,stream-Subscribe2配置如下:
server:
port: 7890
spring:
application:
name: consumer_2
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
input:
destination: stream-demo
group: group
consumer:
partitioned: true #開(kāi)啟分區(qū)
instance-count: 2 #分區(qū)數(shù)量
stream-Publish模塊配置如下:
server:
port: 7888
spring:
application:
name: producer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-create-topics: true
bindings:
myOutput:
destination: stream-demo
content-type: text/plain
producer:
partitionKeyExpression: payload.id(你自己的key) #分區(qū)的主鍵沮峡,根據(jù)什么來(lái)分區(qū)疚脐,下面的payload.id只是一個(gè)對(duì)象的id用于做為Key,用來(lái)說(shuō)明的邢疙。希望不要誤解
partitionCount: 2 #Key和分區(qū)數(shù)量進(jìn)行取模去分配消息棍弄,這里分區(qū)數(shù)量配置為2
其他的代碼基本不變望薄,這里就不演示了。這里要給大家說(shuō)明一下照卦,比如分區(qū)的Key是一個(gè)對(duì)象的id式矫,比如說(shuō)id=1,每次發(fā)送消息的對(duì)象的id為相同值1,則消息只會(huì)被同一個(gè)消費(fèi)者消費(fèi)役耕,比如說(shuō)Key和分區(qū)數(shù)量取模計(jì)算的結(jié)果是分到stream2模塊中采转,那么下一次進(jìn)行進(jìn)行消息發(fā)送,
只要分組的key即id的值依然還是1的話瞬痘,消息永遠(yuǎn)只會(huì)分配到stream2模塊中故慈。