C# & RabbitMQ 之 Exchanges

Exchanges

在前面的學(xué)習(xí)中其實已經(jīng)接觸了exchanges,通常情況下回还,producer 不需要知道發(fā)送消息給哪一個queue,只需要發(fā)送messages給exchange就足以,exchange一端接受producers的messages,另一端push message到queues错忱。為了準(zhǔn)確的處理messages倘待,定義了exchange type:direct, topic, headers ,fanout。

Fanout Exchange

fanout exchange將messages廣播到到已經(jīng)綁定了的queues中膝昆。每個queue都會收到producer發(fā)送的信息丸边。


fanout exchange

fanout exchange代碼:
producer:

static void Main(string[] args)
        {
            //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************fanout producer***************");
            Console.WriteLine("please Input send message:");
            //連接到RabbitMQ

            var factory = new RabbitMQ.Client.ConnectionFactory();
            //第一種方式
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //第二種方式
            //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
            //產(chǎn)生一個連接對象
            using (var conncetion = factory.CreateConnection())
            {
                //通過conncetion產(chǎn)生一個連接通道
                using (var channel = conncetion.CreateModel())
                {
                    //用代碼實現(xiàn) exchanges和Queues 
                    //定義exchanges
                    string exchangeName = "Efanout_test";
                    //設(shè)置類型 Fanout
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
                    //定義Queues
                    string queueName1 = "qfanout_test1";
                    string queueName2 = "qfanout_test2";

                    bool durable = true;//設(shè)RabbitMQ置持久化

                    channel.QueueDeclare(queueName1, durable, false, false, null);
                    channel.QueueDeclare(queueName2, durable, false, false, null);
                    //綁定 queue 與exchange
                    channel.QueueBind(queueName1, exchangeName, "", null);
                    channel.QueueBind(queueName2, exchangeName, "", null);

                    for (int i = 0; i < 20; i++)
                    {
                        string message = i.ToString();

                        var body = Encoding.UTF8.GetBytes(message);
                        var properties = channel.CreateBasicProperties();
                        properties.Persistent = true;
                        channel.BasicPublish(exchange: "Efanout_test", routingKey: "", mandatory: false, basicProperties: properties, body: body);
                        Console.WriteLine("[producer] send : {0}", message);
                        Thread.Sleep(1000);
                    }
                }
            }
            Console.ReadLine();
        }

接受端代碼:

  static void Main(string[] args)
        {
            //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************fanout c1***************");
            //連接MQ
            var factory = new ConnectionFactory();
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //產(chǎn)生連接對象
            using (var connection = factory.CreateConnection())
            {
                //通道
                using (var channel = connection.CreateModel())
                {
                    //公平調(diào)用
                    //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);

                    //訂閱方式獲取message
                    var consumer = new EventingBasicConsumer(channel);
                    //實現(xiàn)獲取message處理事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);


                        Console.WriteLine("[qfanout_test1] received : {0}", message);

                        //手動設(shè)置回復(fù)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //設(shè)置手動回復(fù)認(rèn)證 接受隊列名稱
                    channel.BasicConsume(queue: "qfanout_test1", autoAck: false, consumer: consumer);
                    //另一個的參數(shù)
                    //channel.BasicConsume(queue: "qfanout_test2", autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }

web管理界面綁定關(guān)系:


綁定關(guān)系

運(yùn)行結(jié)果:


fanout exchange

Direct exchange

前面的fanout 類型中,只要exchange與queue綁定荚孵,message 發(fā)送給所有與exchange有綁定關(guān)系的queue中妹窖,但是有時候不是我們只希望傳遞message到某些queue中時就需要用到direct exchange,在QueueBind時添加routing key來實現(xiàn)收叶。


Direct exchange

代碼實現(xiàn):
Direct_P:

static void Main(string[] args)
        {
            //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************Direct P***************");
            Console.WriteLine("please Input send message:");
            //連接到RabbitMQ

            var factory = new RabbitMQ.Client.ConnectionFactory();
            //第一種方式
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //第二種方式
            //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
            //產(chǎn)生一個連接對象
            using (var conncetion = factory.CreateConnection())
            {
                //通過conncetion產(chǎn)生一個連接通道
                using (var channel = conncetion.CreateModel())
                {
                    //用代碼實現(xiàn) exchanges和Queues 
                    //定義exchanges
                    string exchangeName = "EDirect_test";
                    //設(shè)置類型 Fanout
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                    //定義Queues
                    string queueName1 = "qDirect_test1";
                    string queueName2 = "qDirect_test2";
                    bool durable = true;//設(shè)RabbitMQ置持久化


                    channel.QueueDeclare(queueName1, durable, false, false, null);
                    channel.QueueDeclare(queueName2, durable, false, false, null);
                    //綁定 queue 與exchange
                    //routingkey info  waring error 
                    channel.QueueBind(queueName1, exchangeName, "info", null);
                    channel.QueueBind(queueName2, exchangeName, "error", null);
                    channel.QueueBind(queueName2, exchangeName, "waring", null);


                    for (int i = 0; i < 20; i++)
                    {
                        string message = null;

                        var properties = channel.CreateBasicProperties();
                        properties.Persistent = true;
                        string routingkey = null;

                        if (i%3 == 0)
                        {
                            message = "error";
                            routingkey = "error";
                        }
                        else if (i%3 == 1)
                        {
                            message = "waring";
                            routingkey = "waring";
                        }
                        else
                        {
                            message = "info";
                            routingkey = "info";
                        }

                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish( "EDirect_test", routingkey,  false, properties,  body);
                        Console.WriteLine("[Direct P] send : {0}", message);
                        Thread.Sleep(1000);
                    }
                }
            }
            Console.ReadLine();
        }

Direct_C:

static void Main(string[] args)
        {
            //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************Direct_C2***************");
            //連接MQ
            var factory = new ConnectionFactory();
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //產(chǎn)生連接對象
            using (var connection = factory.CreateConnection())
            {
                //通道
                using (var channel = connection.CreateModel())
                {
                    //公平調(diào)用
                    //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);

                    //訂閱方式獲取message
                    var consumer = new EventingBasicConsumer(channel);
                    //實現(xiàn)獲取message處理事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine("[Direct_C2] received : {0}--routingkey {1}", message,routingKey);

                        //手動設(shè)置回復(fù)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //設(shè)置手動回復(fù)認(rèn)證
                    channel.BasicConsume(queue: "qDirect_test2", autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }

代碼實現(xiàn)的結(jié)構(gòu)如圖:



運(yùn)行結(jié)果:


Direct exchange實現(xiàn)效果

Web后臺綁定關(guān)系:
后臺綁定關(guān)系

Topic exchange

簡單的理解Topic exchange是在Direct exchange上的擴(kuò)展骄呼,routing_key不局限于完全匹配,而是像一種正則的樣子去匹配判没。不過只有兩個特殊通配符:
* 號用來匹配一個單詞蜓萄,比如"quick.orange.rabbit" 就可以用*.orange. 匹配到*
#號用來匹配0到多個單詞,比如“l(fā)azy.orange.male.rabbit”可以用lazy.# 匹配

Topic exchange

ETopic_P.cs

static void Main(string[] args)
        {
            //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************ETopic P***************");
            Console.WriteLine("please Input send message:");
            //連接到RabbitMQ

            var factory = new RabbitMQ.Client.ConnectionFactory();
            //第一種方式
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //第二種方式
            //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
            //產(chǎn)生一個連接對象
            using (var conncetion = factory.CreateConnection())
            {
                //通過conncetion產(chǎn)生一個連接通道
                using (var channel = conncetion.CreateModel())
                {
                    //用代碼實現(xiàn) exchanges和Queues 
                    //定義exchanges
                    string exchangeName = "ETopic_test";
                    //設(shè)置類型 Fanout
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                    //定義Queues
                    string queueName1 = "qTopic_test1";
                    string queueName2 = "qTopic_test2";
                    string queueName3 = "qTopic_test3";
                    bool durable = true;//設(shè)RabbitMQ置持久化


                    channel.QueueDeclare(queueName1, durable, false, false, null);
                    channel.QueueDeclare(queueName2, durable, false, false, null);
                    channel.QueueDeclare(queueName3, durable, false, false, null);
                    //綁定 queue 與exchange
                    //routingkey info  waring error 
                    channel.QueueBind(queueName1, exchangeName, "log.#", null);
                    channel.QueueBind(queueName2, exchangeName, "*.error", null);
                    channel.QueueBind(queueName3, exchangeName, "*.waring", null);

                    for (int i = 0; i < 20; i++)
                    {
                        string message = null;

                        //var properties = channel.CreateBasicProperties();
                        //properties.Persistent = true;
                        string routingkey = null;

                        if (i % 3 == 0)
                        {
                            message = "error";
                            routingkey = "log.error";
                        }
                        else if (i % 3 == 1)
                        {
                            message = "waring";
                            routingkey = "log.waring";
                        }
                        else
                        {
                            message = "info";
                            routingkey = "log.waring.error";
                        }

                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("ETopic_test", routingkey, false, null, body);
                        Console.WriteLine("[ETopic P] send : {0}", message);
                        Thread.Sleep(1000);
                    }
                }
            }
            Console.ReadLine();
        }

ETopic_C.cs

static void Main(string[] args)
        {
            //參考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************Topic_C3***************");
            //連接MQ
            var factory = new ConnectionFactory();
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //產(chǎn)生連接對象
            using (var connection = factory.CreateConnection())
            {
                //通道
                using (var channel = connection.CreateModel())
                {
                    //公平調(diào)用
                    //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);

                    //訂閱方式獲取message
                    var consumer = new EventingBasicConsumer(channel);
                    //實現(xiàn)獲取message處理事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine("[Topic_C3] received : {0}--routingkey {1}", message, routingKey);

                        //手動設(shè)置回復(fù)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //設(shè)置手動回復(fù)認(rèn)證
                    channel.BasicConsume(queue: "qTopic_test3", autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
Topic exchange運(yùn)行結(jié)果

Web后臺綁定關(guān)系
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末澄峰,一起剝皮案震驚了整個濱河市嫉沽,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌俏竞,老刑警劉巖绸硕,帶你破解...
    沈念sama閱讀 211,423評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件堂竟,死亡現(xiàn)場離奇詭異,居然都是意外死亡臣咖,警方通過查閱死者的電腦和手機(jī)跃捣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,147評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來夺蛇,“玉大人疚漆,你說我怎么就攤上這事〉笊猓” “怎么了娶聘?”我有些...
    開封第一講書人閱讀 157,019評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長甚脉。 經(jīng)常有香客問我丸升,道長,這世上最難降的妖魔是什么牺氨? 我笑而不...
    開封第一講書人閱讀 56,443評論 1 283
  • 正文 為了忘掉前任狡耻,我火速辦了婚禮,結(jié)果婚禮上猴凹,老公的妹妹穿的比我還像新娘夷狰。我一直安慰自己,他們只是感情好郊霎,可當(dāng)我...
    茶點故事閱讀 65,535評論 6 385
  • 文/花漫 我一把揭開白布沼头。 她就那樣靜靜地躺著,像睡著了一般书劝。 火紅的嫁衣襯著肌膚如雪进倍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,798評論 1 290
  • 那天购对,我揣著相機(jī)與錄音猾昆,去河邊找鬼。 笑死骡苞,一個胖子當(dāng)著我的面吹牛毡庆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播烙如,決...
    沈念sama閱讀 38,941評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼么抗,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了亚铁?” 一聲冷哼從身側(cè)響起蝇刀,我...
    開封第一講書人閱讀 37,704評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎徘溢,沒想到半個月后吞琐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捆探,經(jīng)...
    沈念sama閱讀 44,152評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,494評論 2 327
  • 正文 我和宋清朗相戀三年站粟,在試婚紗的時候發(fā)現(xiàn)自己被綠了黍图。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,629評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡奴烙,死狀恐怖助被,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情切诀,我是刑警寧澤揩环,帶...
    沈念sama閱讀 34,295評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站幅虑,受9級特大地震影響丰滑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜倒庵,卻給世界環(huán)境...
    茶點故事閱讀 39,901評論 3 313
  • 文/蒙蒙 一褒墨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧擎宝,春花似錦郁妈、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽锄奢。三九已至失晴,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拘央,已是汗流浹背涂屁。 一陣腳步聲響...
    開封第一講書人閱讀 31,978評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留灰伟,地道東北人拆又。 一個月前我還...
    沈念sama閱讀 46,333評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像栏账,于是被迫代替她去往敵國和親帖族。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,499評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 1. 歷史 RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,092評論 3 51
  • rabbitMQ是一款基于AMQP協(xié)議的消息中間件挡爵,它能夠在應(yīng)用之間提供可靠的消息傳輸竖般。在易用性,擴(kuò)展性茶鹃,高可用性...
    點融黑幫閱讀 2,991評論 3 41
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理涣雕,服務(wù)發(fā)現(xiàn)艰亮,斷路器,智...
    卡卡羅2017閱讀 134,629評論 18 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器挣郭。支持消息的持久化迄埃、事務(wù)、擁塞控...
    jiangmo閱讀 10,346評論 2 34
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,892評論 2 11