rabbitmq入門

什么是rabbitmq茶行?

RabbitMQ.png

RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)阴挣。RabbitMQ服務(wù)器是用Erlang語言編寫的气堕,而群集和故障轉(zhuǎn)移是構(gòu)建在開放電信平臺(tái)框架上的。所有主要的編程語言均有與代理接口通訊的客戶端

rabbitmq.png

第一種方式:點(diǎn)對(duì)點(diǎn)

點(diǎn)對(duì)點(diǎn).png
  • P:生成者
  • C:消費(fèi)者
  • 紅色方塊代表信道

生成者

void producerSendMessing() {
        //生產(chǎn)者代碼
        //創(chuàng)建一個(gè)rabbitmq的連接工廠
        ConnectionFactory connectionFactory =new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setConnectionTimeout(1000);
        connectionFactory.setPort(5672);
        //設(shè)置虛擬主機(jī)
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        try {
            //開啟一個(gè)server連接
            Connection connection = connectionFactory.newConnection();
            //server連接創(chuàng)建一個(gè)信道
            Channel channel = connection.createChannel();
            //通過信道綁定一個(gè)隊(duì)列,兩個(gè)重載方法
            /*
            *  @param queue :隊(duì)列的名稱
             * @param durable 是否持久化茎芭,
             * @param exclusive 是否獨(dú)占這個(gè)隊(duì)列
             * @param autoDelete 是否消費(fèi)之后揖膜,刪除隊(duì)列
             * @param arguments 其他參數(shù)設(shè)置,是一個(gè)map
            * */
            channel.queueDeclare("hello",true,false,false,null);
            /**
             * 三個(gè)重載方法梅桩,
             *
             * @param exchange 交換器
             * @param routingKey 路由密鑰
             * @param支持消息的其他屬性-路由標(biāo)頭等
             * @param正文消息正文
             */
            channel.basicPublish("","hello",null,"hello world!".getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

消費(fèi)者

void consumerReceivingMessing() {
        //生產(chǎn)者代碼
        //創(chuàng)建一個(gè)rabbitmq的連接工廠
        ConnectionFactory connectionFactory =new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setConnectionTimeout(1000);
        connectionFactory.setPort(5672);
        //設(shè)置虛擬主機(jī)
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        try {
            //開啟一個(gè)server連接
            Connection connection = connectionFactory.newConnection();
            //server連接創(chuàng)建一個(gè)信道
            Channel channel = connection.createChannel();
            //通過信道綁定一個(gè)隊(duì)列,兩個(gè)重載方法
            /*
             *  @param queue :隊(duì)列的名稱
             * @param durable 是否持久化壹粟,
             * @param exclusive 是否獨(dú)占這個(gè)隊(duì)列
             * @param autoDelete 是否消費(fèi)之后,刪除隊(duì)列
             * @param arguments 其他參數(shù)設(shè)置宿百,是一個(gè)map
             * */
            channel.queueDeclare("hello",true,false,false,null);
            /**
             * @param queue隊(duì)列名稱
             * @param autoAck如果服務(wù)器應(yīng)考慮消息趁仙,則為true。自動(dòng)確認(rèn)機(jī)制
             *交付后確認(rèn)犀呼; 如果服務(wù)器應(yīng)該期望幸撕,則返回false
             *明確的確認(rèn)
             * @param回調(diào)用戶對(duì)象的接口Consumer,默認(rèn)實(shí)現(xiàn)類DefaultConsumer,需要傳入信道
             */
            String hello = channel.basicConsume("hello", true,new DefaultConsumer(channel){
                    //最后一個(gè)參數(shù)是消息體
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body)
                            throws IOException
                    {
                        System.out.println(new String(body));
                    }

            });
            System.out.println(hello);
            //連接關(guān)閉
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

總結(jié)

  • 無論是生成者還是消費(fèi)者都需要連接到rabbitmq的server外臂。通過信道操作消息

work模型

Work queues, 也被稱為(Task queues)坐儿, 任務(wù)模型。 當(dāng)消息處理比較耗時(shí)的時(shí)候宋光,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度貌矿。長此以往,消息就會(huì)堆積越來越多,無法及時(shí)處理罪佳。此時(shí)就可以使用work模型:讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列逛漫,共同消費(fèi)隊(duì)列中的消息。隊(duì)列中的消息一旦消費(fèi)赘艳, 就會(huì)消失酌毡,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的。


image.png

角色:
●P:生產(chǎn)者:任務(wù)的發(fā)布者
●C1:消費(fèi)者蕾管,領(lǐng)取任務(wù)并且完成任務(wù)枷踏,假設(shè)完成速度較慢
●C2:消費(fèi)者2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快

總結(jié)

  • 默認(rèn)情況下,RabbitMQ將按順序?qū)⒚總€(gè)消息發(fā)送給下一個(gè)使用者掰曾。平均而言旭蠕,每個(gè)消費(fèi)者都會(huì)收到相同數(shù)量的消息。這種分發(fā)消息的方式稱為輪詢旷坦。

為了避免消費(fèi)者1處理消息的業(yè)務(wù)慢掏熬,消費(fèi)者2處理消息的業(yè)務(wù)快。但是因?yàn)檩喸儥C(jī)制秒梅,導(dǎo)致消息被消費(fèi)者1拿了過去進(jìn)行堵塞旗芬,從而導(dǎo)致系統(tǒng)宕機(jī)。消息確認(rèn)機(jī)制相應(yīng)出現(xiàn)捆蜀。

消費(fèi)者需要拿到消息之后回復(fù)隊(duì)列已處理才會(huì)拿到下一條消息疮丛,從而實(shí)現(xiàn)能者多勞

  • 消費(fèi)者1:假設(shè)完成速度較慢
public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //創(chuàng)建一個(gè)信道
        Channel channel = mqConnection.createChannel();
        //信道每次只傳遞1個(gè)消息
        channel.basicQos(1);
        //綁定到一個(gè)work隊(duì)列
        channel.queueDeclare("work",true,false,false,null);
        //獲取消息
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //模擬消費(fèi)者處理慢的場景
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消費(fèi)者2:--->"+new String(body));
                //參數(shù)1:根據(jù)標(biāo)簽回復(fù)那條消息辆琅,參數(shù)2:是否回復(fù)多條消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
  • 消費(fèi)者2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快
public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //創(chuàng)建一個(gè)信道
        Channel channel = mqConnection.createChannel();
        //信道每次只傳遞1個(gè)消息
        channel.basicQos(1);
        //綁定到一個(gè)work隊(duì)列
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者1:--->"+new String(body));
                //參數(shù)1:根據(jù)標(biāo)簽回復(fù)那條消息,參數(shù)2:是否回復(fù)多條消息
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        });
    }

注意

代碼中在消費(fèi)者里面設(shè)置了信道里面只消費(fèi)1條消息这刷,并且處理業(yè)務(wù)之后會(huì)回復(fù)消息已被消費(fèi)

fanout廣播模型

fanout.png

在廣播模式下婉烟,消息發(fā)送流程是這樣的:
●可以有多個(gè)消費(fèi)者
●每個(gè)消費(fèi)者有自己的queue (隊(duì)列)
●每個(gè)隊(duì)列都要綁定到Exchange (交換機(jī))
●生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī)暇屋,交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列似袁,生產(chǎn)者無法決定。
●交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
●隊(duì)列的消費(fèi)者都能拿到消息咐刨。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
生產(chǎn)者代碼

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //創(chuàng)建一個(gè)信道
        Channel channel = mqConnection.createChannel();
        //聲明一個(gè)交換機(jī)昙衅。參數(shù)1:交換機(jī)名稱,參數(shù)2:選擇交換機(jī)模式定鸟。fanout:廣播模式
        channel.exchangeDeclare("logs","fanout");
        channel.basicPublish("logs","",null,("fanout條消息").getBytes());
        //關(guān)閉連接
        RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);

    }

消費(fèi)者代碼

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //創(chuàng)建一個(gè)信道
        Channel channel = mqConnection.createChannel();
        //綁定一個(gè)交換機(jī)
        channel.exchangeDeclare("logs","fanout");
        //獲取臨時(shí)隊(duì)列
        String queue = channel.queueDeclare().getQueue();
        //綁定交換機(jī)和隊(duì)列
        channel.queueBind(queue,"logs","");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者1:--->"+new String(body));
            }
        });
    }

Routing模式:1direct(直連模型)

在Fanout模式中而涉,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)联予。但是在某些場景下啼县,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange沸久。
在Direct模型下:
●隊(duì)列與交換機(jī)的綁定季眷,不能是任意綁定了,而是要指定-個(gè)RoutingKey (路由key)
●消息的發(fā)送方在向Exchange發(fā)送消息時(shí)卷胯,也必須指定消息的RoutingKey子刮。
●Exchange不再把消息交給每- 個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷窑睁,只有隊(duì)列的Routingkey與消息的Routing key完全一致挺峡,才會(huì)接收到消息


image.png

生產(chǎn)者代碼

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //創(chuàng)建一個(gè)信道
        Channel channel = mqConnection.createChannel();
        //創(chuàng)建一個(gè)交換器,參數(shù)1:交換器名稱担钮,參數(shù)2:交換器模式:路由
        channel.exchangeDeclare("log_direct","direct");
        //聲明一個(gè)routingKey
        String routingKey="error";
        //發(fā)送消息
        channel.basicPublish("log_direct",routingKey,null,("這是direc模型發(fā)送的消息:"+routingKey).getBytes());
        //關(guān)閉鏈接
        RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
    }

消費(fèi)者1代碼

public static void main(String[] args) throws IOException {
        //獲取鏈接
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //創(chuàng)建信道
        Channel channel = mqConnection.createChannel();
        //信道綁定交換器
        channel.exchangeDeclare("log_direct","direct");
        //獲取臨時(shí)隊(duì)列名稱
        String queue = channel.queueDeclare().getQueue();
        String routingKey="info";
        //信道綁定交換器橱赠,路由鍵,隊(duì)列
        channel.queueBind(queue,"log_direct",routingKey);
        //獲取消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者1:"+new String(body));
            }
        });
    }

消費(fèi)者2代碼

public static void main(String[] args) throws IOException {
        //創(chuàng)建一個(gè)連接
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //創(chuàng)建一個(gè)信道
        Channel channel = mqConnection.createChannel();
        //綁定一個(gè)交換機(jī)
        channel.exchangeDeclare("log_direct","direct");
        //獲取信道的一個(gè)臨時(shí)隊(duì)列
        String queue = channel.queueDeclare().getQueue();
        //定義三個(gè)路由鍵
        String routingKey="error";
        String routingKeyWarring="warring";
        String routingKeyInfo="info";
        //信道綁定隊(duì)列裳朋,交換機(jī)和路由鍵
        channel.queueBind(queue,"log_direct",routingKey);
        channel.queueBind(queue,"log_direct",routingKeyWarring);
        channel.queueBind(queue,"log_direct",routingKeyInfo);
        //獲取消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者2:消費(fèi)了"+new String(body));
            }
        });
    }

Routing模式:topic(訂閱模型)

Topic類型的Exchange與Direct相比病线,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列吓著。只不過Topic類型Exchange可以讓隊(duì)列在綁定Routing key的時(shí)候使用通配
符!這種模型Routingkey 一般都是由一個(gè)或多個(gè)單詞組成鲤嫡,多個(gè)單詞之間以"分割,例如: item. insert


topic.png
  • 通配符:
    * 代表只匹配一個(gè)單詞绑莺,比如user.*的路由鍵可以接受user.add暖眼,user.delete,user.update
    #代表只匹配多個(gè)單詞纺裁,比如user.#的路由鍵可以接受user.add.all诫肠,user.delete.all

生產(chǎn)者代碼

public static void main(String[] args) throws IOException {
        //創(chuàng)建連接
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //創(chuàng)建信道
        Channel channel = mqConnection.createChannel();
        //綁定交換機(jī)
        channel.exchangeDeclare("topic","topic");
        //路由
        String routingKey="user.save";
        //發(fā)送消息
        channel.basicPublish("topic",routingKey,null,("生產(chǎn)了"+routingKey+"消息").getBytes());
        //關(guān)閉連接
        RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
    }

消費(fèi)者1代碼是 * 的通配符

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.exchangeDeclare("topic","topic");
        String queue = channel.queueDeclare().getQueue();
        //路由鍵匹配一個(gè)單詞比如user.save
        channel.queueBind(queue,"topic","user.*");
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者1:"+new String(body));
            }
        });
    }

消費(fèi)者2代碼是#號(hào)的通配符

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.exchangeDeclare("topic","topic");
        String queue = channel.queueDeclare().getQueue();
        //路由鍵匹配多個(gè)單詞比如user.id.delete
        channel.queueBind(queue,"topic","user.#");
        //消費(fèi)消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者1:"+new String(body));
            }
        });
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末司澎,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子栋豫,更是在濱河造成了極大的恐慌挤安,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件丧鸯,死亡現(xiàn)場離奇詭異蛤铜,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)丛肢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門围肥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蜂怎,你說我怎么就攤上這事穆刻。” “怎么了杠步?”我有些...
    開封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵氢伟,是天一觀的道長。 經(jīng)常有香客問我幽歼,道長腐芍,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任试躏,我火速辦了婚禮猪勇,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘颠蕴。我一直安慰自己泣刹,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開白布犀被。 她就那樣靜靜地躺著椅您,像睡著了一般。 火紅的嫁衣襯著肌膚如雪寡键。 梳的紋絲不亂的頭發(fā)上掀泳,一...
    開封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音西轩,去河邊找鬼员舵。 笑死,一個(gè)胖子當(dāng)著我的面吹牛藕畔,可吹牛的內(nèi)容都是我干的马僻。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼注服,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼韭邓!你這毒婦竟也來了措近?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤女淑,失蹤者是張志新(化名)和其女友劉穎瞭郑,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鸭你,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡凰浮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了苇本。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袜茧。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖瓣窄,靈堂內(nèi)的尸體忽然破棺而出笛厦,到底是詐尸還是另有隱情,我是刑警寧澤俺夕,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布裳凸,位于F島的核電站,受9級(jí)特大地震影響劝贸,放射性物質(zhì)發(fā)生泄漏姨谷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一映九、第九天 我趴在偏房一處隱蔽的房頂上張望梦湘。 院中可真熱鬧,春花似錦件甥、人聲如沸捌议。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瓣颅。三九已至,卻和暖如春譬正,著一層夾襖步出監(jiān)牢的瞬間宫补,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來泰國打工曾我, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留粉怕,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓您单,卻偏偏與公主長得像斋荞,于是被迫代替她去往敵國和親荞雏。 傳聞我的和親對(duì)象是個(gè)殘疾皇子虐秦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容