消息監(jiān)聽適配器
-
MessageListenerAdapter
蓝撇,即消息監(jiān)聽適配器 - 前面我們介紹了通過實現(xiàn)
ChannelAwareMessageListener
接口并通過onMessage
方法來接收消息。 - 除了這種方式我們也可以使用適配器對不同類型的消息進行適配處理陈莽。
1.定義消息處理類
委托類可以自己隨意定義渤昌,但是這里的方法名和參數(shù)是固定的:handleMessage
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.err.println("默認(rèn)方法, 消息內(nèi)容:" + new String(messageBody));
}
}
可以通過查看MessageListenerAdapter類的源碼得知
2.設(shè)置消息監(jiān)聽器
這里將消息監(jiān)聽器設(shè)置為一個適配器對象,適配器需要一個委托對象走搁。
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//省略其他配置....
//設(shè)置消息監(jiān)聽器
/*container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消費者: " + msg);
}
});*/
//適配器独柑,傳遞的參數(shù)為委托者對象
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
container.setMessageListener(adapter);
return container;
}
3.運行說明
運行之前編寫的測試方法進行消息發(fā)送
@Test
public void testSendMessage() throws Exception {
//1 創(chuàng)建消息
MessageProperties messageProperties = new MessageProperties();
//設(shè)置消息屬性
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定義消息類型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
//發(fā)送消息
//參數(shù):exchange, routingKey, message, messagePostProcessor
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加額外的設(shè)置---------");
message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性");
return message;
}
});
}
控制臺打印了如下內(nèi)容,說明通過適配器同樣可以接收到消息
------添加額外的設(shè)置---------
默認(rèn)方法, 消息內(nèi)容:Hello RabbitMQ
消息監(jiān)聽適配器相關(guān)設(shè)置
1.修改默認(rèn)監(jiān)聽方法
設(shè)置方法:adapter.setDefaultListenerMethod("consumeMessage");
在MessageDelegate
類中添加方法
public void consumeMessage(byte[] messageBody) {
System.err.println("字節(jié)數(shù)組方法, 消息內(nèi)容:" + new String(messageBody));
}
再次運行測試方法私植,打印了如下內(nèi)容忌栅,說明方法名修改是生效的。
------添加額外的設(shè)置---------
字節(jié)數(shù)組方法, 消息內(nèi)容:Hello RabbitMQ
2.修改方法參數(shù)類型
修改為String類型:adapter.setMessageConverter(new TextMessageConverter());
自定義轉(zhuǎn)換器曲稼,實現(xiàn)MessageConverter
接口
public class TextMessageConverter implements MessageConverter {
//將一個java對象轉(zhuǎn)換為Message
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.err.println("toMessage");
return new Message(object.toString().getBytes(), messageProperties);
}
//將Message轉(zhuǎn)換為Java對象
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("fromMessage");
//含有"text"的contentType索绪,轉(zhuǎn)換為字符串對象
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
修改委托類的監(jiān)聽方法參數(shù)
public void consumeMessage(String messageBody) {
System.err.println("字符串方法, 消息內(nèi)容:" + messageBody);
}
添加測試方法,指定消息的contentType屬性
@Test
public void testSendMessage4Text() throws Exception {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("Test SpringAMQP Message".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.send("topic002", "rabbit.abc", message);
}
運行測試方法贫悄,打印了如下內(nèi)容瑞驱,說明方法參數(shù)類型修改是OK的。
fromMessage
字符串方法, 消息內(nèi)容:Test SpringAMQP Message
fromMessage
字符串方法, 消息內(nèi)容:Test SpringAMQP Message
3.隊列名與方法名匹配
修改MessageListenerAdapter
的配置窄坦,采用隊列與方法名匹配方式唤反,此時只有不匹配的隊列消息才會走默認(rèn)的監(jiān)聽方法。
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//省略其他配置....
/**
* 2.適配器方式: 我們的隊列名稱 和 方法名稱 也可以進行一一的匹配
*/
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
//設(shè)置隊列與方法名對應(yīng)
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
return container;
}
MessageDelegate
類添加消息監(jiān)聽方法
public void method1(String messageBody) {
System.err.println("method1 收到消息內(nèi)容:" + new String(messageBody));
}
public void method2(String messageBody) {
System.err.println("method2 收到消息內(nèi)容:" + new String(messageBody));
}
運行測試方法:testSendMessage4Text鸭津,控制臺打印了如下內(nèi)容
fromMessage
method1 收到消息內(nèi)容:Test SpringAMQP Message
fromMessage
method2 收到消息內(nèi)容:Test SpringAMQP Message
MessageListenerAdapter總結(jié)
- MessageListenerAdapter:即消息監(jiān)聽適配器
- 通過messageListenerAdapter的代碼我們可以看出如下核心屬性:
defaultListenerMethod:
默認(rèn)監(jiān)聽方法名稱:用于設(shè)置監(jiān)聽方法名稱
Delegate:
實際真實的委托對象拴袭,用于處理消息
queueOrTagToMethodName:
隊列和方法名稱綁定,即指定隊列里的消息會被綁定的方法所接受處理
消息轉(zhuǎn)換器 - MessageConverter
- 我們在進行發(fā)送消息的時候曙博,正常情況下消息體為二進制的數(shù)據(jù)方式進行傳輸,如果希望內(nèi)部幫我們進行轉(zhuǎn)換怜瞒,或者指定自定義的轉(zhuǎn)換器父泳,就需要用到
MessageConverter
- 自定義轉(zhuǎn)換器通常是實現(xiàn)
MessageConverter
接口,重寫下面兩個方法:
toMessage:
java對象轉(zhuǎn)換為Message
fromMessage:
Message對象轉(zhuǎn)換為java對象 - 3.常用轉(zhuǎn)換器
Jackson2JsonMessageConverter:
Json轉(zhuǎn)換器吴汪,可以進行java對象的轉(zhuǎn)換功能
DefaultJackson2JavaTypeMapper:
映射器惠窄,可以進行java對象的映射關(guān)系
自定義二進制轉(zhuǎn)換器: 比如圖片類型、PDF漾橙、 PPT杆融、 流媒體
JSON轉(zhuǎn)換器的使用
創(chuàng)建實體類
@Data
public class Order {
private String id;
private String name;
private String content;
}
@Data
public class Packaged {
private String packageId;
private String packageName;
private String description;
}
配置轉(zhuǎn)換器
在RabbitMQConfig配置類中配置MessageListenerAdapter對應(yīng)的轉(zhuǎn)換器
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//省略其他配置...
/**
* 1.1 支持json格式的轉(zhuǎn)換器
*/
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//使用json轉(zhuǎn)換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
委托類中添加監(jiān)聽方法
在MessageDelegate中添加JSON格式的消息監(jiān)聽方法,對應(yīng)的參數(shù)類型是Map
public class MessageDelegate {
public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息內(nèi)容:" + messageBody);
}
}
測試json轉(zhuǎn)換器
編寫測試方法霜运,注意設(shè)置消息的contentType屬性
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息訂單");
order.setContent("描述信息");
//使用jackson進行json序列化
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
//發(fā)送消息
MessageProperties messageProperties = new MessageProperties();
//這里注意一定要修改contentType為 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
運行測試方法脾歇,控制臺打印內(nèi)容如下
order 4 json: {"id":"001","name":"消息訂單","content":"描述信息"}
map方法, 消息內(nèi)容:{id=001, name=消息訂單, content=描述信息}
對象映射器的使用
配置轉(zhuǎn)換器
在RabbitMQConfig配置類中配置MessageListenerAdapter對應(yīng)的轉(zhuǎn)換器蒋腮,其實依然是使用JSON轉(zhuǎn)換器,只不過需要另外對JSON轉(zhuǎn)換器設(shè)置一個對象映射器藕各,這樣接收消息時就能使用對象接收了池摧。
方法:jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
//設(shè)置json與java對象的映射器
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
委托類中添加監(jiān)聽方法
在MessageDelegate中添加消息監(jiān)聽方法,對應(yīng)的參數(shù)類型是Java對象
public void consumeMessage(Order order) {
System.err.println("order對象, 消息內(nèi)容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
測試映射器
編寫測試方法激况,注意設(shè)置消息的java對象類型: messageProperties.getHeaders().put("__TypeId__", "xxx");
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order("002","訂單消息","訂單描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//這里注意一定要修改contentType為 application/json
messageProperties.setContentType("application/json");
//設(shè)置java類型:key是固定的
messageProperties.getHeaders().put("__TypeId__", "com.rxy.spring.entity.Order");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
運行測試方法作彤,控制臺打印內(nèi)容如下
order 4 json: {"id":"002","name":"訂單消息","content":"訂單描述信息"}
order對象, 消息內(nèi)容, id: 002, name: 訂單消息, content: 訂單描述信息
Java對象多映射轉(zhuǎn)換
配置轉(zhuǎn)換器
方法: javaTypeMapper.setIdClassMapping(idClassMapping);
可以配置多個Java對象的映射關(guān)系,從而根據(jù)對象標(biāo)識將json數(shù)據(jù)轉(zhuǎn)換為不同的Java對象
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//配置類標(biāo)識與java類的映射關(guān)系
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.rxy.spring.entity.Order.class);
idClassMapping.put("packaged", com.rxy.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
委托類中添加監(jiān)聽方法
在MessageDelegate中添加消息監(jiān)聽方法乌逐,對應(yīng)的參數(shù)類型是Java對象
public void consumeMessage(Order order) {
System.err.println("order對象, 消息內(nèi)容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
public void consumeMessage(Packaged pack) {
System.err.println("package對象, 消息內(nèi)容, id: " + pack.getPackageId() +
", name: " + pack.getPackageName() +
", content: "+ pack.getDescription());
}
測試映射器
編寫測試方法竭讳,注意設(shè)置消息的java對象類型,這里只需要設(shè)置java對象的標(biāo)識即可
@Test
public void testSendMappingMessage() throws Exception {
ObjectMapper mapper = new ObjectMapper();
Order order = new Order("003","訂單消息","訂單描述信息");
String json1 = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json1);
MessageProperties messageProperties1 = new MessageProperties();
//這里注意一定要修改contentType為 application/json
messageProperties1.setContentType("application/json");
//只需要設(shè)置java對象的標(biāo)識
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged pack = new Packaged("003","包裹消息","包裹描述信息");
String json2 = mapper.writeValueAsString(pack);
System.err.println("pack 4 json: " + json2);
MessageProperties messageProperties2 = new MessageProperties();
//這里注意一定要修改contentType為 application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
運行測試方法浙踢,控制臺打印內(nèi)容如下绢慢,事實上只打印了前三行,因為SpringBoot運行完Junit測試后就會自動停止了成黄,而不會等消息處理完成之后再關(guān)閉容器呐芥,此時可以再運行Application類就打印了第四行信息,也就是完成了第二條消息的處理奋岁。
order 4 json: {"id":"003","name":"訂單消息","content":"訂單描述信息"}
pack 4 json: {"packageId":"003","packageName":"包裹消息","description":"包裹描述信息"}
order對象, 消息內(nèi)容, id: 003, name: 訂單消息, content: 訂單描述信息
package對象, 消息內(nèi)容, id: 003, name: 包裹消息, content: 包裹描述信息
全局轉(zhuǎn)換器
上述介紹的轉(zhuǎn)換器功能都相對單一思瘟,如果要處理更多場景下的不同類型消息,可以使用全局轉(zhuǎn)換器闻伶。
配置全局轉(zhuǎn)換器
//全局的轉(zhuǎn)換器滨攻,可以添加各種具體的轉(zhuǎn)換器
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
//根據(jù)不同的contentType添加相應(yīng)的消息轉(zhuǎn)換器
//addDelegate(String contentType, MessageConverter messageConverter)
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/jpg", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
定義相應(yīng)的轉(zhuǎn)換器
這里以圖片轉(zhuǎn)換器為例,最終轉(zhuǎn)換的類型是File
public class ImageMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------Image MessageConverter----------");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "jpg" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/photo/" + fileName + "." + extName;
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
委托類中添加監(jiān)聽方法
public void consumeMessage(File file) {
System.err.println("文件對象方法, 消息內(nèi)容:" + file.getName());
}
測試
編寫測試方法蓝翰,注意設(shè)置消息的contentType
@Test
public void testSendExtConverterMessage() throws Exception {
byte[] body = Files.readAllBytes(Paths.get("C:/Users/ruoxiyuan/Desktop", "picture.jpg"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/jpg");
messageProperties.getHeaders().put("extName", "jpg");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);
}
運行測試方法光绕,控制臺打印內(nèi)容如下,其他幾種類型轉(zhuǎn)換器也可以自行驗證
-----------Image MessageConverter----------
文件對象方法, 消息內(nèi)容:35f00409-83ce-4bf0-881a-a3bc5d13f85e.jpg