Exchanges
在前面的學(xué)習(xí)中其實已經(jīng)接觸了exchanges,通常情況下回还,producer 不需要知道發(fā)送消息給哪一個queue,只需要發(fā)送messages給exchange就足以,exchange一端接受producers的messages,另一端push message到queues错忱。為了準(zhǔn)確的處理messages倘待,定義了exchange type:direct, topic, headers ,fanout。
Fanout Exchange
fanout exchange將messages廣播到到已經(jīng)綁定了的queues中膝昆。每個queue都會收到producer發(fā)送的信息丸边。
fanout exchange代碼:
producer:
static void Main(string[] args)
{
//參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************fanout producer***************");
Console.WriteLine("please Input send message:");
//連接到RabbitMQ
var factory = new RabbitMQ.Client.ConnectionFactory();
//第一種方式
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//第二種方式
//factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
//產(chǎn)生一個連接對象
using (var conncetion = factory.CreateConnection())
{
//通過conncetion產(chǎn)生一個連接通道
using (var channel = conncetion.CreateModel())
{
//用代碼實現(xiàn) exchanges和Queues
//定義exchanges
string exchangeName = "Efanout_test";
//設(shè)置類型 Fanout
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
//定義Queues
string queueName1 = "qfanout_test1";
string queueName2 = "qfanout_test2";
bool durable = true;//設(shè)RabbitMQ置持久化
channel.QueueDeclare(queueName1, durable, false, false, null);
channel.QueueDeclare(queueName2, durable, false, false, null);
//綁定 queue 與exchange
channel.QueueBind(queueName1, exchangeName, "", null);
channel.QueueBind(queueName2, exchangeName, "", null);
for (int i = 0; i < 20; i++)
{
string message = i.ToString();
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "Efanout_test", routingKey: "", mandatory: false, basicProperties: properties, body: body);
Console.WriteLine("[producer] send : {0}", message);
Thread.Sleep(1000);
}
}
}
Console.ReadLine();
}
接受端代碼:
static void Main(string[] args)
{
//參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************fanout c1***************");
//連接MQ
var factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//產(chǎn)生連接對象
using (var connection = factory.CreateConnection())
{
//通道
using (var channel = connection.CreateModel())
{
//公平調(diào)用
//channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
//訂閱方式獲取message
var consumer = new EventingBasicConsumer(channel);
//實現(xiàn)獲取message處理事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[qfanout_test1] received : {0}", message);
//手動設(shè)置回復(fù)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//設(shè)置手動回復(fù)認(rèn)證 接受隊列名稱
channel.BasicConsume(queue: "qfanout_test1", autoAck: false, consumer: consumer);
//另一個的參數(shù)
//channel.BasicConsume(queue: "qfanout_test2", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
web管理界面綁定關(guān)系:
運(yùn)行結(jié)果:
Direct exchange
前面的fanout 類型中,只要exchange與queue綁定荚孵,message 發(fā)送給所有與exchange有綁定關(guān)系的queue中妹窖,但是有時候不是我們只希望傳遞message到某些queue中時就需要用到direct exchange,在QueueBind時添加routing key來實現(xiàn)收叶。
代碼實現(xiàn):
Direct_P:
static void Main(string[] args)
{
//參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************Direct P***************");
Console.WriteLine("please Input send message:");
//連接到RabbitMQ
var factory = new RabbitMQ.Client.ConnectionFactory();
//第一種方式
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//第二種方式
//factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
//產(chǎn)生一個連接對象
using (var conncetion = factory.CreateConnection())
{
//通過conncetion產(chǎn)生一個連接通道
using (var channel = conncetion.CreateModel())
{
//用代碼實現(xiàn) exchanges和Queues
//定義exchanges
string exchangeName = "EDirect_test";
//設(shè)置類型 Fanout
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
//定義Queues
string queueName1 = "qDirect_test1";
string queueName2 = "qDirect_test2";
bool durable = true;//設(shè)RabbitMQ置持久化
channel.QueueDeclare(queueName1, durable, false, false, null);
channel.QueueDeclare(queueName2, durable, false, false, null);
//綁定 queue 與exchange
//routingkey info waring error
channel.QueueBind(queueName1, exchangeName, "info", null);
channel.QueueBind(queueName2, exchangeName, "error", null);
channel.QueueBind(queueName2, exchangeName, "waring", null);
for (int i = 0; i < 20; i++)
{
string message = null;
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
string routingkey = null;
if (i%3 == 0)
{
message = "error";
routingkey = "error";
}
else if (i%3 == 1)
{
message = "waring";
routingkey = "waring";
}
else
{
message = "info";
routingkey = "info";
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish( "EDirect_test", routingkey, false, properties, body);
Console.WriteLine("[Direct P] send : {0}", message);
Thread.Sleep(1000);
}
}
}
Console.ReadLine();
}
Direct_C:
static void Main(string[] args)
{
//參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************Direct_C2***************");
//連接MQ
var factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//產(chǎn)生連接對象
using (var connection = factory.CreateConnection())
{
//通道
using (var channel = connection.CreateModel())
{
//公平調(diào)用
//channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
//訂閱方式獲取message
var consumer = new EventingBasicConsumer(channel);
//實現(xiàn)獲取message處理事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine("[Direct_C2] received : {0}--routingkey {1}", message,routingKey);
//手動設(shè)置回復(fù)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//設(shè)置手動回復(fù)認(rèn)證
channel.BasicConsume(queue: "qDirect_test2", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
代碼實現(xiàn)的結(jié)構(gòu)如圖:
運(yùn)行結(jié)果:
Web后臺綁定關(guān)系:
Topic exchange
簡單的理解Topic exchange是在Direct exchange上的擴(kuò)展骄呼,routing_key不局限于完全匹配,而是像一種正則的樣子去匹配判没。不過只有兩個特殊通配符:
* 號用來匹配一個單詞蜓萄,比如"quick.orange.rabbit" 就可以用*.orange. 匹配到*
#號用來匹配0到多個單詞,比如“l(fā)azy.orange.male.rabbit”可以用lazy.# 匹配
ETopic_P.cs
static void Main(string[] args)
{
//參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************ETopic P***************");
Console.WriteLine("please Input send message:");
//連接到RabbitMQ
var factory = new RabbitMQ.Client.ConnectionFactory();
//第一種方式
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//第二種方式
//factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
//產(chǎn)生一個連接對象
using (var conncetion = factory.CreateConnection())
{
//通過conncetion產(chǎn)生一個連接通道
using (var channel = conncetion.CreateModel())
{
//用代碼實現(xiàn) exchanges和Queues
//定義exchanges
string exchangeName = "ETopic_test";
//設(shè)置類型 Fanout
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
//定義Queues
string queueName1 = "qTopic_test1";
string queueName2 = "qTopic_test2";
string queueName3 = "qTopic_test3";
bool durable = true;//設(shè)RabbitMQ置持久化
channel.QueueDeclare(queueName1, durable, false, false, null);
channel.QueueDeclare(queueName2, durable, false, false, null);
channel.QueueDeclare(queueName3, durable, false, false, null);
//綁定 queue 與exchange
//routingkey info waring error
channel.QueueBind(queueName1, exchangeName, "log.#", null);
channel.QueueBind(queueName2, exchangeName, "*.error", null);
channel.QueueBind(queueName3, exchangeName, "*.waring", null);
for (int i = 0; i < 20; i++)
{
string message = null;
//var properties = channel.CreateBasicProperties();
//properties.Persistent = true;
string routingkey = null;
if (i % 3 == 0)
{
message = "error";
routingkey = "log.error";
}
else if (i % 3 == 1)
{
message = "waring";
routingkey = "log.waring";
}
else
{
message = "info";
routingkey = "log.waring.error";
}
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("ETopic_test", routingkey, false, null, body);
Console.WriteLine("[ETopic P] send : {0}", message);
Thread.Sleep(1000);
}
}
}
Console.ReadLine();
}
ETopic_C.cs
static void Main(string[] args)
{
//參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************Topic_C3***************");
//連接MQ
var factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//產(chǎn)生連接對象
using (var connection = factory.CreateConnection())
{
//通道
using (var channel = connection.CreateModel())
{
//公平調(diào)用
//channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
//訂閱方式獲取message
var consumer = new EventingBasicConsumer(channel);
//實現(xiàn)獲取message處理事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine("[Topic_C3] received : {0}--routingkey {1}", message, routingKey);
//手動設(shè)置回復(fù)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//設(shè)置手動回復(fù)認(rèn)證
channel.BasicConsume(queue: "qTopic_test3", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}