rabbitmq消息隊(duì)列入門

介紹

rabbitmq是一個(gè)消息代理冒掌,它接收和轉(zhuǎn)發(fā)消息歪今,類似一個(gè)郵局萧锉,把你投遞的郵件送給指定收件人。
相關(guān)術(shù)語:

  • producing: 消息生產(chǎn)者朗兵,用于發(fā)送消息
  • queue: 隊(duì)列污淋,用于存儲消息
  • consuming: 消息消費(fèi)者,用于接收消息

HelloWorld

P為生產(chǎn)者余掖,C是消費(fèi)者寸爆,中間的框是隊(duì)列,消息的緩沖區(qū)盐欺。


image.png

消息發(fā)送

image.png

send.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            // 創(chuàng)建一個(gè)連接
            Connection connection = factory.newConnection();
            // 創(chuàng)建一個(gè)頻道赁豆,用于復(fù)用連接
            Channel channel = connection.createChannel();
            // 聲明消息發(fā)送的隊(duì)列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 往隊(duì)列中發(fā)出一條消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent'" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

消息接收

image.png

Recv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            // 創(chuàng)建一個(gè)連接
            Connection connection = factory.newConnection();
            // 創(chuàng)建一個(gè)頻道,用于復(fù)用連接
            Channel channel = connection.createChannel();
            // 指定發(fā)送的隊(duì)列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 往隊(duì)列中發(fā)出一條消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent'" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

Work Queues

image.png

在HelloWorld中寫了發(fā)送/接收消息的程序冗美,現(xiàn)在我們創(chuàng)建一個(gè)Work Queues(也稱為Task Queues)魔种,來在多個(gè)耗時(shí)的消息之間分配任務(wù)。

NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class NewTask {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // durable true 持久化
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 發(fā)送當(dāng)前時(shí)間
            String message = String.valueOf(System.currentTimeMillis());
            // PERSISTENT_TEXT_PLAIN 持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("[x] Sent'" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

聲明消息持久化后rabbitmq宕機(jī)也能從存儲中恢復(fù)消息粉洼。

Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Worker {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            // 未回復(fù)消息處理完节预,消息隊(duì)列不會給它發(fā)新的消息
            channel.basicQos(1);
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    doWork();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 手動(dòng)確認(rèn)
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    private  static  void  doWork () throws InterruptedException {
        Thread.sleep(1000);
    }

}

設(shè)置prefetchCount為1,在沒有處理完一條消息的時(shí)候属韧,消息隊(duì)列不會給它繼續(xù)下發(fā)消息安拟,在它確認(rèn)完消息后,消息隊(duì)列繼續(xù)下發(fā)新消息宵喂。


image.png

發(fā)送/訂閱

在上面我們把消息發(fā)給相同的一個(gè)消費(fèi)者糠赦,現(xiàn)在把消息發(fā)送給多個(gè)消費(fèi)者,這種模式稱為發(fā)布訂閱模式。為了演示這種模式愉棱,我們創(chuàng)建一個(gè)簡單的日志記錄系統(tǒng)唆铐,生產(chǎn)者發(fā)出日志,消費(fèi)者接收并打印它們奔滑,發(fā)布的消息將被廣播給所有消費(fèi)者。

Exchange (交換機(jī))

前面簡單的展示了如何接收發(fā)送消息顺少,現(xiàn)在介紹完整的rabbitmq概念朋其。簡單重復(fù)一下前面介紹的內(nèi)容:

  • 生產(chǎn)者是發(fā)送消息的程序
  • 隊(duì)列是消息的緩沖器
  • 消費(fèi)者是接收處理消息的程序

rabbitmq消息模型的核心思想是,生產(chǎn)者從來不會直接發(fā)送消息給一個(gè)隊(duì)列脆炎。又或者說生產(chǎn)者甚至不知道它的消息將會發(fā)送到哪個(gè)隊(duì)列梅猿。

生產(chǎn)者只能發(fā)送消息給一個(gè)交換機(jī)。交換機(jī)是一個(gè)很簡單的概念秒裕。它接收生產(chǎn)者的消息袱蚓,然后推送消息到隊(duì)列中。交換機(jī)必須明確知道自己要對接收到的消息進(jìn)行何種處理: 是推送到特定隊(duì)列几蜻,還是推送到所有的隊(duì)列還是直接丟棄喇潘,這些規(guī)則由交換機(jī)的類型來定義。


image.png

有許多可供選擇的交換機(jī)類型: direct梭稚、topic颖低、headers、fanout弧烤。 我詳細(xì)介紹fanout忱屑。創(chuàng)建一個(gè)fanout類型的交換機(jī),稱它為logs暇昂。

EmitLog.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // fanout廣播模式莺戒,會廣播所有接收到的消息給所有它的已知隊(duì)列
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);

            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

ReveiveLogs.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 臨時(shí)隊(duì)列,非持久化的急波,唯一的从铲,斷開連接自動(dòng)刪除的并且隨機(jī)名稱的隊(duì)列
        String queueName = channel.queueDeclare().getQueue();
        // 綁定隊(duì)列和交換機(jī),告訴交換機(jī)給我們發(fā)送消息幔崖,如果沒有綁定到交換機(jī)上食店,消息會丟失
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市赏寇,隨后出現(xiàn)的幾起案子吉嫩,更是在濱河造成了極大的恐慌,老刑警劉巖嗅定,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件自娩,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)忙迁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進(jìn)店門脐彩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人姊扔,你說我怎么就攤上這事惠奸。” “怎么了恰梢?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵佛南,是天一觀的道長。 經(jīng)常有香客問我嵌言,道長嗅回,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任摧茴,我火速辦了婚禮绵载,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘苛白。我一直安慰自己娃豹,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布丸氛。 她就那樣靜靜地躺著培愁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪缓窜。 梳的紋絲不亂的頭發(fā)上定续,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天,我揣著相機(jī)與錄音禾锤,去河邊找鬼私股。 笑死,一個(gè)胖子當(dāng)著我的面吹牛恩掷,可吹牛的內(nèi)容都是我干的倡鲸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼黄娘,長吁一口氣:“原來是場噩夢啊……” “哼峭状!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起逼争,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤优床,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后胆敞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年匪燕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了帽驯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,144評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖牌借,靈堂內(nèi)的尸體忽然破棺而出磷籍,到底是詐尸還是另有隱情,我是刑警寧澤比然,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布湾笛,位于F島的核電站迄本,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏于樟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一传黄、第九天 我趴在偏房一處隱蔽的房頂上張望膘掰。 院中可真熱鬧识埋,春花似錦系忙、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至冈欢,卻和暖如春太示,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背餐弱。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人尉剩。 一個(gè)月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像理茎,于是被迫代替她去往敵國和親黑界。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,925評論 2 11
  • 1.RabbitMQ概述 簡介: MQ全稱為Message Queue皂林,消息隊(duì)列是應(yīng)用程序和應(yīng)用程序之間的通信方法...
    梁朋舉閱讀 49,713評論 0 47
  • 關(guān)于消息隊(duì)列朗鸠,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了础倍,但一直沒騰出空烛占,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 584,778評論 51 786
  • 關(guān)于消息隊(duì)列沟启,從前年開始斷斷續(xù)續(xù)看了些資料忆家,想寫很久了,但一直沒騰出空德迹,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型芽卿,是時(shí)...
    Johnson_zx閱讀 1,109評論 0 5
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料胳搞,想寫很久了卸例,但一直沒騰出空称杨,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    Java機(jī)械師閱讀 548評論 0 2