我們工作中各服務(wù)之間大多數(shù)數(shù)據(jù)都是以JSON類型的數(shù)據(jù)進(jìn)行傳輸?shù)乃扑瓷a(chǎn)者服務(wù)將JSON類型的數(shù)據(jù)傳遞到對(duì)應(yīng)的隊(duì)列胁勺, 而消費(fèi)端處理器中接收到的數(shù)據(jù)類型也是JSON類型檐晕。
Json MessageConverter
先看一個(gè)demo
消費(fèi)端:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
//指定Json轉(zhuǎn)換器
adapter.setMessageConverter(new Jackson2JsonMessageConverter());
//設(shè)置處理器的消費(fèi)消息的默認(rèn)方法
adapter.setDefaultListenerMethod("onMessage");
container.setMessageListener(adapter);
return container;
}
}
消息轉(zhuǎn)換器使用了RabbitMQ自帶的Jackson2JsonMessageConverter
轉(zhuǎn)換器仇箱,但是沒(méi)有指定消息的contentType
類型
處理器奖唯,定義了二個(gè)消息處理方法惨缆,參數(shù)不一樣:
public class MessageHandler {
public void onMessage(byte[] message){
System.out.println("---------onMessage----byte-------------");
System.out.println(new String(message));
}
public void onMessage(String message){
System.out.println("---------onMessage---String-------------");
System.out.println(message);
}
消費(fèi)端應(yīng)用啟動(dòng)類:
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("===start up======");
TimeUnit.SECONDS.sleep(60);
context.close();
}
}
生產(chǎn)端代碼
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
消息的實(shí)體類型
public class Order {
private Integer id;
private Integer userId;
private double amout;
private String time;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public double getAmout() {
return amout;
}
public void setAmout(double amout) {
this.amout = amout;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
@Override
public String toString() {
return "Order{" +
"id=" + id +
", userId=" + userId +
", amout=" + amout +
", time='" + time + '\'' +
'}';
}
}
應(yīng)用啟動(dòng)類,生產(chǎn)端傳遞的消息類型是Order類型丰捷,并且轉(zhuǎn)換成JSON類型發(fā)送到隊(duì)列中
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.time.LocalDateTime;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println(json);
rabbitTemplate.convertAndSend("","zhihao.miao.order",json);
context.close();
}
}
消費(fèi)之后的控制臺(tái)打优髂:
===start up======
---------onMessage----byte-------------
九月 08, 2017 10:25:20 下午 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter fromMessage
警告: Could not convert incoming message with content-type [text/plain]
{"id":1,"userId":1000,"amout":88.0,"time":"2017-09-08T22:03:46.015"}
我們發(fā)現(xiàn)消費(fèi)端還是將其當(dāng)作字節(jié)數(shù)組來(lái)消費(fèi),轉(zhuǎn)換器還是將其轉(zhuǎn)換成byte[]
改造
此時(shí)是因?yàn)樯a(chǎn)端沒(méi)有指定contentType類型病往,生產(chǎn)者應(yīng)用啟動(dòng)類重新指定了相應(yīng)的contentType類型后捣染,
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.time.LocalDateTime;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println(json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
context.close();
}
}
此時(shí)消費(fèi)端的Jackson2JsonMessageConverter
類型轉(zhuǎn)換器將其轉(zhuǎn)換成Map類型,指定消費(fèi)的方法參數(shù)類型是Map即可停巷。
public class MessageHandler {
public void onMessage(byte[] message){
System.out.println("---------onMessage----byte-------------");
System.out.println(new String(message));
}
public void onMessage(String message){
System.out.println("---------onMessage---String-------------");
System.out.println(message);
}
public void onMessage(Map order){
System.out.println("---------onMessage---map-------------");
System.out.println(order.toString());
}
}
此時(shí)消費(fèi)端控制臺(tái)打印耍攘,我們知道生產(chǎn)者傳遞JSON類型數(shù)據(jù),消費(fèi)者將其作為Map類型的數(shù)據(jù)進(jìn)行處理:
---------onMessage---map-------------
{id=1, userId=1000, amout=88.0, time=2017-10-15T22:47:03.500}
再次改造
如果消費(fèi)端發(fā)送多條消息畔勤,發(fā)送List的json格式蕾各,那么在消費(fèi)端也要使用參數(shù)是List的方法來(lái)消費(fèi),生產(chǎn)者啟動(dòng)應(yīng)用類
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println(json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
Order order2 = new Order();
order2.setId(2);
order2.setUserId(2000);
order2.setAmout(99d);
order2.setTime(LocalDateTime.now().toString());
List<Order> orderList = new ArrayList<>();
orderList.add(order);
orderList.add(order2);
String jsonlist = mapper.writeValueAsString(orderList);
Message message2 = new Message(jsonlist.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message2);
context.close();
}
}
消費(fèi)端的Handler:
public class MessageHandler {
public void onMessage(byte[] message){
System.out.println("---------onMessage----byte-------------");
System.out.println(new String(message));
}
public void onMessage(String message){
System.out.println("---------onMessage---String-------------");
System.out.println(message);
}
public void onMessage(Map order){
System.out.println("---------onMessage---map-------------");
System.out.println(order.toString());
}
public void onMessage(List orders){
System.out.println("---------onMessage---List-------------");
System.out.println(orders.toString());
}
}
消費(fèi)者控制臺(tái)打印,此時(shí)發(fā)現(xiàn)消費(fèi)端將消息轉(zhuǎn)換成List類型的消息:
---------onMessage---map-------------
{id=1, userId=1000, amout=88.0, time=2017-10-15T22:52:46.739}
---------onMessage---List-------------
[{id=1, userId=1000, amout=88.0, time=2017-10-15T22:52:46.739}, {id=2, userId=2000, amout=99.0, time=2017-10-15T22:52:47.882}]
總結(jié)
- 使用Jackson2JsonMessageConverter處理器庆揪,客戶端發(fā)送JSON類型數(shù)據(jù)式曲,但是沒(méi)有指定消息的contentType類型,那么Jackson2JsonMessageConverter就會(huì)將消息轉(zhuǎn)換成byte[]類型的消息進(jìn)行消費(fèi)缸榛。
- 如果指定了contentType為application/json吝羞,那么消費(fèi)端就會(huì)將消息轉(zhuǎn)換成Map類型的消息進(jìn)行消費(fèi)。
- 如果指定了contentType為application/json内颗,并且生產(chǎn)端是List類型的JSON格式钧排,那么消費(fèi)端就會(huì)將消息轉(zhuǎn)換成List類型的消息進(jìn)行消費(fèi)。
Jackson2JsonMessageConverter類的源碼分析:
@Override
public Object fromMessage(Message message)
throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
//contentType中包含有json的都是用指定的格式來(lái)轉(zhuǎn)換消息
if (contentType != null && contentType.contains("json")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = getDefaultCharset();
}
try {
if (getClassMapper() == null) {
JavaType targetJavaType = getJavaTypeMapper()
.toJavaType(message.getMessageProperties());
content = convertBytesToObject(message.getBody(),
encoding, targetJavaType);
}
else {
Class<?> targetClass = getClassMapper().toClass(
message.getMessageProperties());
content = convertBytesToObject(message.getBody(),
encoding, targetClass);
}
}
catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
}
else {
if (log.isWarnEnabled()) {
log.warn("Could not convert incoming message with content-type ["
+ contentType + "]");
}
}
}
//其余的使用
if (content == null) {
content = message.getBody();
}
return content;
}
結(jié)論:
Jackson2JsonMessageConverter
如果接收到的消息屬性里面沒(méi)有content_type
屬性起暮,或者content_type
值不包含json卖氨,則轉(zhuǎn)換后的結(jié)果是byte[]
Jackson2JsonMessageConverter詳解續(xù)
上面我們提到的是將實(shí)體類型轉(zhuǎn)換成Map或者List類型,這樣轉(zhuǎn)換沒(méi)有多大意義负懦,我們需要消費(fèi)者將生產(chǎn)者的消息對(duì)象格式轉(zhuǎn)換成對(duì)應(yīng)的消息格式筒捺,而不是Map或者List對(duì)象,解決方案纸厉,看代碼:
生成端代碼:
/**
* 生產(chǎn)者在發(fā)送json數(shù)據(jù)的時(shí)候系吭,需要指定這個(gè)json是哪個(gè)對(duì)象,否則消費(fèi)者收到消息之后颗品,不知道要轉(zhuǎn)換成哪個(gè)java對(duì)象
*
* 指定方法:
* 在消息header中肯尺,增加一個(gè)_TypeId_,value就是具體的java對(duì)象(全類名),一定是消費(fèi)者所在系統(tǒng)的java對(duì)象全稱
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.time.LocalDateTime;
@ComponentScan
public class Application {
public static void sendOrder( RabbitTemplate rabbitTemplate) throws Exception{
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println(json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
//指定的__TypeId__屬性值必須是消費(fèi)端的Order的全類名沃缘,如果不匹配則會(huì)報(bào)錯(cuò)。
messageProperties.getHeaders().put("__TypeId__","com.zhihao.miao.test.day10.Sender.Order");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
sendOrder(rabbitTemplate);
context.close();
}
}
消費(fèi)端的Handler改造:
import java.util.List;
import java.util.Map;
import com.zhihao.miao.test.day10.Sender.Order;
public class MessageHandler {
public void onMessage(byte[] message){
System.out.println("---------onMessage----byte-------------");
System.out.println(new String(message));
}
public void onMessage(String message){
System.out.println("---------onMessage---String-------------");
System.out.println(message);
}
public void onMessage(Map order){
System.out.println("---------onMessage---map-------------");
System.out.println(order.toString());
}
public void onMessage(Order order){
System.out.println("---------onMessage---Order-------------");
System.out.println(order);
}
public void onMessage(List orders){
System.out.println("---------onMessage---List-------------");
System.out.println(orders.toString());
}
}
測(cè)試之后發(fā)現(xiàn)消費(fèi)端調(diào)用的是onMessage(Order order)這個(gè)方法,消費(fèi)端控制臺(tái)打影住:
---------onMessage---Order-------------
Order{id=1, userId=1000, amout=88.0, time='2017-10-15T23:23:31.977'}
總結(jié):
- 生產(chǎn)者在發(fā)送json數(shù)據(jù)的時(shí)候葡幸,需要指定這個(gè)json是哪個(gè)對(duì)象,否則消費(fèi)者收到消息之后水慨,不知道要轉(zhuǎn)換成哪個(gè)java對(duì)象
指定方法:
- 在消息header中,增加一個(gè)TypeId,value就是具體的java對(duì)象(全類名),一定是消費(fèi)者所在系統(tǒng)的java對(duì)象全稱
優(yōu)化
我們發(fā)現(xiàn)生產(chǎn)者和消費(fèi)者的耦合度太高敬扛,生產(chǎn)者需要知道消費(fèi)者相應(yīng)對(duì)應(yīng)的全類名晰洒,如何去改造呢?
在消費(fèi)端配置映射:
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
//指定Json轉(zhuǎn)換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();
//消費(fèi)端配置映射
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("order",Order.class);
idClassMapping.put("user",User.class);
DefaultJackson2JavaTypeMapper jackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JavaTypeMapper.setIdClassMapping(idClassMapping);
System.out.println("在jackson2JsonMessageConverter轉(zhuǎn)換器中指定映射配置");
jackson2JsonMessageConverter.setJavaTypeMapper(jackson2JavaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
//設(shè)置處理器的消費(fèi)消息的默認(rèn)方法
adapter.setDefaultListenerMethod("onMessage");
container.setMessageListener(adapter);
return container;
}
消費(fèi)者處理器Handler中增加入?yún)?shù)是User的方法:
import java.util.List;
import java.util.Map;
import com.zhihao.miao.test.day10.Sender.Order;
import com.zhihao.miao.test.day10.Sender.User;
public class MessageHandler {
public void onMessage(byte[] message){
System.out.println("---------onMessage----byte-------------");
System.out.println(new String(message));
}
public void onMessage(String message){
System.out.println("---------onMessage---String-------------");
System.out.println(message);
}
public void onMessage(Map order){
System.out.println("---------onMessage---map-------------");
System.out.println(order.toString());
}
public void onMessage(Order order){
System.out.println("---------onMessage---Order-------------");
System.out.println(order);
}
public void onMessage(User user){
System.out.println("---------onMessage---user-------------");
System.out.println(user.toString());
}
public void onMessage(List orders){
System.out.println("---------onMessage---List-------------");
System.out.println(orders.toString());
}
}
然后在生產(chǎn)端就可以指定對(duì)應(yīng)的key啥箭,而不需要再去指定全類名了谍珊,
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.time.LocalDateTime;
@ComponentScan
public class Application {
public static void sendOrder( RabbitTemplate rabbitTemplate) throws Exception{
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
System.out.println(order);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println(json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__","order");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void sendUser( RabbitTemplate rabbitTemplate) throws Exception{
User user = new User();
user.setUserId(1000);
user.setAge(50);
user.setUsername("zhihao.miao");
user.setPassword("123343");
System.out.println(user);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(user);
System.out.println(json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
//指定消費(fèi)端配置的key值就行了
messageProperties.getHeaders().put("__TypeId__","user");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
//sendOrder(rabbitTemplate);
sendUser(rabbitTemplate);
context.close();
}
}
進(jìn)行測(cè)試發(fā)現(xiàn)結(jié)果符合我們預(yù)期。
結(jié)論
發(fā)送消息的時(shí)候,TypeId的值可以是java對(duì)象全稱急侥,也可以是映射的key
當(dāng)消費(fèi)者有配置映射key的時(shí)候砌滞,生產(chǎn)者既可以指定java對(duì)象全稱,又可以是映射的key缆巧。如果消費(fèi)者沒(méi)有配置映射key布持,則只能指定java對(duì)象全稱
Jackson2JsonMessageConverter詳解續(xù)
如果消息類型是List或者M(jìn)ap類型的時(shí)候,
生產(chǎn)端:
public static void sendOrderList(RabbitTemplate rabbitTemplate) throws Exception{
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
Order order2 = new Order();
order2.setId(2);
order2.setUserId(2000);
order2.setAmout(99d);
order2.setTime(LocalDateTime.now().toString());
List<Order> orderList = Arrays.asList(order,order2);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(orderList);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__","java.util.List");
messageProperties.getHeaders().put("__ContentTypeId__","order");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void sendOrderMap(RabbitTemplate rabbitTemplate) throws Exception{
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
Order order2 = new Order();
order2.setId(2);
order2.setUserId(2000);
order2.setAmout(99d);
order2.setTime(LocalDateTime.now().toString());
Map<String,Object> orderMaps = new HashMap<>();
orderMaps.put("10",order);
orderMaps.put("20",order2);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(orderMaps);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__","java.util.Map");
messageProperties.getHeaders().put("__KeyTypeId__","java.lang.String");
messageProperties.getHeaders().put("__ContentTypeId__","order");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
消費(fèi)端:
public void onMessage(List<Order> orders){
System.out.println("---------onMessage---List<Order>-------------");
orders.stream().forEach(order -> System.out.println(order));
}
public void onMessage(Map<String,Object> orderMaps){
System.out.println("-------onMessage---Map<String,Object>------------");
orderMaps.keySet().forEach(key -> System.out.println(orderMaps.get(key)));
}
結(jié)論
如果生產(chǎn)者發(fā)送的是list的json數(shù)據(jù)陕悬,則還需要增加一個(gè)__ContentTypeId__
的header题暖,用于指明List里面的具體對(duì)象。
如果生產(chǎn)者發(fā)送的是map的json數(shù)據(jù)捉超,則需要指定__KeyTypeId__
胧卤,__ContentTypeId__
的header,用于指明map里面的key拼岳,value的具體對(duì)象枝誊。
ContentTypeDelegatingMessageConverter詳解
生產(chǎn)端:
public class Application {
public static void sendOrder( RabbitTemplate rabbitTemplate) throws Exception{
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.out.println(json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__","order");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void sendUser( RabbitTemplate rabbitTemplate) throws Exception{
User user = new User();
user.setUserId(1000);
user.setAge(50);
user.setUsername("zhihao.miao");
user.setPassword("123343");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(user);
System.out.println(json);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__","user");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void sendOrderList(RabbitTemplate rabbitTemplate) throws Exception{
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
Order order2 = new Order();
order2.setId(2);
order2.setUserId(2000);
order2.setAmout(99d);
order2.setTime(LocalDateTime.now().toString());
List<Order> orderList = Arrays.asList(order,order2);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(orderList);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__","java.util.List");
messageProperties.getHeaders().put("__ContentTypeId__","order");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void sendOrderMap(RabbitTemplate rabbitTemplate) throws Exception{
Order order = new Order();
order.setId(1);
order.setUserId(1000);
order.setAmout(88d);
order.setTime(LocalDateTime.now().toString());
Order order2 = new Order();
order2.setId(2);
order2.setUserId(2000);
order2.setAmout(99d);
order2.setTime(LocalDateTime.now().toString());
Map<String,Object> orderMaps = new HashMap<>();
orderMaps.put("10",order);
orderMaps.put("20",order2);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(orderMaps);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__","java.util.Map");
messageProperties.getHeaders().put("__KeyTypeId__","java.lang.String");
messageProperties.getHeaders().put("__ContentTypeId__","order");
Message message = new Message(json.getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void sendJepg(RabbitTemplate rabbitTemplate) throws Exception{
byte[] body = Files.readAllBytes(Paths.get("/Users/naeshihiroshi/Desktop/file/file","aisi.jpeg"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/jepg");
Message message = new Message(body,messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void sendJson( RabbitTemplate rabbitTemplate) throws Exception{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("hello".getBytes(),messageProperties);
rabbitTemplate.send("","zhihao.miao.order",message);
}
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(com.zhihao.miao.day03.Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
//sendOrder(rabbitTemplate);
//sendUser(rabbitTemplate);
//sendOrderList(rabbitTemplate);
//sendOrderMap(rabbitTemplate);
sendJepg(rabbitTemplate);
context.close();
}
}
消費(fèi)端:
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
//指定Json轉(zhuǎn)換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("order",Order.class);
idClassMapping.put("user",User.class);
DefaultJackson2JavaTypeMapper jackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JavaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(jackson2JavaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
TextMessageConverter textMessageConverter = new TextMessageConverter();
ContentTypeDelegatingMessageConverter contentTypeDelegatingMessageConverter = new ContentTypeDelegatingMessageConverter();
contentTypeDelegatingMessageConverter.addDelegate("text",textMessageConverter);
contentTypeDelegatingMessageConverter.addDelegate("html/text",textMessageConverter);
contentTypeDelegatingMessageConverter.addDelegate("xml/text",textMessageConverter);
contentTypeDelegatingMessageConverter.addDelegate("text/plain",textMessageConverter);
contentTypeDelegatingMessageConverter.addDelegate("json",jackson2JsonMessageConverter);
contentTypeDelegatingMessageConverter.addDelegate("application/json",jackson2JsonMessageConverter);
contentTypeDelegatingMessageConverter.addDelegate("image/jpg",new JPGMessageConverter());
contentTypeDelegatingMessageConverter.addDelegate("image/jepg",new JPGMessageConverter());
contentTypeDelegatingMessageConverter.addDelegate("image/png",new JPGMessageConverter());
adapter.setMessageConverter(contentTypeDelegatingMessageConverter);
//設(shè)置處理器的消費(fèi)消息的默認(rèn)方法
adapter.setDefaultListenerMethod("onMessage");
container.setMessageListener(adapter);
return container;
}
}
指定的TextMessageConverter消息轉(zhuǎn)換器
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("=======toMessage=========");
return new Message(object.toString().getBytes(),messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("=======fromMessage=========");
return new String(message.getBody());
}
}
指定的JPGMessageConverter消息轉(zhuǎn)換器
public class JPGMessageConverter implements MessageConverter{
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return null;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("====JPGMessageConverter====");
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "/Users/naeshihiroshi/Desktop/file/"+fileName+".jpg";
File file = new File(path);
try{
Files.copy(new ByteArrayInputStream(body),file.toPath());
}catch (IOException e){
e.printStackTrace();
}
return file;
}
}
客戶端消息處理器
public class MessageHandler {
public void onMessage(byte[] message){
System.out.println("---------onMessage----byte-------------");
System.out.println(new String(message));
}
public void onMessage(String message){
System.out.println("---------onMessage---String-------------");
System.out.println(message);
}
public void onMessage(Order order){
System.out.println("---------onMessage---Order-------------");
System.out.println(order);
}
public void onMessage(User user){
System.out.println("---------onMessage---user-------------");
System.out.println(user);
}
public void onMessage(List<Order> orders){
System.out.println("---------onMessage---List<Order>-------------");
orders.stream().forEach(order -> System.out.println(order));
}
public void onMessage(Map<String,Object> orderMaps){
System.out.println("-------onMessage---Map<String,Object>------------");
orderMaps.keySet().forEach(key -> System.out.println(orderMaps.get(key)));
}
public void onMessage(File message){
System.out.println("-------onMessage---File message------------");
System.out.println(message.getName());
}
}
服務(wù)器端應(yīng)用類,
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("===start up ing======");
TimeUnit.SECONDS.sleep(60);
context.close();
}
}
結(jié)論:
- ContentTypeDelegatingMessageConverter是一個(gè)代理的MessageConverter惜纸。
- ContentTypeDelegatingMessageConverter本身不做消息轉(zhuǎn)換的具體動(dòng)作叶撒,而是將消息轉(zhuǎn)換委托給具體的MessageConverter。我們可以設(shè)置COntentType和MessageConverter的映射關(guān)系耐版。
- ContentTypeDelegatingMessageConverter還有一個(gè)默認(rèn)的MessageConverter祠够,也就是說(shuō)當(dāng)根據(jù)ContentType沒(méi)有找到映射的MessageConverter的時(shí)候,就會(huì)使用默認(rèn)的MessageConverter粪牲。