Spring-boot 集成Redis應(yīng)用(一) --消息隊(duì)列
一.基礎(chǔ)環(huán)境
- jdk 1.8
- maven 3.5.3
- spring-boot 2.0.4
- redis 4.0.11
二.基本介紹
Spring MVC 3.2 之后引入了基于Servlet 3的異步請(qǐng)求處理。因此使用了DeferredResult
相關(guān)使用來實(shí)現(xiàn)異步處理惕它,從而擴(kuò)大請(qǐng)求吞吐量。
Redis使用 LPUSH 和RPOP命令實(shí)現(xiàn)隊(duì)列的概念。只需要讓生產(chǎn)者將任務(wù)使用LPUSH 命令加入到某個(gè)鍵中,另一邊讓消費(fèi)者不斷地使用RPOP命令從該鍵中取出任務(wù)即可趟径。
三.流程介紹
四.相關(guān)代碼示例
-
POM文件
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.0</version> </dependency>
-
application.properties 簡(jiǎn)單相關(guān)redis配置
注意如果redis沒有安裝到本機(jī)堕仔,那么必須設(shè)置訪問密碼否則會(huì)有連接報(bào)錯(cuò)異常。
#Redis server host. spring.redis.host=192.168.56.101 # Redis server port. spring.redis.port=6379 #Login password of the redis server. spring.redis.password=123456 # Maximum number of connections that can be allocated # by the pool at a given time. Use a negative value for no limit. spring.redis.lettuce.pool.max-active=8 # Maximum number of "idle" connections in the pool. # Use a negative value to indicate an unlimited number of idle connections. spring.redis.lettuce.pool.max-idle=8 # Maximum amount of time a connection allocation should # block before throwing an exception when the pool is exhausted. # Use a negative value to block indefinitely. spring.redis.lettuce.pool.max-wait=-1ms # Target for the minimum number of idle connections to maintain in the pool. T # his setting only has an effect if it is positive. spring.redis.lettuce.pool.min-idle=0 # Shutdown timeout. spring.redis.lettuce.shutdown-timeout=0ms
自定義
StringRedisTemplate
類,保證采用string的序列序列化,具體配置在代碼中有原文鏈接早敬,如果有疑問可以看看。
@Configuration
public class RedisConfig {
/**
* 定義 StringRedisTemplate 大脉,指定序列化和反序列化的處理類
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisTemplate<String,String> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
//配置過濾類型
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//默認(rèn)允許序列化類型
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
//序列化 值時(shí)使用此序列化方法
stringRedisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
stringRedisTemplate.afterPropertiesSet();
return stringRedisTemplate;
}
- 編寫
DeferredResult
的管理類搞监,方便在異步方法中找到任務(wù)ID對(duì)應(yīng)的DeferredResult
對(duì)象。
@Component
public class DeferredResultHolder {
private Map<String, DeferredResult<String>> map = new HashMap<String,DeferredResult<String>>();
public Map<String, DeferredResult<String>> getMap() {
return map;
}
public void setMap(Map<String, DeferredResult<String>> map) {
this.map = map;
}
}
- 編寫消息隊(duì)列監(jiān)聽,監(jiān)聽處理任務(wù)程序的消費(fèi)隊(duì)列,如果隊(duì)列里有值镰矿,則代表任務(wù)處理完成琐驴,并找到對(duì)應(yīng)的
DeferredResult
對(duì)象,進(jìn)行賦值結(jié)果返回。
/**
* 設(shè)置消息隊(duì)列監(jiān)聽
* ContextRefreshedEvent spring 在初始化完畢后的事件
* @author Neal
*/
@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {
//日志
private Logger logger = LoggerFactory.getLogger(getClass());
//redis完成隊(duì)列KEY
private static String REDIS_COMPLATE = "complete";
@Autowired
private RedisTemplate redisTemplate;
//DeferredResult管理類
@Autowired
private DeferredResultHolder deferredResultHolder;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent){
//由于處理隊(duì)列方法是一個(gè)無限循環(huán)绝淡,需要單起一個(gè)線程宙刘,防止阻塞系統(tǒng)啟動(dòng)
new Thread(()->{
while(true) {
logger.info("讀取消息隊(duì)列完成訂單 ");
//從完成的隊(duì)列中按順序取出完成的任務(wù)ID
Object uuid = redisTemplate.opsForList().rightPop(REDIS_COMPLATE,5000,TimeUnit.SECONDS);
//為空判斷
if(null == uuid) {
try {
TimeUnit.MILLISECONDS.sleep(500);
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("返回訂單處理結(jié)果: " + uuid);
//返回處理結(jié)果
deferredResultHolder.getMap().get(uuid).setResult("success");
}
}).start();
}
}
- 編寫 模擬任務(wù)處理方法 ,在DEMO中同樣使用監(jiān)聽機(jī)制來實(shí)現(xiàn),當(dāng)從生產(chǎn)隊(duì)列中取到非NULL值的任務(wù)ID時(shí),則進(jìn)行1秒鐘的休眠牢酵,方便后期調(diào)試查看悬包。然后將該ID放入消費(fèi)隊(duì)列。
/**
* 模擬另一個(gè)程序去處理消息隊(duì)列里的任務(wù)
* ContextRefreshedEvent spring 在初始化完畢后的事件
* @author Neal
*/
@Component
public class ResolveListener implements ApplicationListener<ContextRefreshedEvent> {
private Logger logger = LoggerFactory.getLogger(getClass());
//redis 完成隊(duì)列 KEY
private static String REDIS_COMPLATE = "complete";
//redis 準(zhǔn)備隊(duì)列 KEY
private static String REDIS_MESSAGE = "prepare";
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
//由于處理隊(duì)列方法是一個(gè)無限循環(huán)馍乙,需要單起一個(gè)線程布近,防止阻塞系統(tǒng)啟動(dòng)
new Thread(()-> {
while(true) {
//獲取任務(wù)隊(duì)列中的任務(wù)ID
Object prepareduuid = redisTemplate.opsForList().rightPop(REDIS_MESSAGE, 5000, TimeUnit.SECONDS);
//非空判斷
if(null == prepareduuid) {
try {
TimeUnit.MILLISECONDS.sleep(500);
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("讀取消息隊(duì)列待處理ID ; " + prepareduuid);
/**
* 模擬任務(wù)處理過程 begin
*/
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* end
*/
logger.info("完成訂單處理,把處理ID放入完成隊(duì)列");
//將完成后的任務(wù)放入 任務(wù)結(jié)束隊(duì)列
redisTemplate.opsForList().leftPush(REDIS_COMPLATE, prepareduuid);
}
}).start();
}
}
接著寫一下測(cè)試的controller層丝格,就是簡(jiǎn)單的請(qǐng)求撑瞧。
@RestController
@RequestMapping("/redis")
public class RedisController {
private Logger logger = LoggerFactory.getLogger(getClass());
//redis 準(zhǔn)備隊(duì)列
private static String REDIS_MESSAGE = "prepare";
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DeferredResultHolder deferredResultHolder;
/**
* 測(cè)試消息隊(duì)列入口接口
* @return
*/
@GetMapping("/async")
public DeferredResult<String> async() {
logger.info("主線程開始");
//生成唯一值 模擬任務(wù)ID初始化
String uuid = UUID.randomUUID().toString();
//將要任務(wù)的ID放入redis 待處理任務(wù)消息隊(duì)列
redisTemplate.opsForList().leftPush(REDIS_MESSAGE,uuid);
DeferredResult<String> deferredResult = new DeferredResult<>();
//將任務(wù)ID和 DeferredResult 對(duì)象綁定
deferredResultHolder.getMap().put(uuid,deferredResult);
logger.info("主線程返回");
return deferredResult;
}
}
-
啟動(dòng)spring-boot。應(yīng)用postman發(fā)送GET請(qǐng)求調(diào)試显蝌。
從postman請(qǐng)求以及控制臺(tái)輸出日志可以看到,postman發(fā)起請(qǐng)求是 先請(qǐng)求主線程预伺,然后主線程調(diào)用結(jié)束,緊接著是 模擬的任務(wù)處理ResolveListener
線程進(jìn)行處理操作琅束,最后監(jiān)聽隊(duì)列QueueListener
線程監(jiān)聽到任務(wù)處理結(jié)束并返回結(jié)果給前臺(tái)扭屁,打印出success。
因?yàn)椴襟E5和6都是在啟動(dòng)監(jiān)聽涩禀,所以單獨(dú)啟用一個(gè)線程料滥,防止方法內(nèi)的WHILE循環(huán)阻塞容器啟動(dòng)。
在此DEMO中艾船,如果程序運(yùn)行時(shí)間長(zhǎng)會(huì)報(bào)出 Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out 異常葵腹。具體原因就是連接REDIS時(shí)間設(shè)置過短,修改相關(guān)配置即可屿岂。
以上的消息隊(duì)列思路是通過學(xué)習(xí)慕課網(wǎng)JoJozhai老師的 spring security相關(guān)課程寫的DEMO