快速閱讀
? 用Rabitmq的隊(duì)列管理召噩,以及如何保證消息在隊(duì)列中不丟失儡湾。通過(guò)ack的消息確認(rèn)和持久化進(jìn)行操作咐容。 以及Rabbit中如何用Web面板進(jìn)行管理隊(duì)列揉阎。消費(fèi)者如何處理耗時(shí)的任務(wù)
生產(chǎn)者代碼
創(chuàng)建鏈接=》創(chuàng)建信道=》聲明隊(duì)列 庄撮。連續(xù)生產(chǎn)10條消息供消費(fèi)者消費(fèi)
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
for (var i = 0; i < 10; i++) //連續(xù)生產(chǎn)10條消息,讓消費(fèi)者消費(fèi)
{
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
消費(fèi)者代碼
創(chuàng)建鏈接=》創(chuàng)建信道=》聲明隊(duì)列 =>創(chuàng)建EventingBasicConsumer=》接收消息進(jìn)行處理毙籽。
如果掛斷重窟,消息會(huì)丟失。
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false, exclusive: false, autoDelete: false, arguments: null);
//以下是區(qū)別生產(chǎn)者的
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received {0}", message);
Thread.Sleep(3000);//模擬耗時(shí)任務(wù) 惧财,
Console.WriteLine("Received over");
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
Console.WriteLine("");
Console.ReadLine();
}
}
}
測(cè)試結(jié)果
從中我們可以看到,消費(fèi)者每3秒消費(fèi)一個(gè)任務(wù) 扭仁。
消息確認(rèn)
如果一個(gè)消費(fèi)者掛掉以后垮衷,怎么辦呢?
正常邏輯是RabbitMq把消費(fèi)發(fā)送給消費(fèi)者以后乖坠,會(huì)把消費(fèi)從隊(duì)列中刪除 搀突。
但是如果消費(fèi)者掛掉以后怎么辦呢?因?yàn)檫@個(gè)時(shí)候消息已經(jīng)發(fā)送出去熊泵,
假如這個(gè)消息 在被消費(fèi)者處理前掛掉了仰迁,我們就會(huì)丟失這個(gè)消費(fèi),
為了避免這種問(wèn)題的出現(xiàn)顽分, 我們要用到消息確認(rèn)機(jī)制徐许,**就是當(dāng)消費(fèi)者處理完消息以后,再給rabbitmq一個(gè)確認(rèn)信息卒蘸,告訴他我已經(jīng)處理好了雌隅,你可以刪除了,RabbitMQ接收到以后缸沃,會(huì)從隊(duì)列中把這個(gè)消息刪除恰起, 這就保證了消息會(huì)不會(huì)因消費(fèi)者掛掉而丟失沒(méi)有處理的消息 **。 **如果Rabbit沒(méi)有接收到消息確認(rèn)的通知(在超時(shí)之前) 趾牧,則會(huì)把這個(gè)消息再放到隊(duì)列中检盼,發(fā)送給另外的消費(fèi)者。****
我們把你代碼改一下
消費(fèi)者代碼中翘单,加入ack發(fā)送的標(biāo)志
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received {0}", message);
Thread.Sleep(3000);//模擬耗時(shí)任務(wù) 吨枉,
Console.WriteLine("Received over");
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
};
發(fā)送者代碼中加入發(fā)送的消息標(biāo)識(shí)
for (var i = 0; i < 10; i++)
{
string message = "Hello World!this is message "+i;
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0},id={1}", message,i);
Thread.Sleep(1000);
}
啟動(dòng)了三個(gè)消費(fèi)者進(jìn)程 蹦渣,但是發(fā)現(xiàn)隊(duì)列中的任務(wù) 沒(méi)有被消費(fèi)完
還有id為6,7东羹,8剂桥,9沒(méi)有被消費(fèi), 這個(gè)時(shí)候是再重啟一個(gè)消費(fèi)者才可以消費(fèi)完属提。
有點(diǎn)奇怪了权逗。先放這里吧,做一個(gè)問(wèn)題記錄一下
=》更新下進(jìn)展
晚上的時(shí)候查了一下冤议。
經(jīng)常測(cè)試發(fā)現(xiàn) 要把a(bǔ)utoAck設(shè)置為false才可以斟薇。
channel.BasicConsume(queue: "HelloDurable1", autoAck: false, consumer: consumer); //這個(gè)是正常的
channel.BasicConsume(queue: "HelloDurable1", autoAck: true, consumer: consumer); //這個(gè)只能消費(fèi)一部分,還需要重啟才可以再消費(fèi)
- 經(jīng)查autoAck 是否自動(dòng)確認(rèn)消息,true自動(dòng)確認(rèn),false 不自動(dòng)要手動(dòng)調(diào)用,建立設(shè)置為false
啟動(dòng)三個(gè)消費(fèi)者測(cè)試發(fā)現(xiàn)正常 恕酸。
消息持久性
我們還需要考慮到當(dāng)RabbitMq.server掛掉的時(shí)候堪滨,消息也會(huì)丟失。
為了避免此類問(wèn)題:需要把消息和隊(duì)列都標(biāo)識(shí)為持久性蕊温。
當(dāng)我們標(biāo)識(shí)為以后袱箱,重啟程序時(shí),發(fā)現(xiàn)報(bào)錯(cuò)了义矛。
根據(jù)提示可以看出发笔, 隊(duì)列hello先前沒(méi)有被標(biāo)記為持久化,但已經(jīng)存在了凉翻,我們不能改變他的屬性了讨,
我們可以新建一個(gè)新的隊(duì)列 。比如HelloDurable,就可以了制轰。
生產(chǎn)者和消費(fèi)者兩端都要修改前计。
或者打開(kāi)Rabbitmq的監(jiān)控把隊(duì)列進(jìn)行刪除
RabbitMq監(jiān)控
先開(kāi)始管理程序
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins en
able rabbitmq_management
[圖片上傳失敗...(image-c6edc9-1562227373030)]
查看安裝
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins.bat list
[圖片上傳失敗...(image-b4f05f-1562227373030)]
輸入管理面板地址
http://127.0.0.1:15672/
用戶名:guest ;密碼 guest
[圖片上傳失敗...(image-3df944-1562227373030)]
登陸進(jìn)去以后,找到隊(duì)列列表垃杖,刪除相應(yīng)的隊(duì)列就可以了男杈。
隊(duì)列持久化的聲明
channel.QueueDeclare(queue: "HelloDurable",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
消費(fèi)持久化的聲明
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
這樣即使服務(wù)器重啟消息也不會(huì)丟失的。
消息負(fù)載均衡
為了避免有些消費(fèi)者不能獲得資源调俘,有些消費(fèi)者獲得資源過(guò)多的情況势就,我們要做如下配置
在消費(fèi)者代碼中增加
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
表示每次取一個(gè)消息。
通過(guò)使用消息確認(rèn)標(biāo)識(shí)和配置消息持久性脉漏,讓我們的消息可以持久化和不會(huì)被丟失苞冯。
友情提示
我對(duì)我的文章負(fù)責(zé),發(fā)現(xiàn)好多網(wǎng)上的文章 沒(méi)有實(shí)踐侧巨,都發(fā)出來(lái)的舅锄,讓人走很多彎路,如果你在我的文章中遇到無(wú)法實(shí)現(xiàn)司忱,或者無(wú)法走通的問(wèn)題皇忿〕氩洌可以直接在公眾號(hào)《愛(ài)碼農(nóng)愛(ài)生活 》留言。必定會(huì)再次復(fù)查原因鳍烁。讓每一篇 文章都能順利實(shí)現(xiàn)叨襟。道理講明白 。原理講清楚幔荒。代碼必實(shí)現(xiàn)