rabbitmq

先說一下安裝rabbitmq
1,安裝opt_win64_22.0.exe (Erlang )
2僚饭,安裝rabbitmq
3,計算機--管理--服務(wù) 找到rabbitmq服務(wù)右鍵屬性胧砰,更改為當前登錄人并填寫密碼鳍鸵,然后重啟服務(wù)
4,去安裝目錄C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin的路徑的cmd窗口 執(zhí)行:rabbitmq-plugins enable rabbitmq_management 打開管理窗口
5尉间,管理地址:http://localhost:15672 登錄名:guest偿乖,密碼:guest(注意:本地登錄才有效,遠程登錄不行)

概述:
rabbitmq是一個消息隊列哲嘲,主要用于系統(tǒng)之間的異步和解耦贪薪,同時也能起到消息緩存,消息分發(fā)的作用眠副。
一般的消息隊列有3個關(guān)鍵部分画切,生產(chǎn)者,隊列囱怕,消費者霍弹;但是rabbitmq還有一個exchange(交換機);交換機的作用是:生產(chǎn)者把消息給交換機然后根據(jù)策略路由到隊列上面去保存娃弓;交換機不能保存消息典格。
rabbitmq還有一個虛擬主機的概念:其作用就是權(quán)限隔離,A虛擬機的交換機和隊列跟B虛擬機的交換機和隊列是不能互相訪問(針對用戶)台丛,就是用戶僅有A虛擬機的權(quán)限就不能去訪問B虛擬機里的交換機和隊列钝计。

交換機四種模式(每個隊列中的消息只能被消費一次):
交換機的作用就是把消息路由到綁定的隊列中去。
direct:簡單模式齐佳,一個發(fā)私恬,另一個收
topic:主題模式,比direct靈活
fanout:廣播模式炼吴,綁定的隊列全部都能消費
headers:設(shè)置 header attribute 參數(shù)類型的交換機(使用的少本鸣,后期再琢磨)
特性:
ack:應(yīng)答機制,用于很重要的消息硅蹦,必須確保送到且消費荣德。默認是自動應(yīng)答;如需手動應(yīng)答需要自己設(shè)置配置文件童芹。
durable:持久化機制涮瞻,把隊列中的數(shù)據(jù)持久化到硬盤,避免rabbitmq服務(wù)器down機假褪,而數(shù)據(jù)不會丟失署咽。

貼代碼(springboot整合rabbitmq):
properties配置:

#rabbitmq配置
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

# 開啟ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

direct模式(不用配置交換機,有默認的)
配置隊列:

package com.example.rabbitmq1.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * rabbitmq的默認模式:direct生音,最簡單的隊列模式宁否,一個生產(chǎn)者,一個消費者缀遍,交換機也是默認的不用創(chuàng)建慕匠,路由key默認是隊列名
 * 1個消息只能被消費一次,如果有多個消費者域醇,rabbitmq服務(wù)器會自己做負載均衡台谊,平均消費
 */
@Configuration
public class DirectConfig {
    public static final String queue_direct_a = "queue.direct.a";
    public static final String queue_direct_user = "queue.direct.user";

    @Bean(name = queue_direct_a)
    public Queue queue1(){
        return new Queue(queue_direct_a);
    }

    @Bean(name = queue_direct_user)
    public Queue queue2(){
        return new Queue(queue_direct_user);
    }
}

生產(chǎn)者:

package com.example.rabbitmq1.producer;
import com.example.rabbitmq1.config.DirectConfig;
import com.example.rabbitmq1.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
 * 生產(chǎn)者
 */
@Component
public class DirectProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String info){
        String now = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
        String msg = "時間: " + now + ", info: " + info;
        amqpTemplate.convertAndSend(DirectConfig.queue_direct_a, msg);
    }

    /**
     * mq之間傳遞Java對象(Java對象要實現(xiàn)序列化接口)
     * @param user
     */
    public void sendMsg(User user){
        amqpTemplate.convertAndSend(DirectConfig.queue_direct_user, user);
    }
}

消費者:

package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.DirectConfig;
import com.example.rabbitmq1.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = DirectConfig.queue_direct_user)
public class DirectConsumer3 {

    @RabbitHandler
    public void receive(User user){
        // 如果是String類型,接收類型就改為String
        System.out.println("-----------------------------user--------------------------");
        System.out.println(user);
    }
}

topic模式
配置隊列譬挚,交換機锅铅,互相綁定

package com.example.rabbitmq1.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * topic模式相比direct模式就是靈活一點(在同一個隊列中也是1個消息實體只能被消費1次)
 * 一個交換機可以綁定到多個隊列上,也可以使用匹配模式來匹配符合規(guī)則的隊列
 * 匹配模式:*代表一個元素殴瘦,#代表0個或多個元素
 * 隊列命名原則:aaa.bbbb.ccc   以.隔開
 */
@Configuration
public class TopicConfig {

    public static final String queue_topic_a = "queue.topicA";  //不支持queue.topic.a    三級模式(這個坑狠角,我剛踩完)

    public static final String queue_topic_b = "queue.topicB";

    public static final String exchange_topic = "exchange_topic";


    @Bean(name = queue_topic_a)
    public Queue queue1(){
        return new Queue(queue_topic_a);
    }

    @Bean(name = queue_topic_b)
    public Queue queue2(){
        return new Queue(queue_topic_b);
    }

    @Bean(name = exchange_topic)
    public TopicExchange exchange(){
        return new TopicExchange(exchange_topic);
    }

    /**
     * 把隊列綁定到交換機上面
     * with方法中的參數(shù)是路由key
     * 在使用的時候會根據(jù)交換和路由隊列名來推送消息,
     * 如果隊列名符合路由key的規(guī)則就會把消息推送到綁定的隊列中去
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding bindingExchangeA(@Qualifier(queue_topic_a) Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with(queue_topic_a);
    }

    @Bean
    public Binding bindingExchangeB(@Qualifier(queue_topic_b) Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("queue.*");
    }
}

生產(chǎn)者:

package com.example.rabbitmq1.producer;
import com.example.rabbitmq1.config.TopicConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String text){
        amqpTemplate.convertAndSend(TopicConfig.exchange_topic, TopicConfig.queue_topic_b, text);
    }
}

消費者:

package com.example.rabbitmq1.consumer;

import com.example.rabbitmq1.config.TopicConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

    @RabbitListener(queues = TopicConfig.queue_topic_a)
    public void receiveA(String info){
        System.out.println("-----------------------隊列A:" + info);
    }

    @RabbitListener(queues = TopicConfig.queue_topic_b)
    public void receiveB(String info){
        System.out.println("-----------------------隊列B:" + info);
    }
}

fanout模式就是廣播模式蚪腋,和topic類似丰歌,把不同點說說

    @Bean
    public FanoutExchange exchange() {
        // 是fanout的交換機
        return new FanoutExchange(exchange_fanout);
    }

    @Bean
    public Binding bindingFanoutA(@Qualifier(queue_fanout_a) Queue queue, FanoutExchange fanoutExchange){
        // 沒有路由key,(后面沒有with)
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    // 生產(chǎn)者屉凯,注意隊列名為null
    public void sendMsg(String info){
//        amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, info);   此用法無效
        amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, null, info);
    }

應(yīng)答機制(properties記得開啟配置):

package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.FanoutConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class FanoutConsumer {

    @RabbitListener(queues = FanoutConfig.queue_fanout_a)
    public void receive1(String info, Channel channel, Message message){
        try {
            // 應(yīng)答這條消息立帖,如果沒有應(yīng)答,下次連接消息依然會收到
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("----------------------------隊列A: "+info);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @RabbitListener(queues = FanoutConfig.queue_fanout_b)
    public void receive2(String info, Channel channel, Message message){
        try {
            // 銷毀這條消息(在隊列中銷毀)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("----------------------------隊列B: "+info);
    }
}

持久化機制:默認是持久化
看源碼的構(gòu)造方法:隊列和交換機
隊列:

public class Queue extends AbstractDeclarable {
    public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
    private final String name;
    private final boolean durable;
    private final boolean exclusive;
    private final boolean autoDelete;
    private final Map<String, Object> arguments;
    private volatile String actualName;

    public Queue(String name) {
        this(name, true, false, false);  //看第2個參數(shù)默認true
    }

    public Queue(String name, boolean durable) {
        this(name, durable, false, false, (Map)null);
    }

    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, (Map)null);
    }
}

交換機:

public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    private final String name;
    private final boolean durable;
    private final boolean autoDelete;
    private final Map<String, Object> arguments;
    private volatile boolean delayed;
    private boolean internal;

    public AbstractExchange(String name) {
        this(name, true, false);  //看第2個參數(shù)默認為true
    }

    public AbstractExchange(String name, boolean durable, boolean autoDelete) {
        this(name, durable, autoDelete, (Map)null);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悠砚,一起剝皮案震驚了整個濱河市晓勇,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖绑咱,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件绰筛,死亡現(xiàn)場離奇詭異,居然都是意外死亡描融,警方通過查閱死者的電腦和手機铝噩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來窿克,“玉大人骏庸,你說我怎么就攤上這事∧甓#” “怎么了具被?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長只损。 經(jīng)常有香客問我一姿,道長,這世上最難降的妖魔是什么改执? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任啸蜜,我火速辦了婚禮,結(jié)果婚禮上辈挂,老公的妹妹穿的比我還像新娘衬横。我一直安慰自己,他們只是感情好终蒂,可當我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布蜂林。 她就那樣靜靜地躺著,像睡著了一般拇泣。 火紅的嫁衣襯著肌膚如雪噪叙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天霉翔,我揣著相機與錄音睁蕾,去河邊找鬼。 笑死债朵,一個胖子當著我的面吹牛子眶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播序芦,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼臭杰,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了谚中?” 一聲冷哼從身側(cè)響起渴杆,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤寥枝,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后磁奖,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體囊拜,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年点寥,在試婚紗的時候發(fā)現(xiàn)自己被綠了艾疟。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡敢辩,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出弟疆,到底是詐尸還是另有隱情戚长,我是刑警寧澤,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布怠苔,位于F島的核電站同廉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏柑司。R本人自食惡果不足惜迫肖,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望攒驰。 院中可真熱鬧蟆湖,春花似錦、人聲如沸玻粪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽劲室。三九已至伦仍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間很洋,已是汗流浹背充蓝。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留喉磁,地道東北人谓苟。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像线定,于是被迫代替她去往敵國和親娜谊。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 什么叫消息隊列斤讥? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)纱皆。消息可以非常簡單湾趾,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,371評論 0 24
  • RabbitMQ 原理介紹及安裝部署 標簽:RabbitMQ 安裝 簡介 RabbitMQ 是一個用 Erlang...
    神仙CGod閱讀 8,567評論 0 60
  • 關(guān)于消息隊列派草,從前年開始斷斷續(xù)續(xù)看了些資料搀缠,想寫很久了,但一直沒騰出空近迁,近來分別碰到幾個朋友聊這塊的技術(shù)選型艺普,是時...
    預(yù)流閱讀 584,647評論 51 786
  • 今天發(fā)現(xiàn)兒子交朋友的方法了,上午帶兒子在游樂場玩的時候鉴竭,兒子突然和一個小朋友說歧譬,我們倆做朋友吧。然后兩個人一塊兒玩...
    鬧鬧小乖乖閱讀 214評論 0 0