1、RabbitMQ 環(huán)境安裝
鏈接(linux):https://www.linuxprobe.com/install-rabbitmq-on-centos-7.html
鏈接(win):https://www.cnblogs.com/JustinLau/p/11738511.html
2击费、Producer 生產(chǎn)者
- main 服務(wù)啟動(dòng)類(lèi)
package com.wyh;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding(Producer.TestTopic.class)
@SpringBootApplication
@EnableDiscoveryClient
public class Producer {
public static void main(String[] args) {
SpringApplication.run(Producer.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
return "ok";
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
@Output(OUTPUT)
MessageChannel output();
/*
如果存在多個(gè)通道的話可以這樣使用
String OUTPUT2 = "example-topic-output";
String INPUT2 = "example-topic-input";
@Output(OUTPUT)
MessageChannel output2();
@Input(INPUT)
SubscribableChannel input2();*/
}
}
- application.yml
server:
port: 8006
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
application:
name: rabbitmq-provider #本服務(wù)的名稱(chēng)
cloud: #注冊(cè)到nacos
nacos:
discovery:
server-addr: localhost:8848
stream:
binders: #需要綁定的rabbitmq的服務(wù)信息
defaultRabbit: #定義的名稱(chēng)拢蛋,用于binding整合
type: rabbit #消息組件類(lèi)型
bindings: # 服務(wù)的整合處理(這里需要注意:如果存在多條管道,那么需要多個(gè)綁定)
example-topic-output:
destination: test-topic
management:
endpoints:
web:
exposure:
include: "*"
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myspring-cloud</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>producer</artifactId>
<dependencies>
<!-- Spring Cloud Stream RabbitMQ-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- SpringCloud alibaba nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- web組件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
3荡灾、consumer-01 消費(fèi)者
- main 服務(wù)啟動(dòng)類(lèi)
package com.wyh;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@EnableBinding(consumer01.TestTopic.class)
@SpringBootApplication
@EnableDiscoveryClient
public class consumer01 {
public static void main(String[] args) {
SpringApplication.run(consumer01.class, args);
}
/**
* 消息消費(fèi)邏輯
*/
@Slf4j
@Component
static class TestListener {
@StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
public void receiveV1(String payload, @Header("version") String version) {
log.info("Received v1 com-01: " + payload + ", " + version);
}
@StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
public void receiveV2(String payload, @Header("version") String version) {
log.info("Received v2 com-01: " + payload + ", " + version);
}
}
interface TestTopic {
String INPUT = "example-topic-input";
@Input(INPUT)
SubscribableChannel input();
}
}
- application.yml
server:
port: 8004
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
application:
name: consumer-01 #本服務(wù)的名稱(chēng)
cloud: #注冊(cè)到nacos
nacos:
discovery:
server-addr: localhost:8848
stream:
binders: #需要綁定的rabbitmq的服務(wù)信息
defaultRabbit: #定義的名稱(chēng)瓤狐,用于binding整合
type: rabbit #消息組件類(lèi)型
bindings: # 服務(wù)的整合處理(這里需要注意:如果存在多條管道,那么需要多個(gè)綁定)
example-topic-input:
destination: test-topic #exchange名稱(chēng)批幌,交換模式默認(rèn)是topic
group: stream-content-route #分組
management:
endpoints:
web:
exposure:
include: "*"
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myspring-cloud</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>consumer-01</artifactId>
<dependencies>
<!-- Spring Cloud Stream RabbitMQ-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- SpringCloud alibaba nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- web組件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
4础锐、consumer-02 消費(fèi)者
- 01copy一份即可,端口號(hào)修改一下