MessageConverter詳解
org.springframework.amqp.support.converter.MessageConverter
Message toMessage(Object object, MessageProperties messageProperties);
將java對(duì)象和屬性對(duì)象轉(zhuǎn)換成Message對(duì)象。
Object fromMessage(Message message) throws MessageConversionException;
將消息對(duì)象轉(zhuǎn)換成java對(duì)象缸托。
Demo
定義Config類(lèi)
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
//指定消息轉(zhuǎn)換器
adapter.setMessageConverter(new TestMessageConverter());
//設(shè)置處理器的消費(fèi)消息的默認(rèn)方法
adapter.setDefaultListenerMethod("onMessage");
container.setMessageListener(adapter);
return container;
}
}
MessageListenerAdapter中定義的消息轉(zhuǎn)換器链韭,消費(fèi)端接收的消息就從Message類(lèi)型轉(zhuǎn)換成了String類(lèi)型
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TestMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("=======toMessage=========");
return new Message(object.toString().getBytes(),messageProperties);
}
//消息類(lèi)型轉(zhuǎn)換器中fromMessage方法返回的類(lèi)型就是消費(fèi)端處理器接收的類(lèi)型
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("=======fromMessage=========");
return new String(message.getBody());
}
}
消費(fèi)者處理消息的Handler
public class MessageHandler {
public void onMessage(String message){
System.out.println("---------onMessage-------------");
System.out.println(message);
}
}
啟動(dòng)類(lèi)
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
/**
* MessageConverter可以把java對(duì)象轉(zhuǎn)換成Message對(duì)象顿锰,也可以把Message對(duì)象轉(zhuǎn)換成java對(duì)象
*
* MessageListenerAdapter內(nèi)部通過(guò)MessageConverter把Message轉(zhuǎn)換成java對(duì)象寓免,然后找到相應(yīng)的處理方法叠荠,參數(shù)為轉(zhuǎn)換成的java對(duì)象
*/
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("===start up======");
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
啟動(dòng)應(yīng)用類(lèi)胸梆,發(fā)送消息到zhihao.miao.order
隊(duì)列,控制臺(tái)打佣嘏酢:
===start up======
=======fromMessage=========
---------onMessage-------------
String類(lèi)型的消息
從控制臺(tái)打印我們知道了在消費(fèi)者處理消息之前會(huì)進(jìn)行消息類(lèi)型轉(zhuǎn)換,調(diào)用TestMessageConverter
的fromMessage
方法碰镜,然后執(zhí)行消息處理器的onMessage
方法兢卵,方法參數(shù)就是String
類(lèi)型。
擴(kuò)展
自定義一個(gè)MyBody類(lèi)型绪颖,將消息從Message轉(zhuǎn)換成MyBody類(lèi)型
public class MyBody {
private byte[] bodys;
public MyBody(byte[] bodys){
this.bodys = bodys;
}
@Override
public String toString() {
return new String(bodys);
}
}
然后修改TestMessageConverter
的fromMessage
方法秽荤,返回了MyBody
類(lèi)型,那么消息處理器的消費(fèi)方法也是MyBody參數(shù)的消費(fèi)方法
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TestMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("=======toMessage=========");
return new Message(object.toString().getBytes(),messageProperties);
}
//消息類(lèi)型轉(zhuǎn)換器中fromMessage方法返回的類(lèi)型就是消費(fèi)端處理器接收的類(lèi)型
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("=======fromMessage=========");
return new MyBody(message.getBody());
}
}
此時(shí)的消息處理器,處理器中的方法的入?yún)⒕褪荕yBody類(lèi)型了窃款,
public class MessageHandler {
public void onMessage(MyBody message){
System.out.println("---------onMessage---MyBody-------------");
System.out.println(message);
}
}
此時(shí)控制臺(tái)打涌涡帧:
===start up======
=======fromMessage=========
---------onMessage---MyBody-------------
Mybody類(lèi)型的消息
小結(jié)
我們還測(cè)試如下如果不使用自定義的Converter
,那么當(dāng)消息的屬性中含有屬性content_type的值為text晨继,那么默認(rèn)的轉(zhuǎn)換成的java類(lèi)型就是String類(lèi)型第喳,如果不指定那么默認(rèn)的轉(zhuǎn)換類(lèi)型就是byte[]
源碼分析
我們跟進(jìn)去MessageListenerAdapte
r的setMessageConverter
方法,
/**
* Set the converter that will convert incoming Rabbit messages to listener method arguments, and objects returned
* from listener methods back to Rabbit messages.
* <p>
* The default converter is a {@link SimpleMessageConverter}, which is able to handle "text" content-types.
* @param messageConverter The message converter.
*/
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
private MessageConverter messageConverter = new SimpleMessageConverter();
我們發(fā)現(xiàn)默認(rèn)的MessageConverter
是SimpleMessageConverter
踱稍,我們進(jìn)入SimpleMessageConverter
類(lèi)中看其默認(rèn)的轉(zhuǎn)換邏輯
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
//contentType屬性值是以text開(kāi)頭曲饱,那么就將Message類(lèi)型轉(zhuǎn)換成String類(lèi)型
if (contentType != null && contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
content = new String(message.getBody(), encoding);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert text-based Message content", e);
}
}
//如果content_type的值是application/x-java-serialized-object則把消息序列化為java對(duì)象
else if (contentType != null &&
contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
try {
content = SerializationUtils.deserialize(
createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
}
catch (IOException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
catch (IllegalStateException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
}
}
if (content == null) {
//都沒(méi)有符合,則轉(zhuǎn)換成字節(jié)數(shù)組
content = message.getBody();
}
return content;
}
源碼分析總結(jié):
1.MessageConverter
可以把java
對(duì)象轉(zhuǎn)換成Message
對(duì)象珠月,也可以把Message
對(duì)象轉(zhuǎn)換成java
對(duì)象
2.MessageListenerAdapter
內(nèi)部通過(guò)MessageConverter
把Message
轉(zhuǎn)換成java對(duì)象扩淀,然后找到相應(yīng)的處理方法,參數(shù)為轉(zhuǎn)換成的java對(duì)象啤挎。
3.SimpleMessageConverter
處理邏輯:
如果content_type
是以text開(kāi)頭驻谆,則把消息轉(zhuǎn)換成String
類(lèi)型
如果content_type的
值是application/x-java-serialized-object
則把消息序列化為java對(duì)象,否則庆聘,把消息轉(zhuǎn)換成字節(jié)數(shù)組胜臊。