RabbitMQ 消息模式總結(jié)

1炕倘、聲明隊(duì)列

result = channel.queue_declare(queue='', exclusive=True)

2、聲明exchange

exchange_type 類型: direct, topic, headers fanout
路由模式使用:direct 準(zhǔn)確匹配route_key路由
發(fā)布訂閱使用:fanout 綁定的所有隊(duì)列路由
topic模式使用:topic *窥摄,#模糊配置到的隊(duì)列路由

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

4俺叭、綁定消息隊(duì)列:將隊(duì)列與exchange和route綁定,發(fā)送消息時(shí)通過兩者路由到對(duì)應(yīng)的隊(duì)列

注意:
1畦娄、當(dāng)exchange為空時(shí)谐算,直接路由隊(duì)列
2熟尉、當(dāng)exchange為direct時(shí),準(zhǔn)確匹配對(duì)應(yīng)隊(duì)列路由隊(duì)列
3洲脂、當(dāng)exchange為fanout時(shí)斤儿,路由綁定的所有隊(duì)列
4、當(dāng)exchange為topic 時(shí)恐锦, *往果、#模糊配置到的隊(duì)列路由

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

5、發(fā)送消息:通過exchange一铅,轉(zhuǎn)route到對(duì)應(yīng)的queue陕贮,若exchage未傳將route作為隊(duì)列名發(fā)送

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

6、消費(fèi)消息:消費(fèi)消息只針對(duì)相應(yīng)的隊(duì)列潘飘,主要控制發(fā)送的exchange和route

hannel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

7肮之、rpc模式

rpc模式
(1) RPCServer
import com.rabbitmq.client.*;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

(2)RPCClient
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末掉缺,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子戈擒,更是在濱河造成了極大的恐慌攀圈,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,013評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件峦甩,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡现喳,警方通過查閱死者的電腦和手機(jī)凯傲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嗦篱,“玉大人冰单,你說我怎么就攤上這事【拇伲” “怎么了诫欠?”我有些...
    開封第一講書人閱讀 152,370評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)浴栽。 經(jīng)常有香客問我荒叼,道長(zhǎng),這世上最難降的妖魔是什么典鸡? 我笑而不...
    開封第一講書人閱讀 55,168評(píng)論 1 278
  • 正文 為了忘掉前任被廓,我火速辦了婚禮,結(jié)果婚禮上萝玷,老公的妹妹穿的比我還像新娘嫁乘。我一直安慰自己,他們只是感情好球碉,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,153評(píng)論 5 371
  • 文/花漫 我一把揭開白布蜓斧。 她就那樣靜靜地躺著,像睡著了一般睁冬。 火紅的嫁衣襯著肌膚如雪挎春。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,954評(píng)論 1 283
  • 那天豆拨,我揣著相機(jī)與錄音搂蜓,去河邊找鬼。 笑死辽装,一個(gè)胖子當(dāng)著我的面吹牛帮碰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播拾积,決...
    沈念sama閱讀 38,271評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼殉挽,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼丰涉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起斯碌,我...
    開封第一講書人閱讀 36,916評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤一死,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后傻唾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體投慈,經(jīng)...
    沈念sama閱讀 43,382評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,877評(píng)論 2 323
  • 正文 我和宋清朗相戀三年冠骄,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伪煤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 37,989評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡凛辣,死狀恐怖抱既,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情扁誓,我是刑警寧澤防泵,帶...
    沈念sama閱讀 33,624評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站蝗敢,受9級(jí)特大地震影響捷泞,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜寿谴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,209評(píng)論 3 307
  • 文/蒙蒙 一肚邢、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拭卿,春花似錦骡湖、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,199評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至惠桃,卻和暖如春浦夷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辜王。 一陣腳步聲響...
    開封第一講書人閱讀 31,418評(píng)論 1 260
  • 我被黑心中介騙來泰國(guó)打工劈狐, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人呐馆。 一個(gè)月前我還...
    沈念sama閱讀 45,401評(píng)論 2 352
  • 正文 我出身青樓肥缔,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親汹来。 傳聞我的和親對(duì)象是個(gè)殘疾皇子续膳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,700評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容