架構(gòu)圖
Binder
- Destination Binder (目標(biāo)綁定器) :與消息中間件通信的組件
- Destination Bindings (目標(biāo)綁定) : Binding是連接應(yīng)用程序跟消息中間件的橋梁 ,用于消息的消費和生產(chǎn),由binder創(chuàng)建
- Message(消息)
springboot整合stream之生產(chǎn)者
- 加依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- 寫注解,在啟動類上加
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class StreamProducerMain {
public static void main(String[] args){
SpringApplication.run(StreamProducerMain.class,args);
}
}
- 寫配置
server:
port: 8081
spring:
application:
name: cloud-stream-producer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息咬扇;
defaultRabbit: # 表示定義的名稱甲葬,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672 #查看rabbitmq Listening ports amqp 為 ip 端口為 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
output: # 這個名字是一個通道的名稱
destination: xypspExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型,文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
- 測試方法
/**
* @author rp
*/
@AllArgsConstructor
@RestController
@RequestMapping("/message")
public class MessageProviderController {
private final MessageProviderServer messageProviderServer;
/**
* 發(fā)送消息
* */
@GetMapping("/sendMessage")
public Result sendMessage(){
String uuid = messageProviderServer.send();
return Result.success(uuid);
}
}
package com.xypsp.springcloud.server;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author rp
* 定義消息的推送管道
*/
@EnableBinding(Source.class)
public class MessageProviderServer {
/**
* 消息發(fā)送管道
*/
@Resource
private MessageChannel output;
public String send() {
String serial = UUID.randomUUID().toString();
//發(fā)送延遲消息
// output.send(MessageBuilder.withPayload(serial).setHeader("x-delay",5000).build());
output.send(MessageBuilder.withPayload(serial).build());
return serial;
}
}
springboot整合stream之消費者A
- 加依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- 寫配置
server:
port: 8082
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息懈贺;
defaultRabbit: # 表示定義的名稱经窖,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672 #查看rabbitmq Listening ports amqp 為 ip 端口為 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
input: # 這個名字是一個通道的名稱
destination: xypspExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型,文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
group: xypspGroup # 消息的持久化 Consumer斷開連接梭灿,隊列仍然存在画侣,相同group的消費同一個queue的時候是輪詢的方式,每個實例一條輪著消費堡妒,避免重復(fù)消費配乱。
- 消費方法
package com.xypsp.springcloud.server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @author rp
* 定義消息的接收管道
*/
@Slf4j
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
log.info("接收到消息: {}",message.getPayload());
}
}
springboot整合stream之消費者B
- 加依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- 寫配置
server:
port: 8083
spring:
application:
name: cloud-stream-consumer-b
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
defaultRabbit: # 表示定義的名稱皮迟,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672 #查看rabbitmq Listening ports amqp 為 ip 端口為 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
input: # 這個名字是一個通道的名稱
destination: xypspExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型搬泥,文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
group: xypspGroup # 消息的持久化 Consumer斷開連接,隊列仍然存在伏尼,相同group的消費同一個queue的時候是輪詢的方式忿檩,每個實例一條輪著消費,避免重復(fù)消費爆阶。
- 消費方法
package com.xypsp.springcloud.server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @author rp
* 定義消息的接收管道
*/
@Slf4j
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
log.info("接收到消息: {}",message.getPayload());
}
}