第三篇:SpringBoot與消息

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 一巫延、概述









? ? ? ? ? ? 四摔笤、SpringBoot+RabbitMQ整合

  1. 使用docker安裝RabbitMQ
1)消略、使用docker pull 下載rabbitmq
[root@localhost ~]# docker pull registry.docker-cn.com/library/rabbitmq:3-management
3-management: Pulling from library/rabbitmq
683abbb4ea60: Already exists 
30a58d97bcb5: Pull complete 
...
  1. 啟動rabbitMQ
[root@localhost ~]# docker images
REPOSITORY                                TAG                 IMAGE ID            CREATED             SIZE
registry.docker-cn.com/library/redis      latest              71a81cb279e3        9 days ago          83.4MB
registry.docker-cn.com/library/rabbitmq   3-management        500d74765467        9 days ago          149MB
mysql                                     5.7                 66bc0f66b7af        9 days ago          372MB
[root@localhost ~]# docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 500d74765467
7599303175cb42287d0f58c0b9d0db67070199670cad4f680f6348e41d6e2240
  1. 在瀏覽器輸入:http://主機地址:15672 進入rabbitMQ登錄頁面


  2. 輸入默認用戶名:guest/guest 進入管理界面



  1. 搭建springboot+rabbit工程
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

引入spring-boot-starter-amqp包后springboot幫我們自動配置:

  1. 執(zhí)行RabbitAutoConfiguration
  2. 自動配置類會幫我們自動配置連接工廠虚缎、RabbitTemplate(給RabbitMQ發(fā)送和接受消息)守谓、AmqpAdmin(RabbitMQ系統(tǒng)管理功能組件)
 @Configuration
    @ConditionalOnMissingBean({ConnectionFactory.class})
    protected static class RabbitConnectionFactoryCreator {
        protected RabbitConnectionFactoryCreator() {
        }

        @Bean
        public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config) throws Exception {
            RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
            if(config.determineHost() != null) {
                factory.setHost(config.determineHost());
            }

            factory.setPort(config.determinePort());
            if(config.determineUsername() != null) {
                factory.setUsername(config.determineUsername());
            }

            if(config.determinePassword() != null) {
                factory.setPassword(config.determinePassword());
            }

            if(config.determineVirtualHost() != null) {
                factory.setVirtualHost(config.determineVirtualHost());
            }

            if(config.getRequestedHeartbeat() != null) {
                factory.setRequestedHeartbeat(config.getRequestedHeartbeat().intValue());
            }

            Ssl ssl = config.getSsl();
            if(ssl.isEnabled()) {
                factory.setUseSSL(true);
                if(ssl.getAlgorithm() != null) {
                    factory.setSslAlgorithm(ssl.getAlgorithm());
                }

                factory.setKeyStore(ssl.getKeyStore());
                factory.setKeyStorePassphrase(ssl.getKeyStorePassword());
                factory.setTrustStore(ssl.getTrustStore());
                factory.setTrustStorePassphrase(ssl.getTrustStorePassword());
            }

            if(config.getConnectionTimeout() != null) {
                factory.setConnectionTimeout(config.getConnectionTimeout().intValue());
            }

            factory.afterPropertiesSet();
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)factory.getObject());
            connectionFactory.setAddresses(config.determineAddresses());
            connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
            connectionFactory.setPublisherReturns(config.isPublisherReturns());
            if(config.getCache().getChannel().getSize() != null) {
                connectionFactory.setChannelCacheSize(config.getCache().getChannel().getSize().intValue());
            }

            if(config.getCache().getConnection().getMode() != null) {
                connectionFactory.setCacheMode(config.getCache().getConnection().getMode());
            }

            if(config.getCache().getConnection().getSize() != null) {
                connectionFactory.setConnectionCacheSize(config.getCache().getConnection().getSize().intValue());
            }

            if(config.getCache().getChannel().getCheckoutTimeout() != null) {
                connectionFactory.setChannelCheckoutTimeout(config.getCache().getChannel().getCheckoutTimeout().longValue());
            }

            return connectionFactory;
        }
    }
-------------------------------------------------------------------------------------
 @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean({RabbitTemplate.class})
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
            if(messageConverter != null) {
                rabbitTemplate.setMessageConverter(messageConverter);
            }

            rabbitTemplate.setMandatory(this.determineMandatoryFlag());
            Template templateProperties = this.properties.getTemplate();
            Retry retryProperties = templateProperties.getRetry();
            if(retryProperties.isEnabled()) {
                rabbitTemplate.setRetryTemplate(this.createRetryTemplate(retryProperties));
            }

            if(templateProperties.getReceiveTimeout() != null) {
                rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout().longValue());
            }

            if(templateProperties.getReplyTimeout() != null) {
                rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout().longValue());
            }

            return rabbitTemplate;
        }
------------------------------------------------------------------------------------------------
@Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnProperty(
            prefix = "spring.rabbitmq",
            name = {"dynamic"},
            matchIfMissing = true
        )
        @ConditionalOnMissingBean({AmqpAdmin.class})
        public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }

  1. RabbitProperties 封裝了RabbitMQ的所有配置
@ConfigurationProperties(
    prefix = "spring.rabbitmq"
)
public class RabbitProperties {
    private String host = "localhost";
    private int port = 5672;
    private String username;
    private String password;
    private final RabbitProperties.Ssl ssl = new RabbitProperties.Ssl();
    private String virtualHost;
    private String addresses;
    private Integer requestedHeartbeat;
    private boolean publisherConfirms;
    private boolean publisherReturns;
    private Integer connectionTimeout;
    private final RabbitProperties.Cache cache = new RabbitProperties.Cache();
    private final RabbitProperties.Listener listener = new RabbitProperties.Listener();
    private final RabbitProperties.Template template = new RabbitProperties.Template();
    private List<RabbitProperties.Address> parsedAddresses;
...
  1. application.properties配置
spring.rabbitmq.host=192.168.43.53
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  1. 編寫測試類
  • 自定義MessageConverter
package com.pyy.rabbitmq.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by Administrator on 2018/7/7 0007.
 */
@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

  • Exchange.direct:點對點(單播模式)

發(fā)送消息:

    /**
     * 1穿铆、單播(點對點)
     */
    @Test
    public void direct() {
        //Message需要自定構(gòu)造一個:定義消息體內(nèi)容和消息頭:
        //rabbitTemplate.send(exchange, routeKey, message);

        // object默認當成消息體,只需要傳入需要發(fā)送的內(nèi)容斋荞,自動序列化發(fā)送給rabbitmq
        // rabbitTemplate.convertAndSend(exchange, routeKey, object);
        User user = new User();
        user.setUsername("張三");
        user.setPassword("12312321");

        // msg 對象被默認(SimpleMessageConverter -- byte[])序列化后發(fā)送出去

        // 通過自定義:MessageConverter 實現(xiàn)JSON序列化
        rabbitTemplate.convertAndSend("exchange.direct", "pyy.news", user);
    }

只有Queue:pyy.news 該消息隊列接收到消息荞雏,其他隊列不能接收到該消息。
消息內(nèi)容如下:



接受消息:

/**
     * 接受數(shù)據(jù)
     */
    @Test
    public void receve() {
        Object obj = rabbitTemplate.receiveAndConvert("pyy.news");
        System.out.println(obj.getClass());
        System.out.println(obj);

        if(obj instanceof  User){
            User u = (User) obj;
            System.out.println(u.getUsername());
            System.out.println(u.getPassword());
        }

    }

結(jié)果:

  class com.pyy.rabbitmq.User
  com.pyy.rabbitmq.User@4c27d39d
  張三
  12312321
  • Exchange.fanout:廣播模式
/**
     * 2平酿、廣播
     */
    @Test
    public void fanout() {
        //Message需要自定構(gòu)造一個:定義消息體內(nèi)容和消息頭:
        //rabbitTemplate.send(exchange, routeKey, message);

        // object默認當成消息體凤优,只需要傳入需要發(fā)送的內(nèi)容,自動序列化發(fā)送給rabbitmq
        // rabbitTemplate.convertAndSend(exchange, routeKey, object);
        User user = new User();
        user.setUsername("李四");
        user.setPassword("123456");

        // fanout模式路由鍵不用指定染服,所有綁定到這個交換機的消息隊列都能接受到該消息
        rabbitTemplate.convertAndSend("exchange.fanout", "", user);
    }

注意:fanout模式路由鍵不用指定别洪,所有綁定到這個交換機的消息隊列都能接受到該消息


  • Exchange.topic 主題模式
/**
     * 3、topic
     */
    @Test
    public void topic() {
        //Message需要自定構(gòu)造一個:定義消息體內(nèi)容和消息頭:
        //rabbitTemplate.send(exchange, routeKey, message);

        // object默認當成消息體柳刮,只需要傳入需要發(fā)送的內(nèi)容挖垛,自動序列化發(fā)送給rabbitmq
        // rabbitTemplate.convertAndSend(exchange, routeKey, object);
        User user = new User();
        user.setUsername("王五");
        user.setPassword("123456");

        // topic模式路由鍵只有和exchange綁定的路由鍵規(guī)則匹配,對應(yīng)的消息隊列就能收到消息
        rabbitTemplate.convertAndSend("exchange.topic", "*.news", user);
    }


SpringBoot高級-消息-@RabbitListener&@EnableRabbit監(jiān)聽

  1. 開啟RabbitListener注解支持
@EnableRabbit // 開啟基于注解的RabbitMQ
@SpringBootApplication
public class Springboot02RabbitmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(Springboot02RabbitmqApplication.class, args);
    }
}
  1. 使用@RabbitListener注解完成消息接收
package com.pyy.rabbitmq.service;

import com.pyy.rabbitmq.model.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * Created by Administrator on 2018/7/8 0008.
 */
@Service
public class UserService {

    /**
     * 自動監(jiān)聽消息隊列:pyy.news
     * @param user
     */
    @RabbitListener(queues = {"pyy.news"})
    public void receive(User user) {
        System.out.println("收到消息:"+user);
    }
    //結(jié)果: 收到消息:User{username='張三', password='12312321'}

    /**
     * 自動監(jiān)聽消息隊列:pyy.news
     * @param message
     */
    @RabbitListener(queues = {"pyy"})
    public void receive(Message message) {
        System.out.println("消息頭:"+message.getMessageProperties());
        System.out.println("消息體:"+message.getBody());
    }

}

系統(tǒng)會自動監(jiān)聽指定名稱的消息隊列秉颗,只有有消息自動消費痢毒。


AmqpAdmin 管理rabbitmq相關(guān)操作:

package com.pyy.rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02RabbitmqApplicationTests1 {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 創(chuàng)建交換機:exchange
     */
    @Test
    public void createExchange() {
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.DirectExchange"));

        amqpAdmin.declareExchange(new FanoutExchange("amqpadmin.FanoutExchange"));

        amqpAdmin.declareExchange(new TopicExchange("amqpadmin.TopicExchange"));

        System.out.println("創(chuàng)建完畢");
    }

    /**
     * 創(chuàng)建隊列:queye
     */
    @Test
    public void createQueue() {
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));

        System.out.println("創(chuàng)建完畢");
    }

    /**
     * 創(chuàng)建綁定規(guī)則:binding
     */
    @Test
    public void createBinding() {
        /**
         String destination: 目的地
         String exchange: 交換機
         String routingKey: 路由鍵
         Map<String, Object> arguments:
         Binding.DestinationType destinationType:目的地類型
         */
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.DirectExchange", "amqp.haha", null));
        System.out.println("創(chuàng)建完畢");
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蚕甥,隨后出現(xiàn)的幾起案子哪替,更是在濱河造成了極大的恐慌,老刑警劉巖菇怀,帶你破解...
    沈念sama閱讀 212,185評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凭舶,死亡現(xiàn)場離奇詭異,居然都是意外死亡爱沟,警方通過查閱死者的電腦和手機帅霜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,445評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來呼伸,“玉大人身冀,你說我怎么就攤上這事。” “怎么了搂根?”我有些...
    開封第一講書人閱讀 157,684評論 0 348
  • 文/不壞的土叔 我叫張陵珍促,是天一觀的道長。 經(jīng)常有香客問我剩愧,道長猪叙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,564評論 1 284
  • 正文 為了忘掉前任隙咸,我火速辦了婚禮沐悦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘五督。我一直安慰自己,他們只是感情好瓶殃,可當我...
    茶點故事閱讀 65,681評論 6 386
  • 文/花漫 我一把揭開白布充包。 她就那樣靜靜地躺著,像睡著了一般遥椿。 火紅的嫁衣襯著肌膚如雪基矮。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,874評論 1 290
  • 那天冠场,我揣著相機與錄音家浇,去河邊找鬼。 笑死碴裙,一個胖子當著我的面吹牛钢悲,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播舔株,決...
    沈念sama閱讀 39,025評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼莺琳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了载慈?” 一聲冷哼從身側(cè)響起惭等,我...
    開封第一講書人閱讀 37,761評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎办铡,沒想到半個月后辞做,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,217評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡寡具,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,545評論 2 327
  • 正文 我和宋清朗相戀三年秤茅,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片晒杈。...
    茶點故事閱讀 38,694評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡嫂伞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情帖努,我是刑警寧澤撰豺,帶...
    沈念sama閱讀 34,351評論 4 332
  • 正文 年R本政府宣布,位于F島的核電站拼余,受9級特大地震影響污桦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜匙监,卻給世界環(huán)境...
    茶點故事閱讀 39,988評論 3 315
  • 文/蒙蒙 一凡橱、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧亭姥,春花似錦稼钩、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,778評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至粮揉,卻和暖如春巡李,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背扶认。 一陣腳步聲響...
    開封第一講書人閱讀 32,007評論 1 266
  • 我被黑心中介騙來泰國打工侨拦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人辐宾。 一個月前我還...
    沈念sama閱讀 46,427評論 2 360
  • 正文 我出身青樓狱从,卻偏偏與公主長得像,于是被迫代替她去往敵國和親螃概。 傳聞我的和親對象是個殘疾皇子矫夯,可洞房花燭夜當晚...
    茶點故事閱讀 43,580評論 2 349

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,892評論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)吊洼,斷路器训貌,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化冒窍、事務(wù)递沪、擁塞控...
    jiangmo閱讀 10,351評論 2 34
  • 利用RabbitMQ集群橫向擴展能力,均衡流量壓力综液,讓消息集群的秒級服務(wù)能力達到百萬款慨,Google曾做過此類實驗;...
    有貨技術(shù)閱讀 3,458評論 0 1
  • //增加 //插入數(shù)據(jù)關(guān)鍵字 INSERT INTO 表名(字段一谬莹,字段二檩奠,字段三)桩了; VALUE(值一,值二埠戳,值...
    久久歸移閱讀 238評論 0 0