RabbitMQ:work queues 工作隊列(Round-robin/Fair dispatch)

9824247-01481244ec2d6199.jpg

在上篇中我們實現(xiàn)了程序來從一個已經(jīng)命名的隊列里發(fā)送和接收消息。本篇博文中我們將要創(chuàng)建工作隊列用來把一些比較耗時的任務(wù)分配給多個worker揽乱。

工作隊列的主要思想就是避開立刻處理某個資源消耗交大的任務(wù)并且需要等待它執(zhí)行完成晦闰。取而代之的是我們可以將它加入計劃列表朝蜘,并在后邊執(zhí)行這些任務(wù)蚂四。我們將任務(wù)分裝成一個消息歼疮,并發(fā)送到隊列中署咽。后臺的工作程序在接收到消息后將會立刻執(zhí)行任務(wù)近顷。當運行多個執(zhí)行器時,任務(wù)將會在他們之間共享宁否。

為什么會出現(xiàn)work queues?

前提:使用simple隊列的時候 我們應(yīng)用程序在是使用消息系統(tǒng)的時候,一般生產(chǎn)者P生產(chǎn)消息是毫不費力的(發(fā)送消息即可),而消費者接收完消息后的需要處理,會耗費一定的時間,這時候,就有可能導致很多消息堆積在隊列里面,一個消費者有可能不夠用 那么怎么讓消費者同事處理多個消息呢? 在同一個隊列上創(chuàng)建多個消費者,讓他們相互競爭,這樣消費者就可以同時處理多條消息了 使用任務(wù)隊列的優(yōu)點之一就是可以輕易的并行工作窒升。如果我們積壓了好多工作,我們可以通過增加工作者(消費者)來解決這一問題慕匠,使得系統(tǒng)的伸縮性更加容易饱须。


image.png

1.Round-robin(輪詢分發(fā))

輪詢分發(fā)結(jié)果就是不管服務(wù)器誰忙或清閑,都不會給誰多一個任務(wù)或少一個任務(wù)台谊,任務(wù)總是你一個我一個的分蓉媳。

1.1 消息提供者
package com.hrabbit.rabbitmq.send;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-27 下午2:37
 * @Description:
 */
public class Send {

    private final static String QUEUE_NAME = "hrabbit_queue_work";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 50; i++) {
            // 消息內(nèi)容
            String message = "." + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }
        channel.close();
        connection.close();
    }
}

1.1 消費者一號
package com.hrabbit.rabbitmq.recover;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-27 下午2:42
 * @Description:
 */
public class Received {

    private final static String QUEUE_NAME = "hrabbit_queue_work";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 聲明隊列譬挚,主要為了防止消息接收者先運行此程序,隊列還不存在時創(chuàng)建隊列酪呻。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定義一個消息的消費者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [1] 消費消息: '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [1] 消費消息結(jié)束");
                }
            }
        };
        boolean autoAck = true;
        //消息的確認模式自動應(yīng)答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }


    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(1000);
    }
}

1.1 消費者二號
package com.hrabbit.rabbitmq.recover;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-27 下午2:42
 * @Description:
 */
public class Received2 {

    private final static String QUEUE_NAME = "hrabbit_queue_work";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 聲明隊列减宣,主要為了防止消息接收者先運行此程序,隊列還不存在時創(chuàng)建隊列玩荠。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定義一個消息的消費者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [2] 消費消息: '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [2] 消費消息結(jié)束");
                }
            }
        };
        boolean autoAck = true;
        //消息的確認模式自動應(yīng)答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }


    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(2000);
    }
}

最后消費者一號和消費者二號的輸出結(jié)果:

消費者一號:


image.png

消費者二號:


image.png

注:消費者1 我們處理時間是1s ;而消費者2中處理時間是2s; 但是我們看到的現(xiàn)象并不是 1處理的多 消費者2處理的少蚪腋,消費者1中將偶數(shù)部分處理掉了 ,消費者2中將基數(shù)部分處理掉了 姨蟋。

測試結(jié)果:
1.消費者1和消費者2獲取到的消息內(nèi)容是不同的,同一個消息只能被一個消費者獲取
2.消費者1和消費者2貨到的消息數(shù)量是一樣的 一個奇數(shù)一個偶數(shù) 按道理消費者1 獲取的比消費者2要多
這種方式叫做輪詢分發(fā) 結(jié)果就是不管誰忙或清閑屉凯,都不會給誰多一個任務(wù)或少一個任務(wù),任務(wù)總是你一個我一個的分眼溶。

2.Fair dispatch(公平分發(fā))

image.png

雖然上面的分配法方式也還行悠砚,但是有個問題就是:比如:現(xiàn)在有2個消費者,所有的偶數(shù)的消息都是繁忙的堂飞,而奇數(shù)則是輕松的灌旧。按照輪詢的方式,偶數(shù)的任務(wù)交給了第一個消費者绰筛,所以一直在忙個不停枢泰。奇數(shù)的任務(wù)交給另一個消費者,則立即完成任務(wù)铝噩,然后閑得不行衡蚂。 而RabbitMQ則是不了解這些的。他是不知道你消費者的消費能力的,這是因為當消息進入隊列骏庸,RabbitMQ就會分派消息毛甲。而rabbitmq只是盲目的將消息輪詢的發(fā)給消費者。你一個我一個的這樣發(fā)送.

為了解決這個問題具被,我們使用basicQos( prefetchCount = 1)方法玻募,來限制RabbitMQ只發(fā)不超過1條的消息給同一個消費者。當消息處理完畢后一姿,有了反饋ack七咧,才會進行第二次發(fā)送。(也就是說需要手動反饋給Rabbitmq ) 還有一點需要注意叮叹,使用公平分發(fā)艾栋,必須關(guān)閉自動應(yīng)答,改為手動應(yīng)答衬横。

2.1 生產(chǎn)者

代碼基本不用修改裹粤,只需要添加一行代碼:channel.basicQos(1);

package com.hrabbit.rabbitmq.send;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-27 下午2:37
 * @Description:
 */
public class Send {

    private final static String QUEUE_NAME = "hrabbit_queue_work";

    private static Integer prefetchCount=1;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //每個消費者發(fā)送確認信號之前,消息隊列不發(fā)送下一個消息過來,一次只處理一個消息
        //限制發(fā)給同一個消費者不得超過1條消息
        channel.basicQos(prefetchCount);

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 50; i++) {
            // 消息內(nèi)容
            String message = "." + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }
        channel.close();
        connection.close();
    }
}
2.2 消費者一

首先設(shè)置channel.basicQos(1);保證一次只分發(fā)一次遥诉,其次將autoAck設(shè)置為fase,在每次消息處理完成之后拇泣,手動確認消息channel.basicAck(envelope.getDeliveryTag(), false);

package com.hrabbit.rabbitmq.recover;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-27 下午2:42
 * @Description:
 */
public class Received {

    private final static String QUEUE_NAME = "hrabbit_queue_work";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 聲明隊列,主要為了防止消息接收者先運行此程序矮锈,隊列還不存在時創(chuàng)建隊列霉翔。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//保證一次只分發(fā)一個
        //定義一個消息的消費者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [1] 消費消息: '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [1] 消費消息結(jié)束");
                    //手動應(yīng)答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        //消息的確認模式關(guān)閉自動
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }


    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(1000);
    }
}

2.2 消費者二
package com.hrabbit.rabbitmq.recover;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-27 下午2:42
 * @Description:
 */
public class Received2 {

    private final static String QUEUE_NAME = "hrabbit_queue_work";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 聲明隊列,主要為了防止消息接收者先運行此程序苞笨,隊列還不存在時創(chuàng)建隊列债朵。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//保證一次只分發(fā)一個
        //定義一個消息的消費者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [2] 消費消息: '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [2] 消費消息結(jié)束");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        //消息的確認模式自動應(yīng)答關(guān)閉
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }


    private static void doWork(String task) throws InterruptedException {
        //消息的處理時間為2000
        Thread.sleep(2000);
    }
}

最后消費者一號和消費者二號的輸出結(jié)果:
消費者一:


image.png

消費者二:


image.png

這時候現(xiàn)象就是消費者1 速度大于消費者2 。

系列文章:

RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:發(fā)布/訂閱 Publish/Subscribe
RabbitMQ:路由Routing
RabbitMQ:Topic類型的exchange
RabbitMQ:RabbitMQ之消息確認機制(事務(wù)+Confirm)
RabbitMQ:spring整合RabbitMQ

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瀑凝,一起剝皮案震驚了整個濱河市序芦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌粤咪,老刑警劉巖谚中,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異寥枝,居然都是意外死亡宪塔,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進店門囊拜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來某筐,“玉大人,你說我怎么就攤上這事冠跷∧咸埽” “怎么了?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵蔽莱,是天一觀的道長弟疆。 經(jīng)常有香客問我,道長盗冷,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任同廉,我火速辦了婚禮仪糖,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘迫肖。我一直安慰自己锅劝,他們只是感情好,可當我...
    茶點故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布蟆湖。 她就那樣靜靜地躺著故爵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪隅津。 梳的紋絲不亂的頭發(fā)上诬垂,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天劲室,我揣著相機與錄音,去河邊找鬼结窘。 笑死很洋,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的隧枫。 我是一名探鬼主播喉磁,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼官脓!你這毒婦竟也來了协怒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤卑笨,失蹤者是張志新(化名)和其女友劉穎孕暇,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體湾趾,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡芭商,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了搀缠。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铛楣。...
    茶點故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖艺普,靈堂內(nèi)的尸體忽然破棺而出簸州,到底是詐尸還是另有隱情,我是刑警寧澤歧譬,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布岸浑,位于F島的核電站,受9級特大地震影響瑰步,放射性物質(zhì)發(fā)生泄漏矢洲。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一缩焦、第九天 我趴在偏房一處隱蔽的房頂上張望读虏。 院中可真熱鬧,春花似錦袁滥、人聲如沸盖桥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽揩徊。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間塑荒,已是汗流浹背熄赡。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留袜炕,地道東北人本谜。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像偎窘,于是被迫代替她去往敵國和親乌助。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理陌知,服務(wù)發(fā)現(xiàn)他托,斷路器,智...
    卡卡羅2017閱讀 134,716評論 18 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器仆葡。支持消息的持久化赏参、事務(wù)、擁塞控...
    jiangmo閱讀 10,369評論 2 34
  • 需求分析 日志格式 數(shù)據(jù)清洗 創(chuàng)建源表由于日志文件中沿盅,字段與字段之間以空格分割把篓,而且每個字段中也存在空格,所以不能...
    心_的方向閱讀 8,574評論 2 10
  • 文/餃子 琴姐給我介紹了一份助理工作腰涧,從懷孕到孩子上幼兒園韧掩,整整四年沒有出去工作了,可是窖铡,現(xiàn)在的我如此窘迫疗锐,急需一...
    零零一餃子閱讀 1,253評論 19 17
  • 第7篇 iOS11正式版更新于20日凌晨推送,今天早上醒來看到即刻里的推送消息后就馬上升級了费彼。本文主要描述iPho...
    短暫瞬間閱讀 626評論 1 0