Spring Cloud入門教程系列:
- Spring Cloud入門教程(一):服務(wù)治理(Eureka)
- Spring Cloud入門教程(二):客戶端負(fù)載均衡(Ribbon)
- Spring Cloud入門教程(三):聲明式服務(wù)調(diào)用(Feign)
- Spring Cloud入門教程(四):微服務(wù)容錯(cuò)保護(hù)(Hystrix)
- Spring Cloud入門教程(五):API服務(wù)網(wǎng)關(guān)(Zuul) 上
- Spring Cloud入門教程(六):API服務(wù)網(wǎng)關(guān)(Zuul) 下
- Spring Cloud入門教程(七):分布式鏈路跟蹤(Sleuth)
- Spring Cloud入門教程(八):統(tǒng)一配置中心(Config)
本人和同事撰寫的《Spring Cloud微服務(wù)架構(gòu)開(kāi)發(fā)實(shí)戰(zhàn)》一書也在京東、當(dāng)當(dāng)?shù)葧晟霞埽蠹铱梢渣c(diǎn)擊這里前往購(gòu)買室梅,多謝大家支持和捧場(chǎng)玻佩!
基于消息驅(qū)動(dòng)的開(kāi)發(fā)幾乎成了微服務(wù)架構(gòu)下必備開(kāi)發(fā)方式之一涮坐。這是因?yàn)榈唢保谝辉瓉?lái)傳統(tǒng)單體架構(gòu)開(kāi)發(fā)中的接口調(diào)用開(kāi)發(fā)已經(jīng)在微服務(wù)架構(gòu)下不存在;第二微服務(wù)架構(gòu)的開(kāi)發(fā)要求降低各微服務(wù)直接的依賴耦合矢否,一旦我們?cè)谀硞€(gè)微服務(wù)中直接調(diào)用另外一個(gè)微服務(wù)慎陵,那么這兩個(gè)微服務(wù)就會(huì)通過(guò)依賴產(chǎn)生了強(qiáng)耦合眼虱;第三微服務(wù)的自治原則也強(qiáng)烈要求各微服務(wù)之間不能夠互相調(diào)用。因此席纽,在微服務(wù)架構(gòu)開(kāi)發(fā)中基于消息驅(qū)動(dòng)的開(kāi)發(fā)成為了一種必然趨勢(shì)捏悬。
讓我們來(lái)看一下示例工程中的一個(gè)場(chǎng)景:
- Mall-Web微服務(wù)要求能夠?qū)崿F(xiàn)自治,盡量降低對(duì)商品微服務(wù)(Procuct-Service)的依賴;
- Mall-Web微服務(wù)為了能夠保障服務(wù)的效率润梯,開(kāi)發(fā)小組決定對(duì)商品數(shù)據(jù)進(jìn)行緩存过牙,這樣只需要第一次加載的時(shí)候遠(yuǎn)程調(diào)用商品微服務(wù),當(dāng)用戶下次在請(qǐng)求該商品的時(shí)候就可以從緩存中獲取纺铭,從而提升了服務(wù)效率(至于使用內(nèi)存方式還是Redis來(lái)實(shí)現(xiàn)緩存寇钉,這個(gè)由你決定)。
如果按照上面的場(chǎng)景進(jìn)行實(shí)現(xiàn)舶赔,在大部分情況下系統(tǒng)都可以穩(wěn)定工作摧莽,一旦商品進(jìn)行修改那該怎么辦,我們總不至于在商品微服務(wù)中再去調(diào)用Mall-Web微服務(wù)吧顿痪,這樣豈不是耦合的更緊密了。嗯油够,是的蚁袭,這個(gè)時(shí)候就可以讓消息出動(dòng)了。
通過(guò)引入消息石咬,我們示例工程的系統(tǒng)架構(gòu)將變?yōu)橄聢D所示:
基于上面這個(gè)架構(gòu)圖我們看一下基于消息如何來(lái)實(shí)現(xiàn)揩悄。之前如果你使用過(guò)消息中間件應(yīng)該對(duì)開(kāi)發(fā)基于消息應(yīng)用的難度心有戚戚然,不過(guò)當(dāng)我們使用Spring Cloud時(shí)鬼悠,已經(jīng)為我們的開(kāi)發(fā)提供了一套非常不錯(cuò)的組件 -- Stream删性。
1. 實(shí)現(xiàn)消息驅(qū)動(dòng)開(kāi)發(fā)
接下來(lái)的改造將分為下面三步:
- 安裝Kafka服務(wù)器;
- 改造商品微服務(wù)亏娜,實(shí)現(xiàn)商品消息的發(fā)送;
- 改造Mall-Web微服務(wù),實(shí)現(xiàn)商品消息的監(jiān)聽(tīng)蹬挺。
1.1 安裝Kafka服務(wù)器
我們接下來(lái)的示例會(huì)使用Kafka作為消息中間件维贺,一方面是Kafka消息中間件非常輕便和高效,另外一方面自己非常喜歡使用Kafka中間件巴帮。如果你不想使用Kafka那么可以自行完成于RabbitMQ的對(duì)接溯泣,而具體實(shí)現(xiàn)的業(yè)務(wù)代碼則不需要進(jìn)行任何改動(dòng)。
如何安裝運(yùn)行kafka服務(wù)器榕茧,這里就不再詳細(xì)描述垃沦,網(wǎng)上及官方都有非常不錯(cuò)的文檔,比如用押,官方文檔肢簿。
1.2 改造商品微服務(wù)
1.2.1 增加對(duì)Stream的依賴
和之前一樣,首先我們需要在項(xiàng)目中引入對(duì)Stream的依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2.2 修改application.properties文件
# =====================================================================================================================
# == stream / kafka ==
# =====================================================================================================================
spring.cloud.stream.bindings.output.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=localhost
這里我們主要是設(shè)置kafka服務(wù)器的地址蜻拨,及發(fā)送商品消息是發(fā)送到Kafka的哪個(gè)主題(Topic)池充,這里設(shè)置為twostepsfromjava-cloud-producttopic
。不過(guò)官觅,這些我們都可以不用配置纵菌,如果不配置,那么Stream就會(huì)根據(jù)默認(rèn)配置來(lái)連接Kafka服務(wù)器及創(chuàng)建相應(yīng)的主題休涤。不過(guò)如果不進(jìn)行配置的首要前提是你在安裝Kakfa服務(wù)器時(shí)沒(méi)有做端口的更改咱圆,而且Kafka服務(wù)器和商品微服務(wù)在同一臺(tái)服務(wù)器上。
1.2.3 構(gòu)建商品消息
當(dāng)商品配置變更時(shí)功氨,如:修改序苏、刪除等,就需要構(gòu)建一個(gè)商品消息捷凄,然后就可以將該消息通過(guò)Kafka發(fā)送給相應(yīng)監(jiān)聽(tīng)的微服務(wù)進(jìn)行處理忱详。因此,所要構(gòu)建的商品消息代碼如下:
package io.twostepsfromjava.cloud.product.mq;
import com.google.common.base.MoreObjects;
/**
* 商品消息
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
public class ProductMsg {
/** 消息類型:更新商品跺涤,值為: {@value} */
public static final String MA_UPDATE = "update";
/** 消息類型:刪除商品匈睁,值為: {@value} */
public static final String MA_DELETE = "delete";
// ========================================================================
// fields =================================================================
private String action;
private String itemCode;
// ========================================================================
// constructor ============================================================
public ProductMsg() { }
public ProductMsg(String action, String itemCode) {
this.action = action;
this.itemCode = itemCode;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("action", this.getAction())
.add("itemCode", this.getItemCode()).toString();
}
// ==================================================================
// setter/getter ====================================================
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getItemCode() {
return itemCode;
}
public void setItemCode(String itemCode) {
this.itemCode = itemCode;
}
}
商品消息非常簡(jiǎn)單,僅包含了兩個(gè)字段:action
和itemCode
桶错。所代表的意義如下:
- action: 表示本次消息是什么消息航唆,比如商品更新消息還是商品刪除消息;
- itemCode: 所變更或刪除商品的貨號(hào)(或者商品的ID)。
可能看到這里你會(huì)奇怪院刁,為何商品消息僅包含這兩個(gè)字段糯钙,夠后續(xù)使用么。一般對(duì)于消息有這兩個(gè)字段已經(jīng)足夠了,但是在正式生產(chǎn)環(huán)境中我們還會(huì)再增加一下其它字段任岸,這里就不講了再榄。此外,一般當(dāng)監(jiān)聽(tīng)方監(jiān)聽(tīng)到該消息之后就可以根據(jù)消息類型及商品貨號(hào)來(lái)進(jìn)行相關(guān)處理享潜。比如后面Mall-Web微服務(wù)就會(huì)根據(jù)商品貨號(hào)通過(guò)遠(yuǎn)程請(qǐng)求商品微服務(wù)來(lái)重新加載商品信息困鸥。
1.2.4 實(shí)現(xiàn)消息發(fā)送
當(dāng)商品微服務(wù)中用戶對(duì)商品進(jìn)行了變更或刪除時(shí)就需要構(gòu)建上面的商品消息并發(fā)送,相應(yīng)的代碼如下:
package io.twostepsfromjava.cloud.product.service;
import io.twostepsfromjava.cloud.product.dto.ProductDto;
import io.twostepsfromjava.cloud.product.mq.ProductMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* 商品服務(wù)
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@Service
public class ProductService {
protected Logger logger = LoggerFactory.getLogger(ProductService.class);
private Source source;
private List<ProductDto> productList;
@Autowired
public ProductService(Source source) {
this.source = source;
this.productList = this.buildProducts();
}
/**
* 獲取商品列表
* @return
*/
public List<ProductDto> findAll() {
return this.productList;
}
/**
* 根據(jù)ItemCode獲取
* @param itemCode
* @return
*/
public ProductDto findOne(String itemCode) {
for (ProductDto productDto : this.productList) {
if (productDto.getItemCode().equalsIgnoreCase(itemCode))
return productDto;
}
return null;
}
/**
* 保存或更新商品信息
* @param productDto
* @return
*/
public ProductDto save(ProductDto productDto) {
// TODO: 實(shí)現(xiàn)商品保存處理
for (ProductDto sourceProductDto : this.productList) {
if (sourceProductDto.getItemCode().equalsIgnoreCase(productDto.getItemCode())) {
sourceProductDto.setName(sourceProductDto.getName() + "-new");
sourceProductDto.setPrice(sourceProductDto.getPrice() + 100);
productDto = sourceProductDto;
break;
}
}
// 發(fā)送商品消息
this.sendMsg(ProductMsg.MA_UPDATE, productDto.getItemCode());
return productDto;
}
/**
* 具體消息發(fā)送的實(shí)現(xiàn)
* @param msgAction 消息類型
* @param itemCode 商品貨號(hào)
*/
protected void sendMsg(String msgAction, String itemCode) {
ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
this.logger.debug("發(fā)送商品消息:{} ", productMsg);
// 發(fā)送消息
this.source.output().send(MessageBuilder.withPayload(productMsg).build());
}
protected List<ProductDto> buildProducts() {
List<ProductDto> products = new ArrayList<>();
products.add(new ProductDto("item-1", "測(cè)試商品-1", "TwoStepsFromJava", 100));
products.add(new ProductDto("item-2", "測(cè)試商品-2", "TwoStepsFromJava", 200));
products.add(new ProductDto("item-3", "測(cè)試商品-3", "TwoStepsFromJava", 300));
products.add(new ProductDto("item-4", "測(cè)試商品-4", "TwoStepsFromJava", 400));
products.add(new ProductDto("item-5", "測(cè)試商品-5", "TwoStepsFromJava", 500));
products.add(new ProductDto("item-6", "測(cè)試商品-6", "TwoStepsFromJava", 600));
return products;
}
}
為了能夠有演示效果米碰,我將原來(lái)直接寫在端點(diǎn)中代碼移到一個(gè)單獨(dú)的Service中窝革。
消息的發(fā)送非常簡(jiǎn)單,我們只需要調(diào)用source.output().send()
方法就可以發(fā)送消息了吕座。這里你可能會(huì)有點(diǎn)迷惑虐译,source是什么鬼,哪里蹦出來(lái)的吴趴。不著急漆诽,現(xiàn)在你只需要明白這個(gè)是Spring Cloud Stream提供的一個(gè)抽象消息發(fā)送接口,通過(guò)該接口中的output()
就可以獲取一個(gè)消息發(fā)送通道锣枝,然后就可以往該通道中send()
消息就可以了厢拭。具體的原理我們后面再細(xì)聊。
1.2.5 增加消息發(fā)送測(cè)試端點(diǎn)
我們需要新增一個(gè)端點(diǎn)用來(lái)模擬用戶保存/更新商品信息撇叁。在上面的代碼可以知道供鸠,當(dāng)我們保存/更新商品信息時(shí)就會(huì)發(fā)送商品變更消息,因此新的端點(diǎn)只需要調(diào)用該方法即可陨闹,具體代碼如下:
@RestController
@RequestMapping("/products")
public class ProductEndpoint {
protected Logger logger = LoggerFactory.getLogger(ProductEndpoint.class);
@Autowired
ProductService productService;
// 省略了其它不相干代碼
// TODO: 該端點(diǎn)僅僅是用來(lái)測(cè)試消息發(fā)送楞捂,并不包含任何業(yè)務(wù)邏輯處理
@RequestMapping(value = "/{itemCode}", method = RequestMethod.POST)
public ProductDto save(@PathVariable String itemCode) {
ProductDto productDto = this.productService.findOne(itemCode);
if (null != productDto) {
this.productService.save(productDto);
}
return productDto;
}
}
1.2.6 綁定消息通道
最后,我們要在微服務(wù)啟動(dòng)的時(shí)候需要去綁定Kafka消息中間件趋厉,實(shí)現(xiàn)代碼如下:
package io.twostepsfromjava.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
/**
* TwoStepsFromJava Cloud -- Product Service 服務(wù)器
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@EnableDiscoveryClient
@EnableBinding(Source.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
只需要在應(yīng)用引導(dǎo)類中添加一個(gè)@EnableBinding(Source.class)
注解即可寨闹。
Ok,到這里我們已經(jīng)實(shí)現(xiàn)了商品微服務(wù)的消息發(fā)送君账,下面讓我們完成Mall-Web微服務(wù)中消息的監(jiān)聽(tīng)繁堡。
1.3 改造Mall-Web微服務(wù),實(shí)現(xiàn)消息監(jiān)聽(tīng)
1.3.1 增加對(duì)Stream的依賴
這個(gè)和商品微服務(wù)一樣乡数,就不重復(fù)了椭蹄。
1.3.2 修改application.properties文件
# =====================================================================================================================
# == stream / kafka ==
# =====================================================================================================================
spring.cloud.stream.bindings.input.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=mallWebGroup
spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=localhost
這個(gè)配置和商品微服務(wù)類似,不過(guò)我們需要把之前output更改為了input净赴,表示這里配置的是消息輸入通道塑娇。
此外,我們還在最后增加了一個(gè)group屬性的配置劫侧,具體該屬性表示什么意思我們也是在后面進(jìn)行講解。
1.3.3 拷貝ProductMsg到本項(xiàng)目
因?yàn)椋⒎?wù)的自治原則烧栋,因此這里你需要將ProductMsg
拷貝到Mall-Web工程中写妥。
1.3.4 實(shí)現(xiàn)具體監(jiān)聽(tīng)處理
通過(guò)Stream進(jìn)行監(jiān)聽(tīng)處理,我們只需要在相應(yīng)的監(jiān)聽(tīng)方法中增加@StreamListener
注解即可审姓,具體所實(shí)現(xiàn)的監(jiān)聽(tīng)代碼如下:
package io.twostepsfromjava.cloud.web.mall.mq;
import io.twostepsfromjava.cloud.web.mall.dto.ProductDto;
import io.twostepsfromjava.cloud.web.mall.service.ProductService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
* 商品消息監(jiān)聽(tīng)器
*
* @author CD826(CD826Dong@gmail.com)
* @since 1.0.0
*/
@EnableBinding(Sink.class)
public class ProductMsgListener {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
protected ProductService productService;
@StreamListener(Sink.INPUT)
public void onProductMsg(ProductMsg productMsg) {
if (ProductMsg.MA_UPDATE.equalsIgnoreCase(productMsg.getAction())) {
this.logger.debug("收到商品變更消息珍特,商品貨號(hào): {}", productMsg.getItemCode());
// 重新獲取該商品信息
ProductDto productDto = this.productService.loadByItemCode(productMsg.getItemCode());
if (null != productDto)
this.logger.debug("重新獲取到的商品信息為:{}", productDto);
else
this.logger.debug("貨號(hào)為:{} 的商品不存在", productMsg.getItemCode());
} else if (ProductMsg.MA_DELETE.equalsIgnoreCase(productMsg.getAction())) {
this.logger.debug("收到商品刪除消息,所要?jiǎng)h除商品貨號(hào)為: {}", productMsg.getItemCode());
} else {
this.logger.debug("收到未知商品消息: {}", productMsg);
}
}
}
這里代碼非常簡(jiǎn)單實(shí)現(xiàn)了一個(gè)商品消息監(jiān)聽(tīng)方法onProductMsg
魔吐,并在該方法中根據(jù)消息類型進(jìn)行不同的處理扎筒。
同樣,對(duì)于Sink
是什么鬼這里也先不細(xì)講酬姆,你只需要明白這是一個(gè)Spring Cloud Stream提供的一個(gè)抽象消息監(jiān)聽(tīng)接口嗜桌,當(dāng)在@StreamListener
注解中添加了該接口類名之后,Stream就會(huì)向Kafka添加一個(gè)消息訂閱辞色,所訂閱的消息主題就是我們?cè)谂渲梦募付ǖ?code>twostepsfromjava-cloud-producttopic骨宠,當(dāng)主題中有消息時(shí),Stream就會(huì)將該主題中的消息反序列化為ProductMsg
相满,然后執(zhí)行具體的消息監(jiān)聽(tīng)方法层亿。
到此,消息監(jiān)聽(tīng)也就全部實(shí)現(xiàn)了立美。
1.4 啟動(dòng)測(cè)試
按照先后次序分別啟動(dòng):
- Kafka服務(wù)器;
- 服務(wù)治理服務(wù)器: Service-discovery;
- 商品微服務(wù): Product-Service;
- Mall-Web微服務(wù)匿又。
然后,使用Postman按照下圖訪問(wèn)消息測(cè)試端點(diǎn): http://localhost:2100/products/item-2:
在商品微服務(wù)的控制臺(tái)建蹄,可以看到類似下面輸出:
從輸出日志中可以看到商品變更消息已經(jīng)發(fā)送到消息中間件碌更。如果這個(gè)時(shí)候我們查看Mall-Web微服務(wù)的控制臺(tái),可以看到下圖的輸出:
從日志輸出中可以看到Mall-Web微服務(wù)已經(jīng)能夠正確接收到商品變更消息躲撰,然后重新請(qǐng)求了并獲取到了該商品的最新信息针贬。
2. Stream原理淺析
從Spring Cloud Stream核心原理上來(lái)說(shuō),Stream提供了一個(gè)與消息中間件進(jìn)行消息收發(fā)的抽象層拢蛋,這個(gè)也是Spring所擅長(zhǎng)的桦他。通過(guò)該抽象層剝離了業(yè)務(wù)中消息收發(fā)與實(shí)際所使用中間件直接的耦合,從而使得我們可以輕松與各種消息中間件對(duì)接谆棱,也可以很簡(jiǎn)單的就可以實(shí)現(xiàn)所使用的消息中間件的更換快压。這點(diǎn)和我們使用ORM框架一樣,可以平滑的在多種數(shù)據(jù)庫(kù)之間進(jìn)行切換垃瞧。
2.1. Stream應(yīng)用模型
從這個(gè)應(yīng)用開(kāi)發(fā)上來(lái)說(shuō)蔫劣,Stream提供了下述模型:
在該模型圖上有如下幾個(gè)核心概念:
- Source: 當(dāng)需要發(fā)送消息時(shí),我們就需要通過(guò)Source个从,Source將會(huì)把我們所要發(fā)送的消息(POJO對(duì)象)進(jìn)行序列化(默認(rèn)轉(zhuǎn)換成JSON格式字符串)脉幢,然后將這些數(shù)據(jù)發(fā)送到Channel中歪沃;
- Sink: 當(dāng)我們需要監(jiān)聽(tīng)消息時(shí)就需要通過(guò)Sink來(lái),Sink負(fù)責(zé)從消息通道中獲取消息嫌松,并將消息反序列化成消息對(duì)象(POJO對(duì)象)沪曙,然后交給具體的消息監(jiān)聽(tīng)處理進(jìn)行業(yè)務(wù)處理;
- Channel: 消息通道是Stream的抽象之一萎羔。通常我們向消息中間件發(fā)送消息或者監(jiān)聽(tīng)消息時(shí)需要指定主題(Topic)/消息隊(duì)列名稱液走,但這樣一旦我們需要變更主題名稱的時(shí)候需要修改消息發(fā)送或者消息監(jiān)聽(tīng)的代碼,但是通過(guò)Channel抽象贾陷,我們的業(yè)務(wù)代碼只需要對(duì)Channel就可以了缘眶,具體這個(gè)Channel對(duì)應(yīng)的是那個(gè)主題,就可以在配置文件中來(lái)指定髓废,這樣當(dāng)主題變更的時(shí)候我們就不用對(duì)代碼做任何修改巷懈,從而實(shí)現(xiàn)了與具體消息中間件的解耦;
- Binder: Stream中另外一個(gè)抽象層瓦哎。通過(guò)不同的Binder可以實(shí)現(xiàn)與不同消息中間件的整合砸喻,比如上面的示例我們所使用的就是針對(duì)Kafka的Binder,通過(guò)Binder提供統(tǒng)一的消息收發(fā)接口蒋譬,從而使得我們可以根據(jù)實(shí)際需要部署不同的消息中間件割岛,或者根據(jù)實(shí)際生產(chǎn)中所部署的消息中間件來(lái)調(diào)整我們的配置。
2.2. Stream應(yīng)用原理
從上面我們了解了Stream的應(yīng)用模型犯助,消息發(fā)送邏輯及流程我們也清晰了癣漆。那么我們?cè)趯?shí)際消息發(fā)送和監(jiān)聽(tīng)時(shí)又是怎么操作的呢?
在使用上Stream提供了下面三個(gè)注解:
- @Input: 創(chuàng)建一個(gè)消息輸入通道剂买,用于消息監(jiān)聽(tīng);
- @Output: 創(chuàng)建一個(gè)消息輸出通道惠爽,用于消息發(fā)送;
- @EnableBinding: 建立與消息通道的綁定。
我們?cè)谑褂脮r(shí)可以通過(guò)@Input
和@Output
創(chuàng)建多個(gè)通道瞬哼,使用這兩個(gè)注解創(chuàng)建通道非常簡(jiǎn)單婚肆,你只需要將他們分別注解到接口的相應(yīng)方法上即可,而不需要具體來(lái)實(shí)現(xiàn)該注解坐慰。當(dāng)啟動(dòng)Stream框架時(shí)较性,就會(huì)根據(jù)這兩個(gè)注解通過(guò)動(dòng)態(tài)代碼生成技術(shù)生成相應(yīng)的實(shí)現(xiàn),并注入到Spring應(yīng)用上下文中结胀,這樣我們就可以在代碼中直接使用赞咙。
2.2.1 Output注解
對(duì)于@Output
注解來(lái)說(shuō),所注解的方法的返回值必須是MessageChannel
糟港,MessageChannel
也就是具體消息發(fā)送的通道攀操。比如下面的代碼:
public interface ProductSource {
@Output
MessageChannel hotProducts();
@Output
MessageChannel selectedProducts();
}
這樣,我們就可以通過(guò)ProductSource
所創(chuàng)建的消息通道來(lái)發(fā)送消息了秸抚。
2.2.2 Input注解
對(duì)于@Input
注解來(lái)說(shuō)速和,所注解的方法的返回值必須是SubscribableChannel
歹垫,SubscribableChannel
也就是消息監(jiān)聽(tīng)的通道。比如下面的代碼:
public interface ProductSink {
@Input
SubscribableChannel productOrders();
}
這樣颠放,我們就可以通過(guò)ProductSink
所創(chuàng)建的消息通道來(lái)監(jiān)聽(tīng)消息了县钥。
2.2.3 關(guān)于Input、Output的開(kāi)箱即用
或許你有點(diǎn)迷糊慈迈,之前我們?cè)诖a中使用了Source
、Sink
省有,那么這兩個(gè)類和上面的注解什么關(guān)系呢痒留?讓我們來(lái)看一下這兩個(gè)接口的源碼:
// Source源碼
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
// Sink源碼
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
是不是有點(diǎn)恍然大悟呀,@Input
和@Output
是Stream核心應(yīng)用的注解蠢沿,而Source
和Sink
只不過(guò)是Stream為我們所提供開(kāi)箱即用的兩個(gè)接口而已伸头,有沒(méi)有這兩個(gè)接口我們都可以正常使用Stream。
此外舷蟀,Stream還提供了一個(gè)開(kāi)箱即用的接口Processor
恤磷,源碼如下:
public interface Processor extends Source, Sink {
}
也就是說(shuō)Processor
只不過(guò)是同時(shí)可以作為消息發(fā)送和消息監(jiān)聽(tīng),這種接口在我們開(kāi)發(fā)消息管道類型應(yīng)用時(shí)會(huì)非常有用野宜。
2.2.4 自定義消息通道名稱
前面扫步,我們講了消息通道是Stream的一個(gè)抽象,通過(guò)該抽象可以避免與消息中間件具體的主題耦合匈子,那么到底是怎么一回事呢河胎?從Source
和Sink
源碼中可以看到,所注解的@Output
和@Input
注解中都有一個(gè)參數(shù)虎敦,分別為output
和input
游岳,這個(gè)時(shí)候你再觀察一下我們之前的配置:
# 商品微服務(wù)中的配置
spring.cloud.stream.bindings.output.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.output.content-type=application/json
# Mall-Web中的配置
spring.cloud.stream.bindings.input.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=mallWebGroup
從配置中可以看到destination
屬性的配置,分別指定了output
和inout
也就是Stream中所使用的消息通道名稱其徙。因此胚迫,我們可以通過(guò)這兩個(gè)注解來(lái)分別設(shè)置消息通道的名稱,比如:
public interface ProductProcessor {
@Output("pmsoutput")
MessageChannel productOutput();
@Input("pmsinput")
SubscribableChannel input();}
這樣唾那,當(dāng)我們使用ProductProcessor
接口來(lái)實(shí)現(xiàn)消息發(fā)送和監(jiān)聽(tīng)的時(shí)就需要在配置文件中配置如下:
# 消息發(fā)送
spring.cloud.stream.bindings.pmsoutput.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.pmsoutput.content-type=application/json
# 消息監(jiān)聽(tīng)
spring.cloud.stream.bindings.pmsinput.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.pmsinput.content-type=application/json
spring.cloud.stream.bindings.pmsinput.group=mallWebGroup
2.2.5 綁定
既然访锻,消息發(fā)送通道和監(jiān)聽(tīng)通道都創(chuàng)建好了,那么將它們對(duì)接到具體的消息中間件就可以完成消息的發(fā)送和監(jiān)聽(tīng)功能了通贞,而@EnableBinding
注解就是用來(lái)實(shí)現(xiàn)該功能朗若。具體使用方式如下:
// 實(shí)現(xiàn)發(fā)送的綁定
@EnableBinding(Source.class)
public class Application {
}
// 實(shí)現(xiàn)監(jiān)聽(tīng)的綁定
@EnableBinding(Sink.class)
public class ProductMsgListener {
}
需要說(shuō)明的是,@EnableBinding
可以同時(shí)綁定多個(gè)接口昌罩,如下:
@EnableBinding(value={ProductSource.class, ProductSink.class})
2.2.6 直接使用通道
前面我們消息發(fā)送的代碼如下:
protected void sendMsg(String msgAction, String itemCode) {
ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
this.logger.debug("發(fā)送商品消息:{} ", productMsg);
// 發(fā)送消息
this.source.output().send(MessageBuilder.withPayload(productMsg).build());
}
獲取你在想既然@Output
所提供的MessageChannel
才是最終消息發(fā)送時(shí)使用的哭懈,那么我們是否可以直接使用呢?的確這個(gè)是可以的茎用,上面的代碼我們可以更改成如下:
@Service
public class ProductService {
protected Logger logger = LoggerFactory.getLogger(ProductService.class);
private MessageChannel output;
private List<ProductDto> productList;
@Autowired
public ProductService(MessageChannel output) {
this.output = output;
this.productList = this.buildProducts();
}
// 省略了其它代碼
/**
* 具體消息發(fā)送的實(shí)現(xiàn)
* @param msgAction 消息類型
* @param itemCode 商品貨號(hào)
*/
protected void sendMsg(String msgAction, String itemCode) {
ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
this.logger.debug("發(fā)送商品消息:{} ", productMsg);
// 發(fā)送消息
this.output.send(MessageBuilder.withPayload(productMsg).build());
}
}
默認(rèn)Stream所創(chuàng)建的MessageChannel
Bean的Id為方法名稱遣总,但是如果我們?cè)?code>@Output注解中增加了名稱定義睬罗,如果:
public interface ProductSource {
@Output("pmsoutput")
MessageChannel output();
}
那么這個(gè)時(shí)候Stream會(huì)使用pmsoutput
作為Bean的Id,而我們的代碼也需要為如下:
@Autowired
public ProductService(@Qualifier("pmsoutput") MessageChannel output) {
this.output = output;
this.productList = this.buildProducts();
}
你可以到這里下載本篇的代碼旭斥。