springboot集成Redisson做分布式消息隊列

這里演示Redisson做分布式消息隊列。首先引入 Redisson依賴腾么,官方github

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.16.0</version>
</dependency>

首先創(chuàng)建一個自定義注解RedissonTopic.java兄墅,用于指定消息的路由key

package com.zyq.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

/** Redissson消息隊列注解
 * author xiaochi
 * date 2024/10/23
 */
@Inherited
@Documented
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonTopic {

    /**
     * topic名稱
     * @return
     */
    String key();

    /**
     * 是否隊列發(fā)送消息
     * @return
     */
    boolean queue() default false;

    /**
     * 隊列容量
     * @return
     */
    int queueSize() default 100;

    /** queue為true時生效
     * 延遲發(fā)送時間(大于0默認(rèn)延遲,延遲隊列可設(shè)置大于0)
     * @return
     */
    int delayTime() default 0;

    /** queue為true時生效
     * 時間單位(默認(rèn)毫秒)
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

繼續(xù)創(chuàng)建消息監(jiān)聽器 RedissonTopicMessageListener.java,具體內(nèi)容如下:

package com.zyq.listener;

/** Redisson消息監(jiān)聽接口
 * author xiaochi
 * date 2024/10/23
 */
public interface RedissonTopicMessageListener{

    /**
     * 接收的消息處理
     * @param message
     */
    void message(Object message);

    /**
     * 發(fā)送失敗(隊列已滿時會調(diào)用)
     * @param message
     */
    void sendFail(Object message);

    /**
     * 異常
     * @param ex
     */
    void exception(Exception ex);
}

接下來就是最重要的Redisson配置,內(nèi)容如下:

/**
 * Redisson 配置
 * @return
 */
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext){
    Config config = new Config();
    config.useSingleServer().setPassword("123456")
            .setDatabase(0)
            .setConnectionPoolSize(24) // 連接池大小搓扯,默認(rèn)64
            .setConnectionMinimumIdleSize(3) // 最小空閑連接數(shù),默認(rèn)32
            .setRetryAttempts(3) // 命令失敗重試次數(shù) 3
            .setRetryInterval(1500) // 命令重試發(fā)送時間間隔(毫秒) 默認(rèn)1500
            .setTimeout(10000) // 命令等待超時(毫秒) 默認(rèn)10000
            .setConnectTimeout(10000) // 連接空閑超時(毫秒) 默認(rèn)10000
            .setIdleConnectionTimeout(10000) // 連接空閑超時(毫秒) 默認(rèn)10000
            .setSubscriptionConnectionMinimumIdleSize(3) // 發(fā)布和訂閱連接的最小空閑連接數(shù)
            .setSubscriptionConnectionPoolSize(50) // 發(fā)布和訂閱連接池大小 默認(rèn)50
            .setDnsMonitoringInterval(10000) // DNS監(jiān)測時間間隔(毫秒)遣妥,默認(rèn)5000
            .setAddress("redis://127.0.0.1:6379");
    //config.setThreads(Runtime.getRuntime().availableProcessors());// 默認(rèn) 16
    RedissonClient redissonClient = Redisson.create(config);
    StringBuilder msg = new StringBuilder();
    msg.append("redisson topic register[");
    String[] beanNames = applicationContext.getBeanNamesForType(RedissonTopicMessageListener.class);
    for (String beanName : beanNames) {
        RedissonTopicMessageListener topicMessageListener = applicationContext.getBean(beanName, RedissonTopicMessageListener.class);
        if (topicMessageListener.getClass().isAnnotationPresent(RedissonTopic.class)){
            RedissonTopic redissonTopic = topicMessageListener.getClass().getAnnotation(RedissonTopic.class);
            if (redissonTopic.queue()){
                RBoundedBlockingQueue<Object> boundedBlockingQueue = redissonClient.getBoundedBlockingQueue(redissonTopic.key());
                boundedBlockingQueue.trySetCapacity(redissonTopic.queueSize());
                RDelayedQueue<Object> delayedQueue = null;
                if (0 != redissonTopic.delayTime()){
                    delayedQueue = redissonClient.getDelayedQueue(boundedBlockingQueue);
                }
                RTopic topic = redissonClient.getTopic(redissonTopic.key());
                RDelayedQueue<Object> finalDelayedQueue = delayedQueue;
                topic.addListener(Object.class, (channel, message) -> {
                    if (finalDelayedQueue != null){
                        try {
                            finalDelayedQueue.offer(message,redissonTopic.delayTime(), redissonTopic.timeUnit());
                        }catch (Exception e){
                            topicMessageListener.exception(e);
                        }
                    }else {
                        try {
                            if (!boundedBlockingQueue.offer(message)){
                                topicMessageListener.sendFail(message);
                            }
                        }catch (Exception e){
                            topicMessageListener.exception(e);
                        }
                    }
                });
                // 為了不阻塞主線程擅编,放在新線程中運行
                AsyncUtil.run(() -> {
                    while (!Thread.currentThread().isInterrupted() && !redissonClient.isShutdown()){
                        try {
                            Object take = boundedBlockingQueue.take();
                            if (!"".equals(take)){
                                topicMessageListener.message(take);
                            }
                        } catch (Exception e) {
                            topicMessageListener.exception(e);
                            log.info("redisson.{}隊列監(jiān)測異常,{}",redissonTopic.key(),e);
                        }
                    }
                    if (Thread.currentThread().isInterrupted() || redissonClient.isShutdown()){
                        log.info("redisson service shutdown");
                    }
                });
            }else {
                RTopic topic = redissonClient.getTopic(redissonTopic.key());
                topic.addListener(Object.class, (channel,message) -> {
                    try {
                        topicMessageListener.message(message);
                    }catch (Exception e){
                        topicMessageListener.exception(e);
                    }
                });
            }
            msg.append(redissonTopic.key()).append(".");
        }
    }
    msg.append("]").append("finish.");
    log.info(msg.toString());
    return redissonClient;
}

到此基本就完成了,接下來就是創(chuàng)建消息監(jiān)聽類進(jìn)行消費消息了TopicMessageListener.java 去實現(xiàn)消息監(jiān)聽器接口RedissonTopicMessageListener.java

package com.zyq.listener;

import com.zyq.annotation.RedissonTopic;
import org.springframework.stereotype.Component;

/** 消息監(jiān)聽類
 * author xiaochi
 * date 2024/10/23
 */
@Component
@RedissonTopic(key = "testTopic",queue = true,delayTime = 5000)
public class TopicMessageListener implements RedissonTopicMessageListener {

    @Override
    public void message(Object message) {
        System.out.println("testTopic監(jiān)聽器延遲隊列收到消息," + message);
    }

    @Override
    public void sendFail(Object message) {
        System.out.println("延遲隊列 TopicMessageListener testTopic消息發(fā)送失敗");
    }

    @Override
    public void exception(Exception ex) {
        System.out.println("延遲隊列 TopicMessageListener testTopic消息異常,{}",ex);
    }
}

現(xiàn)在可以起2個springboot項目進(jìn)行消息交流了箫踩。封裝一個消息發(fā)送工具 RedissonMessageUtil.java,內(nèi)容如下:

package com.demo3.util;

import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/** Redisson消息發(fā)送工具
 * author xiaochi
 * date 2024/10/24
 */
@Component
public class RedissonMessageUtil {

    private static RedissonClient redissonClient;

    @Autowired
    public void setRedissonClient(RedissonClient redissonClient) {
        RedissonMessageUtil.redissonClient = redissonClient;
    }

    /**
     * 發(fā)送消息
     * @param key
     * @param message
     * @return 返回接收消息的客戶端數(shù)量
     */
    public static long send(String key,Object message){
        RTopic topic = redissonClient.getTopic(key);
        return topic.publish(message);
    }
}

在業(yè)務(wù)處使用

RedissonMessageUtil.send("testTopic","你好啊");

再去看控制臺谭贪,已經(jīng)打印接收到的消息了境钟,到此完成。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末俭识,一起剝皮案震驚了整個濱河市慨削,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌套媚,老刑警劉巖缚态,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異堤瘤,居然都是意外死亡玫芦,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進(jìn)店門本辐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來桥帆,“玉大人医增,你說我怎么就攤上這事±铣妫” “怎么了叶骨?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長祈匙。 經(jīng)常有香客問我忽刽,道長,這世上最難降的妖魔是什么夺欲? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任缔恳,我火速辦了婚禮,結(jié)果婚禮上洁闰,老公的妹妹穿的比我還像新娘歉甚。我一直安慰自己,他們只是感情好扑眉,可當(dāng)我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布纸泄。 她就那樣靜靜地躺著,像睡著了一般腰素。 火紅的嫁衣襯著肌膚如雪聘裁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天弓千,我揣著相機(jī)與錄音衡便,去河邊找鬼。 笑死洋访,一個胖子當(dāng)著我的面吹牛镣陕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播姻政,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼呆抑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了汁展?” 一聲冷哼從身側(cè)響起鹊碍,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎食绿,沒想到半個月后侈咕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡器紧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年耀销,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片品洛。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡树姨,死狀恐怖摩桶,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情帽揪,我是刑警寧澤硝清,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站转晰,受9級特大地震影響芦拿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜查邢,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一蔗崎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧扰藕,春花似錦缓苛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至芥备,卻和暖如春冬耿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背萌壳。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工亦镶, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人袱瓮。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓缤骨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親懂讯。 傳聞我的和親對象是個殘疾皇子荷憋,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容