最近有一段時(shí)間沒有學(xué)習(xí)Spring
相關(guān)的內(nèi)容了,上次和一個(gè)朋友聊天說到了Spring Cloud Stream
,聽他說的是這是一個(gè)消息隊(duì)列的技術(shù).之前我了解到的是有Spring Cloud Bus
這樣一個(gè)技術(shù),但是因?yàn)榫τ邢蘅隙ú荒苁裁炊既W(xué)習(xí),所以今天簡(jiǎn)單了解了一下Spring Cloud Bus
和Spring Cloud Stream
.
文檔地址:Spring Cloud
一吓歇、簡(jiǎn)介
Spring Cloud Bus
官方的簡(jiǎn)介:
Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions. AMQP and Kafka broker implementations are included with the project. Alternatively, any Spring Cloud Stream binder found on the classpath will work out of the box as a transport.
Spring Cloud Stream
官方簡(jiǎn)介:
Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.
根據(jù)上面的是簡(jiǎn)介,我們大概能了解到Spring Cloud Bus
和Spring Cloud Stream
的區(qū)別:
Spring Cloud Bus
定位于通過輕量級(jí)的消息代理來鏈接分布式系統(tǒng)中的節(jié)點(diǎn),其主要用來廣播狀態(tài)(配置)或者其他管理變更。其實(shí)現(xiàn)了AMQP
和kafka
。并且它還可以使用Spring Cloud Stream
的消息綁定器.感覺Spring Cloud Bus
更偏向于用來廣播配置方面的變更,而非業(yè)務(wù)方面的數(shù)據(jù)。
Spring Cloud Stream
則是一個(gè)通過共有的消息系統(tǒng)構(gòu)建高擴(kuò)展性的,以事件驅(qū)動(dòng)的微服務(wù)的框架活喊。這和Spring Cloud Bus
定位是不一樣的。
之前的項(xiàng)目中已經(jīng)多次整合了消息隊(duì)列Kafka
、Rocketmq
等等霎肯,感興趣的可以查看相關(guān)的文章。但是目前穩(wěn)定看到的Spring Cloud Stream
官方支持的只有RabbitMQ
榛斯、Kafka
和Amazon Kinesis
观游。下面開始搭建項(xiàng)目。
二驮俗、創(chuàng)建項(xiàng)目
本次項(xiàng)目Spring Boot
版本選擇的是:2.3.8.RELEASE
懂缕;而Spring Cloud
版本是:Hoxton.SR9
,Kafka
版本是:2.5.1王凑。在測(cè)試中需要注意相關(guān)版本問題搪柑。
按照需要我創(chuàng)建兩個(gè)項(xiàng)目,一個(gè)消息生產(chǎn)者索烹,一個(gè)消息消費(fèi)者拌屏,因?yàn)橹皇菧y(cè)試Spring Cloud Stream
,所以我項(xiàng)目依賴比較簡(jiǎn)單术荤,其中生產(chǎn)者額外添加了數(shù)據(jù)庫的相關(guān)依賴倚喂,而消費(fèi)者只有Spring Cloud Stream
,為了方便這里只放出生產(chǎn)者項(xiàng)目的pom.xml
瓣戚,如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.spring.cloud.stream.kafka</groupId>
<artifactId>producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>producer</name>
<description>spring cloud stream use kafka</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR9</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
接下來就是配置文件端圈,先看消息生產(chǎn)者項(xiàng)目(下文統(tǒng)一簡(jiǎn)稱為生產(chǎn)者):
server.port=8080
## JPA配置
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQL10Dialect
## 指定列名,不配置指定列名不生效
spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
spring.jpa.properties.hibernate.temp.use_jdbc_metadata_defaults=false
## 數(shù)據(jù)庫配置
spring.datasource.username=postgres
spring.datasource.password=123456
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/pgsql?useSSL=false&characterEncoding=utf8
## kafka基礎(chǔ)配置
spring.cloud.stream.default-binder=kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.kafka.binder.health-timeout=60
## 全局配置
#spring.cloud.stream.kafka.default.producer.<property>=<value>
## channel 名稱為自定義的out_channel
spring.cloud.stream.bindings.out_channel.binder=kafka
spring.cloud.stream.bindings.out_channel.content-type=application/json
spring.cloud.stream.bindings.out_channel.destination=topic_one
spring.cloud.stream.bindings.out_channel.group=group_one
spring.cloud.stream.bindings.out_channel.producer.auto-startup=true
spring.cloud.stream.bindings.out_channel.producer.partition-count=1
上述的配置文件中關(guān)于數(shù)據(jù)庫和JPA
的配置可以忽略子库。相關(guān)的配置可以在官方文檔找到舱权,而且講解也比較詳細(xì),我們只是簡(jiǎn)單分析下上面的配置仑嗅,主要就是自定義消息發(fā)送的管道名稱out_channel
宴倍,其對(duì)應(yīng)的topic
张症、group
還有就是發(fā)生消息的內(nèi)容類型,默認(rèn)就是application/json
鸵贬,支持的其他類型還有text/plain
俗他、application/xml
、text/xml
等阔逼。
接下來就是編寫相關(guān)的代碼兆衅,首先我們需要綁定消息通道的接口,用來綁定我們的輸入流和輸出流嗜浮,代碼如下:
public interface CustomChannel {
/**
* 輸出channel 名稱
*/
String OUTPUT = "out_channel";
/**
* 輸入channel 名稱
*/
String INPUT = "in_channel";
@Output(value = CustomChannel.OUTPUT)
MessageChannel output();
@Input(value = CustomChannel.INPUT)
SubscribableChannel input();
}
上面的@Output
和@Input
分別表示的輸出管道和輸入管道羡亩,這兩個(gè)注解標(biāo),而它們的名稱就是由框架創(chuàng)建的bean
的名稱危融,也就是配置文件我們配置的輸出管道和輸入管道的名稱畏铆。對(duì)于生產(chǎn)者我們只需要配置輸出管道即可,接下來我們創(chuàng)建一個(gè)消息生產(chǎn)者
@Component
public class MessageProducer {
private CustomChannel source;
public MessageProducer(CustomChannel source) {
this.source = source;
}
public CustomChannel getSource() {
return source;
}
}
這個(gè)代碼其實(shí)沒什么特別的意義吉殃,不創(chuàng)建也可以辞居,因?yàn)橄⒆罱K的發(fā)送還是由綁定的輸出管道發(fā)送的。
編寫一個(gè)測(cè)試的Controller
用來發(fā)送消息寨腔。
@RestController
@RequestMapping("/kafka")
public class SendMessageController {
private TestService testService;
public SendMessageController(TestService testService) {
this.testService = testService;
}
@PostMapping("/send/message")
public ResponseEntity<Boolean> sendMessage() {
return ResponseEntity.ok(testService.sendMessage());
}
}
具體的邏輯代碼,如下:
@Service
public class TestServiceImpl implements TestService {
private MessageProducer messageProducer;
private UserRepository userRepository;
public TestServiceImpl(MessageProducer messageProducer, UserRepository userRepository) {
this.messageProducer = messageProducer;
this.userRepository = userRepository;
}
@Override
public Boolean sendMessage() {
List<UserEntity> userEntityList = userRepository.findAll();
MessageBuilder<List<UserEntity>> messageBuilder = MessageBuilder.withPayload(userEntityList);
boolean success = messageProducer.getSource().output().send(messageBuilder.build());
return success;
}
}
上面的代碼中我從數(shù)據(jù)庫查詢了一些數(shù)據(jù)率寡,然后由消息生產(chǎn)者迫卢,準(zhǔn)確的說是綁定的消息管道進(jìn)行發(fā)送,這里只是簡(jiǎn)單的發(fā)送了一個(gè)消息體冶共。
還有一個(gè)重要的事情沒有做乾蛤,就是開啟消息輸出管道和輸入管道和broker
的綁定關(guān)系。這點(diǎn)一定一定別忘了捅僵,所以在啟動(dòng)類上添加@EnableBinding(CustomChannel.class)
家卖。這個(gè)注解是支持多個(gè)綁定關(guān)系的,如果你自定義了多個(gè)輸出管道和輸入管道都可以添加上庙楚。
這里基本上生產(chǎn)者的相關(guān)代碼已經(jīng)完成了上荡。接下來我們開始消費(fèi)者項(xiàng)目(下文統(tǒng)稱消費(fèi)者)。
消費(fèi)者主要就是綁定輸入管道馒闷,項(xiàng)目配置文件如下:
server.port=9090
spring.cloud.stream.default-binder=kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.bindings.in_channel.binder=kafka
spring.cloud.stream.bindings.in_channel.destination=topic_one
#spring.cloud.stream.bindings.input.group=group_one
spring.cloud.stream.bindings.in_channel.content-type=application/json
配置非常簡(jiǎn)單酪捡,就是指定了Kafka
以及輸入管道的topic
和消息類型,其實(shí)這兩點(diǎn)只需要和生產(chǎn)者的配置保持一致即可纳账。
消費(fèi)者我們則需要?jiǎng)?chuàng)建消息的監(jiān)聽器來訂閱輸入管道的消息逛薇,代碼如下:
@Slf4j
@EnableBinding(CustomChannel.class)
public class TestListener {
@StreamListener(target = CustomChannel.INPUT)
public void consume(Message<String> message) {
String body = message.getPayload();
log.info(">>>> message={} <<<<",body);
}
}
將生產(chǎn)者項(xiàng)目中綁定消息通道的接口復(fù)制一份到消費(fèi)者,同樣將其和broker
進(jìn)行綁定疏虫。這里需要注意一點(diǎn)永罚,@EnableBinding
不能直接添加到啟動(dòng)類上(除非你在啟動(dòng)類內(nèi)添加監(jiān)聽器)啤呼,而應(yīng)該添加到具體的消息監(jiān)聽器所在的類上。@StreamListener
表明這個(gè)方法是一個(gè)輸入通道的消息的監(jiān)聽方法呢袱,也就是真正的消費(fèi)消息的方法(方法名稱無所謂)官扣,該注解的名稱就是輸入管道的名稱。
接下來我們就測(cè)試以下消息的發(fā)送和接收产捞,分別啟動(dòng)生產(chǎn)者和消費(fèi)者醇锚。并調(diào)用測(cè)試發(fā)送消息的接口,成功收到了生產(chǎn)者發(fā)送的消息列表坯临。當(dāng)然有的人會(huì)問焊唬,你在生產(chǎn)者里面使用了消息的泛型List<UserEntity>,而在消費(fèi)者里面消息的泛型則為String看靠,我的感覺這里因?yàn)榘l(fā)送消息的內(nèi)容是json
赶促,因此可以直接使用String
,當(dāng)然和生產(chǎn)者一樣使用相應(yīng)的泛型也可以挟炬,并不會(huì)影響消息的接收鸥滨。
三、總結(jié)
因?yàn)樯厦嬷皇亲隽艘粋€(gè)簡(jiǎn)單的例子谤祖,要說有什么很大的收貨確實(shí)是沒有婿滓,Spring Cloud Stream
就是在Kafka
、RabbitMQ
進(jìn)行了進(jìn)一步的抽象粥喜,它不關(guān)心你具體的使用那個(gè)類型的消息隊(duì)列凸主,只需要在配置中具體指定使用的類型即可,相當(dāng)于做了一個(gè)統(tǒng)一性的標(biāo)準(zhǔn)化的接口额湘,不需要額外的去配置具體的消息的監(jiān)聽器等等卿吐。但是就本次學(xué)習(xí)的quick start
來講,個(gè)人感覺有點(diǎn)雞肋锋华,當(dāng)然這個(gè)見仁見智嗡官,大家有興趣都可以交流討論。因?yàn)闀r(shí)間關(guān)系本次學(xué)習(xí)就到這里毯焕,如果有什么疑問歡迎大家交流衍腥、討論,最后還是希望大家能多多關(guān)注我的VX個(gè)人號(hào)超超學(xué)堂
纳猫,非常感謝紧阔。