RabbitMQ筆記十一:Jackson2JsonMessageConverter與ContentTypeDelegatingMessageConverter詳解

我們工作中各服務(wù)之間大多數(shù)數(shù)據(jù)都是以JSON類型的數(shù)據(jù)進(jìn)行傳輸?shù)乃扑瓷a(chǎn)者服務(wù)將JSON類型的數(shù)據(jù)傳遞到對(duì)應(yīng)的隊(duì)列胁勺, 而消費(fèi)端處理器中接收到的數(shù)據(jù)類型也是JSON類型檐晕。

Json MessageConverter

先看一個(gè)demo

消費(fèi)端:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
        //指定Json轉(zhuǎn)換器
        adapter.setMessageConverter(new Jackson2JsonMessageConverter());
        //設(shè)置處理器的消費(fèi)消息的默認(rèn)方法
        adapter.setDefaultListenerMethod("onMessage");
        container.setMessageListener(adapter);

        return container;
    }
}

消息轉(zhuǎn)換器使用了RabbitMQ自帶的Jackson2JsonMessageConverter轉(zhuǎn)換器仇箱,但是沒(méi)有指定消息的contentType類型

處理器奖唯,定義了二個(gè)消息處理方法惨缆,參數(shù)不一樣:

public class MessageHandler {

    public void onMessage(byte[] message){
        System.out.println("---------onMessage----byte-------------");
        System.out.println(new String(message));
    }


    public void onMessage(String message){
        System.out.println("---------onMessage---String-------------");
        System.out.println(message);
    }

消費(fèi)端應(yīng)用啟動(dòng)類:

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("===start up======");
        TimeUnit.SECONDS.sleep(60);
        context.close();
    }
}

生產(chǎn)端代碼

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}

消息的實(shí)體類型

public class Order {

    private Integer id;
    private Integer userId;
    private double amout;
    private String time;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public double getAmout() {
        return amout;
    }

    public void setAmout(double amout) {
        this.amout = amout;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "Order{" +
                "id=" + id +
                ", userId=" + userId +
                ", amout=" + amout +
                ", time='" + time + '\'' +
                '}';
    }
}

應(yīng)用啟動(dòng)類,生產(chǎn)端傳遞的消息類型是Order類型丰捷,并且轉(zhuǎn)換成JSON類型發(fā)送到隊(duì)列中

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.time.LocalDateTime;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        Order order = new Order();
        order.setId(1);
        order.setUserId(1000);
        order.setAmout(88d);
        order.setTime(LocalDateTime.now().toString());

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.out.println(json);

        rabbitTemplate.convertAndSend("","zhihao.miao.order",json);
        context.close();
    }
}

消費(fèi)之后的控制臺(tái)打优髂:

===start up======
---------onMessage----byte-------------
九月 08, 2017 10:25:20 下午 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter fromMessage
警告: Could not convert incoming message with content-type [text/plain]
{"id":1,"userId":1000,"amout":88.0,"time":"2017-09-08T22:03:46.015"}

我們發(fā)現(xiàn)消費(fèi)端還是將其當(dāng)作字節(jié)數(shù)組來(lái)消費(fèi),轉(zhuǎn)換器還是將其轉(zhuǎn)換成byte[]

改造

此時(shí)是因?yàn)樯a(chǎn)端沒(méi)有指定contentType類型病往,生產(chǎn)者應(yīng)用啟動(dòng)類重新指定了相應(yīng)的contentType類型后捣染,

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.time.LocalDateTime;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        Order order = new Order();
        order.setId(1);
        order.setUserId(1000);
        order.setAmout(88d);
        order.setTime(LocalDateTime.now().toString());

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.out.println(json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(json.getBytes(),messageProperties);

        rabbitTemplate.send("","zhihao.miao.order",message);
        context.close();
    }
}

此時(shí)消費(fèi)端的Jackson2JsonMessageConverter類型轉(zhuǎn)換器將其轉(zhuǎn)換成Map類型,指定消費(fèi)的方法參數(shù)類型是Map即可停巷。

public class MessageHandler {

    public void onMessage(byte[] message){
        System.out.println("---------onMessage----byte-------------");
        System.out.println(new String(message));
    }


    public void onMessage(String message){
        System.out.println("---------onMessage---String-------------");
        System.out.println(message);
    }


    public void onMessage(Map order){
        System.out.println("---------onMessage---map-------------");
        System.out.println(order.toString());
    }

}

此時(shí)消費(fèi)端控制臺(tái)打印耍攘,我們知道生產(chǎn)者傳遞JSON類型數(shù)據(jù),消費(fèi)者將其作為Map類型的數(shù)據(jù)進(jìn)行處理:

---------onMessage---map-------------
{id=1, userId=1000, amout=88.0, time=2017-10-15T22:47:03.500}

再次改造

如果消費(fèi)端發(fā)送多條消息畔勤,發(fā)送List的json格式蕾各,那么在消費(fèi)端也要使用參數(shù)是List的方法來(lái)消費(fèi),生產(chǎn)者啟動(dòng)應(yīng)用類

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        Order order = new Order();
        order.setId(1);
        order.setUserId(1000);
        order.setAmout(88d);
        order.setTime(LocalDateTime.now().toString());

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.out.println(json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(json.getBytes(),messageProperties);

        rabbitTemplate.send("","zhihao.miao.order",message);

        Order order2 = new Order();
        order2.setId(2);
        order2.setUserId(2000);
        order2.setAmout(99d);
        order2.setTime(LocalDateTime.now().toString());

        List<Order> orderList = new ArrayList<>();
        orderList.add(order);
        orderList.add(order2);

        String jsonlist = mapper.writeValueAsString(orderList);
        Message message2 = new Message(jsonlist.getBytes(),messageProperties);
        rabbitTemplate.send("","zhihao.miao.order",message2);

        context.close();
    }
}

消費(fèi)端的Handler:

public class MessageHandler {

    public void onMessage(byte[] message){
        System.out.println("---------onMessage----byte-------------");
        System.out.println(new String(message));
    }


    public void onMessage(String message){
        System.out.println("---------onMessage---String-------------");
        System.out.println(message);
    }


    public void onMessage(Map order){
        System.out.println("---------onMessage---map-------------");
        System.out.println(order.toString());
    }

    public void onMessage(List orders){
        System.out.println("---------onMessage---List-------------");
        System.out.println(orders.toString());
    }

}

消費(fèi)者控制臺(tái)打印,此時(shí)發(fā)現(xiàn)消費(fèi)端將消息轉(zhuǎn)換成List類型的消息:

---------onMessage---map-------------
{id=1, userId=1000, amout=88.0, time=2017-10-15T22:52:46.739}
---------onMessage---List-------------
[{id=1, userId=1000, amout=88.0, time=2017-10-15T22:52:46.739}, {id=2, userId=2000, amout=99.0, time=2017-10-15T22:52:47.882}]

總結(jié)

  • 使用Jackson2JsonMessageConverter處理器庆揪,客戶端發(fā)送JSON類型數(shù)據(jù)式曲,但是沒(méi)有指定消息的contentType類型,那么Jackson2JsonMessageConverter就會(huì)將消息轉(zhuǎn)換成byte[]類型的消息進(jìn)行消費(fèi)缸榛。
  • 如果指定了contentType為application/json吝羞,那么消費(fèi)端就會(huì)將消息轉(zhuǎn)換成Map類型的消息進(jìn)行消費(fèi)。
  • 如果指定了contentType為application/json内颗,并且生產(chǎn)端是List類型的JSON格式钧排,那么消費(fèi)端就會(huì)將消息轉(zhuǎn)換成List類型的消息進(jìn)行消費(fèi)。

Jackson2JsonMessageConverter類的源碼分析:

@Override
public Object fromMessage(Message message)
        throws MessageConversionException {
    Object content = null;
    MessageProperties properties = message.getMessageProperties();
    if (properties != null) {
        String contentType = properties.getContentType();
        //contentType中包含有json的都是用指定的格式來(lái)轉(zhuǎn)換消息
        if (contentType != null && contentType.contains("json")) {
            String encoding = properties.getContentEncoding();
            if (encoding == null) {
                encoding = getDefaultCharset();
            }
            try {

                if (getClassMapper() == null) {
                    JavaType targetJavaType = getJavaTypeMapper()
                            .toJavaType(message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(),
                            encoding, targetJavaType);
                }
                else {
                    Class<?> targetClass = getClassMapper().toClass(
                            message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(),
                            encoding, targetClass);
                }
            }
            catch (IOException e) {
                throw new MessageConversionException(
                        "Failed to convert Message content", e);
            }
        }
        else {
            if (log.isWarnEnabled()) {
                log.warn("Could not convert incoming message with content-type ["
                        + contentType + "]");
            }
        }
    }
    //其余的使用
    if (content == null) {
        content = message.getBody();
    }
    return content;
}

結(jié)論:
Jackson2JsonMessageConverter如果接收到的消息屬性里面沒(méi)有content_type屬性起暮,或者content_type值不包含json卖氨,則轉(zhuǎn)換后的結(jié)果是byte[]

Jackson2JsonMessageConverter詳解續(xù)

上面我們提到的是將實(shí)體類型轉(zhuǎn)換成Map或者List類型,這樣轉(zhuǎn)換沒(méi)有多大意義负懦,我們需要消費(fèi)者將生產(chǎn)者的消息對(duì)象格式轉(zhuǎn)換成對(duì)應(yīng)的消息格式筒捺,而不是Map或者List對(duì)象,解決方案纸厉,看代碼:
生成端代碼:

/**
 * 生產(chǎn)者在發(fā)送json數(shù)據(jù)的時(shí)候系吭,需要指定這個(gè)json是哪個(gè)對(duì)象,否則消費(fèi)者收到消息之后颗品,不知道要轉(zhuǎn)換成哪個(gè)java對(duì)象
 *
 * 指定方法:
 * 在消息header中肯尺,增加一個(gè)_TypeId_,value就是具體的java對(duì)象(全類名),一定是消費(fèi)者所在系統(tǒng)的java對(duì)象全稱
 */
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.time.LocalDateTime;

@ComponentScan
public class Application {

    public static void sendOrder( RabbitTemplate rabbitTemplate) throws Exception{
        Order order = new Order();
        order.setId(1);
        order.setUserId(1000);
        order.setAmout(88d);
        order.setTime(LocalDateTime.now().toString());

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.out.println(json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        //指定的__TypeId__屬性值必須是消費(fèi)端的Order的全類名沃缘,如果不匹配則會(huì)報(bào)錯(cuò)。
        messageProperties.getHeaders().put("__TypeId__","com.zhihao.miao.test.day10.Sender.Order");
        Message message = new Message(json.getBytes(),messageProperties);

        rabbitTemplate.send("","zhihao.miao.order",message);
    }

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);
        sendOrder(rabbitTemplate);
        context.close();
    }
}

消費(fèi)端的Handler改造:

import java.util.List;
import java.util.Map;
import com.zhihao.miao.test.day10.Sender.Order;

public class MessageHandler {

    public void onMessage(byte[] message){
        System.out.println("---------onMessage----byte-------------");
        System.out.println(new String(message));
    }


    public void onMessage(String message){
        System.out.println("---------onMessage---String-------------");
        System.out.println(message);
    }


    public void onMessage(Map order){
        System.out.println("---------onMessage---map-------------");
        System.out.println(order.toString());
    }

    public void onMessage(Order order){
        System.out.println("---------onMessage---Order-------------");
        System.out.println(order);
    }

    public void onMessage(List orders){
        System.out.println("---------onMessage---List-------------");
        System.out.println(orders.toString());
    }

}

測(cè)試之后發(fā)現(xiàn)消費(fèi)端調(diào)用的是onMessage(Order order)這個(gè)方法,消費(fèi)端控制臺(tái)打影住:

---------onMessage---Order-------------
Order{id=1, userId=1000, amout=88.0, time='2017-10-15T23:23:31.977'}

總結(jié)

  • 生產(chǎn)者在發(fā)送json數(shù)據(jù)的時(shí)候葡幸,需要指定這個(gè)json是哪個(gè)對(duì)象,否則消費(fèi)者收到消息之后水慨,不知道要轉(zhuǎn)換成哪個(gè)java對(duì)象

指定方法

  • 在消息header中,增加一個(gè)TypeId,value就是具體的java對(duì)象(全類名),一定是消費(fèi)者所在系統(tǒng)的java對(duì)象全稱

優(yōu)化

我們發(fā)現(xiàn)生產(chǎn)者和消費(fèi)者的耦合度太高敬扛,生產(chǎn)者需要知道消費(fèi)者相應(yīng)對(duì)應(yīng)的全類名晰洒,如何去改造呢?

在消費(fèi)端配置映射:

 @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
        //指定Json轉(zhuǎn)換器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();

        //消費(fèi)端配置映射
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("order",Order.class);
        idClassMapping.put("user",User.class);

        DefaultJackson2JavaTypeMapper jackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
        jackson2JavaTypeMapper.setIdClassMapping(idClassMapping);

        System.out.println("在jackson2JsonMessageConverter轉(zhuǎn)換器中指定映射配置");
        jackson2JsonMessageConverter.setJavaTypeMapper(jackson2JavaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);

        //設(shè)置處理器的消費(fèi)消息的默認(rèn)方法
        adapter.setDefaultListenerMethod("onMessage");
        container.setMessageListener(adapter);

        return container;
    }

消費(fèi)者處理器Handler中增加入?yún)?shù)是User的方法:

import java.util.List;
import java.util.Map;
import com.zhihao.miao.test.day10.Sender.Order;
import com.zhihao.miao.test.day10.Sender.User;

public class MessageHandler {

    public void onMessage(byte[] message){
        System.out.println("---------onMessage----byte-------------");
        System.out.println(new String(message));
    }


    public void onMessage(String message){
        System.out.println("---------onMessage---String-------------");
        System.out.println(message);
    }


    public void onMessage(Map order){
        System.out.println("---------onMessage---map-------------");
        System.out.println(order.toString());
    }

    public void onMessage(Order order){
        System.out.println("---------onMessage---Order-------------");
        System.out.println(order);
    }

    public void onMessage(User user){
        System.out.println("---------onMessage---user-------------");
        System.out.println(user.toString());
    }

    public void onMessage(List orders){
        System.out.println("---------onMessage---List-------------");
        System.out.println(orders.toString());
    }

}

然后在生產(chǎn)端就可以指定對(duì)應(yīng)的key啥箭,而不需要再去指定全類名了谍珊,

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.time.LocalDateTime;

@ComponentScan
public class Application {

    public static void sendOrder( RabbitTemplate rabbitTemplate) throws Exception{
        Order order = new Order();
        order.setId(1);
        order.setUserId(1000);
        order.setAmout(88d);
        order.setTime(LocalDateTime.now().toString());
        System.out.println(order);

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.out.println(json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        messageProperties.getHeaders().put("__TypeId__","order");
        Message message = new Message(json.getBytes(),messageProperties);

        rabbitTemplate.send("","zhihao.miao.order",message);
    }

    public static void sendUser( RabbitTemplate rabbitTemplate) throws Exception{
        User user = new User();
        user.setUserId(1000);
        user.setAge(50);
        user.setUsername("zhihao.miao");
        user.setPassword("123343");
        System.out.println(user);

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(user);
        System.out.println(json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        //指定消費(fèi)端配置的key值就行了
        messageProperties.getHeaders().put("__TypeId__","user");
        Message message = new Message(json.getBytes(),messageProperties);

        rabbitTemplate.send("","zhihao.miao.order",message);
    }

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        //sendOrder(rabbitTemplate);
        sendUser(rabbitTemplate);
        context.close();
    }
}

進(jìn)行測(cè)試發(fā)現(xiàn)結(jié)果符合我們預(yù)期。

結(jié)論

發(fā)送消息的時(shí)候,TypeId的值可以是java對(duì)象全稱急侥,也可以是映射的key
當(dāng)消費(fèi)者有配置映射key的時(shí)候砌滞,生產(chǎn)者既可以指定java對(duì)象全稱,又可以是映射的key缆巧。如果消費(fèi)者沒(méi)有配置映射key布持,則只能指定java對(duì)象全稱

Jackson2JsonMessageConverter詳解續(xù)

如果消息類型是List或者M(jìn)ap類型的時(shí)候,

生產(chǎn)端:

public static void sendOrderList(RabbitTemplate rabbitTemplate) throws Exception{
    Order order = new Order();
    order.setId(1);
    order.setUserId(1000);
    order.setAmout(88d);
    order.setTime(LocalDateTime.now().toString());

    Order order2 = new Order();
    order2.setId(2);
    order2.setUserId(2000);
    order2.setAmout(99d);
    order2.setTime(LocalDateTime.now().toString());

    List<Order> orderList = Arrays.asList(order,order2);

    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(orderList);

    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("application/json");
    messageProperties.getHeaders().put("__TypeId__","java.util.List");
    messageProperties.getHeaders().put("__ContentTypeId__","order");


    Message message = new Message(json.getBytes(),messageProperties);
    rabbitTemplate.send("","zhihao.miao.order",message);
}


public static void sendOrderMap(RabbitTemplate rabbitTemplate) throws Exception{
    Order order = new Order();
    order.setId(1);
    order.setUserId(1000);
    order.setAmout(88d);
    order.setTime(LocalDateTime.now().toString());

    Order order2 = new Order();
    order2.setId(2);
    order2.setUserId(2000);
    order2.setAmout(99d);
    order2.setTime(LocalDateTime.now().toString());

    Map<String,Object> orderMaps = new HashMap<>();
    orderMaps.put("10",order);
    orderMaps.put("20",order2);

    ObjectMapper mapper = new ObjectMapper();
    String json = mapper.writeValueAsString(orderMaps);

    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("application/json");
    messageProperties.getHeaders().put("__TypeId__","java.util.Map");
    messageProperties.getHeaders().put("__KeyTypeId__","java.lang.String");
    messageProperties.getHeaders().put("__ContentTypeId__","order");


    Message message = new Message(json.getBytes(),messageProperties);
    rabbitTemplate.send("","zhihao.miao.order",message);
}

消費(fèi)端:

public void onMessage(List<Order> orders){
    System.out.println("---------onMessage---List<Order>-------------");
    orders.stream().forEach(order -> System.out.println(order));
}

public void onMessage(Map<String,Object> orderMaps){
    System.out.println("-------onMessage---Map<String,Object>------------");
    orderMaps.keySet().forEach(key -> System.out.println(orderMaps.get(key)));
}

結(jié)論
如果生產(chǎn)者發(fā)送的是list的json數(shù)據(jù)陕悬,則還需要增加一個(gè)__ContentTypeId__的header题暖,用于指明List里面的具體對(duì)象。

如果生產(chǎn)者發(fā)送的是map的json數(shù)據(jù)捉超,則需要指定__KeyTypeId__胧卤,__ContentTypeId__的header,用于指明map里面的key拼岳,value的具體對(duì)象枝誊。

ContentTypeDelegatingMessageConverter詳解

MessageConverter接口繼承體系

生產(chǎn)端:

public class Application {

    public static void sendOrder( RabbitTemplate rabbitTemplate) throws Exception{
        Order order = new Order();
        order.setId(1);
        order.setUserId(1000);
        order.setAmout(88d);
        order.setTime(LocalDateTime.now().toString());

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.out.println(json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        messageProperties.getHeaders().put("__TypeId__","order");
        Message message = new Message(json.getBytes(),messageProperties);

        rabbitTemplate.send("","zhihao.miao.order",message);
    }

    public static void sendUser( RabbitTemplate rabbitTemplate) throws Exception{
        User user = new User();
        user.setUserId(1000);
        user.setAge(50);
        user.setUsername("zhihao.miao");
        user.setPassword("123343");

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(user);
        System.out.println(json);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        messageProperties.getHeaders().put("__TypeId__","user");
        Message message = new Message(json.getBytes(),messageProperties);

        rabbitTemplate.send("","zhihao.miao.order",message);
    }

    public static void sendOrderList(RabbitTemplate rabbitTemplate) throws Exception{
        Order order = new Order();
        order.setId(1);
        order.setUserId(1000);
        order.setAmout(88d);
        order.setTime(LocalDateTime.now().toString());

        Order order2 = new Order();
        order2.setId(2);
        order2.setUserId(2000);
        order2.setAmout(99d);
        order2.setTime(LocalDateTime.now().toString());

        List<Order> orderList = Arrays.asList(order,order2);

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(orderList);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        messageProperties.getHeaders().put("__TypeId__","java.util.List");
        messageProperties.getHeaders().put("__ContentTypeId__","order");


        Message message = new Message(json.getBytes(),messageProperties);
        rabbitTemplate.send("","zhihao.miao.order",message);
    }


    public static void sendOrderMap(RabbitTemplate rabbitTemplate) throws Exception{
        Order order = new Order();
        order.setId(1);
        order.setUserId(1000);
        order.setAmout(88d);
        order.setTime(LocalDateTime.now().toString());

        Order order2 = new Order();
        order2.setId(2);
        order2.setUserId(2000);
        order2.setAmout(99d);
        order2.setTime(LocalDateTime.now().toString());

        Map<String,Object> orderMaps = new HashMap<>();
        orderMaps.put("10",order);
        orderMaps.put("20",order2);

        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(orderMaps);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        messageProperties.getHeaders().put("__TypeId__","java.util.Map");
        messageProperties.getHeaders().put("__KeyTypeId__","java.lang.String");
        messageProperties.getHeaders().put("__ContentTypeId__","order");


        Message message = new Message(json.getBytes(),messageProperties);
        rabbitTemplate.send("","zhihao.miao.order",message);
    }


    public static void sendJepg(RabbitTemplate rabbitTemplate) throws Exception{
        byte[] body = Files.readAllBytes(Paths.get("/Users/naeshihiroshi/Desktop/file/file","aisi.jpeg"));

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("image/jepg");

        Message message = new Message(body,messageProperties);

        rabbitTemplate.send("","zhihao.miao.order",message);
    }

    public static void sendJson( RabbitTemplate rabbitTemplate) throws Exception{


        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("hello".getBytes(),messageProperties);
        rabbitTemplate.send("","zhihao.miao.order",message);
    }



    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(com.zhihao.miao.day03.Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        //sendOrder(rabbitTemplate);
        //sendUser(rabbitTemplate);
        //sendOrderList(rabbitTemplate);
        //sendOrderMap(rabbitTemplate);
        sendJepg(rabbitTemplate);

        context.close();
    }
}

消費(fèi)端:

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
        //指定Json轉(zhuǎn)換器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();


        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("order",Order.class);
        idClassMapping.put("user",User.class);

        DefaultJackson2JavaTypeMapper jackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
        jackson2JavaTypeMapper.setIdClassMapping(idClassMapping);

        jackson2JsonMessageConverter.setJavaTypeMapper(jackson2JavaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);

        TextMessageConverter textMessageConverter = new TextMessageConverter();

        ContentTypeDelegatingMessageConverter contentTypeDelegatingMessageConverter = new ContentTypeDelegatingMessageConverter();
        contentTypeDelegatingMessageConverter.addDelegate("text",textMessageConverter);
        contentTypeDelegatingMessageConverter.addDelegate("html/text",textMessageConverter);
        contentTypeDelegatingMessageConverter.addDelegate("xml/text",textMessageConverter);
        contentTypeDelegatingMessageConverter.addDelegate("text/plain",textMessageConverter);

        contentTypeDelegatingMessageConverter.addDelegate("json",jackson2JsonMessageConverter);
        contentTypeDelegatingMessageConverter.addDelegate("application/json",jackson2JsonMessageConverter);

        contentTypeDelegatingMessageConverter.addDelegate("image/jpg",new JPGMessageConverter());
        contentTypeDelegatingMessageConverter.addDelegate("image/jepg",new JPGMessageConverter());
        contentTypeDelegatingMessageConverter.addDelegate("image/png",new JPGMessageConverter());


        adapter.setMessageConverter(contentTypeDelegatingMessageConverter);
        //設(shè)置處理器的消費(fèi)消息的默認(rèn)方法
        adapter.setDefaultListenerMethod("onMessage");
        container.setMessageListener(adapter);

        return container;
    }
}

指定的TextMessageConverter消息轉(zhuǎn)換器

public class TextMessageConverter implements MessageConverter {


    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        System.out.println("=======toMessage=========");
        return new Message(object.toString().getBytes(),messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("=======fromMessage=========");
        return new String(message.getBody());
    }
}

指定的JPGMessageConverter消息轉(zhuǎn)換器

public class JPGMessageConverter implements MessageConverter{
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("====JPGMessageConverter====");
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "/Users/naeshihiroshi/Desktop/file/"+fileName+".jpg";
        File file = new File(path);
        try{
            Files.copy(new ByteArrayInputStream(body),file.toPath());
        }catch (IOException e){
            e.printStackTrace();
        }
        return file;
    }
}

客戶端消息處理器

public class MessageHandler {


    public void onMessage(byte[] message){
        System.out.println("---------onMessage----byte-------------");
        System.out.println(new String(message));
    }


    public void onMessage(String message){
        System.out.println("---------onMessage---String-------------");
        System.out.println(message);
    }

    public void onMessage(Order order){
        System.out.println("---------onMessage---Order-------------");
        System.out.println(order);
    }

    public void onMessage(User user){
        System.out.println("---------onMessage---user-------------");
        System.out.println(user);
    }

    public void onMessage(List<Order> orders){
        System.out.println("---------onMessage---List<Order>-------------");
        orders.stream().forEach(order -> System.out.println(order));
    }

    public void onMessage(Map<String,Object> orderMaps){
        System.out.println("-------onMessage---Map<String,Object>------------");
        orderMaps.keySet().forEach(key -> System.out.println(orderMaps.get(key)));
    }

    public void onMessage(File message){
        System.out.println("-------onMessage---File message------------");
        System.out.println(message.getName());
    }
}

服務(wù)器端應(yīng)用類,

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        System.out.println("===start up ing======");
        TimeUnit.SECONDS.sleep(60);
        context.close();
    }
}

結(jié)論

  • ContentTypeDelegatingMessageConverter是一個(gè)代理的MessageConverter惜纸。
  • ContentTypeDelegatingMessageConverter本身不做消息轉(zhuǎn)換的具體動(dòng)作叶撒,而是將消息轉(zhuǎn)換委托給具體的MessageConverter。我們可以設(shè)置COntentType和MessageConverter的映射關(guān)系耐版。
  • ContentTypeDelegatingMessageConverter還有一個(gè)默認(rèn)的MessageConverter祠够,也就是說(shuō)當(dāng)根據(jù)ContentType沒(méi)有找到映射的MessageConverter的時(shí)候,就會(huì)使用默認(rèn)的MessageConverter粪牲。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末古瓤,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌落君,老刑警劉巖穿香,帶你破解...
    沈念sama閱讀 221,331評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異绎速,居然都是意外死亡皮获,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,372評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門(mén)朝氓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)魔市,“玉大人,你說(shuō)我怎么就攤上這事赵哲。” “怎么了君丁?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,755評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵枫夺,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我绘闷,道長(zhǎng)橡庞,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,528評(píng)論 1 296
  • 正文 為了忘掉前任印蔗,我火速辦了婚禮扒最,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘华嘹。我一直安慰自己吧趣,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,526評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布耙厚。 她就那樣靜靜地躺著强挫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪薛躬。 梳的紋絲不亂的頭發(fā)上俯渤,一...
    開(kāi)封第一講書(shū)人閱讀 52,166評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音型宝,去河邊找鬼八匠。 笑死,一個(gè)胖子當(dāng)著我的面吹牛趴酣,可吹牛的內(nèi)容都是我干的梨树。 我是一名探鬼主播,決...
    沈念sama閱讀 40,768評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼价卤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼劝萤!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起慎璧,我...
    開(kāi)封第一講書(shū)人閱讀 39,664評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤床嫌,失蹤者是張志新(化名)和其女友劉穎跨释,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體厌处,經(jīng)...
    沈念sama閱讀 46,205評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鳖谈,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,290評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了阔涉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片缆娃。...
    茶點(diǎn)故事閱讀 40,435評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖瑰排,靈堂內(nèi)的尸體忽然破棺而出贯要,到底是詐尸還是另有隱情,我是刑警寧澤椭住,帶...
    沈念sama閱讀 36,126評(píng)論 5 349
  • 正文 年R本政府宣布崇渗,位于F島的核電站,受9級(jí)特大地震影響京郑,放射性物質(zhì)發(fā)生泄漏宅广。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,804評(píng)論 3 333
  • 文/蒙蒙 一些举、第九天 我趴在偏房一處隱蔽的房頂上張望跟狱。 院中可真熱鬧,春花似錦户魏、人聲如沸驶臊。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,276評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)资铡。三九已至,卻和暖如春幢码,著一層夾襖步出監(jiān)牢的瞬間笤休,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工症副, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留店雅,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,818評(píng)論 3 376
  • 正文 我出身青樓贞铣,卻偏偏與公主長(zhǎng)得像闹啦,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子辕坝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,442評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容