RabbitMQ基礎(chǔ)概念和使用

基本概念

Amqp概念

amqp,既Advanced Message Queuing Protocol ,一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議

包括的要素

信道:多個線程間通過一個 tcp鏈接與服務端通訊,每個線程使用一個信道通訊偎漫,保證

交換器-隊列-綁定-路由鍵

  • 路由鍵是發(fā)送者指定由交換機和隊列的綁定
  • 生產(chǎn)者發(fā)送消息到交換機
  • 消費者綁定隊列
  • 到達了無人訂閱的隊列,消息會一直排隊等待
  • 一個隊列有多個訂閱者---消息會循環(huán)方式 以此發(fā)個這幾個消費者
  • 發(fā)送者發(fā)送一個不存在的路由鍵--消息會丟失
clipboard.png

消息的確認

消費者收到的每一條消息都必須進行確認(手動或者自動)

  • 消費者遲遲不確認,rabbitMQ 會一直會保持這個消息性昭,直到鏈接的斷開颖低,會將消息投遞到另一個消費者(前提是有多個消費者)

topic模式

可以使通配符 通過交換機&路由鍵是消息到多個隊列中去

clipboard.png

虛擬主機

/service1

/service2

多個應用分區(qū)串纺,類似oracle - 表空間

每個用戶名只能連自己的虛擬主機

多個應用時可以很好地做服務隔離


基礎(chǔ)使用

使用RabbitMQ原生Java客戶端進行消息通信

客戶端需要amqp-client-5.0.0.jar和slf4j-api-1.6.1.jar
建議使用Maven:
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>
注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具體的版本號到Maven的中央倉庫查)的版本染服。

使用RabbitMQ原生Java客戶端進行消息通信

Direct Exchange示例

簡單形式的 生產(chǎn)者-消費者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *direct類型交換器的生產(chǎn)者
 */
public class DirectProducer {

    public final static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args)
            throws IOException, TimeoutException {
        /* 創(chuàng)建連接,連接到RabbitMQ*/
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();

        /*創(chuàng)建信道*/
        Channel channel = connection.createChannel();
        /*創(chuàng)建交換器*/
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);

        /*日志消息級別,作為路由鍵使用*/
        String[] serverities = {"error","info","warning"};
        for(int i=0;i<3;i++){
            String severity = serverities[i%3];
            String msg = "Hellol,RabbitMq"+(i+1);
            /*發(fā)布消息叨恨,需要參數(shù):交換器柳刮,路由鍵,其中以日志消息級別為路由鍵*/
            channel.basicPublish(EXCHANGE_NAME,severity,null,
                    msg.getBytes());
            System.out.println("Sent "+severity+":"+msg);
        }
        channel.close();
        connection.close();

    }

}

普通消費者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *普通的消費者
 */
public class NormalConsumer {

    public static void main(String[] argv)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打開連接和創(chuàng)建頻道痒钝,與發(fā)送端一樣
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                "direct");

        /*聲明一個隊列*/
        String queueName = "focuserror";
        channel.queueDeclare(queueName,false,false,
                false,null);

        /*綁定诚亚,將隊列和交換器通過路由鍵進行綁定*/
        String routekey = "info";/*表示只關(guān)注error級別的日志消息*/
        channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,routekey);

        System.out.println("waiting for message........");

        /*聲明了一個消費者*/
        //envelope  信封 可以獲取路由鍵,隊列名 等信息
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received["+envelope.getRoutingKey()
                        +"]"+message);
            }
        };
        /*消費者正式開始在指定隊列上消費消息*/
        channel.basicConsume(queueName,true,consumer);
    }

}

消費者綁定多個路由鍵

結(jié)果:這個消費者 會收到多個隊列的消息

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *隊列和交換器的多重綁定
 */
public class MulitBindConsumer {

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打開連接和創(chuàng)建頻道午乓,與發(fā)送端一樣
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                "direct");

        //聲明一個隨機隊列
        String queueName = channel.queueDeclare().getQueue();

        /*隊列綁定到交換器上時站宗,是允許綁定多個路由鍵的,也就是多重綁定*/
        String[] severities={"error","info","warning"};
        for(String serverity:severities){
            channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,
                    serverity);
        }
        System.out.println(" [*] Waiting for messages:");

        // 創(chuàng)建隊列消費者
        final Consumer consumerA = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties
                                               properties,
                                       byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" Received "
                        + envelope.getRoutingKey() + ":'" + message
                        + "'");
            }
        };
        channel.basicConsume(queueName, true, consumerA);
    }
}

一個連接多個信道

結(jié)果:每個消費者 都會收到所有的消息

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *一個連接多個信道
 */
public class MulitChannelConsumer {

    private static class ConsumerWorker implements Runnable{
        final Connection connection;

        public ConsumerWorker(Connection connection) {
            this.connection = connection;
        }

        public void run() {
            try {
                /*創(chuàng)建一個信道益愈,意味著每個線程單獨一個信道*/
                final Channel channel = connection.createChannel();
                channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                        "direct");
                // 聲明一個隨機隊列
                String queueName = channel.queueDeclare().getQueue();
                //消費者名字梢灭,打印輸出用
                final String consumerName
                        =  Thread.currentThread().getName()+"-all";

                //所有日志嚴重性級別
                String[] severities={"error","info","warning"};
                for (String severity : severities) {
                    //關(guān)注所有級別的日志(多重綁定)
                    channel.queueBind(queueName,
                            DirectProducer.EXCHANGE_NAME, severity);
                }
                System.out.println("["+consumerName+"] Waiting for messages:");

                // 創(chuàng)建隊列消費者
                final Consumer consumerA = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties
                                                       properties,
                                               byte[] body)
                            throws IOException {
                        String message =
                                new String(body, "UTF-8");
                        System.out.println(consumerName
                                +" Received "  + envelope.getRoutingKey()
                                + ":'" + message + "'");
                    }
                };
                channel.basicConsume(queueName, true, consumerA);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打開連接和創(chuàng)建頻道,與發(fā)送端一樣
        Connection connection = factory.newConnection();
        //一個連接多個信道
        for(int i=0;i<2;i++){
            /*將連接作為參數(shù)蒸其,傳遞給每個線程*/
            Thread worker =new Thread(new ConsumerWorker(connection));
            worker.start();
        }
    }
}

一個隊列多個消費者敏释,則會表現(xiàn)出消息在消費者之間的輪詢發(fā)送。

消費者會輪詢收到消息

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *一個隊列多個消費者摸袁,則會表現(xiàn)出消息在消費者之間的輪詢發(fā)送钥顽。
 */
public class MulitConsumerOneQueue {

    private static class ConsumerWorker implements Runnable{
        final Connection connection;
        final String queueName;

        public ConsumerWorker(Connection connection,String queueName) {
            this.connection = connection;
            this.queueName = queueName;
        }

        public void run() {
            try {
                /*創(chuàng)建一個信道,意味著每個線程單獨一個信道*/
                final Channel channel = connection.createChannel();
                channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                        "direct");
                /*聲明一個隊列,rabbitmq靠汁,如果隊列已存在蜂大,不會重復創(chuàng)建*/
                channel.queueDeclare(queueName,
                        false,false,
                        false,null);
                //消費者名字,打印輸出用
                final String consumerName
                        =  Thread.currentThread().getName();

                //所有日志嚴重性級別
                String[] severities={"error","info","warning"};
                for (String severity : severities) {
                    //關(guān)注所有級別的日志(多重綁定)
                    channel.queueBind(queueName,
                            DirectProducer.EXCHANGE_NAME, severity);
                }
                System.out.println(" ["+consumerName+"] Waiting for messages:");

                // 創(chuàng)建隊列消費者
                final Consumer consumerA = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties
                                                       properties,
                                               byte[] body)
                            throws IOException {
                        String message =
                                new String(body, "UTF-8");
                        System.out.println(consumerName
                                +" Received "  + envelope.getRoutingKey()
                                + ":'" + message + "'");
                    }
                };
                channel.basicConsume(queueName, true, consumerA);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打開連接和創(chuàng)建頻道蝶怔,與發(fā)送端一樣
        Connection connection = factory.newConnection();

        //3個線程奶浦,線程之間共享隊列,一個隊列多個消費者
        String queueName = "focusAll";
        for(int i=0;i<3;i++){
            /*將隊列名作為參數(shù),傳遞給每個線程*/
            Thread worker =new Thread(new ConsumerWorker(connection,queueName));
            worker.start();
        }

    }
}

個人理解:信道只是客戶端與服務端建立連接 踢星,消息消費時 是多個消費者輪詢收到消息 還是 每個消費者收到全部消息 取決于這些消費者是否監(jiān)聽同一個隊列
模式:
一個連接多個信道 -- 實際是 每個信道中有一個隨機產(chǎn)生的隊列名澳叉,此時是多個隊列是不同的隊列,每個隊列 有一個消費者 那么這個消費者就會收到這個隊列里的所有消息成洗;在上一層 每個信道使用(邦定)的是同一個路由鍵(就是生產(chǎn)者發(fā)布的路由鍵)五督,所以此時每個隊列都會收到生產(chǎn)者發(fā)布的所有消息,進而每個消費者就可以收到所有消息
一個隊列多個消費者瓶殃,則會表現(xiàn)出消息在消費者之間的輪詢發(fā)送 -- 實際上是 多個信道使用(邦定)的同一個路由鍵充包,而這些信道使用的是同一個隊列,消息會發(fā)布到這個隊列中(此時只有這一個隊列)碌燕,所以多個消費者監(jiān)聽的是同一個隊列误证,此時消息會被這些消費者(輪詢,分發(fā))收到

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末修壕,一起剝皮案震驚了整個濱河市愈捅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌慈鸠,老刑警劉巖蓝谨,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異青团,居然都是意外死亡譬巫,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門督笆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來芦昔,“玉大人,你說我怎么就攤上這事娃肿」径校” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵料扰,是天一觀的道長凭豪。 經(jīng)常有香客問我,道長晒杈,這世上最難降的妖魔是什么嫂伞? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮拯钻,結(jié)果婚禮上帖努,老公的妹妹穿的比我還像新娘。我一直安慰自己说庭,他們只是感情好然磷,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著刊驴,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上捆憎,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天舅柜,我揣著相機與錄音,去河邊找鬼躲惰。 笑死致份,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的础拨。 我是一名探鬼主播氮块,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诡宗!你這毒婦竟也來了滔蝉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤塔沃,失蹤者是張志新(化名)和其女友劉穎蝠引,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛀柴,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡螃概,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了鸽疾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吊洼。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖制肮,靈堂內(nèi)的尸體忽然破棺而出冒窍,到底是詐尸還是另有隱情,我是刑警寧澤弄企,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布超燃,位于F島的核電站,受9級特大地震影響拘领,放射性物質(zhì)發(fā)生泄漏意乓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一约素、第九天 我趴在偏房一處隱蔽的房頂上張望届良。 院中可真熱鬧,春花似錦圣猎、人聲如沸士葫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽慢显。三九已至爪模,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間荚藻,已是汗流浹背屋灌。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工酷师, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留煤搜,地道東北人。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓糠馆,卻偏偏與公主長得像疾呻,于是被迫代替她去往敵國和親除嘹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,869評論 2 11
  • 什么叫消息隊列 消息(Message)是指在應用間傳送的數(shù)據(jù)岸蜗。消息可以非常簡單尉咕,比如只包含文本字符串,也可以更復雜...
    lijun_m閱讀 1,335評論 0 1
  • RabbitMQ優(yōu)異性 RabbitMQ發(fā)展到今天散吵,被越來越多的人認可龙考,這和它在易用性、擴展性矾睦、可靠性和高可用性等...
    查無此人_chazz閱讀 525評論 0 0
  • 這個指導提供一個AMQP 0-9-1協(xié)議的概述晦款,它是RabbitMq支持的一個協(xié)議。 什么是AMQP 0-9-1枚冗?...
    浪_6e80閱讀 705評論 0 1
  • 好詞:破譯缓溅,救濟苦難,無緣無故赁温,如醉如癡坛怪,中影,左顧右盼股囊,毫不足鬼袜匿。 好句:他周身包圍著一層極薄的牧,這是天生的稚疹,...
    朋吧閱讀 533評論 0 0