簡(jiǎn)單模式(Hello World)
工作類別模式(工作人數(shù))
訂閱模式(發(fā)布/訂閱)
路由模式(Routing)
主題模式(Topics)
遠(yuǎn)程過程調(diào)用(RPC)
發(fā)布者確認(rèn)(發(fā)布者確認(rèn))
代碼演示
簡(jiǎn)單模式
發(fā)布訂閱模式
路由模式
主題模式
外形開關(guān)介紹
總結(jié)
七種模式介紹與應(yīng)用場(chǎng)景
簡(jiǎn)單模式(Hello World)
做最簡(jiǎn)單的事情持痰,一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,RabbitMQ相當(dāng)于一個(gè)消息代理祟蚀,負(fù)責(zé)將A的消息轉(zhuǎn)發(fā)給B
應(yīng)用場(chǎng)景:將發(fā)送的電子郵件放到消息正文工窍,然后郵件服務(wù)在其中中獲取郵件并發(fā)送給收件人
工作類別模式(工作隊(duì)列)
在多個(gè)消費(fèi)者之間分配任務(wù)(競(jìng)爭(zhēng)的消費(fèi)者模式),一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者前酿,一般適用于執(zhí)行資源密集型任務(wù)患雏,單獨(dú)的消費(fèi)者處理不過來,需要多個(gè)消費(fèi)者進(jìn)行處理
應(yīng)用場(chǎng)景:一個(gè)訂單的處理需要10s罢维,有多個(gè)訂單可以同時(shí)放到消息序列淹仑,然后讓多個(gè)消費(fèi)者同時(shí)處理,這樣就是并行了肺孵,而不是單個(gè)消費(fèi)者的串行情況
訂閱模式(發(fā)布/訂閱)
一次向多個(gè)消費(fèi)者發(fā)送消息匀借,一個(gè)生產(chǎn)者發(fā)送的消息會(huì)被多個(gè)消費(fèi)者獲取,也就是將消息將廣播到所有的消費(fèi)者中平窘。
應(yīng)用場(chǎng)景:更新商品庫(kù)存后需要通知多個(gè)緩存和多個(gè)數(shù)據(jù)庫(kù)吓肋,這里的結(jié)構(gòu)應(yīng)該是:
一個(gè)fanout類型交換機(jī)扇出兩個(gè)個(gè)消息一體機(jī),分別為緩存消息類型瑰艘,數(shù)據(jù)庫(kù)消息等級(jí)
一個(gè)緩存消息對(duì)應(yīng)對(duì)應(yīng)著多個(gè)緩存消費(fèi)者
一個(gè)數(shù)據(jù)庫(kù)消息對(duì)應(yīng)對(duì)應(yīng)著多個(gè)數(shù)據(jù)庫(kù)消費(fèi)者
路由模式(Routing)
有選擇地(路由密鑰)接收消息是鬼,發(fā)送消息到交換機(jī)和要指定路由密鑰,消費(fèi)者將綁定綁定到交換機(jī)時(shí)需要指定路由密鑰磅叛,僅消費(fèi)指定路由密鑰的消息
應(yīng)用場(chǎng)景:如在商品庫(kù)存中增加了1個(gè)iphone12,iphone12促銷活動(dòng)消費(fèi)者指定路由鍵為iphone12萨赁,只有此促銷活動(dòng)會(huì)接收到消息弊琴,其他促銷活動(dòng)不關(guān)心也不會(huì)消費(fèi)此路由鍵的消息
主題模式(Topics)
根據(jù)主題(Topics)來接收消息,將路由密鑰和某模式進(jìn)行匹配杖爽,此時(shí)需要綁定在一個(gè)模式上敲董,#匹配一個(gè)詞或多個(gè)詞,*只匹配一個(gè)詞慰安。
應(yīng)用場(chǎng)景:同上腋寨,iphone促銷活動(dòng)可以接收主題為iphone的消息,如iphone12化焕,iphone13等
遠(yuǎn)程過程調(diào)用(RPC)
如果我們需要在遠(yuǎn)程計(jì)算機(jī)上運(yùn)行功能并等待結(jié)果就可以使用RPC萄窜,具體流程可以看圖。應(yīng)用場(chǎng)景:需要等待接口返回?cái)?shù)據(jù),如訂單支付
發(fā)布者確認(rèn)(發(fā)布者確認(rèn))
與發(fā)布者進(jìn)行可靠的發(fā)布確認(rèn)查刻,發(fā)布者確認(rèn)是RabbitMQ擴(kuò)展键兜,可以實(shí)現(xiàn)可靠的發(fā)布。在通道上啟用發(fā)布者確認(rèn)后穗泵,Rabbit MQ將確認(rèn)發(fā)送者發(fā)布的消息普气,這意味著已在服務(wù)器端處理。
應(yīng)用場(chǎng)景:對(duì)于消息可靠性要求較高佃延,某些錢包扣款
代碼演示,代碼中沒有對(duì)后面兩種模式演示现诀,有興趣可以自己研究
簡(jiǎn)單模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
? ? private final static String QUEUE_NAME = "simple_queue";
? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? // 聲明隊(duì)列
? ? ? ? // queue:隊(duì)列名
? ? ? ? // durable:是否持久化
? ? ? ? // exclusive:是否排外? 即只允許該channel訪問該隊(duì)列? 一般等于true的話用于一個(gè)隊(duì)列只能有一個(gè)消費(fèi)者來消費(fèi)的場(chǎng)景
? ? ? ? // autoDelete:是否自動(dòng)刪除? 消費(fèi)完刪除
? ? ? ? // arguments:其他屬性
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? //消息內(nèi)容
? ? ? ? String message = "simplest mode message";
? ? ? ? channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
? ? ? ? System.out.println("[x]Sent '" + message + "'");
? ? ? ? //最后關(guān)閉通關(guān)和連接
? ? ? ? channel.close();
? ? ? ? connection.close();
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver {
? ? private final static String QUEUE_NAME = "simplest_queue";
? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
? ? ? ? // 獲取連接
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received '" +
? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
? ? ? ? };
? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
工作模式模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver1 {
? ? private final static String QUEUE_NAME = "queue_work";
? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? // 同一時(shí)刻服務(wù)器只會(huì)發(fā)送一條消息給消費(fèi)者
? ? ? ? channel.basicQos(1);
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received '" +
? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
? ? ? ? };
? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver2 {
? ? private final static String QUEUE_NAME = "queue_work";
? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? // 同一時(shí)刻服務(wù)器只會(huì)發(fā)送一條消息給消費(fèi)者
? ? ? ? channel.basicQos(1);
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received '" +
? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
? ? ? ? };
? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
? ? private final static String QUEUE_NAME = "queue_work";
? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? // 聲明隊(duì)列
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? for (int i = 0; i < 100; i++) {
? ? ? ? ? ? String message = "work mode message" + i;
? ? ? ? ? ? channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
? ? ? ? ? ? System.out.println("[x] Sent '" + message + "'");
? ? ? ? ? ? Thread.sleep(i * 10);
? ? ? ? }
? ? ? ? channel.close();
? ? ? ? connection.close();
? ? }
}
發(fā)布訂閱模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Receive1 {
? ? private static final String EXCHANGE_NAME = "logs";
? ? public static void main(String[] argv) throws Exception {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
? ? ? ? String queueName = channel.queueDeclare().getQueue();
? ? ? ? channel.queueBind(queueName, EXCHANGE_NAME, "");
? ? ? ? System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
? ? ? ? // 訂閱消息的回調(diào)函數(shù)
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received '" + message + "'");
? ? ? ? };
? ? ? ? // 消費(fèi)者,有消息時(shí)出發(fā)訂閱回調(diào)函數(shù)
? ? ? ? channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Receive2 {
? ? private static final String EXCHANGE_NAME = "logs";
? ? public static void main(String[] argv) throws Exception {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
? ? ? ? String queueName = channel.queueDeclare().getQueue();
? ? ? ? channel.queueBind(queueName, EXCHANGE_NAME, "");
? ? ? ? System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
? ? ? ? // 訂閱消息的回調(diào)函數(shù)
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received2 '" + message + "'");
? ? ? ? };
? ? ? ? // 消費(fèi)者履肃,有消息時(shí)出發(fā)訂閱回調(diào)函數(shù)
? ? ? ? channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
? ? private static final String EXCHANGE_NAME = "logs";
? ? public static void main(String[] argv) throws Exception {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
? ? ? ? String message = "publish subscribe message";
? ? ? ? channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
? ? ? ? System.out.println(" [x] Sent '" + message + "'");
? ? ? ? channel.close();
? ? ? ? connection.close();
? ? }
}
路由模式
路由模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver1 {
? ? private final static String QUEUE_NAME = "queue_routing";
? ? private final static String EXCHANGE_NAME = "exchange_direct";
? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? // 指定路由的key仔沿,接收key和key2
? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");
? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
? ? ? ? channel.basicQos(1);
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received '" +
? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
? ? ? ? };
? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver2 {
? ? private final static String QUEUE_NAME = "queue_routing2";
? ? private final static String EXCHANGE_NAME = "exchange_direct";
? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? // 僅接收key2
? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
? ? ? ? channel.basicQos(1);
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received '" +
? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
? ? ? ? };
? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
? ? private final static String EXCHANGE_NAME = "exchange_direct";
? ? private final static String EXCHANGE_TYPE = "direct";
? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? // 交換機(jī)聲明
? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
? ? ? ? // 只有routingKey相同的才會(huì)消費(fèi)
? ? ? ? String message = "routing mode message";
? ? ? ? channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());
? ? ? ? System.out.println("[x] Sent '" + message + "'");
//? ? ? ? channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes());
//? ? ? ? System.out.println("[x] Sent '" + message + "'");
? ? ? ? channel.close();
? ? ? ? connection.close();
? ? }
}
主題模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver1 {
? ? private final static String QUEUE_NAME = "queue_topic";
? ? private final static String EXCHANGE_NAME = "exchange_topic";
? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? // 可以接收key.1
? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
? ? ? ? channel.basicQos(1);
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received '" +
? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
? ? ? ? };
? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver2 {
? ? private final static String QUEUE_NAME = "queue_topic2";
? ? private final static String EXCHANGE_NAME = "exchange_topic";
? ? private final static String EXCHANGE_TYPE = "topic";
? ? public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.queueDeclare(QUEUE_NAME, false, false, false, null);
? ? ? ? // *號(hào)代表單個(gè)單詞,可以接收key.1
? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
? ? ? ? // #號(hào)代表多個(gè)單詞榆浓,可以接收key.1.2
? ? ? ? channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.#");
? ? ? ? channel.basicQos(1);
? ? ? ? DeliverCallback deliverCallback = (consumerTag, delivery) -> {
? ? ? ? ? ? String message = new String(delivery.getBody(), "UTF-8");
? ? ? ? ? ? System.out.println(" [x] Received '" +
? ? ? ? ? ? ? ? ? ? delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
? ? ? ? };
? ? ? ? channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
? ? ? ? });
? ? }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
? ? private final static String EXCHANGE_NAME = "exchange_topic";
? ? private final static String EXCHANGE_TYPE = "topic";
? ? public static void main(String[] args) throws IOException, TimeoutException {
? ? ? ? ConnectionFactory factory = new ConnectionFactory();
? ? ? ? factory.setHost("localhost");
? ? ? ? factory.setPort(5672);
? ? ? ? Connection connection = factory.newConnection();
? ? ? ? Channel channel = connection.createChannel();
? ? ? ? channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
? ? ? ? String message = "topics model message with key.1";
? ? ? ? channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
? ? ? ? System.out.println("[x] Sent '" + message + "'");
? ? ? ? String message2 = "topics model message with key.1.2";
? ? ? ? channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message2.getBytes());
? ? ? ? System.out.println("[x] Sent '" + message2 + "'");
? ? ? ? channel.close();
? ? ? ? connection.close();
? ? }
}
四種交換機(jī)介紹
直連交換機(jī)(Direct exchange):具有路由功能的交換機(jī)于未,綁定到此交換機(jī)的時(shí)候需要指定一個(gè)routing_key,交換機(jī)發(fā)送消息的時(shí)候需要routing_key陡鹃,會(huì)將消息發(fā)送道對(duì)應(yīng)的隊(duì)列
扇形交換機(jī)(Fanout exchange):廣播消息到所有隊(duì)列烘浦,沒有任何處理,速度最快
主題交換機(jī)(Topic exchange):在直連交換機(jī)基礎(chǔ)上增加模式匹配萍鲸,也就是對(duì)routing_key進(jìn)行模式匹配党觅,*代表一個(gè)單詞,#代表多個(gè)單詞
首部交換機(jī)(Headers exchange):忽略routing_key倒彰,使用Headers信息(一個(gè)Hash的數(shù)據(jù)結(jié)構(gòu))進(jìn)行匹配责嚷,優(yōu)勢(shì)在于可以有更多更靈活的匹配規(guī)則
總結(jié)
這么多種隊(duì)列模式中都有其應(yīng)用場(chǎng)景,大家可以根據(jù)應(yīng)用場(chǎng)景示例中進(jìn)行選擇
轉(zhuǎn)自:https://mp.weixin.qq.com/s/dyIjEfeNVFlYqXOWUmDACQ