C# & RabbitMQ 之 Work Queues

Work Queues介紹

Work Queues簡而言之就是Producer將Message發(fā)送到Queues中滴须,公平調(diào)度的發(fā)送到個個worker處理。

Work Queues模型

值得注意的地方梁厉,在消息接受過程中,worker會遇到異常而崩潰劈猿,導致接收到的消息處理失敗入热,但是Queues發(fā)送Message并不知道這個是否已經(jīng)正確處理而自動刪除這條message。這樣會導致Message的丟失解总,所以需要實現(xiàn)手動Message acknowledgment贮匕。當處理成功是告知RabbitMQ 這條message處理OK并刪除。
除此之外還有一個Message的丟失風險花枫,就是當RabbitMQ 退出或者異常崩潰時刻盐,會導致queue和message的丟失,所以也要配置Message durability(持久化)劳翰。
公平調(diào)度(Fair dispatch)敦锌,RabbitMQ默認是平均分配message到各個worker。為防止出現(xiàn)某些worker因為處理比較復雜佳簸,大量的數(shù)據(jù)而一直處理繁忙狀態(tài)乙墙,其他的worker卻處于閑置狀態(tài),還不停的進行調(diào)度繁忙的worker,需要使用basicQos 方法設(shè)置 prefetchCount = 1 听想,就是告知RabbitMQ 不要同時的給一個worker大于1條Message腥刹。
Fair dispatch

演示代碼

producer:

static void Main(string[] args)
        {
            //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************producer***************");
            Console.WriteLine("please Input send message:");
            //連接到RabbitMQ

            var factory = new RabbitMQ.Client.ConnectionFactory();
            //第一種方式
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //第二種方式
            //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
            //產(chǎn)生一個連接對象
            using (var conncetion = factory.CreateConnection())
            {
                //通過conncetion產(chǎn)生一個連接通道
                using (var channel = conncetion.CreateModel())
                {
                    //用代碼實現(xiàn) exchanges和Queues 
                    //定義exchanges
                    string exchangeName = "Ewrokqueues";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                    //定義Queues
                    string queueName = "Qwrokqueues";
                    bool durable = true;//設(shè)RabbitMQ置持久化
                    channel.QueueDeclare(queueName, durable, false, false, null);
                    //綁定exchanges 和Queues
                    string routingKey = "task_queue";
                    channel.QueueBind(queueName, exchangeName, "", null);

                    //簡單設(shè)置隊列方式
                    //channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false,
                    //    arguments: null);

                      for (int i =0;i<20;i++)
                      {
                          string message = i.ToString();

                            var body = Encoding.UTF8.GetBytes(message);
                            var properties = channel.CreateBasicProperties();
                            properties.Persistent = true;
                            channel.BasicPublish(exchange: "Ewrokqueues", routingKey: "", mandatory: false, basicProperties: properties, body: body);
                            Console.WriteLine("[producer] send : {0}", message);
                          Thread.Sleep(1000);
                        }
                }
            }
            Console.ReadLine();
        }

worker 代碼:

static void Main(string[] args)
       {
           //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

           Console.WriteLine("********************worker1(sleep 5s)***************");
           //連接MQ
           var factory = new ConnectionFactory();
           factory.UserName = "admin";
           factory.Password = "admin";
           factory.VirtualHost = "/";
           factory.HostName = "10.19.52.80";

           //產(chǎn)生連接對象
           using (var connection = factory.CreateConnection())
           {
               //通道
               using (var channel = connection.CreateModel())
               {
                   //公平調(diào)用
                   channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);

                   //訂閱方式獲取message
                   var consumer = new EventingBasicConsumer(channel);
                   //實現(xiàn)獲取message處理事件
                   consumer.Received += (model, ea) =>
                   {
                       var body = ea.Body;
                       var message = Encoding.UTF8.GetString(body);
                       //睡眠5s 另一個是1s
                       Thread.Sleep(5000);

                       Console.WriteLine("[worker1] received : {0}", message);

                       //手動設(shè)置回復
                       channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);
                   };
                   //設(shè)置手動回復認證
                   channel.BasicConsume(queue: "Qwrokqueues", autoAck: false, consumer: consumer);
                   Console.ReadLine();
               }
           }
       }

P會循環(huán)發(fā)送20次,每秒發(fā)送一個數(shù)字到queue中哗魂,兩個worker接受message肛走。最后從運行結(jié)果可以看到整個分配情況,worker1第一個接受到“0”录别,在5秒處理完成后才接受“5”,而worker2會一直在處理邻吞,而不是出于等待閑置组题。


運行結(jié)果
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市抱冷,隨后出現(xiàn)的幾起案子崔列,更是在濱河造成了極大的恐慌,老刑警劉巖旺遮,帶你破解...
    沈念sama閱讀 211,423評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赵讯,死亡現(xiàn)場離奇詭異,居然都是意外死亡耿眉,警方通過查閱死者的電腦和手機边翼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,147評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鸣剪,“玉大人组底,你說我怎么就攤上這事】鸷В” “怎么了债鸡?”我有些...
    開封第一講書人閱讀 157,019評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長铛纬。 經(jīng)常有香客問我厌均,道長,這世上最難降的妖魔是什么告唆? 我笑而不...
    開封第一講書人閱讀 56,443評論 1 283
  • 正文 為了忘掉前任棺弊,我火速辦了婚禮,結(jié)果婚禮上悔详,老公的妹妹穿的比我還像新娘镊屎。我一直安慰自己,他們只是感情好茄螃,可當我...
    茶點故事閱讀 65,535評論 6 385
  • 文/花漫 我一把揭開白布缝驳。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪用狱。 梳的紋絲不亂的頭發(fā)上运怖,一...
    開封第一講書人閱讀 49,798評論 1 290
  • 那天,我揣著相機與錄音夏伊,去河邊找鬼摇展。 笑死,一個胖子當著我的面吹牛溺忧,可吹牛的內(nèi)容都是我干的咏连。 我是一名探鬼主播,決...
    沈念sama閱讀 38,941評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼鲁森,長吁一口氣:“原來是場噩夢啊……” “哼祟滴!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起歌溉,我...
    開封第一講書人閱讀 37,704評論 0 266
  • 序言:老撾萬榮一對情侶失蹤垄懂,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后痛垛,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體草慧,經(jīng)...
    沈念sama閱讀 44,152評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,494評論 2 327
  • 正文 我和宋清朗相戀三年匙头,在試婚紗的時候發(fā)現(xiàn)自己被綠了漫谷。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,629評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡乾胶,死狀恐怖抖剿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情识窿,我是刑警寧澤斩郎,帶...
    沈念sama閱讀 34,295評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站喻频,受9級特大地震影響缩宜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜甥温,卻給世界環(huán)境...
    茶點故事閱讀 39,901評論 3 313
  • 文/蒙蒙 一锻煌、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧姻蚓,春花似錦宋梧、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽释涛。三九已至,卻和暖如春倦沧,著一層夾襖步出監(jiān)牢的瞬間唇撬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,978評論 1 266
  • 我被黑心中介騙來泰國打工展融, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留窖认,地道東北人。 一個月前我還...
    沈念sama閱讀 46,333評論 2 360
  • 正文 我出身青樓告希,卻偏偏與公主長得像扑浸,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子暂雹,可洞房花燭夜當晚...
    茶點故事閱讀 43,499評論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理首装,服務發(fā)現(xiàn),斷路器杭跪,智...
    卡卡羅2017閱讀 134,629評論 18 139
  • 上篇講過簡單的hello消息,這篇我們將實現(xiàn)一個可以在多個Consumer上發(fā)送持久化消息的work queue驰吓。...
    初級賽亞人閱讀 1,847評論 0 3
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器涧尿。支持消息的持久化、事務檬贰、擁塞控...
    jiangmo閱讀 10,346評論 2 34
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,892評論 2 11
  • 上個學期學堂的招新工作是從四月份進行到五月份姑廉,但實際上在2016年的十二月左右學監(jiān)的一次講座也可以算作是招新的前言...
    白河丶夜船閱讀 838評論 0 0