上篇我寫了一個(gè)通用的消息隊(duì)列(redis,kafka,rabbitmq)--生產(chǎn)者篇,這次寫一個(gè)消費(fèi)者篇.
1.消費(fèi)者的通用調(diào)用類:
/**
* 消息隊(duì)列處理的handle
* @author starmark
* @date 2020/5/1 上午10:56
*/
public interface IMessageQueueConsumerService {
/**
* 處理消息隊(duì)列的消息
* @param message 消息
*/
void receiveMessage(String message);
/**
* 返回監(jiān)聽的topic
* @return 主題
*/
String topic();
/**
*
* @param consumerType 消費(fèi)者類型
* @return 是否支持該消費(fèi)者類者
*/
boolean support(String consumerType);
}
只要實(shí)現(xiàn)該類的接口就可以實(shí)現(xiàn)監(jiān)聽段标,
redis的消費(fèi)端,有兩個(gè)類,如下:
/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueRedisConsumerListener implements MessageListener {
private IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(Message message, byte[] pattern) {
messageQueueConsumerService.receiveMessage(message.toString());
}
}
/**
* 消息隊(duì)列服務(wù)端的監(jiān)聽
*
* @author starmark
* @date 2020/5/1 上午10:55
*/
@Service
public class MessageQueueRedisConsumerServiceFactory {
private List<IMessageQueueConsumerService> messageQueueConsumerServices;
@Autowired
public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("redis")).collect(Collectors.toList());
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
new MessageQueueRedisConsumerListener(messageQueueConsumerService));
messageListenerAdapter.afterPropertiesSet();
container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));
});
return container;
}
}
kafka消費(fèi)者也有兩個(gè)類缭乘,如下:
/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {
private final IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
messageQueueConsumerService.receiveMessage(data.value());
}
}
/**
* 消息隊(duì)列服務(wù)端的監(jiān)聽
*
* @author starmark
* @date 2020/5/1 上午10:55
*/
@Component
public class MessageQueueKafkaConsumerServiceFactory implements InitializingBean {
@Autowired
KafkaProperties kafkaProperties;
private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
@Autowired
public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
}
private KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(props);
return new KafkaMessageListenerContainer<>(cf, containerProps);
}
@Override
public void afterPropertiesSet() {
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());
containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
);
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName(messageQueueConsumerService.topic() + "kafkaListener");
container.start();
});
}
}
這些類都是實(shí)現(xiàn)動(dòng)態(tài)監(jiān)聽某個(gè)主題.
rabbitmq就有點(diǎn)復(fù)雜捏雌,因?yàn)樗蠼藂ueue才能實(shí)現(xiàn)監(jiān)聽,我現(xiàn)在這個(gè)代碼,如果生產(chǎn)者沒(méi)有創(chuàng)建隊(duì)列,會(huì)自動(dòng)幫生產(chǎn)者創(chuàng)建該主題的隊(duì)列暇番。其實(shí)這是不對(duì)的,但不這么做思喊,無(wú)法實(shí)現(xiàn)監(jiān)聽.
/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueRabbitmqConsumerListener implements MessageListener {
private final IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueRabbitmqConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(Message message) {
messageQueueConsumerService.receiveMessage(new String(message.getBody()));
}
}
@Component
public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {
//自動(dòng)注入RabbitTemplate模板類
@Autowired
private RabbitTemplate rabbitTemplate;
private final ConfigurableApplicationContext applicationContext;
private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
private final ConnectionFactory connectionFactory;
@Autowired
public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
this.applicationContext = applicationContext;
this.connectionFactory = connectionFactory;
}
@Override
public void afterPropertiesSet() {
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setConsumerStartTimeout(6000L);
;
//設(shè)置監(jiān)聽的隊(duì)列名壁酬,
String[] types = {messageQueueConsumerService.topic()};
container.setQueueNames(types);
container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
container.start();
});
}
private void registerBean(String name, Object... args) {
if (applicationContext.containsBean(name)) {
return;
}
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
if (args.length > 0) {
for (Object arg : args) {
beanDefinitionBuilder.addConstructorArgValue(arg);
}
}
BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();
BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
beanFactory.registerBeanDefinition(name, beanDefinition);
}
}
至此,通用的消息隊(duì)列已完成,這個(gè)只能滿足一般情況的使用 .
如果要更高端的使用舆乔,直接使用其原生的api會(huì)更好.