Spring Cloud Stream初探

最近有一段時(shí)間沒有學(xué)習(xí)Spring相關(guān)的內(nèi)容了,上次和一個(gè)朋友聊天說到了Spring Cloud Stream,聽他說的是這是一個(gè)消息隊(duì)列的技術(shù).之前我了解到的是有Spring Cloud Bus這樣一個(gè)技術(shù),但是因?yàn)榫τ邢蘅隙ú荒苁裁炊既W(xué)習(xí),所以今天簡(jiǎn)單了解了一下Spring Cloud BusSpring Cloud Stream.
文檔地址:Spring Cloud

一吓歇、簡(jiǎn)介

Spring Cloud Bus官方的簡(jiǎn)介:

Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions. AMQP and Kafka broker implementations are included with the project. Alternatively, any Spring Cloud Stream binder found on the classpath will work out of the box as a transport.

Spring Cloud Stream官方簡(jiǎn)介:

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

根據(jù)上面的是簡(jiǎn)介,我們大概能了解到Spring Cloud BusSpring Cloud Stream的區(qū)別:
Spring Cloud Bus定位于通過輕量級(jí)的消息代理來鏈接分布式系統(tǒng)中的節(jié)點(diǎn),其主要用來廣播狀態(tài)(配置)或者其他管理變更。其實(shí)現(xiàn)了AMQPkafka。并且它還可以使用Spring Cloud Stream的消息綁定器.感覺Spring Cloud Bus更偏向于用來廣播配置方面的變更,而非業(yè)務(wù)方面的數(shù)據(jù)。
Spring Cloud Stream則是一個(gè)通過共有的消息系統(tǒng)構(gòu)建高擴(kuò)展性的,以事件驅(qū)動(dòng)的微服務(wù)的框架活喊。這和Spring Cloud Bus定位是不一樣的。
之前的項(xiàng)目中已經(jīng)多次整合了消息隊(duì)列KafkaRocketmq等等霎肯,感興趣的可以查看相關(guān)的文章。但是目前穩(wěn)定看到的Spring Cloud Stream官方支持的只有RabbitMQ榛斯、KafkaAmazon Kinesis观游。下面開始搭建項(xiàng)目。

二驮俗、創(chuàng)建項(xiàng)目

本次項(xiàng)目Spring Boot版本選擇的是:2.3.8.RELEASE懂缕;而Spring Cloud版本是:Hoxton.SR9Kafka版本是:2.5.1王凑。在測(cè)試中需要注意相關(guān)版本問題搪柑。
按照需要我創(chuàng)建兩個(gè)項(xiàng)目,一個(gè)消息生產(chǎn)者索烹,一個(gè)消息消費(fèi)者拌屏,因?yàn)橹皇菧y(cè)試Spring Cloud Stream,所以我項(xiàng)目依賴比較簡(jiǎn)單术荤,其中生產(chǎn)者額外添加了數(shù)據(jù)庫的相關(guān)依賴倚喂,而消費(fèi)者只有Spring Cloud Stream,為了方便這里只放出生產(chǎn)者項(xiàng)目的pom.xml瓣戚,如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.spring.cloud.stream.kafka</groupId>
    <artifactId>producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>producer</name>
    <description>spring cloud stream use kafka</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR9</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

接下來就是配置文件端圈,先看消息生產(chǎn)者項(xiàng)目(下文統(tǒng)一簡(jiǎn)稱為生產(chǎn)者):

server.port=8080

## JPA配置
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQL10Dialect
## 指定列名,不配置指定列名不生效
spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
spring.jpa.properties.hibernate.temp.use_jdbc_metadata_defaults=false
## 數(shù)據(jù)庫配置
spring.datasource.username=postgres
spring.datasource.password=123456
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/pgsql?useSSL=false&characterEncoding=utf8

## kafka基礎(chǔ)配置
spring.cloud.stream.default-binder=kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.kafka.binder.health-timeout=60
## 全局配置
#spring.cloud.stream.kafka.default.producer.<property>=<value>

## channel 名稱為自定義的out_channel
spring.cloud.stream.bindings.out_channel.binder=kafka
spring.cloud.stream.bindings.out_channel.content-type=application/json
spring.cloud.stream.bindings.out_channel.destination=topic_one
spring.cloud.stream.bindings.out_channel.group=group_one
spring.cloud.stream.bindings.out_channel.producer.auto-startup=true
spring.cloud.stream.bindings.out_channel.producer.partition-count=1

上述的配置文件中關(guān)于數(shù)據(jù)庫和JPA的配置可以忽略子库。相關(guān)的配置可以在官方文檔找到舱权,而且講解也比較詳細(xì),我們只是簡(jiǎn)單分析下上面的配置仑嗅,主要就是自定義消息發(fā)送的管道名稱out_channel宴倍,其對(duì)應(yīng)的topic张症、group還有就是發(fā)生消息的內(nèi)容類型,默認(rèn)就是application/json鸵贬,支持的其他類型還有text/plain俗他、application/xmltext/xml等阔逼。
接下來就是編寫相關(guān)的代碼兆衅,首先我們需要綁定消息通道的接口,用來綁定我們的輸入流和輸出流嗜浮,代碼如下:

public interface CustomChannel {

    /**
     * 輸出channel 名稱
     */
    String OUTPUT = "out_channel";

    /**
     * 輸入channel 名稱
     */
    String INPUT = "in_channel";

    @Output(value = CustomChannel.OUTPUT)
    MessageChannel output();

    @Input(value = CustomChannel.INPUT)
    SubscribableChannel input();
}

上面的@Output@Input分別表示的輸出管道和輸入管道羡亩,這兩個(gè)注解標(biāo),而它們的名稱就是由框架創(chuàng)建的bean的名稱危融,也就是配置文件我們配置的輸出管道和輸入管道的名稱畏铆。對(duì)于生產(chǎn)者我們只需要配置輸出管道即可,接下來我們創(chuàng)建一個(gè)消息生產(chǎn)者

@Component
public class MessageProducer {

    private CustomChannel source;

    public MessageProducer(CustomChannel source) {
        this.source = source;
    }

    public CustomChannel getSource() {
        return source;
    }
}

這個(gè)代碼其實(shí)沒什么特別的意義吉殃,不創(chuàng)建也可以辞居,因?yàn)橄⒆罱K的發(fā)送還是由綁定的輸出管道發(fā)送的。
編寫一個(gè)測(cè)試的Controller用來發(fā)送消息寨腔。

@RestController
@RequestMapping("/kafka")
public class SendMessageController {

    private TestService testService;

    public SendMessageController(TestService testService) {
        this.testService = testService;
    }

    @PostMapping("/send/message")
    public ResponseEntity<Boolean> sendMessage() {
        return ResponseEntity.ok(testService.sendMessage());
    }
}

具體的邏輯代碼,如下:

@Service
public class TestServiceImpl implements TestService {

    private MessageProducer messageProducer;

    private UserRepository userRepository;

    public TestServiceImpl(MessageProducer messageProducer, UserRepository userRepository) {
        this.messageProducer = messageProducer;
        this.userRepository = userRepository;
    }

    @Override
    public Boolean sendMessage() {
        List<UserEntity> userEntityList = userRepository.findAll();
        MessageBuilder<List<UserEntity>> messageBuilder = MessageBuilder.withPayload(userEntityList);
        boolean success = messageProducer.getSource().output().send(messageBuilder.build());
        return success;
    }
}

上面的代碼中我從數(shù)據(jù)庫查詢了一些數(shù)據(jù)率寡,然后由消息生產(chǎn)者迫卢,準(zhǔn)確的說是綁定的消息管道進(jìn)行發(fā)送,這里只是簡(jiǎn)單的發(fā)送了一個(gè)消息體冶共。
還有一個(gè)重要的事情沒有做乾蛤,就是開啟消息輸出管道和輸入管道和broker的綁定關(guān)系。這點(diǎn)一定一定別忘了捅僵,所以在啟動(dòng)類上添加@EnableBinding(CustomChannel.class)家卖。這個(gè)注解是支持多個(gè)綁定關(guān)系的,如果你自定義了多個(gè)輸出管道和輸入管道都可以添加上庙楚。
這里基本上生產(chǎn)者的相關(guān)代碼已經(jīng)完成了上荡。接下來我們開始消費(fèi)者項(xiàng)目(下文統(tǒng)稱消費(fèi)者)。
消費(fèi)者主要就是綁定輸入管道馒闷,項(xiàng)目配置文件如下:

server.port=9090

spring.cloud.stream.default-binder=kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9092

spring.cloud.stream.bindings.in_channel.binder=kafka
spring.cloud.stream.bindings.in_channel.destination=topic_one
#spring.cloud.stream.bindings.input.group=group_one
spring.cloud.stream.bindings.in_channel.content-type=application/json

配置非常簡(jiǎn)單酪捡,就是指定了Kafka以及輸入管道的topic和消息類型,其實(shí)這兩點(diǎn)只需要和生產(chǎn)者的配置保持一致即可纳账。
消費(fèi)者我們則需要?jiǎng)?chuàng)建消息的監(jiān)聽器來訂閱輸入管道的消息逛薇,代碼如下:

@Slf4j
@EnableBinding(CustomChannel.class)
public class TestListener {

    @StreamListener(target = CustomChannel.INPUT)
    public void consume(Message<String> message) {
        String body = message.getPayload();
        log.info(">>>> message={} <<<<",body);
    }
}

將生產(chǎn)者項(xiàng)目中綁定消息通道的接口復(fù)制一份到消費(fèi)者,同樣將其和broker進(jìn)行綁定疏虫。這里需要注意一點(diǎn)永罚,@EnableBinding不能直接添加到啟動(dòng)類上(除非你在啟動(dòng)類內(nèi)添加監(jiān)聽器)啤呼,而應(yīng)該添加到具體的消息監(jiān)聽器所在的類上。@StreamListener表明這個(gè)方法是一個(gè)輸入通道的消息的監(jiān)聽方法呢袱,也就是真正的消費(fèi)消息的方法(方法名稱無所謂)官扣,該注解的名稱就是輸入管道的名稱。
接下來我們就測(cè)試以下消息的發(fā)送和接收产捞,分別啟動(dòng)生產(chǎn)者和消費(fèi)者醇锚。并調(diào)用測(cè)試發(fā)送消息的接口,成功收到了生產(chǎn)者發(fā)送的消息列表坯临。當(dāng)然有的人會(huì)問焊唬,你在生產(chǎn)者里面使用了消息的泛型List<UserEntity>,而在消費(fèi)者里面消息的泛型則為String看靠,我的感覺這里因?yàn)榘l(fā)送消息的內(nèi)容是json赶促,因此可以直接使用String,當(dāng)然和生產(chǎn)者一樣使用相應(yīng)的泛型也可以挟炬,并不會(huì)影響消息的接收鸥滨。

三、總結(jié)

因?yàn)樯厦嬷皇亲隽艘粋€(gè)簡(jiǎn)單的例子谤祖,要說有什么很大的收貨確實(shí)是沒有婿滓,Spring Cloud Stream就是在KafkaRabbitMQ進(jìn)行了進(jìn)一步的抽象粥喜,它不關(guān)心你具體的使用那個(gè)類型的消息隊(duì)列凸主,只需要在配置中具體指定使用的類型即可,相當(dāng)于做了一個(gè)統(tǒng)一性的標(biāo)準(zhǔn)化的接口额湘,不需要額外的去配置具體的消息的監(jiān)聽器等等卿吐。但是就本次學(xué)習(xí)的quick start來講,個(gè)人感覺有點(diǎn)雞肋锋华,當(dāng)然這個(gè)見仁見智嗡官,大家有興趣都可以交流討論。因?yàn)闀r(shí)間關(guān)系本次學(xué)習(xí)就到這里毯焕,如果有什么疑問歡迎大家交流衍腥、討論,最后還是希望大家能多多關(guān)注我的VX個(gè)人號(hào)超超學(xué)堂纳猫,非常感謝紧阔。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市续担,隨后出現(xiàn)的幾起案子擅耽,更是在濱河造成了極大的恐慌,老刑警劉巖物遇,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乖仇,死亡現(xiàn)場(chǎng)離奇詭異憾儒,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)乃沙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門起趾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人警儒,你說我怎么就攤上這事训裆。” “怎么了蜀铲?”我有些...
    開封第一講書人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵边琉,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我记劝,道長(zhǎng)变姨,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任厌丑,我火速辦了婚禮定欧,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘怒竿。我一直安慰自己砍鸠,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開白布耕驰。 她就那樣靜靜地躺著爷辱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪耍属。 梳的紋絲不亂的頭發(fā)上托嚣,一...
    開封第一講書人閱讀 49,144評(píng)論 1 285
  • 那天巩检,我揣著相機(jī)與錄音厚骗,去河邊找鬼。 笑死兢哭,一個(gè)胖子當(dāng)著我的面吹牛领舰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播迟螺,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼冲秽,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了矩父?” 一聲冷哼從身側(cè)響起锉桑,我...
    開封第一講書人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎窍株,沒想到半個(gè)月后民轴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體攻柠,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年后裸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了瑰钮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡微驶,死狀恐怖浪谴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情因苹,我是刑警寧澤苟耻,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站容燕,受9級(jí)特大地震影響梁呈,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蘸秘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一官卡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧醋虏,春花似錦寻咒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至阻课,卻和暖如春叫挟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背限煞。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來泰國打工抹恳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人署驻。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓奋献,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親旺上。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瓶蚂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容