消息隊(duì)列一般都會(huì)想到kafka,rabbitmq,Rockermq, 其實(shí),給你印像做緩存的Redis也是能做消息隊(duì)列.
- redis消息隊(duì)列生產(chǎn)者如下:
@Service
public class MessageQueueRedisProducerServiceImpl implements IMessageQueueProducerService {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public boolean produceMessage(MessageQueueDto messageQueueDto) {
redisTemplate.convertAndSend(messageQueueDto.getTopic(),messageQueueDto.getMessage());
return true;
}
@Override
public boolean support(String producerType) {
return Objects.equals(producerType,"redis");
}
}
其中,只要調(diào)用convertAndSend方法就可以產(chǎn)生隊(duì)列
2 redis消息隊(duì)列消費(fèi)者如下:
public class MessageQueueRedisConsumerListener implements MessageListener {
private IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(Message message, byte[] pattern) {
messageQueueConsumerService.receiveMessage(message.toString());
}
}
MessageQueueRedisConsumerListener 實(shí)現(xiàn)接口MessageListener 的監(jiān)聽君丁,這個(gè)主要用于處理獲取到的消息數(shù)據(jù)
@Service
public class MessageQueueRedisConsumerServiceFactory {
private List<IMessageQueueConsumerService> messageQueueConsumerServices;
@Autowired
public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("redis")).collect(Collectors.toList());
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
new MessageQueueRedisConsumerListener(messageQueueConsumerService));
messageListenerAdapter.afterPropertiesSet();
container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));
});
return container;
}
}
b. 類MessageQueueRedisConsumerServiceFactory 主要是用于注冊(cè)監(jiān)聽器,要監(jiān)聽哪種主題,并這種主題使用哪種數(shù)據(jù)處理類
至此,redis的消息隊(duì)列已完成.