二:Work Queues输瓜,輪訓(xùn)分發(fā)哩治,ack和nack稻据,持久化

一 :Work Queues
Work Queues— 工作隊(duì)列 (又稱任務(wù)隊(duì)列) 的主要思想是避免立即執(zhí)行資源密集型任務(wù)管宵,而不得不等待它完成。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列,在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)箩朴。當(dāng)有多個(gè)工作線程時(shí)岗喉,這些工作線程將一起處理這些任務(wù)。

輪訓(xùn)分發(fā)消息
在這個(gè)案例中我們會(huì)啟動(dòng)兩個(gè)工作線程炸庞,一個(gè)消息發(fā)送線程钱床,我們來(lái)看看他們兩個(gè)工作線程是如何工作的。

package com.oddfar.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    //得到一個(gè)連接的 channel
    public static Channel getChannel() throws Exception {
        //創(chuàng)建一個(gè)連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("42.192.149.71");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
package com.oddfar.two;

import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 這是一個(gè)工作線程埠居,相當(dāng)于之前的消費(fèi)者
 *
 * @author zhiyuan
 */
public class Worker01 {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        //消息接受
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" + receivedMessage);
        };
        //消息被取消
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");

        };

        System.out.println("C1 消費(fèi)者啟動(dòng)等待消費(fèi).................. ");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}
public class Task01 {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("消息發(fā)送完成:" + message);
        }

    }
}

通過(guò)程序執(zhí)行發(fā)現(xiàn)生產(chǎn)者總共發(fā)送 4 個(gè)消息查牌,消費(fèi)者 1 和消費(fèi)者 2 分別分得兩個(gè)消息,并且是按照有序的一個(gè)接收一次消息滥壕。這就是輪訓(xùn)的效果

另外:輪詢分發(fā)并不是串行的輪詢分發(fā)纸颜,假設(shè)生產(chǎn)者連續(xù)發(fā)1 - 10 這10個(gè)消息,消費(fèi)者A會(huì)消費(fèi)1,3,5,7,9绎橘,消費(fèi)者2會(huì)消費(fèi)到2,4,6,8,10 胁孙。消費(fèi)者1比消費(fèi)者2的消費(fèi)速度快很多,當(dāng)消費(fèi)者1消費(fèi)完1的時(shí)候称鳞,消費(fèi)者2還在消費(fèi)2,此時(shí)消費(fèi)者1并不會(huì)等待消費(fèi)者1完成消息2涮较,消費(fèi)者1會(huì)快速的將3,5,7,9這4個(gè)消息快速消費(fèi)完,然后消費(fèi)者2再慢慢的消費(fèi)4,6,8,10冈止。
此時(shí)消費(fèi)者1它并不會(huì)去救濟(jì)消費(fèi)者2 并不會(huì)去幫消費(fèi)者2消費(fèi)消息
狂票,也就是說(shuō)它不會(huì)向forkJoin這種東西會(huì)動(dòng)態(tài)的去執(zhí)行任務(wù)。

  • 按照消費(fèi)者消費(fèi)能力分發(fā)(在消費(fèi)者代碼中聲明)
        //設(shè)置權(quán)重分發(fā) (消費(fèi)者速度快的機(jī)器會(huì)消費(fèi)到更多的消息熙暴,消費(fèi)速度慢的機(jī)器會(huì)消費(fèi)到更少的消息闺属,就是能者多勞)
        channel.basicQos(1);
  • 權(quán)重分發(fā)
    也可以按照機(jī)器的能力進(jìn)行分發(fā),和nginx權(quán)重類似

二 消息應(yīng)答 (ack)

消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間周霉,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成了部分突然它掛掉了掂器,會(huì)發(fā)生什么情況。

RabbitMQ 一旦向消費(fèi)者傳遞了一條消息诗眨,便立即將該消息標(biāo)記為刪除 (自動(dòng)ack的情況)。在這種情況下孕讳,突然有個(gè)消費(fèi)者掛掉了匠楚,我們將丟失正在處理的消息,以及后續(xù)發(fā)送給該消費(fèi)者的消息厂财,因?yàn)樗鼰o(wú)法接收到芋簿。

為了保證消息在發(fā)送過(guò)程中不丟失,引入消息應(yīng)答機(jī)制璃饱,消息應(yīng)答就是:消費(fèi)者在接收到消息并且處理該消息之后与斤,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。(這就是手動(dòng)ack的情況)

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //聲明隊(duì)列
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        Scanner sc = new Scanner(System.in);
        System.out.println("請(qǐng)輸入信息");
        while (sc.hasNext()) {
            String message = sc.nextLine();
            //發(fā)布消息
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息" + message);
        }
    }
}
public class Work03 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1 等待接收消息處理時(shí)間較 短");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接收到消息:" + message);

            //todo 三個(gè)處理消息方法的api
            /**
             * 1.消息標(biāo)記 tag
             * 2.是否批量應(yīng)答未應(yīng)答消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

          /**
             * 第一個(gè)參數(shù):deliveryTag 消息的標(biāo)識(shí)
             * 第二個(gè)參數(shù) multiple 是否批量確認(rèn)消息
             * 第三個(gè)參數(shù):requeue 被Nack后 是否重新入隊(duì)撩穿,
             *           如果是填false 那么就相當(dāng)于消息被消費(fèi)者自己丟棄了磷支,如果是true 那么相當(dāng)于重新入隊(duì),broker會(huì)交給其他的消費(fèi)者來(lái)消費(fèi)這個(gè)消息
             *           如果消費(fèi)者僅有一個(gè) 然后Nack后的requeue參數(shù)又是true 那么就相當(dāng)于中國(guó)消息一直會(huì)發(fā)送給這個(gè)消費(fèi)者了食寡,相當(dāng)于消費(fèi)者死循環(huán)的消費(fèi)這個(gè)消息
             */

//            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false,false);

//            basicNack和basicReject的區(qū)別僅僅是一個(gè)可以批量一個(gè)不能批量操作而已
//            channel.basicReject();
        };

        CancelCallback cancelCallback = (s) -> {
            System.out.println(s + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };

        //采用手動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}
public class Work04 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1 等待接收消息處理時(shí)間較 長(zhǎng)");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接收到消息:" + message);
            /**
             * 1.消息標(biāo)記 tag
             * 2.是否批量應(yīng)答未應(yīng)答消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        CancelCallback cancelCallback = (s) -> {
            System.out.println(s + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };

        //采用手動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

最終效果雾狈,如果我們演示了消費(fèi)消息較慢的消費(fèi)者(Work04 )消費(fèi)消息的時(shí)候 將程序斷掉,那么消息相當(dāng)于沒(méi)有給broker確認(rèn)ack抵皱,那么broker會(huì)將本應(yīng)該屬于Work04 的消息重新投遞給其他消費(fèi)者
另外:Nack和Reject的兩種情況在Work03中寫出來(lái)了

  • 三 持久化

當(dāng) RabbitMQ 服務(wù)停掉以后善榛,默認(rèn)情況下會(huì)將隊(duì)列和消息都會(huì)刪除,
因此 呻畸,我們?cè)谏a(chǎn)者發(fā)消息的時(shí)候
1:要將隊(duì)列持久化移盆,2: 要將消息進(jìn)行持久化

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //1 設(shè)置隊(duì)列持久化
        boolean durable = true;
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        Scanner sc = new Scanner(System.in);
        System.out.println("請(qǐng)輸入信息");
        while (sc.hasNext()) {
            String message = sc.nextLine();
            //2 設(shè)置消息持久化
            //發(fā)布消息
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN , message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息" + message);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市伤为,隨后出現(xiàn)的幾起案子咒循,更是在濱河造成了極大的恐慌,老刑警劉巖钮呀,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件剑鞍,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡爽醋,警方通過(guò)查閱死者的電腦和手機(jī)蚁署,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蚂四,“玉大人光戈,你說(shuō)我怎么就攤上這事∷煸” “怎么了久妆?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)跷睦。 經(jīng)常有香客問(wèn)我筷弦,道長(zhǎng),這世上最難降的妖魔是什么抑诸? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任烂琴,我火速辦了婚禮,結(jié)果婚禮上蜕乡,老公的妹妹穿的比我還像新娘奸绷。我一直安慰自己,他們只是感情好层玲,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布号醉。 她就那樣靜靜地躺著反症,像睡著了一般。 火紅的嫁衣襯著肌膚如雪畔派。 梳的紋絲不亂的頭發(fā)上铅碍,一...
    開(kāi)封第一講書(shū)人閱讀 52,255評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音父虑,去河邊找鬼该酗。 笑死,一個(gè)胖子當(dāng)著我的面吹牛士嚎,可吹牛的內(nèi)容都是我干的呜魄。 我是一名探鬼主播,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼莱衩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼爵嗅!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起笨蚁,我...
    開(kāi)封第一講書(shū)人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤睹晒,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后括细,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體伪很,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年奋单,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了锉试。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡览濒,死狀恐怖呆盖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情贷笛,我是刑警寧澤应又,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站乏苦,受9級(jí)特大地震影響株扛,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜汇荐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一洞就、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拢驾,春花似錦奖磁、人聲如沸改基。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至稠腊,卻和暖如春躁染,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背架忌。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工吞彤, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人叹放。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓饰恕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親井仰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子埋嵌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 工作隊(duì)列 在上一個(gè)教程中,我們寫了一個(gè)從一個(gè)已經(jīng)命好名的隊(duì)列中收發(fā)消息的程序俱恶。在這個(gè)教程中雹嗦,我們將創(chuàng)建一個(gè)工作隊(duì)列...
    番薯IT閱讀 963評(píng)論 0 5
  • 說(shuō)明 在第一個(gè)教程中,我們編寫了程序來(lái)從已命名的隊(duì)列中接收和發(fā)送消息合是。在本教程中我們將創(chuàng)建一個(gè)工作隊(duì)列了罪,用于在多個(gè)...
    亼玨閱讀 243評(píng)論 0 0
  • 在上一篇文章中我們解決了最簡(jiǎn)單的helloworld 消息傳遞,這一篇中我們來(lái)探討rabbitMQ中的任務(wù)分發(fā) r...
    lsfire閱讀 633評(píng)論 0 0
  • 在上篇中我們實(shí)現(xiàn)了程序來(lái)從一個(gè)已經(jīng)命名的隊(duì)列里發(fā)送和接收消息聪全。本篇博文中我們將要?jiǎng)?chuàng)建工作隊(duì)列用來(lái)把一些比較耗時(shí)的任...
    AubreyXue閱讀 2,106評(píng)論 1 6
  • 上節(jié)介紹了在命名隊(duì)列中發(fā)送和獲取消息泊藕,本節(jié)介紹創(chuàng)建一個(gè)工作隊(duì)列,然后分發(fā)任務(wù)到多個(gè)消費(fèi)者(consumer) 編輯...
    bloke_anon閱讀 188評(píng)論 0 0