RabbitMQ可以做什么?
AMQP愁拭,即Advanced Message Queuing Protocol,高級消息隊列協(xié)議亏吝。是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn)岭埠,為面向消息的中間件設(shè)計。
消息中間件主要用于組件之間的解耦蔚鸥,消息的發(fā)送者無需知道消息使用者的存在惜论,反之亦然。
應(yīng)用場景
我們知道現(xiàn)在很多APP都可以推送止喷。在管理員選定內(nèi)容進行推送這個行為中馆类。
系統(tǒng)往往需要執(zhí)行倆個操作:推送消息;記錄是誰在什么時候推送的弹谁。
但是對于管理員來說乾巧,他只關(guān)心推送完成了沒有,而不關(guān)心是否產(chǎn)生了日志预愤。
傳統(tǒng)的做法有2種:
串行:推送消息卧抗,然后在記錄日志,在倆個操作都完成之后告訴管理員推送完成鳖粟。
并行:推送消息的同時記錄日志社裆,在倆個操作都完成之后告訴管理員推送完成。
OK向图。我們現(xiàn)在引入消息隊列泳秀。
引入消息隊列之后,我們只需要在推送榄攀,然后日志放入消息隊列中嗜傅。然后就可以告訴管理員消息推送完成。
而日志信息會存放在消息隊列之中檩赢。消息隊列的消費者會在系統(tǒng)不繁忙的時候進行處理吕嘀。
RabbitMQ術(shù)語
生產(chǎn)者:
消息發(fā)送者
消費者:
等待消息的程序
Queue:
隊列,存放消息的
Simple
RabbitMQ四種exchange
如果發(fā)生緊急情況,我們的服務(wù)器宕掉的話,消息隊列里的信息沒了怎么辦偶房?
消息持久化
RabbitMQ為我們提供了消息持久化的手段
首先是隊列持久化趁曼,然后在是消息持久化
如果消費者在消費當(dāng)前消息的時候,突然崩掉棕洋,那么這條消息還在消息隊列中嗎挡闰?還是已經(jīng)被消費掉了?
消息響應(yīng)機制
RabbitMQ為我們提供了消息響應(yīng)機制
啰里啰唆半天掰盘,下邊是代碼摄悯。
.net版本。
//生產(chǎn)者
var factory = new ConnectionFactory(){ HostName = "localhost", UserName = "guest", Password = "" };
using (IConnection conn = factory.CreateConnection())
{
using (IModel im = conn.CreateModel())
{
im.ExchangeDeclare("rabbitmq_route", ExchangeType.Direct);
im.QueueDeclare("rabbitmq_query", true, false, false, null);//第二個參數(shù)隊列持久化
im.QueueBind("rabbitmq_query", "rabbitmq_route", ExchangeType.Direct, null);
for (int i = 0; i < 5; i++)
{
var props = im.CreateBasicProperties();
props.SetPersistent(true);//消息持久化
byte[] message = Encoding.UTF8.GetBytes("Hello " + i);
im.BasicPublish("rabbitmq_route", ExchangeType.Direct, props, message);
Console.WriteLine("send:" + i);
}
}
}
//消費者
var factory = new ConnectionFactory(){ HostName = "localhost", UserName = "guest", Password = "" };
using (IConnection conn = factory.CreateConnection())
{
using (IModel im = conn.CreateModel())
{
while (true)
{
BasicGetResult res = channel.BasicGet("rabbitmq_query", false);
if (res != null)
{
Console.WriteLine("receiver:" + UTF8Encoding.UTF8.GetString(res.Body));
}
Thread.Sleep(5000);
channel.BasicAck(res.DeliveryTag, true);//消息響應(yīng)
Console.WriteLine("basiack end");
}
}
}
//using (IConnection conn = factory.CreateConnection())
//{
// using (IModel im = conn.CreateModel())
// {
// im.ExchangeDeclare("rabbitmq_route_Fanout", ExchangeType.Fanout);// 路由
// int i = 0;
// while (true)
// {
// Thread.Sleep(1000);
// ++i;
// byte[] message = Encoding.UTF8.GetBytes(i.ToString());
// im.BasicPublish("rabbitmq_route_Fanout", "", null, message);
// Console.WriteLine("send:" + i.ToString());
// }
// }
//}
//using (IConnection conn = factory.CreateConnection())
//{
// using (IModel im = conn.CreateModel())
// {
// im.ExchangeDeclare("rabbitmq_route_Fanout", ExchangeType.Fanout);
// var queueOk = im.QueueDeclare();//1
// im.QueueBind(queueOk.QueueName, "rabbitmq_route_Fanout", "");//2
// var consumer = new QueueingBasicConsumer(im);//3
// im.BasicConsume(queueOk.QueueName, true, consumer);//4
// while (true)
// {
// var _result = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//5
// var body = _result.Body;
// var message = Encoding.UTF8.GetString(body);
// Console.WriteLine("received:{0}", message);
// }
// }
//}
RabbitMQ.Client.dil 版本5.1.0-rc1