一母谎、背景
最近雙12銀聯(lián)進(jìn)行滿減活動(dòng)慨蓝,由于外部接入商戶響應(yīng)速度較慢,導(dǎo)致了隊(duì)列數(shù)據(jù)擠壓竹伸,影響了原本沒有參與滿減活動(dòng)的商戶史辙,為了解決此問題決定按照商戶將隊(duì)列進(jìn)行拆分,降低彼此的影響佩伤。
在Spring Boot框架下大家基本上會(huì)想到如下這樣修改方式聊倔,那隨著被監(jiān)聽的隊(duì)列越來越多,可以想象代碼的可讀性會(huì)比較差生巡,所以基本這個(gè)目的實(shí)現(xiàn)了@JmsGroupListener注解來解決該問題(如果監(jiān)監(jiān)聽的隊(duì)列數(shù)量較少還是建議使用原生注解)耙蔑。
@JmsListener(destination = "test_0001")
@JmsListener(destination = "test_0002")
public void receiveMessage(String msg) {
System.out.println("Received <" + msg + ">");
}
@JmsListeners(value = {@JmsListener(destination = "test_0001"), @JmsListener(destination = "test_0002")})
public void receiveMessage1(String msg) {
System.out.println("Received <" + msg + ">");
}
二、效果
在配置文件中配置需要監(jiān)聽的隊(duì)列集合
activemq.message.group=0001|0002
在業(yè)務(wù)代碼中使用@JmsGroupListener注解
@JmsGroupListener(group = "${activemq.message.group}", groupSplit = "|", destinationPrefix = "test_")
public void receiveMessage2(String msg) {
System.out.println("Received <" + msg + ">");
}
三孤荣、定義注解
定義一個(gè)注解甸陌,如下可以看出該注解與@JmsListener注解的區(qū)別,刪除的注解屬性的原因后面會(huì)進(jìn)行介紹盐股,按照第二部分的使用钱豁,最后監(jiān)聽的隊(duì)列名為test_0001和test_0002。
public @interface JmsGroupListener {
//定義要監(jiān)聽到隊(duì)列區(qū)分關(guān)鍵詞集合
String group();
//關(guān)鍵詞集合分隔符
String groupSplit();
//隊(duì)列名稱前綴
String destinationPrefix();
//String id() default "";
String containerFactory() default "";
//String destination();
String subscription() default "";
String selector() default "";
String concurrency() default "";
}
四疯汁、實(shí)現(xiàn)注解
①實(shí)現(xiàn)思路
Processing of @JmsListener annotations is performed by registering a JmsListenerAnnotationBeanPostProcessor. This can be done manually or, more conveniently, through the <jms:annotation-driven/> element or @EnableJms annotation.
通過查看@JmsListener注解的注釋可以了解到注解的實(shí)現(xiàn)主要在JmsListenerAnnotationBeanPostProcessor中牲尺,該類繼承了MergedBeanDefinitionPostProcessor,所以我們繼承該類基于@JmsListener的實(shí)現(xiàn)方式實(shí)現(xiàn)@JmsGroupListener注解就可以了。
如果不知道為什么繼承JmsListenerAnnotationBeanPostProcessor就可以實(shí)現(xiàn)的話可以看一下我同事寫的主題為BeanFactoryPostProcessor,BeanPostProcessor,SmartInitializingSingleton等幾個(gè)可拓展接口的執(zhí)行時(shí)機(jī)的一篇博客谤碳,應(yīng)該會(huì)很大的幫助溃卡。
②重寫postProcessAfterInitialization方法
該方法大家注意兩個(gè)Process代碼塊即可,第一個(gè)Process代碼塊主要構(gòu)造一個(gè)監(jiān)聽方法與@MyJmsListener注解信息的Map蜒简。第二個(gè)Process代碼塊是處理每一個(gè)@MyJmsListener注解瘸羡,也是實(shí)現(xiàn)了監(jiān)聽注冊的關(guān)鍵代碼。
@Override
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
bean instanceof JmsListenerEndpointRegistry) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, JmsGroupListener.class)) {
//Process @MyJmsListener annotation ,Getting the relationship between method and annotation
Map<Method, JmsGroupListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<JmsGroupListener>) method -> {
JmsGroupListener listenerMethod = AnnotatedElementUtils.findMergedAnnotation(method, JmsGroupListener.class);
return listenerMethod;
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @JmsGroupListener annotations found on bean type: " + targetClass);
}
} else {
//Process each @MyJmsListener annotation
annotatedMethods.forEach((method, listener) -> processJmsListener(listener, method, bean));
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @JmsGroupListener methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
}
③重寫processJmsListener方法
在本部分大家只要關(guān)注一個(gè)Process代碼塊即可搓茬,該部分實(shí)現(xiàn)了將group屬性進(jìn)行拆分犹赖,然后改造需要監(jiān)聽的MethodJmsListenerEndpoint并注冊到JmsListenerEndpointRegistrar中。
在定義注解的部分我們注意到我們注釋了@JmsListener注解的id屬性卷仑,這是因?yàn)锧
JmsGroupListener監(jiān)聽的是一個(gè)隊(duì)列的集合峻村,為了處理方便,我們自動(dòng)為其生成id系枪。
public void processJmsListener(JmsGroupListener jmsGroupListener, Method mostSpecificMethod, Object bean) {
Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
JmsListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(jmsGroupListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
} catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
" with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
//Process all destination names
String[] DestinationPostfixes = resolve(jmsGroupListener.group()).split("[" + jmsGroupListener.groupSplit() + "]");
for (String postfix : DestinationPostfixes) {
String destination = jmsGroupListener.destinationPrefix() + postfix;
MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
endpoint.setBean(bean);
endpoint.setMethod(invocableMethod);
endpoint.setMostSpecificMethod(mostSpecificMethod);
endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
endpoint.setBeanFactory(this.beanFactory);
//Avoid conflict
endpoint.setId(getEndpointId());
endpoint.setDestination(resolve(destination));
if (StringUtils.hasText(jmsGroupListener.selector())) {
endpoint.setSelector(resolve(jmsGroupListener.selector()));
}
if (StringUtils.hasText(jmsGroupListener.subscription())) {
endpoint.setSubscription(resolve(jmsGroupListener.subscription()));
}
if (StringUtils.hasText(jmsGroupListener.concurrency())) {
endpoint.setConcurrency(resolve(jmsGroupListener.concurrency()));
}
this.registrar.registerEndpoint(endpoint, factory);
}
}
④重寫afterSingletonsInstantiated方法
通過查看JmsListenerAnnotationBeanPostProcessor的源碼我們發(fā)現(xiàn)雀哨,在該類中afterSingletonsInstantiated方法的最關(guān)鍵的一句registrar.afterPropertiesSet()即可完成所有監(jiān)聽的注冊磕谅。
我們原本的思路是依靠JmsListenerAnnotationBeanPostProcessor類的afterSingletonsInstantiated私爷,但是后面通過調(diào)試發(fā)現(xiàn)我們自己構(gòu)造的JmsListenerEndpointRegistrar對象中的JmsListenerEndpointRegistry對象需要傳遞給JmsListenerEndpointRegistrar類的registerAllEndpoints方法,所以迫于無奈我們只能重寫afterSingletonsInstantiated方法膊夹。
所以在本部分的重點(diǎn)進(jìn)行了setContainerFactoryBeanName和setEndpointRegistry(全局對象)衬浑,本來進(jìn)行該類重寫時(shí)候本來想閹割對于JmsListenerConfigurer和MessageHandlerMethodFactory擴(kuò)展,但是最后還是為了有一定的通用性保留了該部分放刨。
@Override
public void afterSingletonsInstantiated() {
// Remove resolved singleton classes from cache
this.nonAnnotatedClasses.clear();
if (this.beanFactory instanceof ListableBeanFactory) {
// Apply JmsListenerConfigurer beans from the BeanFactory, if any
Map<String, JmsListenerConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);
List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (JmsListenerConfigurer configurer : configurers) {
configurer.configureJmsListeners(this.registrar);
}
}
// Must be set to obtain container factory by bean name
if (this.containerFactoryBeanName != null) {
registrar.setContainerFactoryBeanName(containerFactoryBeanName);
}
// Register endpointRegistry with spring context
if (this.registrar.getEndpointRegistry() == null) {
registrar.setEndpointRegistry(endpointRegistry);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
// Actually register all listeners
this.registrar.afterPropertiesSet();
}
五工秩、總結(jié)
到此該注解的關(guān)鍵實(shí)現(xiàn)過程已經(jīng)介紹完成,其中還有一部代碼這里就不進(jìn)行詳細(xì)的介紹了进统,有需要的同學(xué)自己可以看一下實(shí)現(xiàn)源碼(由于水平有限助币,歡迎大家來找茬),最后與大家分享一下對源碼進(jìn)行擴(kuò)展的新的體會(huì)螟碎,調(diào)試源碼->了解大體流程->缺什么補(bǔ)什么眉菱。