Spring Cloud入門教程(九):基于消息驅(qū)動(dòng)開(kāi)發(fā)(Stream)

Spring Cloud入門教程系列:

本人和同事撰寫的《Spring Cloud微服務(wù)架構(gòu)開(kāi)發(fā)實(shí)戰(zhàn)》一書也在京東、當(dāng)當(dāng)?shù)葧晟霞埽蠹铱梢渣c(diǎn)擊這里前往購(gòu)買室梅,多謝大家支持和捧場(chǎng)玻佩!


基于消息驅(qū)動(dòng)的開(kāi)發(fā)幾乎成了微服務(wù)架構(gòu)下必備開(kāi)發(fā)方式之一涮坐。這是因?yàn)榈唢保谝辉瓉?lái)傳統(tǒng)單體架構(gòu)開(kāi)發(fā)中的接口調(diào)用開(kāi)發(fā)已經(jīng)在微服務(wù)架構(gòu)下不存在;第二微服務(wù)架構(gòu)的開(kāi)發(fā)要求降低各微服務(wù)直接的依賴耦合矢否,一旦我們?cè)谀硞€(gè)微服務(wù)中直接調(diào)用另外一個(gè)微服務(wù)慎陵,那么這兩個(gè)微服務(wù)就會(huì)通過(guò)依賴產(chǎn)生了強(qiáng)耦合眼虱;第三微服務(wù)的自治原則也強(qiáng)烈要求各微服務(wù)之間不能夠互相調(diào)用。因此席纽,在微服務(wù)架構(gòu)開(kāi)發(fā)中基于消息驅(qū)動(dòng)的開(kāi)發(fā)成為了一種必然趨勢(shì)捏悬。

讓我們來(lái)看一下示例工程中的一個(gè)場(chǎng)景:

  • Mall-Web微服務(wù)要求能夠?qū)崿F(xiàn)自治,盡量降低對(duì)商品微服務(wù)(Procuct-Service)的依賴;
  • Mall-Web微服務(wù)為了能夠保障服務(wù)的效率润梯,開(kāi)發(fā)小組決定對(duì)商品數(shù)據(jù)進(jìn)行緩存过牙,這樣只需要第一次加載的時(shí)候遠(yuǎn)程調(diào)用商品微服務(wù),當(dāng)用戶下次在請(qǐng)求該商品的時(shí)候就可以從緩存中獲取纺铭,從而提升了服務(wù)效率(至于使用內(nèi)存方式還是Redis來(lái)實(shí)現(xiàn)緩存寇钉,這個(gè)由你決定)。

如果按照上面的場(chǎng)景進(jìn)行實(shí)現(xiàn)舶赔,在大部分情況下系統(tǒng)都可以穩(wěn)定工作摧莽,一旦商品進(jìn)行修改那該怎么辦,我們總不至于在商品微服務(wù)中再去調(diào)用Mall-Web微服務(wù)吧顿痪,這樣豈不是耦合的更緊密了。嗯油够,是的蚁袭,這個(gè)時(shí)候就可以讓消息出動(dòng)了。

通過(guò)引入消息石咬,我們示例工程的系統(tǒng)架構(gòu)將變?yōu)橄聢D所示:

示例工程架構(gòu)圖

基于上面這個(gè)架構(gòu)圖我們看一下基于消息如何來(lái)實(shí)現(xiàn)揩悄。之前如果你使用過(guò)消息中間件應(yīng)該對(duì)開(kāi)發(fā)基于消息應(yīng)用的難度心有戚戚然,不過(guò)當(dāng)我們使用Spring Cloud時(shí)鬼悠,已經(jīng)為我們的開(kāi)發(fā)提供了一套非常不錯(cuò)的組件 -- Stream删性。

1. 實(shí)現(xiàn)消息驅(qū)動(dòng)開(kāi)發(fā)

接下來(lái)的改造將分為下面三步:

  1. 安裝Kafka服務(wù)器;
  2. 改造商品微服務(wù)亏娜,實(shí)現(xiàn)商品消息的發(fā)送;
  3. 改造Mall-Web微服務(wù),實(shí)現(xiàn)商品消息的監(jiān)聽(tīng)蹬挺。

1.1 安裝Kafka服務(wù)器

我們接下來(lái)的示例會(huì)使用Kafka作為消息中間件维贺,一方面是Kafka消息中間件非常輕便和高效,另外一方面自己非常喜歡使用Kafka中間件巴帮。如果你不想使用Kafka那么可以自行完成于RabbitMQ的對(duì)接溯泣,而具體實(shí)現(xiàn)的業(yè)務(wù)代碼則不需要進(jìn)行任何改動(dòng)。

如何安裝運(yùn)行kafka服務(wù)器榕茧,這里就不再詳細(xì)描述垃沦,網(wǎng)上及官方都有非常不錯(cuò)的文檔,比如用押,官方文檔肢簿。

1.2 改造商品微服務(wù)

1.2.1 增加對(duì)Stream的依賴

和之前一樣,首先我們需要在項(xiàng)目中引入對(duì)Stream的依賴:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

1.2.2 修改application.properties文件

# =====================================================================================================================
# == stream / kafka                                                                                                  ==
# =====================================================================================================================
spring.cloud.stream.bindings.output.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=localhost

這里我們主要是設(shè)置kafka服務(wù)器的地址蜻拨,及發(fā)送商品消息是發(fā)送到Kafka的哪個(gè)主題(Topic)池充,這里設(shè)置為twostepsfromjava-cloud-producttopic。不過(guò)官觅,這些我們都可以不用配置纵菌,如果不配置,那么Stream就會(huì)根據(jù)默認(rèn)配置來(lái)連接Kafka服務(wù)器及創(chuàng)建相應(yīng)的主題休涤。不過(guò)如果不進(jìn)行配置的首要前提是你在安裝Kakfa服務(wù)器時(shí)沒(méi)有做端口的更改咱圆,而且Kafka服務(wù)器和商品微服務(wù)在同一臺(tái)服務(wù)器上。

1.2.3 構(gòu)建商品消息

當(dāng)商品配置變更時(shí)功氨,如:修改序苏、刪除等,就需要構(gòu)建一個(gè)商品消息捷凄,然后就可以將該消息通過(guò)Kafka發(fā)送給相應(yīng)監(jiān)聽(tīng)的微服務(wù)進(jìn)行處理忱详。因此,所要構(gòu)建的商品消息代碼如下:

package io.twostepsfromjava.cloud.product.mq;

import com.google.common.base.MoreObjects;

/**
 * 商品消息
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class ProductMsg {
    /** 消息類型:更新商品跺涤,值為: {@value} */
    public static final String MA_UPDATE = "update";
    /** 消息類型:刪除商品匈睁,值為: {@value} */
    public static final String MA_DELETE = "delete";

    // ========================================================================
    // fields =================================================================
    private String action;
    private String itemCode;

    // ========================================================================
    // constructor ============================================================
    public ProductMsg() {  }

    public ProductMsg(String action, String itemCode) {
        this.action = action;
        this.itemCode = itemCode;
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .add("action", this.getAction())
                .add("itemCode", this.getItemCode()).toString();
    }

    // ==================================================================
    // setter/getter ====================================================
    public String getAction() {
        return action;
    }
    public void setAction(String action) {
        this.action = action;
    }

    public String getItemCode() {
        return itemCode;
    }
    public void setItemCode(String itemCode) {
        this.itemCode = itemCode;
    }
}

商品消息非常簡(jiǎn)單,僅包含了兩個(gè)字段:actionitemCode桶错。所代表的意義如下:

  • action: 表示本次消息是什么消息航唆,比如商品更新消息還是商品刪除消息;
  • itemCode: 所變更或刪除商品的貨號(hào)(或者商品的ID)。

可能看到這里你會(huì)奇怪院刁,為何商品消息僅包含這兩個(gè)字段糯钙,夠后續(xù)使用么。一般對(duì)于消息有這兩個(gè)字段已經(jīng)足夠了,但是在正式生產(chǎn)環(huán)境中我們還會(huì)再增加一下其它字段任岸,這里就不講了再榄。此外,一般當(dāng)監(jiān)聽(tīng)方監(jiān)聽(tīng)到該消息之后就可以根據(jù)消息類型及商品貨號(hào)來(lái)進(jìn)行相關(guān)處理享潜。比如后面Mall-Web微服務(wù)就會(huì)根據(jù)商品貨號(hào)通過(guò)遠(yuǎn)程請(qǐng)求商品微服務(wù)來(lái)重新加載商品信息困鸥。

1.2.4 實(shí)現(xiàn)消息發(fā)送

當(dāng)商品微服務(wù)中用戶對(duì)商品進(jìn)行了變更或刪除時(shí)就需要構(gòu)建上面的商品消息并發(fā)送,相應(yīng)的代碼如下:

package io.twostepsfromjava.cloud.product.service;

import io.twostepsfromjava.cloud.product.dto.ProductDto;
import io.twostepsfromjava.cloud.product.mq.ProductMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;


/**
 * 商品服務(wù)
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@Service
public class ProductService {
    protected Logger logger = LoggerFactory.getLogger(ProductService.class);

    private Source source;
    private List<ProductDto> productList;

    @Autowired
    public ProductService(Source source) {
        this.source = source;
        this.productList = this.buildProducts();
    }

    /**
     * 獲取商品列表
     * @return
     */
    public List<ProductDto> findAll() {
        return this.productList;
    }

    /**
     * 根據(jù)ItemCode獲取
     * @param itemCode
     * @return
     */
    public ProductDto findOne(String itemCode) {
        for (ProductDto productDto : this.productList) {
            if (productDto.getItemCode().equalsIgnoreCase(itemCode))
                return productDto;
        }

        return null;
    }

    /**
     * 保存或更新商品信息
     * @param productDto
     * @return
     */
    public ProductDto save(ProductDto productDto) {
        // TODO: 實(shí)現(xiàn)商品保存處理
        for (ProductDto sourceProductDto : this.productList) {
            if (sourceProductDto.getItemCode().equalsIgnoreCase(productDto.getItemCode())) {
                sourceProductDto.setName(sourceProductDto.getName() + "-new");
                sourceProductDto.setPrice(sourceProductDto.getPrice() + 100);
                productDto = sourceProductDto;
                break;
            }
        }

        // 發(fā)送商品消息
        this.sendMsg(ProductMsg.MA_UPDATE, productDto.getItemCode());

        return productDto;
    }
    
    /**
     * 具體消息發(fā)送的實(shí)現(xiàn)
     * @param msgAction 消息類型
     * @param itemCode 商品貨號(hào)
     */
    protected void sendMsg(String msgAction, String itemCode) {
        ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
        this.logger.debug("發(fā)送商品消息:{} ", productMsg);

        // 發(fā)送消息
        this.source.output().send(MessageBuilder.withPayload(productMsg).build());
    }

    protected List<ProductDto> buildProducts() {
        List<ProductDto> products = new ArrayList<>();
        products.add(new ProductDto("item-1", "測(cè)試商品-1", "TwoStepsFromJava", 100));
        products.add(new ProductDto("item-2", "測(cè)試商品-2", "TwoStepsFromJava", 200));
        products.add(new ProductDto("item-3", "測(cè)試商品-3", "TwoStepsFromJava", 300));
        products.add(new ProductDto("item-4", "測(cè)試商品-4", "TwoStepsFromJava", 400));
        products.add(new ProductDto("item-5", "測(cè)試商品-5", "TwoStepsFromJava", 500));
        products.add(new ProductDto("item-6", "測(cè)試商品-6", "TwoStepsFromJava", 600));
        return products;
    }
}

為了能夠有演示效果米碰,我將原來(lái)直接寫在端點(diǎn)中代碼移到一個(gè)單獨(dú)的Service中窝革。

消息的發(fā)送非常簡(jiǎn)單,我們只需要調(diào)用source.output().send()方法就可以發(fā)送消息了吕座。這里你可能會(huì)有點(diǎn)迷惑虐译,source是什么鬼,哪里蹦出來(lái)的吴趴。不著急漆诽,現(xiàn)在你只需要明白這個(gè)是Spring Cloud Stream提供的一個(gè)抽象消息發(fā)送接口,通過(guò)該接口中的output()就可以獲取一個(gè)消息發(fā)送通道锣枝,然后就可以往該通道中send()消息就可以了厢拭。具體的原理我們后面再細(xì)聊。

1.2.5 增加消息發(fā)送測(cè)試端點(diǎn)

我們需要新增一個(gè)端點(diǎn)用來(lái)模擬用戶保存/更新商品信息撇叁。在上面的代碼可以知道供鸠,當(dāng)我們保存/更新商品信息時(shí)就會(huì)發(fā)送商品變更消息,因此新的端點(diǎn)只需要調(diào)用該方法即可陨闹,具體代碼如下:

@RestController
@RequestMapping("/products")
public class ProductEndpoint {
    protected Logger logger = LoggerFactory.getLogger(ProductEndpoint.class);

    @Autowired
    ProductService productService;
    // 省略了其它不相干代碼
    
    // TODO: 該端點(diǎn)僅僅是用來(lái)測(cè)試消息發(fā)送楞捂,并不包含任何業(yè)務(wù)邏輯處理
    @RequestMapping(value = "/{itemCode}", method = RequestMethod.POST)
    public ProductDto save(@PathVariable String itemCode) {
        ProductDto productDto = this.productService.findOne(itemCode);
        if (null != productDto) {
            this.productService.save(productDto);
        }
        return productDto;
    }
}

1.2.6 綁定消息通道

最后,我們要在微服務(wù)啟動(dòng)的時(shí)候需要去綁定Kafka消息中間件趋厉,實(shí)現(xiàn)代碼如下:

package io.twostepsfromjava.cloud;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

/**
 * TwoStepsFromJava Cloud -- Product Service 服務(wù)器
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@EnableDiscoveryClient
@EnableBinding(Source.class)
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

只需要在應(yīng)用引導(dǎo)類中添加一個(gè)@EnableBinding(Source.class)注解即可寨闹。

Ok,到這里我們已經(jīng)實(shí)現(xiàn)了商品微服務(wù)的消息發(fā)送君账,下面讓我們完成Mall-Web微服務(wù)中消息的監(jiān)聽(tīng)繁堡。

1.3 改造Mall-Web微服務(wù),實(shí)現(xiàn)消息監(jiān)聽(tīng)

1.3.1 增加對(duì)Stream的依賴

這個(gè)和商品微服務(wù)一樣乡数,就不重復(fù)了椭蹄。

1.3.2 修改application.properties文件

# =====================================================================================================================
# == stream / kafka                                                                                                  ==
# =====================================================================================================================
spring.cloud.stream.bindings.input.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=mallWebGroup
spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=localhost

這個(gè)配置和商品微服務(wù)類似,不過(guò)我們需要把之前output更改為了input净赴,表示這里配置的是消息輸入通道塑娇。

此外,我們還在最后增加了一個(gè)group屬性的配置劫侧,具體該屬性表示什么意思我們也是在后面進(jìn)行講解。

1.3.3 拷貝ProductMsg到本項(xiàng)目

因?yàn)椋⒎?wù)的自治原則烧栋,因此這里你需要將ProductMsg拷貝到Mall-Web工程中写妥。

1.3.4 實(shí)現(xiàn)具體監(jiān)聽(tīng)處理

通過(guò)Stream進(jìn)行監(jiān)聽(tīng)處理,我們只需要在相應(yīng)的監(jiān)聽(tīng)方法中增加@StreamListener注解即可审姓,具體所實(shí)現(xiàn)的監(jiān)聽(tīng)代碼如下:

package io.twostepsfromjava.cloud.web.mall.mq;

import io.twostepsfromjava.cloud.web.mall.dto.ProductDto;
import io.twostepsfromjava.cloud.web.mall.service.ProductService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

/**
 * 商品消息監(jiān)聽(tīng)器
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@EnableBinding(Sink.class)
public class ProductMsgListener {
    protected Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    protected ProductService productService;

    @StreamListener(Sink.INPUT)
    public void onProductMsg(ProductMsg productMsg) {
        if (ProductMsg.MA_UPDATE.equalsIgnoreCase(productMsg.getAction())) {
            this.logger.debug("收到商品變更消息珍特,商品貨號(hào): {}", productMsg.getItemCode());
            // 重新獲取該商品信息
            ProductDto productDto = this.productService.loadByItemCode(productMsg.getItemCode());
            if (null != productDto)
                this.logger.debug("重新獲取到的商品信息為:{}", productDto);
            else
                this.logger.debug("貨號(hào)為:{} 的商品不存在", productMsg.getItemCode());
        } else if (ProductMsg.MA_DELETE.equalsIgnoreCase(productMsg.getAction())) {
            this.logger.debug("收到商品刪除消息,所要?jiǎng)h除商品貨號(hào)為: {}", productMsg.getItemCode());
        } else {
            this.logger.debug("收到未知商品消息: {}", productMsg);
        }
    }
}

這里代碼非常簡(jiǎn)單實(shí)現(xiàn)了一個(gè)商品消息監(jiān)聽(tīng)方法onProductMsg魔吐,并在該方法中根據(jù)消息類型進(jìn)行不同的處理扎筒。

同樣,對(duì)于Sink是什么鬼這里也先不細(xì)講酬姆,你只需要明白這是一個(gè)Spring Cloud Stream提供的一個(gè)抽象消息監(jiān)聽(tīng)接口嗜桌,當(dāng)在@StreamListener注解中添加了該接口類名之后,Stream就會(huì)向Kafka添加一個(gè)消息訂閱辞色,所訂閱的消息主題就是我們?cè)谂渲梦募付ǖ?code>twostepsfromjava-cloud-producttopic骨宠,當(dāng)主題中有消息時(shí),Stream就會(huì)將該主題中的消息反序列化為ProductMsg相满,然后執(zhí)行具體的消息監(jiān)聽(tīng)方法层亿。

到此,消息監(jiān)聽(tīng)也就全部實(shí)現(xiàn)了立美。

1.4 啟動(dòng)測(cè)試

按照先后次序分別啟動(dòng):

  1. Kafka服務(wù)器;
  2. 服務(wù)治理服務(wù)器: Service-discovery;
  3. 商品微服務(wù): Product-Service;
  4. Mall-Web微服務(wù)匿又。

然后,使用Postman按照下圖訪問(wèn)消息測(cè)試端點(diǎn): http://localhost:2100/products/item-2:

使用Postman訪問(wèn)

在商品微服務(wù)的控制臺(tái)建蹄,可以看到類似下面輸出:

商品微服務(wù)控制臺(tái)輸出

從輸出日志中可以看到商品變更消息已經(jīng)發(fā)送到消息中間件碌更。如果這個(gè)時(shí)候我們查看Mall-Web微服務(wù)的控制臺(tái),可以看到下圖的輸出:

Mall-Web微服務(wù)控制臺(tái)輸出

從日志輸出中可以看到Mall-Web微服務(wù)已經(jīng)能夠正確接收到商品變更消息躲撰,然后重新請(qǐng)求了并獲取到了該商品的最新信息针贬。

2. Stream原理淺析

從Spring Cloud Stream核心原理上來(lái)說(shuō),Stream提供了一個(gè)與消息中間件進(jìn)行消息收發(fā)的抽象層拢蛋,這個(gè)也是Spring所擅長(zhǎng)的桦他。通過(guò)該抽象層剝離了業(yè)務(wù)中消息收發(fā)與實(shí)際所使用中間件直接的耦合,從而使得我們可以輕松與各種消息中間件對(duì)接谆棱,也可以很簡(jiǎn)單的就可以實(shí)現(xiàn)所使用的消息中間件的更換快压。這點(diǎn)和我們使用ORM框架一樣,可以平滑的在多種數(shù)據(jù)庫(kù)之間進(jìn)行切換垃瞧。

2.1. Stream應(yīng)用模型

從這個(gè)應(yīng)用開(kāi)發(fā)上來(lái)說(shuō)蔫劣,Stream提供了下述模型:

Stream Application Mode

在該模型圖上有如下幾個(gè)核心概念:

  • Source: 當(dāng)需要發(fā)送消息時(shí),我們就需要通過(guò)Source个从,Source將會(huì)把我們所要發(fā)送的消息(POJO對(duì)象)進(jìn)行序列化(默認(rèn)轉(zhuǎn)換成JSON格式字符串)脉幢,然后將這些數(shù)據(jù)發(fā)送到Channel中歪沃;
  • Sink: 當(dāng)我們需要監(jiān)聽(tīng)消息時(shí)就需要通過(guò)Sink來(lái),Sink負(fù)責(zé)從消息通道中獲取消息嫌松,并將消息反序列化成消息對(duì)象(POJO對(duì)象)沪曙,然后交給具體的消息監(jiān)聽(tīng)處理進(jìn)行業(yè)務(wù)處理;
  • Channel: 消息通道是Stream的抽象之一萎羔。通常我們向消息中間件發(fā)送消息或者監(jiān)聽(tīng)消息時(shí)需要指定主題(Topic)/消息隊(duì)列名稱液走,但這樣一旦我們需要變更主題名稱的時(shí)候需要修改消息發(fā)送或者消息監(jiān)聽(tīng)的代碼,但是通過(guò)Channel抽象贾陷,我們的業(yè)務(wù)代碼只需要對(duì)Channel就可以了缘眶,具體這個(gè)Channel對(duì)應(yīng)的是那個(gè)主題,就可以在配置文件中來(lái)指定髓废,這樣當(dāng)主題變更的時(shí)候我們就不用對(duì)代碼做任何修改巷懈,從而實(shí)現(xiàn)了與具體消息中間件的解耦;
  • Binder: Stream中另外一個(gè)抽象層瓦哎。通過(guò)不同的Binder可以實(shí)現(xiàn)與不同消息中間件的整合砸喻,比如上面的示例我們所使用的就是針對(duì)Kafka的Binder,通過(guò)Binder提供統(tǒng)一的消息收發(fā)接口蒋譬,從而使得我們可以根據(jù)實(shí)際需要部署不同的消息中間件割岛,或者根據(jù)實(shí)際生產(chǎn)中所部署的消息中間件來(lái)調(diào)整我們的配置。

2.2. Stream應(yīng)用原理

從上面我們了解了Stream的應(yīng)用模型犯助,消息發(fā)送邏輯及流程我們也清晰了癣漆。那么我們?cè)趯?shí)際消息發(fā)送和監(jiān)聽(tīng)時(shí)又是怎么操作的呢?

在使用上Stream提供了下面三個(gè)注解:

  • @Input: 創(chuàng)建一個(gè)消息輸入通道剂买,用于消息監(jiān)聽(tīng);
  • @Output: 創(chuàng)建一個(gè)消息輸出通道惠爽,用于消息發(fā)送;
  • @EnableBinding: 建立與消息通道的綁定。

我們?cè)谑褂脮r(shí)可以通過(guò)@Input@Output創(chuàng)建多個(gè)通道瞬哼,使用這兩個(gè)注解創(chuàng)建通道非常簡(jiǎn)單婚肆,你只需要將他們分別注解到接口的相應(yīng)方法上即可,而不需要具體來(lái)實(shí)現(xiàn)該注解坐慰。當(dāng)啟動(dòng)Stream框架時(shí)较性,就會(huì)根據(jù)這兩個(gè)注解通過(guò)動(dòng)態(tài)代碼生成技術(shù)生成相應(yīng)的實(shí)現(xiàn),并注入到Spring應(yīng)用上下文中结胀,這樣我們就可以在代碼中直接使用赞咙。

2.2.1 Output注解

對(duì)于@Output注解來(lái)說(shuō),所注解的方法的返回值必須是MessageChannel糟港,MessageChannel也就是具體消息發(fā)送的通道攀操。比如下面的代碼:

public interface ProductSource {
    @Output
    MessageChannel hotProducts();

    @Output
    MessageChannel selectedProducts();
}

這樣,我們就可以通過(guò)ProductSource所創(chuàng)建的消息通道來(lái)發(fā)送消息了秸抚。

2.2.2 Input注解

對(duì)于@Input注解來(lái)說(shuō)速和,所注解的方法的返回值必須是SubscribableChannel歹垫,SubscribableChannel也就是消息監(jiān)聽(tīng)的通道。比如下面的代碼:

public interface ProductSink {
    @Input
    SubscribableChannel productOrders();
}

這樣颠放,我們就可以通過(guò)ProductSink所創(chuàng)建的消息通道來(lái)監(jiān)聽(tīng)消息了县钥。

2.2.3 關(guān)于Input、Output的開(kāi)箱即用

或許你有點(diǎn)迷糊慈迈,之前我們?cè)诖a中使用了SourceSink省有,那么這兩個(gè)類和上面的注解什么關(guān)系呢痒留?讓我們來(lái)看一下這兩個(gè)接口的源碼:

// Source源碼
public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();

}

// Sink源碼
public interface Sink {

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();

}

是不是有點(diǎn)恍然大悟呀,@Input@Output是Stream核心應(yīng)用的注解蠢沿,而SourceSink只不過(guò)是Stream為我們所提供開(kāi)箱即用的兩個(gè)接口而已伸头,有沒(méi)有這兩個(gè)接口我們都可以正常使用Stream。

此外舷蟀,Stream還提供了一個(gè)開(kāi)箱即用的接口Processor恤磷,源碼如下:

public interface Processor extends Source, Sink {
}

也就是說(shuō)Processor只不過(guò)是同時(shí)可以作為消息發(fā)送和消息監(jiān)聽(tīng),這種接口在我們開(kāi)發(fā)消息管道類型應(yīng)用時(shí)會(huì)非常有用野宜。

2.2.4 自定義消息通道名稱

前面扫步,我們講了消息通道是Stream的一個(gè)抽象,通過(guò)該抽象可以避免與消息中間件具體的主題耦合匈子,那么到底是怎么一回事呢河胎?從SourceSink源碼中可以看到,所注解的@Output@Input注解中都有一個(gè)參數(shù)虎敦,分別為outputinput游岳,這個(gè)時(shí)候你再觀察一下我們之前的配置:

# 商品微服務(wù)中的配置
spring.cloud.stream.bindings.output.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.output.content-type=application/json

# Mall-Web中的配置
spring.cloud.stream.bindings.input.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=mallWebGroup

從配置中可以看到destination屬性的配置,分別指定了outputinout也就是Stream中所使用的消息通道名稱其徙。因此胚迫,我們可以通過(guò)這兩個(gè)注解來(lái)分別設(shè)置消息通道的名稱,比如:

public interface ProductProcessor {
    @Output("pmsoutput")
    MessageChannel productOutput();

    @Input("pmsinput")
    SubscribableChannel input();}

這樣唾那,當(dāng)我們使用ProductProcessor接口來(lái)實(shí)現(xiàn)消息發(fā)送和監(jiān)聽(tīng)的時(shí)就需要在配置文件中配置如下:

# 消息發(fā)送
spring.cloud.stream.bindings.pmsoutput.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.pmsoutput.content-type=application/json

# 消息監(jiān)聽(tīng)
spring.cloud.stream.bindings.pmsinput.destination=twostepsfromjava-cloud-producttopic
spring.cloud.stream.bindings.pmsinput.content-type=application/json
spring.cloud.stream.bindings.pmsinput.group=mallWebGroup

2.2.5 綁定

既然访锻,消息發(fā)送通道和監(jiān)聽(tīng)通道都創(chuàng)建好了,那么將它們對(duì)接到具體的消息中間件就可以完成消息的發(fā)送和監(jiān)聽(tīng)功能了通贞,而@EnableBinding注解就是用來(lái)實(shí)現(xiàn)該功能朗若。具體使用方式如下:

// 實(shí)現(xiàn)發(fā)送的綁定
@EnableBinding(Source.class)
public class Application {

}

// 實(shí)現(xiàn)監(jiān)聽(tīng)的綁定
@EnableBinding(Sink.class)
public class ProductMsgListener {

}

需要說(shuō)明的是,@EnableBinding可以同時(shí)綁定多個(gè)接口昌罩,如下:

@EnableBinding(value={ProductSource.class, ProductSink.class})

2.2.6 直接使用通道

前面我們消息發(fā)送的代碼如下:

protected void sendMsg(String msgAction, String itemCode) {
    ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
    this.logger.debug("發(fā)送商品消息:{} ", productMsg);
    
    // 發(fā)送消息
    this.source.output().send(MessageBuilder.withPayload(productMsg).build());
}

獲取你在想既然@Output所提供的MessageChannel才是最終消息發(fā)送時(shí)使用的哭懈,那么我們是否可以直接使用呢?的確這個(gè)是可以的茎用,上面的代碼我們可以更改成如下:

@Service
public class ProductService {
    protected Logger logger = LoggerFactory.getLogger(ProductService.class);

    private MessageChannel output;
    private List<ProductDto> productList;

    @Autowired
    public ProductService(MessageChannel output) {
        this.output = output;
        this.productList = this.buildProducts();
    }
    
     // 省略了其它代碼
     
    /**
     * 具體消息發(fā)送的實(shí)現(xiàn)
     * @param msgAction 消息類型
     * @param itemCode 商品貨號(hào)
     */
    protected void sendMsg(String msgAction, String itemCode) {
        ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
        this.logger.debug("發(fā)送商品消息:{} ", productMsg);

        // 發(fā)送消息
        this.output.send(MessageBuilder.withPayload(productMsg).build());
    }
}

默認(rèn)Stream所創(chuàng)建的MessageChannelBean的Id為方法名稱遣总,但是如果我們?cè)?code>@Output注解中增加了名稱定義睬罗,如果:

public interface ProductSource {
    @Output("pmsoutput")
    MessageChannel output();
}

那么這個(gè)時(shí)候Stream會(huì)使用pmsoutput作為Bean的Id,而我們的代碼也需要為如下:

@Autowired
public ProductService(@Qualifier("pmsoutput") MessageChannel output) {
    this.output = output;
    this.productList = this.buildProducts();
}

你可以到這里下載本篇的代碼旭斥。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末容达,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子垂券,更是在濱河造成了極大的恐慌花盐,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件菇爪,死亡現(xiàn)場(chǎng)離奇詭異算芯,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)凳宙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門熙揍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人氏涩,你說(shuō)我怎么就攤上這事届囚。” “怎么了是尖?”我有些...
    開(kāi)封第一講書人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵意系,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我析砸,道長(zhǎng)昔字,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任首繁,我火速辦了婚禮作郭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘弦疮。我一直安慰自己夹攒,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布胁塞。 她就那樣靜靜地躺著咏尝,像睡著了一般。 火紅的嫁衣襯著肌膚如雪啸罢。 梳的紋絲不亂的頭發(fā)上编检,一...
    開(kāi)封第一講書人閱讀 49,144評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音扰才,去河邊找鬼允懂。 笑死,一個(gè)胖子當(dāng)著我的面吹牛衩匣,可吹牛的內(nèi)容都是我干的蕾总。 我是一名探鬼主播粥航,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼生百!你這毒婦竟也來(lái)了递雀?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蚀浆,失蹤者是張志新(化名)和其女友劉穎缀程,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體市俊,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡杠输,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了秕衙。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡僵刮,死狀恐怖据忘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情搞糕,我是刑警寧澤勇吊,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站窍仰,受9級(jí)特大地震影響汉规,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜驹吮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一针史、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧碟狞,春花似錦啄枕、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至脆淹,卻和暖如春常空,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背盖溺。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工漓糙, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人咐柜。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓兼蜈,卻偏偏與公主長(zhǎng)得像攘残,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子为狸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容