這里演示Redisson做分布式
消息隊列。首先引入 Redisson
依賴腾么,官方github
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.0</version>
</dependency>
首先創(chuàng)建一個自定義注解RedissonTopic.java
兄墅,用于指定消息的路由key
package com.zyq.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;
/** Redissson消息隊列注解
* author xiaochi
* date 2024/10/23
*/
@Inherited
@Documented
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonTopic {
/**
* topic名稱
* @return
*/
String key();
/**
* 是否隊列發(fā)送消息
* @return
*/
boolean queue() default false;
/**
* 隊列容量
* @return
*/
int queueSize() default 100;
/** queue為true時生效
* 延遲發(fā)送時間(大于0默認(rèn)延遲,延遲隊列可設(shè)置大于0)
* @return
*/
int delayTime() default 0;
/** queue為true時生效
* 時間單位(默認(rèn)毫秒)
* @return
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
繼續(xù)創(chuàng)建消息監(jiān)聽器 RedissonTopicMessageListener.java
,具體內(nèi)容如下:
package com.zyq.listener;
/** Redisson消息監(jiān)聽接口
* author xiaochi
* date 2024/10/23
*/
public interface RedissonTopicMessageListener{
/**
* 接收的消息處理
* @param message
*/
void message(Object message);
/**
* 發(fā)送失敗(隊列已滿時會調(diào)用)
* @param message
*/
void sendFail(Object message);
/**
* 異常
* @param ex
*/
void exception(Exception ex);
}
接下來就是最重要的Redisson配置
,內(nèi)容如下:
/**
* Redisson 配置
* @return
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext){
Config config = new Config();
config.useSingleServer().setPassword("123456")
.setDatabase(0)
.setConnectionPoolSize(24) // 連接池大小搓扯,默認(rèn)64
.setConnectionMinimumIdleSize(3) // 最小空閑連接數(shù),默認(rèn)32
.setRetryAttempts(3) // 命令失敗重試次數(shù) 3
.setRetryInterval(1500) // 命令重試發(fā)送時間間隔(毫秒) 默認(rèn)1500
.setTimeout(10000) // 命令等待超時(毫秒) 默認(rèn)10000
.setConnectTimeout(10000) // 連接空閑超時(毫秒) 默認(rèn)10000
.setIdleConnectionTimeout(10000) // 連接空閑超時(毫秒) 默認(rèn)10000
.setSubscriptionConnectionMinimumIdleSize(3) // 發(fā)布和訂閱連接的最小空閑連接數(shù)
.setSubscriptionConnectionPoolSize(50) // 發(fā)布和訂閱連接池大小 默認(rèn)50
.setDnsMonitoringInterval(10000) // DNS監(jiān)測時間間隔(毫秒)遣妥,默認(rèn)5000
.setAddress("redis://127.0.0.1:6379");
//config.setThreads(Runtime.getRuntime().availableProcessors());// 默認(rèn) 16
RedissonClient redissonClient = Redisson.create(config);
StringBuilder msg = new StringBuilder();
msg.append("redisson topic register[");
String[] beanNames = applicationContext.getBeanNamesForType(RedissonTopicMessageListener.class);
for (String beanName : beanNames) {
RedissonTopicMessageListener topicMessageListener = applicationContext.getBean(beanName, RedissonTopicMessageListener.class);
if (topicMessageListener.getClass().isAnnotationPresent(RedissonTopic.class)){
RedissonTopic redissonTopic = topicMessageListener.getClass().getAnnotation(RedissonTopic.class);
if (redissonTopic.queue()){
RBoundedBlockingQueue<Object> boundedBlockingQueue = redissonClient.getBoundedBlockingQueue(redissonTopic.key());
boundedBlockingQueue.trySetCapacity(redissonTopic.queueSize());
RDelayedQueue<Object> delayedQueue = null;
if (0 != redissonTopic.delayTime()){
delayedQueue = redissonClient.getDelayedQueue(boundedBlockingQueue);
}
RTopic topic = redissonClient.getTopic(redissonTopic.key());
RDelayedQueue<Object> finalDelayedQueue = delayedQueue;
topic.addListener(Object.class, (channel, message) -> {
if (finalDelayedQueue != null){
try {
finalDelayedQueue.offer(message,redissonTopic.delayTime(), redissonTopic.timeUnit());
}catch (Exception e){
topicMessageListener.exception(e);
}
}else {
try {
if (!boundedBlockingQueue.offer(message)){
topicMessageListener.sendFail(message);
}
}catch (Exception e){
topicMessageListener.exception(e);
}
}
});
// 為了不阻塞主線程擅编,放在新線程中運行
AsyncUtil.run(() -> {
while (!Thread.currentThread().isInterrupted() && !redissonClient.isShutdown()){
try {
Object take = boundedBlockingQueue.take();
if (!"".equals(take)){
topicMessageListener.message(take);
}
} catch (Exception e) {
topicMessageListener.exception(e);
log.info("redisson.{}隊列監(jiān)測異常,{}",redissonTopic.key(),e);
}
}
if (Thread.currentThread().isInterrupted() || redissonClient.isShutdown()){
log.info("redisson service shutdown");
}
});
}else {
RTopic topic = redissonClient.getTopic(redissonTopic.key());
topic.addListener(Object.class, (channel,message) -> {
try {
topicMessageListener.message(message);
}catch (Exception e){
topicMessageListener.exception(e);
}
});
}
msg.append(redissonTopic.key()).append(".");
}
}
msg.append("]").append("finish.");
log.info(msg.toString());
return redissonClient;
}
到此基本就完成了,接下來就是創(chuàng)建消息監(jiān)聽類進(jìn)行消費消息了TopicMessageListener.java
去實現(xiàn)消息監(jiān)聽器接口RedissonTopicMessageListener.java
package com.zyq.listener;
import com.zyq.annotation.RedissonTopic;
import org.springframework.stereotype.Component;
/** 消息監(jiān)聽類
* author xiaochi
* date 2024/10/23
*/
@Component
@RedissonTopic(key = "testTopic",queue = true,delayTime = 5000)
public class TopicMessageListener implements RedissonTopicMessageListener {
@Override
public void message(Object message) {
System.out.println("testTopic監(jiān)聽器延遲隊列收到消息," + message);
}
@Override
public void sendFail(Object message) {
System.out.println("延遲隊列 TopicMessageListener testTopic消息發(fā)送失敗");
}
@Override
public void exception(Exception ex) {
System.out.println("延遲隊列 TopicMessageListener testTopic消息異常,{}",ex);
}
}
現(xiàn)在可以起2個springboot項目進(jìn)行消息交流了箫踩。封裝一個消息發(fā)送工具 RedissonMessageUtil.java
,內(nèi)容如下:
package com.demo3.util;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** Redisson消息發(fā)送工具
* author xiaochi
* date 2024/10/24
*/
@Component
public class RedissonMessageUtil {
private static RedissonClient redissonClient;
@Autowired
public void setRedissonClient(RedissonClient redissonClient) {
RedissonMessageUtil.redissonClient = redissonClient;
}
/**
* 發(fā)送消息
* @param key
* @param message
* @return 返回接收消息的客戶端數(shù)量
*/
public static long send(String key,Object message){
RTopic topic = redissonClient.getTopic(key);
return topic.publish(message);
}
}
在業(yè)務(wù)處使用
RedissonMessageUtil.send("testTopic","你好啊");
再去看控制臺谭贪,已經(jīng)打印接收到的消息了境钟,到此完成。