來自公眾號:Hollis
眾所周知设褐,redis是一個高性能的分布式key-value存儲系統(tǒng)颠蕴,在NoSQL數(shù)據(jù)庫市場上,redis自己就占據(jù)了將近半壁江山助析,足以見到其強大之處犀被。同時,由于redis的單線程特性外冀,我們可以將其用作為一個消息隊列寡键。本篇文章就來講講如何將redis整合到spring boot中,并用作消息隊列的……
一雪隧、什么是消息隊列
“消息隊列”是在消息的傳輸過程中保存消息的容器西轩。——《百度百科》
消息我們可以理解為在計算機中或在整個計算機網(wǎng)絡(luò)中傳遞的數(shù)據(jù)膀跌。
隊列是我們在學(xué)習(xí)數(shù)據(jù)結(jié)構(gòu)的時候?qū)W習(xí)的基本數(shù)據(jù)結(jié)構(gòu)之一遭商,它具有先進先出的特性。
所以捅伤,消息隊列就是一個保存消息的容器,它具有先進先出的特性巫玻。
為什么會出現(xiàn)消息隊列丛忆?
異步:常見的B/S架構(gòu)下祠汇,客戶端向服務(wù)器發(fā)送請求,但是服務(wù)器處理這個消息需要花費的時間很長的時間熄诡,如果客戶端一直等待服務(wù)器處理完消息可很,會造成客戶端的系統(tǒng)資源浪費;而使用消息隊列后凰浮,服務(wù)器直接將消息推送到消息隊列中我抠,由專門的處理消息程序處理消息,這樣客戶端就不必花費大量時間等待服務(wù)器的響應(yīng)了袜茧;
解耦:傳統(tǒng)的軟件開發(fā)模式菜拓,模塊之間的調(diào)用是直接調(diào)用,這樣的系統(tǒng)很不利于系統(tǒng)的擴展笛厦,同時纳鼎,模塊之間的相互調(diào)用,數(shù)據(jù)之間的共享問題也很大裳凸,每個模塊都要時時刻刻考慮其他模塊會不會掛了贱鄙;使用消息隊列以后,模塊之間不直接調(diào)用姨谷,而是通過數(shù)據(jù)逗宁,且當(dāng)某個模塊掛了以后,數(shù)據(jù)仍舊會保存在消息隊列中梦湘。最典型的就是生產(chǎn)者-消費者模式疙剑,本案例使用的就是該模式;
削峰填谷:某一時刻践叠,系統(tǒng)的并發(fā)請求暴增言缤,遠(yuǎn)遠(yuǎn)超過了系統(tǒng)的最大處理能力后,如果不做任何處理禁灼,系統(tǒng)會崩潰管挟;使用消息隊列以后,服務(wù)器把請求推送到消息隊列中弄捕,由專門的處理消息程序以合理的速度消費消息僻孝,降低服務(wù)器的壓力。
由上圖可以看到守谓,消息隊列充當(dāng)了一個中間人的角色穿铆,我們可以通過操作這個消息隊列來保證我們的系統(tǒng)穩(wěn)定。
二斋荞、環(huán)境準(zhǔn)備
Java環(huán)境:jdk1.8
spring boot版本:2.2.1.RELEASE
redis-server版本:3.2.100
三荞雏、相關(guān)依賴
這里只展示與redis相關(guān)的依賴,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
這里解釋一下這兩個依賴:
第一個依賴是對redis NoSQL的支持
第二個依賴是spring integration與redis的結(jié)合,這里添加這個代碼主要是為了實現(xiàn)分布式鎖
四凤优、配置文件
這里只展示與redis相關(guān)的配置
# redis所在的的地址
spring.redis.host=localhost
# redis數(shù)據(jù)庫索引悦陋,從0開始,可以從redis的可視化客戶端查看
spring.redis.database=1
# redis的端口筑辨,默認(rèn)為6379
spring.redis.port=6379
# redis的密碼
spring.redis.password=
# 連接redis的超時時間(ms)俺驶,默認(rèn)是2000
spring.redis.timeout=5000
# 連接池最大連接數(shù)
spring.redis.jedis.pool.max-active=16
# 連接池最小空閑連接
spring.redis.jedis.pool.min-idle=0
# 連接池最大空閑連接
spring.redis.jedis.pool.max-idle=16
# 連接池最大阻塞等待時間(負(fù)數(shù)表示沒有限制)
spring.redis.jedis.pool.max-wait=-1
# 連接redis的客戶端名
spring.redis.client-name=mall
五、代碼配置
redis用作消息隊列棍辕,其在spring boot中的主要表現(xiàn)為一RedisTemplate.convertAndSend()方法和一個MessageListener接口暮现。所以我們要在IOC容器中注入一個RedisTemplate和一個實現(xiàn)了MessageListener接口的類。話不多說楚昭,先看代碼
配置RedisTemplate
配置RedisTemplate的主要目的是配置序列化方式以解決亂碼問題栖袋,同時合理配置序列化方式還能降低一點性能開銷。
/**
* 配置RedisTemplate哪替,解決亂碼問題
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
LOGGER.debug("redis序列化配置開始");
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// string序列化方式
RedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
// 設(shè)置默認(rèn)序列化方式
template.setDefaultSerializer(serializer);
template.setKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
LOGGER.debug("redis序列化配置結(jié)束");
return template;
}
代碼第12行栋荸,我們配置默認(rèn)的序列化方式為GenericJackson2JsonRedisSerializer代碼第13行,我們配置鍵的序列化方式為StringRedisSerializer代碼第14行凭舶,我們配置哈希表的值的序列化方式為GenericJackson2JsonRedisSerializer
RedisTemplate幾種序列化方式的簡要介紹
六晌块、redis隊列監(jiān)聽器(消費者)
上面說了,與redis隊列監(jiān)聽器相關(guān)的類為一個名為MessageListener的接口帅霜,下面是該接口的源碼
public interface MessageListener {
void onMessage(Message message, @Nullable byte[] pattern);
}
可以看到匆背,該接口僅有一個onMessage(Message message, @Nullable byte[] pattern)方法,該方法便是監(jiān)聽到隊列中消息后的回調(diào)方法身冀。下面解釋一下這兩個參數(shù):
message:redis消息類钝尸,該類中僅有兩個方法
byte[] getBody()以二進制形式獲取消息體
byte[] getChannel()以二進制形式獲取消息通道
pattern:二進制形式的消息通道,和message.getChannel()返回值相同
介紹完接口搂根,我們來實現(xiàn)一個簡單的redis隊列監(jiān)聽器
@Component
public class RedisListener implement MessageListener{
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(pattern));
LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一個用于反序列化的對象珍促,注意這里的對象要和前面配置的一樣
// 因為我前面設(shè)置的默認(rèn)序列化方式為GenericJackson2JsonRedisSerializer
// 所以這里的實現(xiàn)方式為GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));
}
}
代碼很簡單,就是輸出參數(shù)中包含的關(guān)鍵信息剩愧。需要注意的是猪叙,RedisSerializer的實現(xiàn)要與上面配置的序列化方式一致。
隊列監(jiān)聽器實現(xiàn)完以后仁卷,我們還需要將這個監(jiān)聽器添加到redis隊列監(jiān)聽器容器中穴翩,代碼如下:
@Bean
public public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(redisListener, new PatternTopic("demo-channel"));
return container;
}
這幾行代碼大概意思就是新建一個Redis消息監(jiān)聽器容器,然后將監(jiān)聽器和管道名想綁定锦积,最后返回這個容器芒帕。
這里要注意的是,這個管道名和下面將要說的推送消息時的管道名要一致丰介,不然監(jiān)聽器監(jiān)聽不到消息背蟆。
七鉴分、redis隊列推送服務(wù)(生產(chǎn)者)
上面我們配置了RedisTemplate將要在這里使用到。
代碼如下:
@Service
public class Publisher{
@Autowrite
private RedisTemplate redis;
public void publish(Object msg){
redis.convertAndSend("demo-channel",msg);
}
}
關(guān)鍵代碼為第7行淆储,redis.convertAndSend()這個方法的作用為冠场,向某個通道(參數(shù)1)推送一條消息(第二個參數(shù))家浇。
這里還是要注意上面所說的本砰,生產(chǎn)者和消費者的通道名要相同。
至此钢悲,消息隊列的生產(chǎn)者和消費者已經(jīng)全部編寫完成点额。
八、遇到的問題及解決辦法
1莺琳、spring boot使用log4j2日志框架問題
在我添加了spring-boot-starter-log4j2依賴并在spring-boot-starter-web中排除了spring-boot-starter-logging后还棱,運行項目,還是會提示下面的錯誤:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
這個錯誤就是maven中有多個日志框架導(dǎo)致的惭等。后來通過依賴分析珍手,發(fā)現(xiàn)在spring-boot-starter-data-redis中,也依賴了spring-boot-starter-logging辞做,解決辦法也很簡單琳要,下面貼出詳細(xì)代碼
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
2、redis隊列監(jiān)聽器線程安全問題
redis隊列監(jiān)聽器的監(jiān)聽機制是:使用一個線程監(jiān)聽隊列秤茅,隊列有未消費的消息則取出消息并生成一個新的線程來消費消息稚补。如果你還記得,我開頭說的是由于redis單線程特性框喳,因此我們用它來做消息隊列课幕,但是如果監(jiān)聽器每次接受一個消息就生成新的線程來消費信息的話,這樣就完全沒有使用到redis的單線程特性五垮,同時還會產(chǎn)生線程安全問題乍惊。
單一消費者(一個通道只有一個消費者)的解決辦法
最簡單的辦法莫過于為onMessage()方法加鎖,這樣簡單粗暴卻很有用放仗,不過這種方式無法控制隊列監(jiān)聽的速率润绎,且無限制的創(chuàng)造線程最終會導(dǎo)致系統(tǒng)資源被占光。那如何解決這種情況呢匙监?線程池凡橱。在將監(jiān)聽器添加到容器的配置的時候,RedisMessageListenerContainer類中有一個方法setTaskExecutor(Executor taskExecutor)可以為監(jiān)聽容器配置線程池亭姥。配置線程池以后稼钩,所有的線程都會由該線程池產(chǎn)生,由此达罗,我們可以通過調(diào)節(jié)線程池來控制隊列監(jiān)聽的速率坝撑。
多個消費者(一個通道有多個消費者)的解決辦法
單一消費者的問題相比于多個消費者來說還是較為簡單静秆,因為Java內(nèi)置的鎖都是只能控制自己程序的運行,不能干擾其他的程序的運行巡李;然而現(xiàn)在很多時候我們都是在分布式環(huán)境下進行開發(fā)抚笔,這時處理多個消費者的情況就很有意義了。
那么這種問題如何解決呢侨拦?分布式鎖殊橙。
下面來簡要科普一下什么是分布式鎖:
分布式鎖是指在分布式環(huán)境下,同一時間只有一個客戶端能夠從某個共享環(huán)境中(例如redis)獲取到鎖狱从,只有獲取到鎖的客戶端才能執(zhí)行程序膨蛮。
然后分布式鎖一般要滿足:排他性(即同一時間只有一個客戶端能夠獲取到鎖)、避免死鎖(即超時后自動釋放)季研、高可用(即獲取或釋放鎖的機制必須高可用且性能佳)
上面講依賴的時候敞葛,我們導(dǎo)入了一個spring-integration-redis依賴,這個依賴?yán)锩姘撕芏鄬嵱玫墓ぞ哳愑胛校覀兘酉聛硪v的分布式鎖就是這個依賴下面的一個工具包RedisLockRegistry惹谐。
首先講一下如何使用,導(dǎo)入了依賴以后驼卖,首先配置一個Bean
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) {
return new RedisLockRegistry(factory, "demo-lock",60);
}
RedisLockRegistry的構(gòu)造函數(shù)氨肌,第一個參數(shù)是redis連接池,第二個參數(shù)是鎖的前綴款慨,即取出的鎖儒飒,鍵名為“demo-lock:KEY_NAME”,第三個參數(shù)為鎖的過期時間(秒)檩奠,默認(rèn)為60秒桩了,當(dāng)持有鎖超過該時間后自動過期。
使用鎖的方法埠戳,下面是對監(jiān)聽器的修改
@Component
public class RedisListener implement MessageListener{
@Autowrite
private RedisLockRegistry redisLockRegistry;
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
Lock lock=redisLockRegistry.obtain("lock");
try{
lock.lock(); //上鎖
LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(pattern));
LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一個用于反序列化的對象井誉,注意這里的對象要和前面配置的一樣
// 因為我前面設(shè)置的默認(rèn)序列化方式為GenericJackson2JsonRedisSerializer
// 所以這里的實現(xiàn)方式為GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解鎖
}
}
}
上面代碼的代碼比起前面的監(jiān)聽器代碼,只是多了一個注入的RedisLockRegistry整胃,一個通過redisLockRegistry.obtain()方法獲取鎖颗圣,一個加鎖一個解鎖,然后這就完成了分布式鎖的使用屁使。注意這個獲取鎖的方法redisLockRegistry.obtain()在岂,其返回的是一個名為RedisLock的鎖,這是一個私有內(nèi)部類蛮寂,它實現(xiàn)了Lock接口蔽午,因此我們不能從代碼外部創(chuàng)建一個他的實例,只能通過obtian()方法來獲取這個鎖酬蹋。
以上就是本文的全部內(nèi)容及老。