藍(lán)色的框:指的是生產(chǎn)者將消息投遞到EXchange上,然后根據(jù)routingkey路由到指定隊(duì)列上
綠色框:消費(fèi)者監(jiān)聽隊(duì)列竿痰,然后接受消息脆粥。
黃色框:消息到達(dá)了exchange是路由到哪個(gè)隊(duì)列,要根據(jù)routingkey而定菇曲。
下面講解Exchange
Exchange的屬性:(大致有個(gè)印象就OK冠绢,繼續(xù)往下看)
1.Name:交換機(jī)名稱;
2.Type:交換機(jī)類型 direct,topic,fanout,headers;
3.Durability:是否需要持久化,true為持久化,代表交換機(jī)在服務(wù)器重啟后是否還存在常潮;
4.Auto Delete :當(dāng)最后一個(gè)綁定到exchange上的隊(duì)列刪除后弟胀,自動(dòng)刪除該exchange.
5.Internal:當(dāng)前的exchange是否用于rabbitmq內(nèi)部使用,默認(rèn)為false.
6.Arguments:擴(kuò)展參數(shù)喊式,用于擴(kuò)展AMQP協(xié)議自制化孵户。
Exchange類型以及講解
1.Direct exchange
生產(chǎn)者代碼:
public class Producer4DirectExchange {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 創(chuàng)建Connection
Connection connection = connectionFactory.newConnection();
//3 創(chuàng)建Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
//5 發(fā)送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
消費(fèi)者的代碼:
此時(shí)需要channel發(fā)送消息指定的routingkey和綁定exchange和隊(duì)列時(shí)候的routingkey相同,直接路由到這些隊(duì)列上岔留。
如果不指定exchangeType,那么就是default Exchange,此時(shí)不需要將隊(duì)列綁定到exchange.但是Routekey需要完全匹配夏哭。
public class Consumer4DirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示聲明了一個(gè)交換機(jī)
//參數(shù)說明:Exchange的屬性:
//1.Name:交換機(jī)名稱;
//2.Type:交換機(jī)類型 direct,topic,fanout,headers;
//3.Durability:是否需要持久化,true為持久化献联;
//4.Auto Delete :當(dāng)最后一個(gè)綁定到exchange上的隊(duì)列刪除后竖配,自動(dòng)刪除該exchange.
//5.Internal:當(dāng)前的exchange是否用于rabbitmq內(nèi)部使用,默認(rèn)為false.
//Arguments:擴(kuò)展參數(shù)里逆,用于擴(kuò)展AMQP協(xié)議自制化
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示聲明了一個(gè)隊(duì)列
//參數(shù)說明:
//1.消息隊(duì)列:實(shí)際存儲(chǔ)消息數(shù)據(jù)
//2.Durability:是否持久化进胯,3.auto_delete:如果選yes,代表最后一個(gè)監(jiān)聽被移除之后原押,該隊(duì)列會(huì)自動(dòng)被刪除胁镐。
channel.queueDeclare(queueName, false, false, false, null);
//建立一個(gè)綁定關(guān)系:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數(shù):隊(duì)列名稱、是否自動(dòng)ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環(huán)獲取消息
while(true){
//獲取消息盯漂,如果沒有消息颇玷,這一步將會(huì)一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
2.topic exchange
此方式是算是routingkey的通配符匹配模式,兩張圖片說明問題
符號(hào)“#” 匹配一個(gè)或多個(gè)詞
符號(hào)“”匹配不多不少一個(gè)詞
例如:“l(fā)og.#”能夠匹配到“l(fā)og.info.oa”
"log."只會(huì)匹配到“l(fā)og.erro”
如下圖
生產(chǎn)者代碼(用了三種routingkey發(fā)送)
public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 創(chuàng)建Connection
Connection connection = connectionFactory.newConnection();
//3 創(chuàng)建Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 發(fā)送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
消費(fèi)者代碼:
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.*";
// 1 聲明交換機(jī)
channel.exchangeDeclare(exchangeName, exch angeType, true, false, false, null);
// 2 聲明隊(duì)列
channel.queueDeclare(queueName, false, false, false, null);
// 3 建立交換機(jī)和隊(duì)列的綁定關(guān)系:
//綁定關(guān)系中指定routingkey
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數(shù):隊(duì)列名稱就缆、是否自動(dòng)ACK帖渠、Consumer
channel.basicConsume(queueName, true, consumer);
//循環(huán)獲取消息
while(true){
//獲取消息,如果沒有消息违崇,這一步將會(huì)一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
3.Fanout Exchange(不需要routingkey征绎,只需要綁定)
特點(diǎn)是:
1.不處理路由鍵匿值,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上
2.發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上
3.Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的
生產(chǎn)者代碼:
public class Producer4FanoutExchange {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 創(chuàng)建Connection
Connection connection = connectionFactory.newConnection();
//3 創(chuàng)建Channel
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_fanout_exchange";
//5 發(fā)送
for(int i = 0; i < 10; i ++) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
channel.close();
connection.close();
}
}
消費(fèi)者代碼:
public class Consumer4FanoutExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 聲明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; //不設(shè)置路由鍵
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數(shù):隊(duì)列名稱、是否自動(dòng)ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循環(huán)獲取消息
while(true){
//獲取消息撒妈,如果沒有消息指孤,這一步將會(huì)一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}