通用的消息隊(duì)列(redis,kafka,rabbitmq)--消費(fèi)者篇

上篇我寫了一個(gè)通用的消息隊(duì)列(redis,kafka,rabbitmq)--生產(chǎn)者篇,這次寫一個(gè)消費(fèi)者篇.
1.消費(fèi)者的通用調(diào)用類:


/**
 * 消息隊(duì)列處理的handle
 * @author starmark
 * @date 2020/5/1  上午10:56
 */
public interface IMessageQueueConsumerService {


    /**
     * 處理消息隊(duì)列的消息
     * @param message 消息
     */
    void receiveMessage(String message);

    /**
     * 返回監(jiān)聽的topic
     * @return 主題
     */
    String topic();

    /**
     *
     * @param consumerType 消費(fèi)者類型
     * @return 是否支持該消費(fèi)者類者
     */
    boolean support(String consumerType);
}

只要實(shí)現(xiàn)該類的接口就可以實(shí)現(xiàn)監(jiān)聽段标,
redis的消費(fèi)端,有兩個(gè)類,如下:


/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueRedisConsumerListener implements MessageListener {

    private IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message, byte[] pattern) {
        messageQueueConsumerService.receiveMessage(message.toString());
    }
}

/**
 * 消息隊(duì)列服務(wù)端的監(jiān)聽
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Service
public class MessageQueueRedisConsumerServiceFactory {


    private List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("redis")).collect(Collectors.toList());
    }

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
                    new MessageQueueRedisConsumerListener(messageQueueConsumerService));
            messageListenerAdapter.afterPropertiesSet();
            container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));

        });

        return container;
    }


}

kafka消費(fèi)者也有兩個(gè)類缭乘,如下:


/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        messageQueueConsumerService.receiveMessage(data.value());
    }
}

/**
 * 消息隊(duì)列服務(wù)端的監(jiān)聽
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Component
public class MessageQueueKafkaConsumerServiceFactory  implements InitializingBean {

    @Autowired
    KafkaProperties kafkaProperties;

    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
    }




    private KafkaMessageListenerContainer<Integer, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<>(props);
        return new KafkaMessageListenerContainer<>(cf, containerProps);
    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());

            containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
            );
            KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
            container.setBeanName(messageQueueConsumerService.topic() + "kafkaListener");

            container.start();

        });

    }
}

這些類都是實(shí)現(xiàn)動(dòng)態(tài)監(jiān)聽某個(gè)主題.

rabbitmq就有點(diǎn)復(fù)雜捏雌,因?yàn)樗蠼藂ueue才能實(shí)現(xiàn)監(jiān)聽,我現(xiàn)在這個(gè)代碼,如果生產(chǎn)者沒(méi)有創(chuàng)建隊(duì)列,會(huì)自動(dòng)幫生產(chǎn)者創(chuàng)建該主題的隊(duì)列暇番。其實(shí)這是不對(duì)的,但不這么做思喊,無(wú)法實(shí)現(xiàn)監(jiān)聽.


/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueRabbitmqConsumerListener implements MessageListener  {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRabbitmqConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message) {

        messageQueueConsumerService.receiveMessage(new String(message.getBody()));
    }

}

@Component
public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {

    //自動(dòng)注入RabbitTemplate模板類
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final ConfigurableApplicationContext applicationContext;
    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
    private final ConnectionFactory connectionFactory;

    @Autowired
    public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
        this.applicationContext = applicationContext;
        this.connectionFactory = connectionFactory;

    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {

            this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setConsumerStartTimeout(6000L);
        ;
            //設(shè)置監(jiān)聽的隊(duì)列名壁酬,
            String[] types = {messageQueueConsumerService.topic()};
            container.setQueueNames(types);
            container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
            container.start();
        });

    }


    private void registerBean(String name, Object... args) {
        if (applicationContext.containsBean(name)) {
            return;
        }
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
        if (args.length > 0) {
            for (Object arg : args) {
                beanDefinitionBuilder.addConstructorArgValue(arg);
            }
        }
        BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();

        BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
        beanFactory.registerBeanDefinition(name, beanDefinition);

    }
}

至此,通用的消息隊(duì)列已完成,這個(gè)只能滿足一般情況的使用 .
如果要更高端的使用舆乔,直接使用其原生的api會(huì)更好.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末岳服,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子希俩,更是在濱河造成了極大的恐慌吊宋,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件颜武,死亡現(xiàn)場(chǎng)離奇詭異璃搜,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)盒刚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門腺劣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)绿贞,“玉大人因块,你說(shuō)我怎么就攤上這事〖” “怎么了涡上?”我有些...
    開封第一講書人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)拒名。 經(jīng)常有香客問(wèn)我吩愧,道長(zhǎng),這世上最難降的妖魔是什么增显? 我笑而不...
    開封第一講書人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任雁佳,我火速辦了婚禮,結(jié)果婚禮上同云,老公的妹妹穿的比我還像新娘糖权。我一直安慰自己,他們只是感情好炸站,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開白布星澳。 她就那樣靜靜地躺著,像睡著了一般旱易。 火紅的嫁衣襯著肌膚如雪禁偎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,708評(píng)論 1 305
  • 那天阀坏,我揣著相機(jī)與錄音如暖,去河邊找鬼。 笑死忌堂,一個(gè)胖子當(dāng)著我的面吹牛装处,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼妄迁,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼寝蹈!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起登淘,我...
    開封第一講書人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤箫老,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后黔州,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體耍鬓,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年流妻,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了牲蜀。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡绅这,死狀恐怖涣达,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情证薇,我是刑警寧澤度苔,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站浑度,受9級(jí)特大地震影響寇窑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜箩张,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一甩骏、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧先慷,春花似錦饮笛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至赴肚,卻和暖如春素跺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背誉券。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工指厌, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人踊跟。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓踩验,卻偏偏與公主長(zhǎng)得像鸥诽,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子箕憾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355