通過《Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動的微服務(wù)(入門)》一文,相信大家對Spring Cloud Stream的工作模式已經(jīng)有了一些基礎(chǔ)概念,比如:輸入、輸出通道的綁定,通道消息事件的監(jiān)聽等鱼响。下面在本文中,我們將詳細(xì)介紹一下Spring Cloud Stream中是如何通過定義一些基礎(chǔ)概念來對各種不同的消息中間件做抽象的组底。
下圖是官方文檔中對于Spring Cloud Stream應(yīng)用模型的結(jié)構(gòu)圖丈积。從中我們可以看到,Spring Cloud Stream構(gòu)建的應(yīng)用程序與消息中間件之間是通過綁定器Binder
相關(guān)聯(lián)的债鸡,綁定器對于應(yīng)用程序而言起到了隔離作用江滨,它使得不同消息中間件的實現(xiàn)細(xì)節(jié)對應(yīng)用程序來說是透明的。所以對于每一個Spring Cloud Stream的應(yīng)用程序來說厌均,它不需要知曉消息中間件的通信細(xì)節(jié)唬滑,它只需要知道Binder
對應(yīng)用程序提供的概念去實現(xiàn)即可,而這個概念就是在快速入門中我們提到的消息通道:Channel
棺弊。如下圖案例晶密,在應(yīng)用程序和Binder之間定義了兩條輸入通道和三條輸出通道來傳遞消息,而綁定器則是作為這些通道和消息中間件之間的橋梁進行通信模她。
綁定器
Binder
綁定器是Spring Cloud Stream中一個非常重要的概念稻艰。在沒有綁定器這個概念的情況下,我們的Spring Boot應(yīng)用要直接與消息中間件進行信息交互的時候侈净,由于各消息中間件構(gòu)建的初衷不同连锯,它們的實現(xiàn)細(xì)節(jié)上會有較大的差異性归苍,這使得我們實現(xiàn)的消息交互邏輯就會非常笨重,因為對具體的中間件實現(xiàn)細(xì)節(jié)有太重的依賴运怖,當(dāng)中間件有較大的變動升級、或是更換中間件的時候夏伊,我們就需要付出非常大的代價來實施摇展。
通過定義綁定器作為中間層,完美地實現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離溺忧。通過向應(yīng)用程序暴露統(tǒng)一的Channel
通道咏连,使得應(yīng)用程序不需要再考慮各種不同的消息中間件實現(xiàn)。當(dāng)我們需要升級消息中間件鲁森,或是更換其他消息中間件產(chǎn)品時祟滴,我們要做的就是更換它們對應(yīng)的Binder
綁定器而不需要修改任何Spring Boot的應(yīng)用邏輯。這一點在上一章實現(xiàn)消息總線時歌溉,從RabbitMQ切換到Kafka的過程中垄懂,已經(jīng)能夠讓我們體驗到這一好處。
目前版本的Spring Cloud Stream為主流的消息中間件產(chǎn)品RabbitMQ和Kafka提供了默認(rèn)的Binder
實現(xiàn)痛垛,在快速入門的例子中草慧,我們就使用了RabbitMQ的Binder
。另外匙头,Spring Cloud Stream還實現(xiàn)了一個專門用于測試的TestSupportBinder
漫谷,開發(fā)者可以直接使用它來對通道的接收內(nèi)容進行可靠的測試斷言。如果要使用除了RabbitMQ和Kafka以外的消息中間件的話蹂析,我們也可以通過使用它所提供的擴展API來實現(xiàn)其他中間件的Binder
舔示。
仔細(xì)的讀者可能已經(jīng)發(fā)現(xiàn),我們在快速入門示例中电抚,并沒有使用application.properties
或是application.yml
來做任何屬性設(shè)置惕稻。那是因為它也秉承了Spring Boot的設(shè)計理念,提供了對RabbitMQ默認(rèn)的自動化配置喻频。當(dāng)然缩宜,我們也可以通過Spring Boot應(yīng)用支持的任何方式來修改這些配置,比如:通過應(yīng)用程序參數(shù)甥温、環(huán)境變量锻煌、application.properties
或是application.yml
配置文件等。比如姻蚓,下面就是通過配置文件來對RabbitMQ的連接信息以及input通道的主題進行配置的示例:
spring.cloud.stream.bindings.input.destination=raw-sensor-data
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=springcloud
spring.rabbitmq.password=123456
發(fā)布-訂閱模式
在Spring Cloud Stream中的消息通信方式遵循了發(fā)布-訂閱模式宋梧,當(dāng)一條消息被投遞到消息中間件之后,它會通過共享的Topic
主題進行廣播狰挡,消息消費者在訂閱的主題中收到它并觸發(fā)自身的業(yè)務(wù)邏輯處理捂龄。這里所提到的Topic
主題是Spring Cloud Stream中的一個抽象概念释涛,用來代表發(fā)布共享消息給消費者的地方。在不同的消息中間件中倦沧,Topic
可能對應(yīng)著不同的概念唇撬,比如:在RabbitMQ中的它對應(yīng)了Exchange、而在Kakfa中則對應(yīng)了Kafka中的Topic展融。
在快速入門的示例中窖认,我們通過RabbitMQ的Channel
進行發(fā)布消息給我們編寫的應(yīng)用程序消費,而實際上Spring Cloud Stream應(yīng)用啟動的時候告希,在RabbitMQ的Exchange中也創(chuàng)建了一個名為input
的Exchange交換器扑浸,由于Binder
的隔離作用,應(yīng)用程序并無法感知它的存在燕偶,應(yīng)用程序只知道自己指向Binder
的輸入或是輸出通道喝噪。為了直觀的感受發(fā)布-訂閱模式中,消息是如何被分發(fā)到多個訂閱者的指么,我們可以使用快速入門的例子酝惧,通過命令行的方式啟動兩個不同端口的進程。此時涧尿,我們在RabbitMQ控制頁面的Channels標(biāo)簽頁中看到如下圖所示的兩個消息通道系奉,它們分別綁定了啟動的兩個應(yīng)用程序。
而在Exchanges標(biāo)簽頁中姑廉,我們還能找到名為input
的交換器缺亮,點擊進入可以看到如下圖所示的詳情頁面,其中在Bindings中的內(nèi)容就是兩個應(yīng)用程序綁定通道中的消息隊列桥言,我們可以通過Exchange頁面的Publish Message來發(fā)布消息萌踱,此時可以發(fā)現(xiàn)兩個啟動的應(yīng)用程序都輸出了消息內(nèi)容。
下圖總結(jié)了我們上面所做嘗試的基礎(chǔ)結(jié)構(gòu)号阿,我們啟動的兩個應(yīng)用程序分別是“訂閱者-1”和“訂閱者-2”并鸵,他們都建立了一條輸入通道綁定到同一個Topic
(RabbitMQ的Exchange)上。當(dāng)該Topic
中有消息發(fā)布進來后扔涧,連接到該Topic
上的所有訂閱者可以收到該消息并根據(jù)自身的需求進行消費操作园担。
相對于點對點隊列實現(xiàn)的消息通信來說,Spring Cloud Stream采用的發(fā)布-訂閱模式可以有效的降低消息生產(chǎn)者與消費者之間的耦合枯夜,當(dāng)我們需要對同一類消息增加一種處理方式時弯汰,只需要增加一個應(yīng)用程序并將輸入通道綁定到既有的Topic
中就可以實現(xiàn)功能的擴展,而不需要改變原來已經(jīng)實現(xiàn)的任何內(nèi)容湖雹。
消費組
雖然Spring Cloud Stream通過發(fā)布-訂閱模式將消息生產(chǎn)者與消費者做了很好的解耦咏闪,基于相同主題的消費者可以輕松的進行擴展,但是這些擴展都是針對不同的應(yīng)用實例而言的摔吏,在現(xiàn)實的微服務(wù)架構(gòu)中鸽嫂,我們每一個微服務(wù)應(yīng)用為了實現(xiàn)高可用和負(fù)載均衡纵装,實際上都會部署多個實例。很多情況下据某,消息生產(chǎn)者發(fā)送消息給某個具體微服務(wù)時橡娄,只希望被消費一次,按照上面我們啟動兩個應(yīng)用的例子哗脖,雖然它們同屬一個應(yīng)用瀑踢,但是這個消息出現(xiàn)了被重復(fù)消費兩次的情況。為了解決這個問題才避,在Spring Cloud Stream中提供了消費組的概念。
如果在同一個主題上的應(yīng)用需要啟動多個實例的時候氨距,我們可以通過spring.cloud.stream.bindings.input.group
屬性為應(yīng)用指定一個組名桑逝,這樣這個應(yīng)用的多個實例在接收到消息的時候,只會有一個成員真正的收到消息并進行處理俏让。如下圖所示楞遏,我們?yōu)镾ervice-A和Service-B分別啟動了兩個實例,并且根據(jù)服務(wù)名進行了分組首昔,這樣當(dāng)消息進入主題之后寡喝,Group-A和Group-B都會收到消息的副本,但是在兩個組中都只會有一個實例對其進行消費勒奇。
默認(rèn)情況下预鬓,當(dāng)我們沒有為應(yīng)用指定消費組的時候,Spring Cloud Stream會為其分配一個獨立的匿名消費組赊颠。所以格二,如果同一主題下所有的應(yīng)用都沒有指定消費組的時候,當(dāng)有消息被發(fā)布之后竣蹦,所有的應(yīng)用都會對其進行消費顶猜,因為它們各自都屬于一個獨立的組中。大部分情況下痘括,我們在創(chuàng)建Spring Cloud Stream應(yīng)用的時候长窄,建議最好為其指定一個消費組,以防止對消息的重復(fù)處理纲菌,除非該行為需要這樣做(比如:刷新所有實例的配置等)挠日。
消息分區(qū)
通過引入消費組的概念,我們已經(jīng)能夠在多實例的情況下驰后,保障每個消息只被組內(nèi)一個實例進行消費肆资。通過上面對消費組參數(shù)設(shè)置后的實驗,我們可以觀察到灶芝,消費組并無法控制消息具體被哪個實例消費郑原。也就是說唉韭,對于同一條消息,它多次到達(dá)之后可能是由不同的實例進行消費的犯犁。但是對于一些業(yè)務(wù)場景属愤,就需要對于一些具有相同特征的消息每次都可以被同一個消費實例處理,比如:一些用于監(jiān)控服務(wù)酸役,為了統(tǒng)計某段時間內(nèi)消息生產(chǎn)者發(fā)送的報告內(nèi)容住诸,監(jiān)控服務(wù)需要在自身內(nèi)容聚合這些數(shù)據(jù),那么消息生產(chǎn)者可以為消息增加一個固有的特征ID來進行分區(qū)涣澡,使得擁有這些ID的消息每次都能被發(fā)送到一個特定的實例上實現(xiàn)累計統(tǒng)計的效果贱呐,否則這些數(shù)據(jù)就會分散到各個不同的節(jié)點導(dǎo)致監(jiān)控結(jié)果不一致的情況。而分區(qū)概念的引入就是為了解決這樣的問題:當(dāng)生產(chǎn)者將消息數(shù)據(jù)發(fā)送給多個消費者實例時入桂,保證擁有共同特征的消息數(shù)據(jù)始終是由同一個消費者實例接收和處理奄薇。
Spring Cloud Stream為分區(qū)提供了通用的抽象實現(xiàn),用來在消息中間件的上層實現(xiàn)分區(qū)處理抗愁,所以它對于消息中間件自身是否實現(xiàn)了消息分區(qū)并不關(guān)心馁蒂,這使得Spring Cloud Stream為不具備分區(qū)功能的消息中間件也增加了分區(qū)功能擴展。
原文首發(fā)于:http://blog.didispace.com/spring-cloud-starter-dalston-7-2/
本文內(nèi)容部分節(jié)選自我的《Spring Cloud微服務(wù)實戰(zhàn)》蜘腌,但對依賴的Spring Boot和Spring Cloud版本做了升級沫屡。