什么鬼,面試官竟然讓我用Redis實(shí)現(xiàn)一個(gè)消息隊(duì)列!E琐辍翁逞?

來自公眾號(hào):Hollis
作者:Hollis

眾所周知,redis是一個(gè)高性能的分布式key-value存儲(chǔ)系統(tǒng)溉仑,在NoSQL數(shù)據(jù)庫市場上挖函,redis自己就占據(jù)了將近半壁江山,足以見到其強(qiáng)大之處浊竟。同時(shí)怨喘,由于redis的單線程特性,我們可以將其用作為一個(gè)消息隊(duì)列振定。本篇文章就來講講如何將redis整合到spring boot中必怜,并用作消息隊(duì)列的……

一、什么是消息隊(duì)列

“消息隊(duì)列”是在消息的傳輸過程中保存消息的容器后频∈崆欤——《百度百科》

消息我們可以理解為在計(jì)算機(jī)中或在整個(gè)計(jì)算機(jī)網(wǎng)絡(luò)中傳遞的數(shù)據(jù)。

隊(duì)列是我們在學(xué)習(xí)數(shù)據(jù)結(jié)構(gòu)的時(shí)候?qū)W習(xí)的基本數(shù)據(jù)結(jié)構(gòu)之一卑惜,它具有先進(jìn)先出的特性膏执。

所以,消息隊(duì)列就是一個(gè)保存消息的容器露久,它具有先進(jìn)先出的特性更米。

為什么會(huì)出現(xiàn)消息隊(duì)列?

  1. 異步:常見的B/S架構(gòu)下毫痕,客戶端向服務(wù)器發(fā)送請求征峦,但是服務(wù)器處理這個(gè)消息需要花費(fèi)的時(shí)間很長的時(shí)間,如果客戶端一直等待服務(wù)器處理完消息消请,會(huì)造成客戶端的系統(tǒng)資源浪費(fèi)栏笆;而使用消息隊(duì)列后,服務(wù)器直接將消息推送到消息隊(duì)列中梯啤,由專門的處理消息程序處理消息竖伯,這樣客戶端就不必花費(fèi)大量時(shí)間等待服務(wù)器的響應(yīng)了;

  2. 解耦:傳統(tǒng)的軟件開發(fā)模式因宇,模塊之間的調(diào)用是直接調(diào)用七婴,這樣的系統(tǒng)很不利于系統(tǒng)的擴(kuò)展,同時(shí)察滑,模塊之間的相互調(diào)用打厘,數(shù)據(jù)之間的共享問題也很大,每個(gè)模塊都要時(shí)時(shí)刻刻考慮其他模塊會(huì)不會(huì)掛了贺辰;使用消息隊(duì)列以后户盯,模塊之間不直接調(diào)用嵌施,而是通過數(shù)據(jù),且當(dāng)某個(gè)模塊掛了以后莽鸭,數(shù)據(jù)仍舊會(huì)保存在消息隊(duì)列中吗伤。最典型的就是生產(chǎn)者-消費(fèi)者模式,本案例使用的就是該模式硫眨;

  3. 削峰填谷:某一時(shí)刻足淆,系統(tǒng)的并發(fā)請求暴增,遠(yuǎn)遠(yuǎn)超過了系統(tǒng)的最大處理能力后礁阁,如果不做任何處理巧号,系統(tǒng)會(huì)崩潰;使用消息隊(duì)列以后姥闭,服務(wù)器把請求推送到消息隊(duì)列中丹鸿,由專門的處理消息程序以合理的速度消費(fèi)消息,降低服務(wù)器的壓力棚品。

下面一張圖我們來簡單了解一下消息隊(duì)列
image

由上圖可以看到靠欢,消息隊(duì)列充當(dāng)了一個(gè)中間人的角色,我們可以通過操作這個(gè)消息隊(duì)列來保證我們的系統(tǒng)穩(wěn)定南片。

二掺涛、環(huán)境準(zhǔn)備

Java環(huán)境:jdk1.8

spring boot版本:2.2.1.RELEASE

redis-server版本:3.2.100

三庭敦、相關(guān)依賴

這里只展示與redis相關(guān)的依賴疼进,

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

<dependency>

    <groupId>org.springframework.integration</groupId>

    <artifactId>spring-integration-redis</artifactId>

</dependency>

這里解釋一下這兩個(gè)依賴:

  • 第一個(gè)依賴是對(duì)redis NoSQL的支持

  • 第二個(gè)依賴是spring integration與redis的結(jié)合,這里添加這個(gè)代碼主要是為了實(shí)現(xiàn)分布式鎖

四秧廉、配置文件

這里只展示與redis相關(guān)的配置

# redis所在的的地址

spring.redis.host=localhost

# redis數(shù)據(jù)庫索引伞广,從0開始,可以從redis的可視化客戶端查看

spring.redis.database=1

# redis的端口疼电,默認(rèn)為6379

spring.redis.port=6379

# redis的密碼

spring.redis.password=

# 連接redis的超時(shí)時(shí)間(ms)嚼锄,默認(rèn)是2000

spring.redis.timeout=5000

# 連接池最大連接數(shù)

spring.redis.jedis.pool.max-active=16

# 連接池最小空閑連接

spring.redis.jedis.pool.min-idle=0

# 連接池最大空閑連接

spring.redis.jedis.pool.max-idle=16

# 連接池最大阻塞等待時(shí)間(負(fù)數(shù)表示沒有限制)

spring.redis.jedis.pool.max-wait=-1

# 連接redis的客戶端名

spring.redis.client-name=mall

五、代碼配置

redis用作消息隊(duì)列蔽豺,其在spring boot中的主要表現(xiàn)為一RedisTemplate.convertAndSend()方法和一個(gè)MessageListener接口区丑。所以我們要在IOC容器中注入一個(gè)RedisTemplate和一個(gè)實(shí)現(xiàn)了MessageListener接口的類。話不多說修陡,先看代碼

配置RedisTemplate

配置RedisTemplate的主要目的是配置序列化方式以解決亂碼問題沧侥,同時(shí)合理配置序列化方式還能降低一點(diǎn)性能開銷。

/**

 * 配置RedisTemplate魄鸦,解決亂碼問題

 */

@Bean

public RedisTemplate&lt;String, Object&gt; redisTemplate(RedisConnectionFactory factory) {

    LOGGER.debug("redis序列化配置開始");

    RedisTemplate&lt;String, Object&gt; template = new RedisTemplate&lt;&gt;();

    template.setConnectionFactory(factory);

    // string序列化方式

    RedisSerializer serializer = new GenericJackson2JsonRedisSerializer();

    // 設(shè)置默認(rèn)序列化方式

    template.setDefaultSerializer(serializer);

    template.setKeySerializer(new StringRedisSerializer());

    template.setHashValueSerializer(serializer);

    LOGGER.debug("redis序列化配置結(jié)束");

    return template;

}

代碼第12行宴杀,我們配置默認(rèn)的序列化方式為GenericJackson2JsonRedisSerializer代碼第13行,我們配置鍵的序列化方式為StringRedisSerializer代碼第14行拾因,我們配置哈希表的值的序列化方式為GenericJackson2JsonRedisSerializer

RedisTemplate幾種序列化方式的簡要介紹

image

六旺罢、redis隊(duì)列監(jiān)聽器(消費(fèi)者)

上面說了旷余,與redis隊(duì)列監(jiān)聽器相關(guān)的類為一個(gè)名為MessageListener的接口,下面是該接口的源碼

public interface MessageListener {

    void onMessage(Message message, @Nullable byte[] pattern);

}

可以看到扁达,該接口僅有一個(gè)onMessage(Message message, @Nullable byte[] pattern)方法正卧,該方法便是監(jiān)聽到隊(duì)列中消息后的回調(diào)方法。下面解釋一下這兩個(gè)參數(shù):

  • message:redis消息類跪解,該類中僅有兩個(gè)方法

  • byte[] getBody()以二進(jìn)制形式獲取消息體

  • byte[] getChannel()以二進(jìn)制形式獲取消息通道

  • pattern:二進(jìn)制形式的消息通道穗酥,和message.getChannel()返回值相同

介紹完接口,我們來實(shí)現(xiàn)一個(gè)簡單的redis隊(duì)列監(jiān)聽器

@Component

public class RedisListener implement MessageListener{

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);

    @Override

    public void onMessage(Message message,byte[] pattern){

        LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(pattern));

        LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(message.getChannel()));

        LOGGER.debug("元消息={}",new String(message.getBody()));

        // 新建一個(gè)用于反序列化的對(duì)象惠遏,注意這里的對(duì)象要和前面配置的一樣

        // 因?yàn)槲仪懊嬖O(shè)置的默認(rèn)序列化方式為GenericJackson2JsonRedisSerializer

        // 所以這里的實(shí)現(xiàn)方式為GenericJackson2JsonRedisSerializer

        RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();

        LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));

    }

}

代碼很簡單砾跃,就是輸出參數(shù)中包含的關(guān)鍵信息。需要注意的是节吮,RedisSerializer的實(shí)現(xiàn)要與上面配置的序列化方式一致抽高。

隊(duì)列監(jiān)聽器實(shí)現(xiàn)完以后,我們還需要將這個(gè)監(jiān)聽器添加到redis隊(duì)列監(jiān)聽器容器中透绩,代碼如下:

@Bean

public public RedisMessageListenerContainer container(RedisConnectionFactory factory) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();

    container.setConnectionFactory(factory);

    container.addMessageListener(redisListener, new PatternTopic("demo-channel"));

    return container;

}

這幾行代碼大概意思就是新建一個(gè)Redis消息監(jiān)聽器容器翘骂,然后將監(jiān)聽器和管道名想綁定驳癌,最后返回這個(gè)容器葫松。

這里要注意的是,這個(gè)管道名和下面將要說的推送消息時(shí)的管道名要一致漾岳,不然監(jiān)聽器監(jiān)聽不到消息狸臣。

七莹桅、redis隊(duì)列推送服務(wù)(生產(chǎn)者)

上面我們配置了RedisTemplate將要在這里使用到。

代碼如下:

@Service

public class Publisher{

    @Autowrite

    private RedisTemplate redis;

    public void publish(Object msg){

        redis.convertAndSend("demo-channel",msg);

    }

}

關(guān)鍵代碼為第7行烛亦,redis.convertAndSend()這個(gè)方法的作用為诈泼,向某個(gè)通道(參數(shù)1)推送一條消息(第二個(gè)參數(shù))。

這里還是要注意上面所說的煤禽,生產(chǎn)者和消費(fèi)者的通道名要相同铐达。

至此,消息隊(duì)列的生產(chǎn)者和消費(fèi)者已經(jīng)全部編寫完成檬果。

八瓮孙、遇到的問題及解決辦法

1、spring boot使用log4j2日志框架問題

在我添加了spring-boot-starter-log4j2依賴并在spring-boot-starter-web中排除了spring-boot-starter-logging后选脊,運(yùn)行項(xiàng)目杭抠,還是會(huì)提示下面的錯(cuò)誤:

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]

這個(gè)錯(cuò)誤就是maven中有多個(gè)日志框架導(dǎo)致的。后來通過依賴分析知牌,發(fā)現(xiàn)在spring-boot-starter-data-redis中祈争,也依賴了spring-boot-starter-logging,解決辦法也很簡單角寸,下面貼出詳細(xì)代碼

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

    <exclusions>

        <exclusion>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-logging</artifactId>

        </exclusion>

    </exclusions>

</dependency>

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-web</artifactId>

    <exclusions>

        <exclusion>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-logging</artifactId>

        </exclusion>

    </exclusions>

</dependency>

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-log4j2</artifactId>

</dependency>

<dependency>

    <groupId>org.springframework.integration</groupId>

    <artifactId>spring-integration-redis</artifactId>

</dependency>

2菩混、redis隊(duì)列監(jiān)聽器線程安全問題

redis隊(duì)列監(jiān)聽器的監(jiān)聽機(jī)制是:使用一個(gè)線程監(jiān)聽隊(duì)列忿墅,隊(duì)列有未消費(fèi)的消息則取出消息并生成一個(gè)新的線程來消費(fèi)消息。如果你還記得沮峡,我開頭說的是由于redis單線程特性疚脐,因此我們用它來做消息隊(duì)列,但是如果監(jiān)聽器每次接受一個(gè)消息就生成新的線程來消費(fèi)信息的話邢疙,這樣就完全沒有使用到redis的單線程特性棍弄,同時(shí)還會(huì)產(chǎn)生線程安全問題。

單一消費(fèi)者(一個(gè)通道只有一個(gè)消費(fèi)者)的解決辦法

最簡單的辦法莫過于為onMessage()方法加鎖疟游,這樣簡單粗暴卻很有用呼畸,不過這種方式無法控制隊(duì)列監(jiān)聽的速率,且無限制的創(chuàng)造線程最終會(huì)導(dǎo)致系統(tǒng)資源被占光颁虐。那如何解決這種情況呢蛮原?線程池。在將監(jiān)聽器添加到容器的配置的時(shí)候另绩,RedisMessageListenerContainer類中有一個(gè)方法setTaskExecutor(Executor taskExecutor)可以為監(jiān)聽容器配置線程池儒陨。配置線程池以后,所有的線程都會(huì)由該線程池產(chǎn)生笋籽,由此蹦漠,我們可以通過調(diào)節(jié)線程池來控制隊(duì)列監(jiān)聽的速率。

多個(gè)消費(fèi)者(一個(gè)通道有多個(gè)消費(fèi)者)的解決辦法

單一消費(fèi)者的問題相比于多個(gè)消費(fèi)者來說還是較為簡單车海,因?yàn)镴ava內(nèi)置的鎖都是只能控制自己程序的運(yùn)行笛园,不能干擾其他的程序的運(yùn)行;然而現(xiàn)在很多時(shí)候我們都是在分布式環(huán)境下進(jìn)行開發(fā)容劳,這時(shí)處理多個(gè)消費(fèi)者的情況就很有意義了喘沿。

那么這種問題如何解決呢闸度?分布式鎖竭贩。

下面來簡要科普一下什么是分布式鎖:

分布式鎖是指在分布式環(huán)境下,同一時(shí)間只有一個(gè)客戶端能夠從某個(gè)共享環(huán)境中(例如redis)獲取到鎖莺禁,只有獲取到鎖的客戶端才能執(zhí)行程序留量。

然后分布式鎖一般要滿足:排他性(即同一時(shí)間只有一個(gè)客戶端能夠獲取到鎖)、避免死鎖(即超時(shí)后自動(dòng)釋放)哟冬、高可用(即獲取或釋放鎖的機(jī)制必須高可用且性能佳)

上面講依賴的時(shí)候楼熄,我們導(dǎo)入了一個(gè)spring-integration-redis依賴,這個(gè)依賴?yán)锩姘撕芏鄬?shí)用的工具類浩峡,而我們接下來要講的分布式鎖就是這個(gè)依賴下面的一個(gè)工具包RedisLockRegistry可岂。

首先講一下如何使用,導(dǎo)入了依賴以后翰灾,首先配置一個(gè)Bean

@Bean

public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) {

    return new RedisLockRegistry(factory, "demo-lock",60);

}

RedisLockRegistry的構(gòu)造函數(shù)缕粹,第一個(gè)參數(shù)是redis連接池稚茅,第二個(gè)參數(shù)是鎖的前綴,即取出的鎖平斩,鍵名為“demo-lock:KEY_NAME”亚享,第三個(gè)參數(shù)為鎖的過期時(shí)間(秒),默認(rèn)為60秒绘面,當(dāng)持有鎖超過該時(shí)間后自動(dòng)過期欺税。

使用鎖的方法,下面是對(duì)監(jiān)聽器的修改

@Component

public class RedisListener implement MessageListener{

    @Autowrite

    private RedisLockRegistry redisLockRegistry;

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);

    @Override

    public void onMessage(Message message,byte[] pattern){

        Lock lock=redisLockRegistry.obtain("lock");

        try{

            lock.lock(); //上鎖

            LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(pattern));

            LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(message.getChannel()));

            LOGGER.debug("元消息={}",new String(message.getBody()));

            // 新建一個(gè)用于反序列化的對(duì)象揭璃,注意這里的對(duì)象要和前面配置的一樣

            // 因?yàn)槲仪懊嬖O(shè)置的默認(rèn)序列化方式為GenericJackson2JsonRedisSerializer

            // 所以這里的實(shí)現(xiàn)方式為GenericJackson2JsonRedisSerializer

            RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();

            LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            lock.unlock(); //解鎖

        }

    }

}

上面代碼的代碼比起前面的監(jiān)聽器代碼晚凿,只是多了一個(gè)注入的RedisLockRegistry,一個(gè)通過redisLockRegistry.obtain()方法獲取鎖瘦馍,一個(gè)加鎖一個(gè)解鎖晃虫,然后這就完成了分布式鎖的使用。注意這個(gè)獲取鎖的方法redisLockRegistry.obtain()扣墩,其返回的是一個(gè)名為RedisLock的鎖哲银,這是一個(gè)私有內(nèi)部類,它實(shí)現(xiàn)了Lock接口呻惕,因此我們不能從代碼外部創(chuàng)建一個(gè)他的實(shí)例荆责,只能通過obtian()方法來獲取這個(gè)鎖。

以上就是本文的全部內(nèi)容亚脆。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末做院,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子濒持,更是在濱河造成了極大的恐慌键耕,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件柑营,死亡現(xiàn)場離奇詭異屈雄,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)官套,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門酒奶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人奶赔,你說我怎么就攤上這事惋嚎。” “怎么了站刑?”我有些...
    開封第一講書人閱讀 158,369評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵另伍,是天一觀的道長。 經(jīng)常有香客問我绞旅,道長摆尝,這世上最難降的妖魔是什么愕宋? 我笑而不...
    開封第一講書人閱讀 56,799評(píng)論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮结榄,結(jié)果婚禮上中贝,老公的妹妹穿的比我還像新娘。我一直安慰自己臼朗,他們只是感情好邻寿,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,910評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著视哑,像睡著了一般绣否。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挡毅,一...
    開封第一講書人閱讀 50,096評(píng)論 1 291
  • 那天蒜撮,我揣著相機(jī)與錄音,去河邊找鬼跪呈。 笑死段磨,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的耗绿。 我是一名探鬼主播苹支,決...
    沈念sama閱讀 39,159評(píng)論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼误阻!你這毒婦竟也來了债蜜?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,917評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤究反,失蹤者是張志新(化名)和其女友劉穎寻定,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體精耐,經(jīng)...
    沈念sama閱讀 44,360評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡狼速,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,673評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了黍氮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唐含。...
    茶點(diǎn)故事閱讀 38,814評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖沫浆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情滚秩,我是刑警寧澤专执,帶...
    沈念sama閱讀 34,509評(píng)論 4 334
  • 正文 年R本政府宣布,位于F島的核電站郁油,受9級(jí)特大地震影響本股,放射性物質(zhì)發(fā)生泄漏攀痊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,156評(píng)論 3 317
  • 文/蒙蒙 一拄显、第九天 我趴在偏房一處隱蔽的房頂上張望苟径。 院中可真熱鬧,春花似錦躬审、人聲如沸棘街。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽遭殉。三九已至,卻和暖如春博助,著一層夾襖步出監(jiān)牢的瞬間险污,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評(píng)論 1 267
  • 我被黑心中介騙來泰國打工富岳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蛔糯,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,641評(píng)論 2 362
  • 正文 我出身青樓窖式,卻偏偏與公主長得像渤闷,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子脖镀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,728評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容