Spring Boot中@JmsListener如何實(shí)現(xiàn)ActiveMQ多隊(duì)列監(jiān)聽--自定義@JmsGroupListener注解

一母谎、背景

最近雙12銀聯(lián)進(jìn)行滿減活動(dòng)慨蓝,由于外部接入商戶響應(yīng)速度較慢,導(dǎo)致了隊(duì)列數(shù)據(jù)擠壓竹伸,影響了原本沒有參與滿減活動(dòng)的商戶史辙,為了解決此問題決定按照商戶將隊(duì)列進(jìn)行拆分,降低彼此的影響佩伤。

在Spring Boot框架下大家基本上會(huì)想到如下這樣修改方式聊倔,那隨著被監(jiān)聽的隊(duì)列越來越多,可以想象代碼的可讀性會(huì)比較差生巡,所以基本這個(gè)目的實(shí)現(xiàn)了@JmsGroupListener注解來解決該問題(如果監(jiān)監(jiān)聽的隊(duì)列數(shù)量較少還是建議使用原生注解)耙蔑。

    @JmsListener(destination = "test_0001")
    @JmsListener(destination = "test_0002")
    public void receiveMessage(String msg) {
        System.out.println("Received <" + msg + ">");
    }

    @JmsListeners(value = {@JmsListener(destination = "test_0001"), @JmsListener(destination = "test_0002")})
    public void receiveMessage1(String msg) {
        System.out.println("Received <" + msg + ">");
    }

二、效果

在配置文件中配置需要監(jiān)聽的隊(duì)列集合

activemq.message.group=0001|0002

在業(yè)務(wù)代碼中使用@JmsGroupListener注解

@JmsGroupListener(group = "${activemq.message.group}", groupSplit = "|", destinationPrefix = "test_")
    public void receiveMessage2(String msg) {
        System.out.println("Received <" + msg + ">");
    }

三孤荣、定義注解

定義一個(gè)注解甸陌,如下可以看出該注解與@JmsListener注解的區(qū)別,刪除的注解屬性的原因后面會(huì)進(jìn)行介紹盐股,按照第二部分的使用钱豁,最后監(jiān)聽的隊(duì)列名為test_0001和test_0002。

public @interface JmsGroupListener {
    //定義要監(jiān)聽到隊(duì)列區(qū)分關(guān)鍵詞集合
    String group();
    //關(guān)鍵詞集合分隔符
    String groupSplit();
    //隊(duì)列名稱前綴
    String destinationPrefix();
    //String id() default "";
    String containerFactory() default "";
    //String destination();
    String subscription() default "";
    String selector() default "";
    String concurrency() default "";
}

四疯汁、實(shí)現(xiàn)注解

①實(shí)現(xiàn)思路
Processing of @JmsListener annotations is performed by registering a JmsListenerAnnotationBeanPostProcessor. This can be done manually or, more conveniently, through the <jms:annotation-driven/> element or @EnableJms annotation.

通過查看@JmsListener注解的注釋可以了解到注解的實(shí)現(xiàn)主要在JmsListenerAnnotationBeanPostProcessor中牲尺,該類繼承了MergedBeanDefinitionPostProcessor,所以我們繼承該類基于@JmsListener的實(shí)現(xiàn)方式實(shí)現(xiàn)@JmsGroupListener注解就可以了。

如果不知道為什么繼承JmsListenerAnnotationBeanPostProcessor就可以實(shí)現(xiàn)的話可以看一下我同事寫的主題為BeanFactoryPostProcessor,BeanPostProcessor,SmartInitializingSingleton等幾個(gè)可拓展接口的執(zhí)行時(shí)機(jī)的一篇博客谤碳,應(yīng)該會(huì)很大的幫助溃卡。

②重寫postProcessAfterInitialization方法

該方法大家注意兩個(gè)Process代碼塊即可,第一個(gè)Process代碼塊主要構(gòu)造一個(gè)監(jiān)聽方法與@MyJmsListener注解信息的Map蜒简。第二個(gè)Process代碼塊是處理每一個(gè)@MyJmsListener注解瘸羡,也是實(shí)現(xiàn)了監(jiān)聽注冊的關(guān)鍵代碼。

 @Override
       @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
                bean instanceof JmsListenerEndpointRegistry) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }

        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass) &&
                AnnotationUtils.isCandidateClass(targetClass, JmsGroupListener.class)) {

            //Process @MyJmsListener annotation ,Getting the relationship between method and annotation
            Map<Method, JmsGroupListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<JmsGroupListener>) method -> {
                        JmsGroupListener listenerMethod = AnnotatedElementUtils.findMergedAnnotation(method, JmsGroupListener.class);
                        return listenerMethod;
                    });

            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @JmsGroupListener annotations found on bean type: " + targetClass);
                }
            } else {

                //Process each @MyJmsListener annotation
                annotatedMethods.forEach((method, listener) -> processJmsListener(listener, method, bean));

                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size() + " @JmsGroupListener methods processed on bean '" + beanName +
                            "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }
    }
③重寫processJmsListener方法

在本部分大家只要關(guān)注一個(gè)Process代碼塊即可搓茬,該部分實(shí)現(xiàn)了將group屬性進(jìn)行拆分犹赖,然后改造需要監(jiān)聽的MethodJmsListenerEndpoint并注冊到JmsListenerEndpointRegistrar中。

在定義注解的部分我們注意到我們注釋了@JmsListener注解的id屬性卷仑,這是因?yàn)锧
JmsGroupListener監(jiān)聽的是一個(gè)隊(duì)列的集合峻村,為了處理方便,我們自動(dòng)為其生成id系枪。

 public void processJmsListener(JmsGroupListener jmsGroupListener, Method mostSpecificMethod, Object bean) {
        Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());

        JmsListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(jmsGroupListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
            } catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
                        mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
                        " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        //Process all destination names
        String[] DestinationPostfixes = resolve(jmsGroupListener.group()).split("[" + jmsGroupListener.groupSplit() + "]");
        for (String postfix : DestinationPostfixes) {
            String destination = jmsGroupListener.destinationPrefix() + postfix;
            MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
            endpoint.setBean(bean);
            endpoint.setMethod(invocableMethod);
            endpoint.setMostSpecificMethod(mostSpecificMethod);
            endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
            endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
            endpoint.setBeanFactory(this.beanFactory);
            //Avoid conflict
            endpoint.setId(getEndpointId());
            endpoint.setDestination(resolve(destination));
            if (StringUtils.hasText(jmsGroupListener.selector())) {
                endpoint.setSelector(resolve(jmsGroupListener.selector()));
            }
            if (StringUtils.hasText(jmsGroupListener.subscription())) {
                endpoint.setSubscription(resolve(jmsGroupListener.subscription()));
            }
            if (StringUtils.hasText(jmsGroupListener.concurrency())) {
                endpoint.setConcurrency(resolve(jmsGroupListener.concurrency()));
            }
            this.registrar.registerEndpoint(endpoint, factory);
        }

    }
④重寫afterSingletonsInstantiated方法

通過查看JmsListenerAnnotationBeanPostProcessor的源碼我們發(fā)現(xiàn)雀哨,在該類中afterSingletonsInstantiated方法的最關(guān)鍵的一句registrar.afterPropertiesSet()即可完成所有監(jiān)聽的注冊磕谅。

我們原本的思路是依靠JmsListenerAnnotationBeanPostProcessor類的afterSingletonsInstantiated私爷,但是后面通過調(diào)試發(fā)現(xiàn)我們自己構(gòu)造的JmsListenerEndpointRegistrar對象中的JmsListenerEndpointRegistry對象需要傳遞給JmsListenerEndpointRegistrar類的registerAllEndpoints方法,所以迫于無奈我們只能重寫afterSingletonsInstantiated方法膊夹。

所以在本部分的重點(diǎn)進(jìn)行了setContainerFactoryBeanName和setEndpointRegistry(全局對象)衬浑,本來進(jìn)行該類重寫時(shí)候本來想閹割對于JmsListenerConfigurer和MessageHandlerMethodFactory擴(kuò)展,但是最后還是為了有一定的通用性保留了該部分放刨。

@Override
    public void afterSingletonsInstantiated() {
        // Remove resolved singleton classes from cache
        this.nonAnnotatedClasses.clear();

        if (this.beanFactory instanceof ListableBeanFactory) {
            // Apply JmsListenerConfigurer beans from the BeanFactory, if any
            Map<String, JmsListenerConfigurer> beans =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);
            List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (JmsListenerConfigurer configurer : configurers) {
                configurer.configureJmsListeners(this.registrar);
            }
        }
        // Must be set to obtain container factory by bean name
        if (this.containerFactoryBeanName != null) {
            registrar.setContainerFactoryBeanName(containerFactoryBeanName);
        }
        // Register endpointRegistry with spring context
        if (this.registrar.getEndpointRegistry() == null) {
            registrar.setEndpointRegistry(endpointRegistry);
        }

        // Set the custom handler method factory once resolved by the configurer
        MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
        if (handlerMethodFactory != null) {
            this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
        }

        // Actually register all listeners
        this.registrar.afterPropertiesSet();
    }

五工秩、總結(jié)

到此該注解的關(guān)鍵實(shí)現(xiàn)過程已經(jīng)介紹完成,其中還有一部代碼這里就不進(jìn)行詳細(xì)的介紹了进统,有需要的同學(xué)自己可以看一下實(shí)現(xiàn)源碼(由于水平有限助币,歡迎大家來找茬),最后與大家分享一下對源碼進(jìn)行擴(kuò)展的新的體會(huì)螟碎,調(diào)試源碼->了解大體流程->缺什么補(bǔ)什么眉菱。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市掉分,隨后出現(xiàn)的幾起案子俭缓,更是在濱河造成了極大的恐慌,老刑警劉巖酥郭,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件华坦,死亡現(xiàn)場離奇詭異,居然都是意外死亡不从,警方通過查閱死者的電腦和手機(jī)惜姐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來椿息,“玉大人载弄,你說我怎么就攤上這事耘拇。” “怎么了宇攻?”我有些...
    開封第一講書人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵惫叛,是天一觀的道長。 經(jīng)常有香客問我逞刷,道長嘉涌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任夸浅,我火速辦了婚禮仑最,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘帆喇。我一直安慰自己警医,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開白布坯钦。 她就那樣靜靜地躺著预皇,像睡著了一般。 火紅的嫁衣襯著肌膚如雪婉刀。 梳的紋絲不亂的頭發(fā)上吟温,一...
    開封第一講書人閱讀 49,166評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音突颊,去河邊找鬼鲁豪。 笑死,一個(gè)胖子當(dāng)著我的面吹牛律秃,可吹牛的內(nèi)容都是我干的爬橡。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼棒动,長吁一口氣:“原來是場噩夢啊……” “哼糙申!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起迁客,我...
    開封第一講書人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬榮一對情侶失蹤郭宝,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后掷漱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體粘室,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年卜范,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了衔统。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖锦爵,靈堂內(nèi)的尸體忽然破棺而出舱殿,到底是詐尸還是另有隱情,我是刑警寧澤险掀,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布沪袭,位于F島的核電站,受9級(jí)特大地震影響樟氢,放射性物質(zhì)發(fā)生泄漏冈绊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一埠啃、第九天 我趴在偏房一處隱蔽的房頂上張望死宣。 院中可真熱鬧,春花似錦碴开、人聲如沸毅该。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽眶掌。三九已至,卻和暖如春罢绽,著一層夾襖步出監(jiān)牢的瞬間畏线,已是汗流浹背静盅。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來泰國打工良价, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蒿叠。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓明垢,卻偏偏與公主長得像,于是被迫代替她去往敵國和親市咽。 傳聞我的和親對象是個(gè)殘疾皇子痊银,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344