RabbitMQ整合SpringCloud
????你好宙枷!歡迎來到Java成長筆記,主要是用于相互交流茧跋,相互學(xué)習(xí)慰丛,也希望分享能幫到大家,如有錯誤之處瘾杭,希望指正诅病,謝謝!
整合RabbitMQ簡介
1、SpringCloudStream定位是兼容主流消息中間件的集成使用贤笆,減少不同消息中間件集成的配置蝇棉,它整合RabbitMQ配置相較簡單,不需要定義相應(yīng)的交換機(jī)苏潜、隊列银萍、以及關(guān)系綁定,使相應(yīng)的配置減少恤左。
2、SpringCloudStream通過上層結(jié)構(gòu)上的處理搀绣,使消息生產(chǎn)端飞袋、消費(fèi)端可以多樣化,不需要拘泥于生產(chǎn)消費(fèi)端使用相同的消息中間件链患。例如:生產(chǎn)端可以使用RabbitMQ巧鸭,而消費(fèi)端可以使用Kafka,讓開發(fā)者省去了相應(yīng)不同的配置的集成麻捻,開發(fā)者只需要使用好相應(yīng)的幾個注解纲仍,就能實(shí)現(xiàn)高性能的生產(chǎn)和消費(fèi)的場景。
3贸毕、當(dāng)然它也有一個非常大的問題就是不能實(shí)現(xiàn)消息的可靠性投遞郑叠,也就是不能保證消息的100%可靠性,會存在少量的消息丟失明棍。
引入依賴配置
主要依賴
// 指定統(tǒng)一配置
<properties>
<java.version>1.8</java.version>
<spring.cloud-version>Greenwich.SR6</spring.cloud-version>
<spring.boot-version>Brussels-SR17</spring.boot-version>
</properties>
// springcloud依賴
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
// 其它配置
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
使用注解說明
@Output:輸出注解乡革,用于定義發(fā)送消息接口
@input:輸入注解,用于定義消息的消費(fèi)者接口
@StreamListener:用于定義監(jiān)聽方法的注解
@EnableBinding:啟動綁定關(guān)系
消息生產(chǎn)端
生產(chǎn)端配置說明
spring:
cloud:
stream:
binders:
defaultRabbit: # 定義的名稱摊腋,用于bidding整合
type: rabbit # 指定消息類型
environment:
spring:
rabbitmq: # rabbitmq 配置信息
addresses: 127.0.0.1:5672 # rabbitmq 連接地址
username: rabbitmq # rabbitmq 用戶
password: 123456 # rabbitmq 連接密碼
virtual-host: / # 虛擬路徑
bindings:
userOutPutChannel:
destination: exchange_cloud # Exchange名稱沸版,交換模式默認(rèn)topic,把stream輸出通道綁定到exchange_cloud交換機(jī)
group: userGroup # 分組名稱兴蒸,生產(chǎn)端视粮、消費(fèi)端名稱需要一致
default-binder: defaultRabbit # 和上面定義的 binders:defaultRabbit需要一致
content-type: application/json # 設(shè)置消息類型 為json
生產(chǎn)端代碼
展示如下:
// 定義輸出類型的通道
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public interface Barista {
final static String OUTPUT_CHANNEL = "userOutPutChannel";
/*
* @Description: 定義一個輸出類型的通道
* @Author ly
* @param []
* @return org.springframework.messaging.MessageChannel
* @date 2021/3/22 17:07
*/
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel userOutPutChannel();
}
// 實(shí)現(xiàn)類封裝
import com.alibaba.fastjson.JSON;
import com.show.service.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
@EnableBinding(Barista.class)
@Service
@Slf4j
public class RabbitSender {
@Resource
@Output(Barista.OUTPUT_CHANNEL)
private MessageChannel channel;
@Resource
private Barista barista;
/*
* @Description: channel 發(fā)送消息
* @Author ly
* @date 2021/3/23 11:12
*/
public void sendMsg (String msg) {
channel.send(MessageBuilder.withPayload(msg).build());
log.error("channel消息發(fā)送成功:{} " + msg);
}
/*
* @Description: barista 發(fā)送消息
* @Author ly
* @date 2021/3/23 11:12
*/
public void sendMessage (Object message, Map<String, Object> properties) {
final MessageHeaders messageHeaders = new MessageHeaders(properties);
final Message msg = MessageBuilder.createMessage(message, messageHeaders);
final boolean sendResult = barista.userOutPutChannel().send(msg);
log.error("barista消息發(fā)送成功:{},sendResult:{} " + JSON.toJSONString(msg), sendResult);
}
}
// 測試類處理
import com.google.common.collect.ImmutableMap;
import com.show.model.User;
import com.show.service.impl.RabbitSender;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ECloudProducerApplicationTests.class)
@ComponentScan(basePackages = {"com.show.*"})
@Slf4j
public class ECloudProducerApplicationTests {
@Resource
private RabbitSender rabbitSender;
@Test
public void sendMessage () {
final String message = "Hello RabbitMQ";
rabbitSender.sendMsg(message);
}
@Test
public void sendRabbitMessage () {
final Map<String, Object> properties = ImmutableMap.of("cloud-stream", "cloud-stream");
final User user = new User("simon","123456", 22, new BigDecimal(100));
rabbitSender.sendMessage(user, properties);
}
}
消費(fèi)端代碼
消費(fèi)端配置說明
spring:
cloud:
stream:
binders:
defaultRabbit: # 此配置為rabbitmq配置說明
type: rabbit
environment:
spring:
rabbitmq:
addresses: 127.0.0.1:5672
username: rabbit
password: 123456
virtual-host: /
bindings:
userInChannel: # 定義輸入管道名稱
destination: exchange_cloud # 交換模式默認(rèn)是topic,把stream的消息輸出通道綁定到exchange_cloud交換器
group: userGroup # 分組名稱與生產(chǎn)端名稱一致
content-type: application/json # 消費(fèi)端消息類型 json
default-binder: defaultRabbit # 與binders:defaultRabbit名稱一致 環(huán)境名稱
consumer:
concurrency: 1 # 默認(rèn)監(jiān)聽數(shù)量
rabbit:
bindings:
userInChannel: # 和bindings:userInChannel 名稱一致
consumer: # 消費(fèi)端配置
requeue-reject: false # 是否支持return
acknowledge-mode: MANUAL # 簽收模式 手動簽收
recovery-interval: 3000 # 3s重新連接
durable-subscription: true # 是否啟動持久化訂閱
max-concurrency: 5 # 最大監(jiān)聽數(shù)量
消費(fèi)端代碼
// 定義一個輸入通道
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component
public interface Barista {
String INPUT_CHANNEL = "userInChannel";
/*
* @Description: 定義一個輸入類型的通道
* @Author ly
* @param []
* @return org.springframework.messaging.SubscribableChannel
* @date 2021/3/22 17:34
*/
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel userInChannel();
}
// 定義輸入通道監(jiān)聽類
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.show.service.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@EnableBinding(Barista.class)
@Service
@Slf4j
public class MQReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
log.error("消費(fèi)完畢:{}, Object:{}", System.currentTimeMillis(), JSON.toJSON(message));
channel.basicAck(deliveryTag, false);
}
}
接收消息返回信息示例
對象消息返回信息示例
本章完結(jié)橙凳,后續(xù)還會持續(xù)更新蕾殴,分享Java成長筆記,希望我們能一起成長痕惋。如果你覺得我的分享有用区宇,記得點(diǎn)贊和關(guān)注哦!這對我是最好的鼓勵值戳。謝謝议谷!
PS:轉(zhuǎn)載請注明出處!