概述
流式微服務(wù)主要用于實(shí)時(shí)處理源源不斷的數(shù)據(jù)流摊灭,相對(duì)于應(yīng)用微服務(wù)咆贬,它對(duì)開發(fā)人員提出了更多的技術(shù)要求。數(shù)據(jù)本身不像是純業(yè)務(wù)應(yīng)用斟或,相對(duì)來說它是抽象的素征、復(fù)雜的,且在傳輸萝挤,存儲(chǔ)御毅,分析等方面都比應(yīng)用微服務(wù)提出了更高的性能要求。
在設(shè)計(jì)上怜珍,Spring Cloud Stream 是一個(gè)用于構(gòu)建消息驅(qū)動(dòng)的微服務(wù)框架端蛆,它基于Spring Boot來創(chuàng)建對(duì) DevOps 提供良好支持的微服務(wù)應(yīng)用,通過 Spring Integration 集成工具為消息連接與傳播提供一個(gè)統(tǒng)一的酥泛、靈活的操作平臺(tái)今豆。該操作平臺(tái)是可配置的,比如配置 生產(chǎn)者/消費(fèi)者 柔袁、消費(fèi)者所屬消費(fèi)者組甚至是消息分區(qū)(需要消息中間件提供商本身的支持)呆躲。
Spring Cloud Stream的使用非常簡(jiǎn)單,在應(yīng)用的main啟動(dòng)類上加上 @EnableBinding
注解就可以連接消息中間件捶索,創(chuàng)建一個(gè)流式微服務(wù)插掂。通過在方法上添加 @StreamListener
注解就可以異步處理所接收到的消息。
快速開始
下面通過一個(gè)簡(jiǎn)單的 sink 應(yīng)用來演示Stream應(yīng)用的消息接收者用法腥例。
@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(VoteRecordingSinkApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void processVote(Vote vote) {
votingService.recordVote(vote);
}
}
@EnableBinding
注解可以接收一個(gè)或多個(gè)輸入/輸出通道接口類作為參數(shù)(本示例中注解的參數(shù)只有 Sink
一個(gè)接口)辅甥,Spring Cloud Task默認(rèn)提供了 Source
, Sink
, 和 Processor
接口來定義消息源,接收者和處理通道燎竖。通道支持之定義擴(kuò)展璃弄。
下面是 Sink
消息接收者通道的接口定義代碼。
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
@Input
注解定義了輸入通道构回,通過這個(gè)注解可以接收進(jìn)入應(yīng)用的消息夏块。@Output
注解定義了消息輸出通道法疏咐,通過這個(gè)注解可以定義應(yīng)用發(fā)布消息的通道。這兩個(gè)注解的參數(shù)為輸入/輸出消息通道的名稱脐供,如果沒有給注解指定參數(shù)凳鬓,默認(rèn)使用注解所標(biāo)注的方法的方法名作為通道名稱。
Spring Cloud Stream會(huì)通過動(dòng)態(tài)代理技術(shù)自動(dòng)為消息通道接口創(chuàng)建實(shí)現(xiàn)類患民。下面代碼是測(cè)試消息通道是否創(chuàng)建成功的方法。
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class StreamApplicationTests {
@Autowired
private Sink sink;
@Test
public void contextLoads() {
assertNotNull(this.sink.input());
}
}
核心概念
Spring Cloud Stream為簡(jiǎn)化消息驅(qū)動(dòng)微服務(wù)設(shè)計(jì)了許多抽象和基礎(chǔ)組件垦梆,其核心簡(jiǎn)要概述如下所示匹颤。
- Spring Cloud 流式微服務(wù)模型
- Binder 抽象消息中間件
- 持續(xù)發(fā)布/訂閱支持
- 支持消費(fèi)者組
- 支持消息分區(qū)
- 可拔插的 Binder API
1. 流式微服務(wù)模型
一個(gè) Spring Cloud Stream 應(yīng)用以消息中間件為核心,應(yīng)用通過Spring Cloud Stream注入的輸入/輸出通道 channels
與外部進(jìn)行通信托猩。channels
通過特定的Binder實(shí)現(xiàn)與外部消息中間件進(jìn)行通信印蓖。如圖1Spring Cloud Stream 應(yīng)用架構(gòu)示意圖所示。
2. 對(duì)消息中間件的抽象
Spring Cloud Stream提供了對(duì)Kafka和Rabbit MQ的抽象Binder來代表消息中間件京腥,其自身也提供了測(cè)試用的中間件 TestSupportBinder
赦肃,可以直接向通道發(fā)送可靠的消息并進(jìn)行斷言測(cè)試,據(jù)此可以使用擴(kuò)展API編寫自己的Binder公浪。
Spring Cloud Stream使用通用的Spring Boot配置方式他宛,抽象的Binder為靈活配置如何連接消息中間件及發(fā)送消息提供了良好的支持。例如欠气,消息發(fā)布者可以在運(yùn)行時(shí)動(dòng)態(tài)的選擇通道要連接的目標(biāo)(kafka的topic或 RabbitMQ的exchanges)厅各,這些外部屬性配置可以通過Spring Boot給出(通過應(yīng)用參數(shù),環(huán)境變量预柒,application.yml或者application.properties配置文件)队塘。
Spring Cloud Stream會(huì)自動(dòng)發(fā)現(xiàn)并使用 classpath 中可用的 綁定器(binder),你可以通過在構(gòu)建項(xiàng)目時(shí)依賴不同的中間件來使同一份代碼支持多種消息中間件宜鸯。在復(fù)雜的應(yīng)用場(chǎng)景下憔古,你也可以在事先指定好每個(gè)通道綁定哪個(gè)Binder,直接打包多個(gè)binders淋袖。
3. 持續(xù)發(fā)布/訂閱支持
流式微服務(wù)應(yīng)用之間通過發(fā)布/訂閱模型通信鸿市,通過共享的話題topic來傳播數(shù)據(jù)。圖2展示了流式微服務(wù)集合的部署架構(gòu)圖适贸,直觀的說明了多個(gè)流式微服務(wù)之間的關(guān)系灸芳。
經(jīng)過HTTP接口的數(shù)據(jù)被轉(zhuǎn)發(fā)到raw-sensor-data
目標(biāo)主題。計(jì)算平均時(shí)間窗口和持久化數(shù)據(jù)到HDFS兩個(gè)相互獨(dú)立的微服務(wù)共同消費(fèi)這個(gè)主題拜姿。
這個(gè)基于發(fā)布訂閱的通信模型可以減少生產(chǎn)者和消費(fèi)者的復(fù)雜性烙样,且能夠在不破壞現(xiàn)有數(shù)據(jù)流的情況下擴(kuò)充拓?fù)溥壿嫛1热缛锓剩憧梢栽谟?jì)算平均時(shí)間窗口服務(wù)后增加一個(gè)用于監(jiān)控和展示最高值的應(yīng)用谒获,甚至還可以在同一個(gè)數(shù)據(jù)流中添加一個(gè)計(jì)算平均錯(cuò)誤時(shí)間的應(yīng)用蛤肌。微服務(wù)間通過共享主題的方式進(jìn)行通信比點(diǎn)對(duì)點(diǎn)通信的方式大大降低了服務(wù)之間的耦合。
發(fā)布訂閱模型并非新概念批狱,Spring Cloud Stream只是通過一些額外的步驟裸准,使發(fā)布訂閱模型成為構(gòu)建微服務(wù)的一種極佳選擇。通過對(duì)本地消息中間提供支持赔硫,Spring Cloud Stream可以簡(jiǎn)潔的構(gòu)建跨平臺(tái)的發(fā)布訂閱模型炒俱。
4. 消費(fèi)者組
發(fā)布訂閱模型讓應(yīng)用間通過共享話題topic通信連接變得相當(dāng)容易,不過在為高可用部署多個(gè)應(yīng)用實(shí)例時(shí)爪膊,還需要防止應(yīng)用對(duì)該話題topic中的消息重復(fù)消費(fèi)权悟。多個(gè)應(yīng)用實(shí)例之間應(yīng)該是一個(gè)競(jìng)爭(zhēng)消費(fèi)的關(guān)系,一條消息應(yīng)該只能被一個(gè)消費(fèi)者消費(fèi)推盛。
Spring Cloud Stream通過消費(fèi)者組的概念來模擬上述需求峦阁。每個(gè)消費(fèi)者輸入通道綁定器可以使用spring.cloud.stream.bindings.<channelName>.group
屬性來指定其所屬消費(fèi)者組。
4.1 持久的消費(fèi)者組
與Spring Cloud Stream的可獨(dú)立運(yùn)行服務(wù)模型一樣耘成,消費(fèi)者組的訂閱也是持久的榔昔。這句話的意思是說綁定器默認(rèn)自己自己所屬的消費(fèi)者組是持久存在的,只要該消費(fèi)者組中的某一個(gè)消費(fèi)者被創(chuàng)建瘪菌,除非所有的消費(fèi)者都被停止掉撒会,否則該組將不停的接收消息。
通常情況下控嗜,在已知消息目的地的時(shí)候茧彤,我們都會(huì)指定消費(fèi)者組,在升級(jí)到Spring Cloud Stream應(yīng)用時(shí)疆栏,你必須要為美俄輸入綁定器指定其所屬消費(fèi)者組曾掂,這樣的話就可以避免在部署多個(gè)應(yīng)用實(shí)例時(shí)重復(fù)消費(fèi)消息。
分區(qū)支持
Spring Cloud Stream為多個(gè)生產(chǎn)者實(shí)例的應(yīng)用提供消息分區(qū)的支持壁顶。在分區(qū)的情景下珠洗,物理上連接的媒體被視為多分區(qū)的結(jié)構(gòu),一個(gè)或多個(gè)生產(chǎn)者發(fā)送數(shù)據(jù)到多個(gè)消費(fèi)者若专,并保證某個(gè)具有某些特征的數(shù)據(jù)僅被某一個(gè)消費(fèi)者實(shí)例消費(fèi)许蓖。
Spring Cloud Stream提供公用的抽象,以統(tǒng)一的方式實(shí)現(xiàn)分區(qū)處理调衰。具體的分區(qū)策略可以由提供分區(qū)機(jī)制的消息中間件來實(shí)現(xiàn)膊爪。
分區(qū)是狀態(tài)處理中重要的概念。為確保相關(guān)數(shù)據(jù)能夠被一個(gè)服務(wù)一起處理嚎莉,無論在性能方面還是一致性方面米酬,分區(qū)的概念都是非常重要的。
關(guān)于
示例源碼
spring-cloud-stream-learning 的 stream-simple-listener 子項(xiàng)目
后記
Spring Cloud Stream提供的流式微服務(wù)是一個(gè)全新的概念趋箩,以消息驅(qū)動(dòng)為服務(wù)通信方式赃额,異步高性能的進(jìn)行數(shù)據(jù)流實(shí)時(shí)處理加派。不過其并未采用什么新技術(shù),而是以優(yōu)美的設(shè)計(jì)跳芳,來抽象各個(gè)消息中間件芍锦,屏蔽其內(nèi)部實(shí)現(xiàn)原理,使服務(wù)間的通信變得更為簡(jiǎn)單友好飞盆。
本文內(nèi)容主要是對(duì) Spring Cloud Stream Elmhurst 官方文檔的翻譯娄琉,不過作者水平有限,有不盡然的地方敬請(qǐng)指出吓歇。本項(xiàng)目和文檔中所用的內(nèi)容僅供學(xué)習(xí)和研究之用车胡,轉(zhuǎn)載或引用時(shí)請(qǐng)指明出處。如果你對(duì)文檔有疑問或問題照瘾,請(qǐng)?jiān)陧?xiàng)目中給我留言或發(fā)email到
weiwei02@vip.qq.com 我的github:
https://github.com/weiwei02/ 我相信技術(shù)能夠改變世界 。
鏈接
- 上篇文章暫無
- 下篇文章正在準(zhǔn)備中