RabbitMQ入門(mén)學(xué)習(xí)系列(三).消息發(fā)送接收

快速閱讀

? 用Rabitmq的隊(duì)列管理召噩,以及如何保證消息在隊(duì)列中不丟失儡湾。通過(guò)ack的消息確認(rèn)和持久化進(jìn)行操作咐容。 以及Rabbit中如何用Web面板進(jìn)行管理隊(duì)列揉阎。消費(fèi)者如何處理耗時(shí)的任務(wù)

生產(chǎn)者代碼

創(chuàng)建鏈接=》創(chuàng)建信道=》聲明隊(duì)列 庄撮。連續(xù)生產(chǎn)10條消息供消費(fèi)者消費(fèi)

static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "hello",
                             durable: false,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        for (var i = 0; i < 10; i++) //連續(xù)生產(chǎn)10條消息,讓消費(fèi)者消費(fèi)
        {
            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }


    }

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
} 

消費(fèi)者代碼

創(chuàng)建鏈接=》創(chuàng)建信道=》聲明隊(duì)列 =>創(chuàng)建EventingBasicConsumer=》接收消息進(jìn)行處理毙籽。

如果掛斷重窟,消息會(huì)丟失。

static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false, exclusive: false, autoDelete: false, arguments: null);

            //以下是區(qū)別生產(chǎn)者的
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                var body = e.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received {0}", message);
                Thread.Sleep(3000);//模擬耗時(shí)任務(wù) 惧财,
                Console.WriteLine("Received over");

            };
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine("");
            Console.ReadLine();
        }

    }

}

測(cè)試結(jié)果

image

從中我們可以看到,消費(fèi)者每3秒消費(fèi)一個(gè)任務(wù) 扭仁。

消息確認(rèn)

如果一個(gè)消費(fèi)者掛掉以后垮衷,怎么辦呢?

正常邏輯是RabbitMq把消費(fèi)發(fā)送給消費(fèi)者以后乖坠,會(huì)把消費(fèi)從隊(duì)列中刪除 搀突。

但是如果消費(fèi)者掛掉以后怎么辦呢?因?yàn)檫@個(gè)時(shí)候消息已經(jīng)發(fā)送出去熊泵,

假如這個(gè)消息 在被消費(fèi)者處理前掛掉了仰迁,我們就會(huì)丟失這個(gè)消費(fèi),

為了避免這種問(wèn)題的出現(xiàn)顽分, 我們要用到消息確認(rèn)機(jī)制徐许,**就是當(dāng)消費(fèi)者處理完消息以后,再給rabbitmq一個(gè)確認(rèn)信息卒蘸,告訴他我已經(jīng)處理好了雌隅,你可以刪除了,RabbitMQ接收到以后缸沃,會(huì)從隊(duì)列中把這個(gè)消息刪除恰起, 這就保證了消息會(huì)不會(huì)因消費(fèi)者掛掉而丟失沒(méi)有處理的消息 **。 **如果Rabbit沒(méi)有接收到消息確認(rèn)的通知(在超時(shí)之前) 趾牧,則會(huì)把這個(gè)消息再放到隊(duì)列中检盼,發(fā)送給另外的消費(fèi)者。****

我們把你代碼改一下

消費(fèi)者代碼中翘单,加入ack發(fā)送的標(biāo)志

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine("Received {0}", message);
    Thread.Sleep(3000);//模擬耗時(shí)任務(wù) 吨枉,
    Console.WriteLine("Received over");
    channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
};

發(fā)送者代碼中加入發(fā)送的消息標(biāo)識(shí)

 for (var i = 0; i < 10; i++)
 {
     string message = "Hello World!this is message "+i;
     var body = Encoding.UTF8.GetBytes(message);
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true;

     channel.BasicPublish(exchange: "",
                          routingKey: "hello",
                          basicProperties: null,
                          body: body);
     Console.WriteLine(" [x] Sent {0},id={1}", message,i);
     Thread.Sleep(1000);
 }

啟動(dòng)了三個(gè)消費(fèi)者進(jìn)程 蹦渣,但是發(fā)現(xiàn)隊(duì)列中的任務(wù) 沒(méi)有被消費(fèi)完

image

還有id為6,7东羹,8剂桥,9沒(méi)有被消費(fèi), 這個(gè)時(shí)候是再重啟一個(gè)消費(fèi)者才可以消費(fèi)完属提。

image

有點(diǎn)奇怪了权逗。先放這里吧,做一個(gè)問(wèn)題記錄一下

=》更新下進(jìn)展

晚上的時(shí)候查了一下冤议。

經(jīng)常測(cè)試發(fā)現(xiàn) 要把a(bǔ)utoAck設(shè)置為false才可以斟薇。

channel.BasicConsume(queue: "HelloDurable1", autoAck: false, consumer: consumer);  //這個(gè)是正常的
channel.BasicConsume(queue: "HelloDurable1", autoAck: true, consumer: consumer); //這個(gè)只能消費(fèi)一部分,還需要重啟才可以再消費(fèi)
  • 經(jīng)查autoAck 是否自動(dòng)確認(rèn)消息,true自動(dòng)確認(rèn),false 不自動(dòng)要手動(dòng)調(diào)用,建立設(shè)置為false

啟動(dòng)三個(gè)消費(fèi)者測(cè)試發(fā)現(xiàn)正常 恕酸。

1562162168225

消息持久性

我們還需要考慮到當(dāng)RabbitMq.server掛掉的時(shí)候堪滨,消息也會(huì)丟失。

為了避免此類問(wèn)題:需要把消息和隊(duì)列都標(biāo)識(shí)為持久性蕊温。

當(dāng)我們標(biāo)識(shí)為以后袱箱,重啟程序時(shí),發(fā)現(xiàn)報(bào)錯(cuò)了义矛。

image

根據(jù)提示可以看出发笔, 隊(duì)列hello先前沒(méi)有被標(biāo)記為持久化,但已經(jīng)存在了凉翻,我們不能改變他的屬性了讨,

我們可以新建一個(gè)新的隊(duì)列 。比如HelloDurable,就可以了制轰。

生產(chǎn)者和消費(fèi)者兩端都要修改前计。

或者打開(kāi)Rabbitmq的監(jiān)控把隊(duì)列進(jìn)行刪除

RabbitMq監(jiān)控

先開(kāi)始管理程序

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins en
able rabbitmq_management

[圖片上傳失敗...(image-c6edc9-1562227373030)]

查看安裝

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins.bat list

[圖片上傳失敗...(image-b4f05f-1562227373030)]

輸入管理面板地址

http://127.0.0.1:15672/

用戶名:guest ;密碼 guest

[圖片上傳失敗...(image-3df944-1562227373030)]

登陸進(jìn)去以后,找到隊(duì)列列表垃杖,刪除相應(yīng)的隊(duì)列就可以了男杈。

1562162380181
1562162391890

隊(duì)列持久化的聲明

 channel.QueueDeclare(queue: "HelloDurable",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

消費(fèi)持久化的聲明

var properties = channel.CreateBasicProperties();
 properties.Persistent = true;

這樣即使服務(wù)器重啟消息也不會(huì)丟失的。

消息負(fù)載均衡

為了避免有些消費(fèi)者不能獲得資源调俘,有些消費(fèi)者獲得資源過(guò)多的情況势就,我們要做如下配置

在消費(fèi)者代碼中增加

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

表示每次取一個(gè)消息。

通過(guò)使用消息確認(rèn)標(biāo)識(shí)和配置消息持久性脉漏,讓我們的消息可以持久化和不會(huì)被丟失苞冯。

友情提示

我對(duì)我的文章負(fù)責(zé),發(fā)現(xiàn)好多網(wǎng)上的文章 沒(méi)有實(shí)踐侧巨,都發(fā)出來(lái)的舅锄,讓人走很多彎路,如果你在我的文章中遇到無(wú)法實(shí)現(xiàn)司忱,或者無(wú)法走通的問(wèn)題皇忿〕氩洌可以直接在公眾號(hào)《愛(ài)碼農(nóng)愛(ài)生活 》留言。必定會(huì)再次復(fù)查原因鳍烁。讓每一篇 文章都能順利實(shí)現(xiàn)叨襟。道理講明白 。原理講清楚幔荒。代碼必實(shí)現(xiàn)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末糊闽,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子爹梁,更是在濱河造成了極大的恐慌右犹,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,651評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件姚垃,死亡現(xiàn)場(chǎng)離奇詭異念链,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)积糯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)掂墓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人看成,你說(shuō)我怎么就攤上這事君编。” “怎么了绍昂?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,931評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)偿荷。 經(jīng)常有香客問(wèn)我窘游,道長(zhǎng),這世上最難降的妖魔是什么跳纳? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,218評(píng)論 1 292
  • 正文 為了忘掉前任忍饰,我火速辦了婚禮,結(jié)果婚禮上寺庄,老公的妹妹穿的比我還像新娘艾蓝。我一直安慰自己,他們只是感情好斗塘,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布赢织。 她就那樣靜靜地躺著,像睡著了一般馍盟。 火紅的嫁衣襯著肌膚如雪于置。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,198評(píng)論 1 299
  • 那天贞岭,我揣著相機(jī)與錄音八毯,去河邊找鬼搓侄。 笑死,一個(gè)胖子當(dāng)著我的面吹牛话速,可吹牛的內(nèi)容都是我干的讶踪。 我是一名探鬼主播,決...
    沈念sama閱讀 40,084評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼泊交,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼乳讥!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起活合,我...
    開(kāi)封第一講書(shū)人閱讀 38,926評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤雏婶,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后白指,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體留晚,經(jīng)...
    沈念sama閱讀 45,341評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評(píng)論 2 333
  • 正文 我和宋清朗相戀三年告嘲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了错维。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,731評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡橄唬,死狀恐怖赋焕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情仰楚,我是刑警寧澤隆判,帶...
    沈念sama閱讀 35,430評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站僧界,受9級(jí)特大地震影響侨嘀,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜捂襟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評(píng)論 3 326
  • 文/蒙蒙 一咬腕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧葬荷,春花似錦涨共、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,676評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至扒吁,卻和暖如春照筑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,829評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工凝危, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留波俄,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,743評(píng)論 2 368
  • 正文 我出身青樓蛾默,卻偏偏與公主長(zhǎng)得像懦铺,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子支鸡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容