字節(jié)一面:RabbitMq有幾種工作模式跷车?額,不知道

簡(jiǎn)單模式(Hello World)

工作類別模式(工作人數(shù))

訂閱模式(發(fā)布/訂閱)

路由模式(Routing)

主題模式(Topics)

遠(yuǎn)程過程調(diào)用(RPC)

發(fā)布者確認(rèn)(發(fā)布者確認(rèn))

代碼演示

簡(jiǎn)單模式

發(fā)布訂閱模式

路由模式

主題模式

外形開關(guān)介紹

總結(jié)

七種模式介紹與應(yīng)用場(chǎng)景

簡(jiǎn)單模式(Hello World)


做最簡(jiǎn)單的事情持痰,一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,RabbitMQ相當(dāng)于一個(gè)消息代理祟蚀,負(fù)責(zé)將A的消息轉(zhuǎn)發(fā)給B

應(yīng)用場(chǎng)景:將發(fā)送的電子郵件放到消息正文工窍,然后郵件服務(wù)在其中中獲取郵件并發(fā)送給收件人

工作類別模式(工作隊(duì)列)


在多個(gè)消費(fèi)者之間分配任務(wù)(競(jìng)爭(zhēng)的消費(fèi)者模式),一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者前酿,一般適用于執(zhí)行資源密集型任務(wù)患雏,單獨(dú)的消費(fèi)者處理不過來,需要多個(gè)消費(fèi)者進(jìn)行處理

應(yīng)用場(chǎng)景:一個(gè)訂單的處理需要10s罢维,有多個(gè)訂單可以同時(shí)放到消息序列淹仑,然后讓多個(gè)消費(fèi)者同時(shí)處理,這樣就是并行了肺孵,而不是單個(gè)消費(fèi)者的串行情況

訂閱模式(發(fā)布/訂閱)


一次向多個(gè)消費(fèi)者發(fā)送消息匀借,一個(gè)生產(chǎn)者發(fā)送的消息會(huì)被多個(gè)消費(fèi)者獲取,也就是將消息將廣播到所有的消費(fèi)者中平窘。

應(yīng)用場(chǎng)景:更新商品庫(kù)存后需要通知多個(gè)緩存和多個(gè)數(shù)據(jù)庫(kù)吓肋,這里的結(jié)構(gòu)應(yīng)該是:

一個(gè)fanout類型交換機(jī)扇出兩個(gè)個(gè)消息一體機(jī),分別為緩存消息類型瑰艘,數(shù)據(jù)庫(kù)消息等級(jí)

一個(gè)緩存消息對(duì)應(yīng)對(duì)應(yīng)著多個(gè)緩存消費(fèi)者

一個(gè)數(shù)據(jù)庫(kù)消息對(duì)應(yīng)對(duì)應(yīng)著多個(gè)數(shù)據(jù)庫(kù)消費(fèi)者

路由模式(Routing)


有選擇地(路由密鑰)接收消息是鬼,發(fā)送消息到交換機(jī)和要指定路由密鑰,消費(fèi)者將綁定綁定到交換機(jī)時(shí)需要指定路由密鑰磅叛,僅消費(fèi)指定路由密鑰的消息

應(yīng)用場(chǎng)景:如在商品庫(kù)存中增加了1個(gè)iphone12,iphone12促銷活動(dòng)消費(fèi)者指定路由鍵為iphone12萨赁,只有此促銷活動(dòng)會(huì)接收到消息弊琴,其他促銷活動(dòng)不關(guān)心也不會(huì)消費(fèi)此路由鍵的消息

主題模式(Topics)


根據(jù)主題(Topics)來接收消息,將路由密鑰和某模式進(jìn)行匹配杖爽,此時(shí)需要綁定在一個(gè)模式上敲董,#匹配一個(gè)詞或多個(gè)詞,*只匹配一個(gè)詞慰安。

應(yīng)用場(chǎng)景:同上腋寨,iphone促銷活動(dòng)可以接收主題為iphone的消息,如iphone12化焕,iphone13等

遠(yuǎn)程過程調(diào)用(RPC)


如果我們需要在遠(yuǎn)程計(jì)算機(jī)上運(yùn)行功能并等待結(jié)果就可以使用RPC萄窜,具體流程可以看圖。應(yīng)用場(chǎng)景:需要等待接口返回?cái)?shù)據(jù),如訂單支付

發(fā)布者確認(rèn)(發(fā)布者確認(rèn))

與發(fā)布者進(jìn)行可靠的發(fā)布確認(rèn)查刻,發(fā)布者確認(rèn)是RabbitMQ擴(kuò)展键兜,可以實(shí)現(xiàn)可靠的發(fā)布。在通道上啟用發(fā)布者確認(rèn)后穗泵,Rabbit MQ將確認(rèn)發(fā)送者發(fā)布的消息普气,這意味著已在服務(wù)器端處理。

應(yīng)用場(chǎng)景:對(duì)于消息可靠性要求較高佃延,某些錢包扣款

代碼演示,代碼中沒有對(duì)后面兩種模式演示现诀,有興趣可以自己研究

簡(jiǎn)單模式

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Sender {

? ? private final static String QUEUE_NAME = "simple_queue";

? ? public static void main(String[] args) throws IOException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? // 聲明隊(duì)列

? ? ? ? // queue:隊(duì)列名

? ? ? ? // durable:是否持久化

? ? ? ? // exclusive:是否排外? 即只允許該channel訪問該隊(duì)列? 一般等于true的話用于一個(gè)隊(duì)列只能有一個(gè)消費(fèi)者來消費(fèi)的場(chǎng)景

? ? ? ? // autoDelete:是否自動(dòng)刪除? 消費(fèi)完刪除

? ? ? ? // arguments:其他屬性

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? //消息內(nèi)容

? ? ? ? String message = "simplest mode message";

? ? ? ? channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

? ? ? ? System.out.println("[x]Sent '" + message + "'");

? ? ? ? //最后關(guān)閉通關(guān)和連接

? ? ? ? channel.close();

? ? ? ? connection.close();

? ? }

}


import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Receiver {

? ? private final static String QUEUE_NAME = "simplest_queue";

? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {

? ? ? ? // 獲取連接

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received '" +

? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

? ? ? ? };

? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

工作模式模式

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Receiver1 {

? ? private final static String QUEUE_NAME = "queue_work";

? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? // 同一時(shí)刻服務(wù)器只會(huì)發(fā)送一條消息給消費(fèi)者

? ? ? ? channel.basicQos(1);

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received '" +

? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

? ? ? ? };

? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Receiver2 {

? ? private final static String QUEUE_NAME = "queue_work";

? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? // 同一時(shí)刻服務(wù)器只會(huì)發(fā)送一條消息給消費(fèi)者

? ? ? ? channel.basicQos(1);

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received '" +

? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

? ? ? ? };

? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Sender {

? ? private final static String QUEUE_NAME = "queue_work";

? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? // 聲明隊(duì)列

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? for (int i = 0; i < 100; i++) {

? ? ? ? ? ? String message = "work mode message" + i;

? ? ? ? ? ? channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

? ? ? ? ? ? System.out.println("[x] Sent '" + message + "'");

? ? ? ? ? ? Thread.sleep(i * 10);

? ? ? ? }

? ? ? ? channel.close();

? ? ? ? connection.close();

? ? }

}

發(fā)布訂閱模式

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

public class Receive1 {

? ? private static final String EXCHANGE_NAME = "logs";

? ? public static void main(String[] argv) throws Exception {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

? ? ? ? String queueName = channel.queueDeclare().getQueue();

? ? ? ? channel.queueBind(queueName, EXCHANGE_NAME, "");

? ? ? ? System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

? ? ? ? // 訂閱消息的回調(diào)函數(shù)

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received '" + message + "'");

? ? ? ? };

? ? ? ? // 消費(fèi)者,有消息時(shí)出發(fā)訂閱回調(diào)函數(shù)

? ? ? ? channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

public class Receive2 {

? ? private static final String EXCHANGE_NAME = "logs";

? ? public static void main(String[] argv) throws Exception {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

? ? ? ? String queueName = channel.queueDeclare().getQueue();

? ? ? ? channel.queueBind(queueName, EXCHANGE_NAME, "");

? ? ? ? System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

? ? ? ? // 訂閱消息的回調(diào)函數(shù)

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received2 '" + message + "'");

? ? ? ? };

? ? ? ? // 消費(fèi)者履肃,有消息時(shí)出發(fā)訂閱回調(diào)函數(shù)

? ? ? ? channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Sender {

? ? private static final String EXCHANGE_NAME = "logs";

? ? public static void main(String[] argv) throws Exception {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

? ? ? ? String message = "publish subscribe message";

? ? ? ? channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

? ? ? ? System.out.println(" [x] Sent '" + message + "'");

? ? ? ? channel.close();

? ? ? ? connection.close();

? ? }

}

路由模式

路由模式

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Receiver1 {

? ? private final static String QUEUE_NAME = "queue_routing";

? ? private final static String EXCHANGE_NAME = "exchange_direct";

? ? public static void main(String[] args) throws IOException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? // 指定路由的key仔沿,接收key和key2

? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");

? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

? ? ? ? channel.basicQos(1);

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received '" +

? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

? ? ? ? };

? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Receiver2 {

? ? private final static String QUEUE_NAME = "queue_routing2";

? ? private final static String EXCHANGE_NAME = "exchange_direct";

? ? public static void main(String[] args) throws IOException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? // 僅接收key2

? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

? ? ? ? channel.basicQos(1);

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received '" +

? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

? ? ? ? };

? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Sender {

? ? private final static String EXCHANGE_NAME = "exchange_direct";

? ? private final static String EXCHANGE_TYPE = "direct";

? ? public static void main(String[] args) throws IOException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? // 交換機(jī)聲明

? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

? ? ? ? // 只有routingKey相同的才會(huì)消費(fèi)

? ? ? ? String message = "routing mode message";

? ? ? ? channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());

? ? ? ? System.out.println("[x] Sent '" + message + "'");

//? ? ? ? channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes());

//? ? ? ? System.out.println("[x] Sent '" + message + "'");

? ? ? ? channel.close();

? ? ? ? connection.close();

? ? }

}

主題模式

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Receiver1 {

? ? private final static String QUEUE_NAME = "queue_topic";

? ? private final static String EXCHANGE_NAME = "exchange_topic";

? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? // 可以接收key.1

? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

? ? ? ? channel.basicQos(1);

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received '" +

? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

? ? ? ? };

? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Receiver2 {

? ? private final static String QUEUE_NAME = "queue_topic2";

? ? private final static String EXCHANGE_NAME = "exchange_topic";

? ? private final static String EXCHANGE_TYPE = "topic";

? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);

? ? ? ? // *號(hào)代表單個(gè)單詞,可以接收key.1

? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

? ? ? ? // #號(hào)代表多個(gè)單詞榆浓,可以接收key.1.2

? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.#");

? ? ? ? channel.basicQos(1);

? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {

? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");

? ? ? ? ? ? System.out.println(" [x] Received '" +

? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");

? ? ? ? };

? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {

? ? ? ? });

? ? }

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Sender {

? ? private final static String EXCHANGE_NAME = "exchange_topic";

? ? private final static String EXCHANGE_TYPE = "topic";

? ? public static void main(String[] args) throws IOException, TimeoutException {

? ? ? ? ConnectionFactory factory = new ConnectionFactory();

? ? ? ? factory.setHost("localhost");

? ? ? ? factory.setPort(5672);

? ? ? ? Connection connection = factory.newConnection();

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

? ? ? ? String message = "topics model message with key.1";

? ? ? ? channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());

? ? ? ? System.out.println("[x] Sent '" + message + "'");

? ? ? ? String message2 = "topics model message with key.1.2";

? ? ? ? channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message2.getBytes());

? ? ? ? System.out.println("[x] Sent '" + message2 + "'");

? ? ? ? channel.close();

? ? ? ? connection.close();

? ? }

}


四種交換機(jī)介紹

直連交換機(jī)(Direct exchange):具有路由功能的交換機(jī)于未,綁定到此交換機(jī)的時(shí)候需要指定一個(gè)routing_key,交換機(jī)發(fā)送消息的時(shí)候需要routing_key陡鹃,會(huì)將消息發(fā)送道對(duì)應(yīng)的隊(duì)列

扇形交換機(jī)(Fanout exchange):廣播消息到所有隊(duì)列烘浦,沒有任何處理,速度最快

主題交換機(jī)(Topic exchange):在直連交換機(jī)基礎(chǔ)上增加模式匹配萍鲸,也就是對(duì)routing_key進(jìn)行模式匹配党觅,*代表一個(gè)單詞,#代表多個(gè)單詞

首部交換機(jī)(Headers exchange):忽略routing_key倒彰,使用Headers信息(一個(gè)Hash的數(shù)據(jù)結(jié)構(gòu))進(jìn)行匹配责嚷,優(yōu)勢(shì)在于可以有更多更靈活的匹配規(guī)則

總結(jié)

這么多種隊(duì)列模式中都有其應(yīng)用場(chǎng)景,大家可以根據(jù)應(yīng)用場(chǎng)景示例中進(jìn)行選擇

轉(zhuǎn)自:https://mp.weixin.qq.com/s/dyIjEfeNVFlYqXOWUmDACQ

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末嘿期,一起剝皮案震驚了整個(gè)濱河市品擎,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌备徐,老刑警劉巖萄传,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蜜猾,居然都是意外死亡秀菱,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門蹭睡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來衍菱,“玉大人,你說我怎么就攤上這事肩豁〖勾” “怎么了辫呻?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)洪规。 經(jīng)常有香客問我印屁,道長(zhǎng),這世上最難降的妖魔是什么斩例? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任雄人,我火速辦了婚禮,結(jié)果婚禮上念赶,老公的妹妹穿的比我還像新娘础钠。我一直安慰自己,他們只是感情好叉谜,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布旗吁。 她就那樣靜靜地躺著,像睡著了一般停局。 火紅的嫁衣襯著肌膚如雪很钓。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天董栽,我揣著相機(jī)與錄音码倦,去河邊找鬼。 笑死锭碳,一個(gè)胖子當(dāng)著我的面吹牛袁稽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播擒抛,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼推汽,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了歧沪?” 一聲冷哼從身側(cè)響起歹撒,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎诊胞,沒想到半個(gè)月后暖夭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡厢钧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年鳞尔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嬉橙。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片早直。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖市框,靈堂內(nèi)的尸體忽然破棺而出霞扬,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布喻圃,位于F島的核電站萤彩,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏斧拍。R本人自食惡果不足惜雀扶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望肆汹。 院中可真熱鬧愚墓,春花似錦、人聲如沸昂勉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽岗照。三九已至村象,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間攒至,已是汗流浹背厚者。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留嗓袱,地道東北人籍救。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像渠抹,于是被迫代替她去往敵國(guó)和親蝙昙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容