踩坑記錄
最近在使用SpringCloud架構(gòu)一個(gè)推薦召回微服務(wù)惩嘉,但是在集成RabbitMQ時(shí)就發(fā)現(xiàn)了無數(shù)個(gè)坑项玛。于是總結(jié)了這篇文章供各位大俠圍觀和嘲笑
SpringBoot集成RabbitMQ版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
RabbitMQ
因?yàn)楣緲I(yè)務(wù)方使用的RabbitMQ消息隊(duì)列發(fā)送數(shù)據(jù),作為消費(fèi)方的我自然也就需要使用啦辽慕。貓了一眼spring boot的官方說明神汹,上面說spring boot為rabbit準(zhǔn)備了spring-boot-starter-amqp被冒,并且為RabbitTemplate和RabbitMQ提供了自動(dòng)配置選項(xiàng)。
官方文檔:https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-rabbitmq 也有例子蠕搜。
開干~~怎茫。
踩坑
消費(fèi)代碼是這樣的:
/**
* <p> 猜你喜歡 </p>
*
* @Author: fc.w
* @Date: 2020/4/9 10:23
*/
@Component
@Slf4j
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.queue}", autoDelete = "false"),
exchange = @Exchange(value = "${mq.exchageName}", delayed = "true", type = ExchangeTypes.TOPIC, arguments = @Argument(name = "x-delayed-type",value="topic")),
key = "${mq.routingKey}"
)
)
public class ProductInventoryConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("Info..........receiver: " + msg);
}
}
MQ 配置信息
mq.exchageName = cl.topic.delayed
mq.routingKey = cl.product.updateCoreInfoModify
mq.queue = mlp-product-guessInventoryQueue-delayed
消費(fèi)信息后,應(yīng)該記錄一條日志妓灌。
結(jié)果得到只有org.springframework.amqp.AmqpException: No method found for class [B
這個(gè)異常轨蛤,并且還無限循環(huán)拋出這個(gè)異常。虫埂。祥山。
2020-04-09 13:49:21.717 [SimpleAsyncTaskExecutor-1] [WARN ] o.s.a.r.l.ConditionalRejectingErrorHandler.log:88 [] - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:840)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:824)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:79)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1051)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpException: No method found for class [B
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:250)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:70)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
... 8 common frames omitted
官方文檔解釋了無限循環(huán)的原因:
知道了為啥會(huì)無限重試了,下面來看看為啥會(huì)拋出這個(gè)異常.
Issues上大神的解答: https://jira.spring.io/browse/AMQP-573
進(jìn)去看完問題和大神的解答掉伏,豁然開朗枪蘑。
There are two conversions in the @RabbitListener pipeline.
The first converts from a Spring AMQP Message to a spring-messaging Message.
There is currently no way to change the first converter from SimpleMessageConverter which handles String, Serializable and passes everything else as byte[].
The second converter converts the message payload to the method parameter type (if necessary).
With method-level @RabbitListeners there is a tight binding between the handler and the method.
With class-level @RabbitListener s, the message payload from the first conversion is used to select which method to invoke. Only then, is the argument conversion attempted.
This mechanism works fine with Java Serializable objects since the payload has already been converted before the method is selected.
However, with JSON, the first conversion returns a byte[] and hence we find no matching @RabbitHandler.
We need a mechanism such that the first converter is settable so that the payload is converted early enough in the pipeline to select the appropriate handler method.
A ContentTypeDelegatingMessageConverter is probably most appropriate.
And, as stated in AMQP-574, we need to clearly document the conversion needs for a @RabbitListener, especially when using JSON or a custom conversion.
總之就是說這種類型推斷只適用于方法級(jí)別的@RabbitListener。
于是修改代碼:
@Component
@Slf4j
public class ProductInventoryConsumer {
@RabbitHandler
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.queue}", autoDelete = "false"),
exchange = @Exchange(value = "${mq.exchageName}", delayed = "true", type = ExchangeTypes.TOPIC, arguments = @Argument(name = "x-delayed-type",value="topic")),
key = "${mq.routingKey}"
)
)
public void process(String msg) {
System.out.println("Info..........receiver: " + msg);
}
}
項(xiàng)目啟動(dòng)正常岖免,問題解決岳颇。。颅湘。话侧。
發(fā)消息測試
消息打印:
Info..........receiver: 123,13,10,34,112,105,100,34,58,32,34,84,82,45,71,89,45,69,88,67,69,76,76,69,78,67,69,124,49,34,44,13,10,34,112,105,100,115,34,58,91,34,65,80,45,66,88,74,71,45,66,80,90,83,124,55,34,93,44,13,10,34,116,121,112,101,34,58,32,34,85,112,115,101,114,116,80,114,111,100,117,99,116,34,44,13,10,34,100,97,116,97,34,58,32,123,13,10,32,32,32,32,34,111,110,115,97,108,101,34,58,32,116,114,117,101,44,13,10,32,32,32,32,34,105,115,115,104,111,119,34,58,32,116,114,117,101,13,10,32,32,32,32,32,125,13,10,125
rabbitTemplate.convertAndSend發(fā)送的消息默認(rèn)帶有消息頭
而amqp-client發(fā)送的消息默認(rèn)是不帶消息頭的
消息訂閱正常
Info..........receiver: {
"pid": "TR-GY-EXCELLENCE|1",
"pids":["AP-BXJG-BPZS|7"],
"type": "UpsertProduct",
"data": {
"onsale": true,
"isshow": true
}
}