RabbitMQ(三) - 工作隊列(Work Queues)

工作隊列

rabbitmq-work-queues

上一個教程中州邢,我們寫了一個從一個已經(jīng)命好名的隊列中收發(fā)消息的程序襟诸。在這個教程中淀歇,我們將創(chuàng)建一個工作隊列用來在多個工作者之間分發(fā)耗時(time-consuming)任務线婚。

工作隊列(又名:任務隊列)背后的主要思想是避免立即做資源密集型的任務并且要等到它完成。相反蜒灰,我們調(diào)度這個任務在以后完成弦蹂。我們封裝一個消息任務并把它發(fā)送給隊列。一個工作進程在后臺運行:取出任務并最終執(zhí)行這個任務强窖。如果跑了多個工作任務凸椿,那么消息被它們共享。

這些概念在web應用中是特別有用的翅溺,在很短的http請求完成一個復雜的任務脑漫。

準備

在前面的一個教程中我們發(fā)送了一個包含“Hello World!”的消息∷枰郑現(xiàn)在我們打算發(fā)送一個字符串來代替一個復雜的任務。我們沒有一個真正的任務优幸,比如改變圖片大小或者渲染一個pdf文件吨拍,我們假裝我們很忙-通過使用Thread.sleep()方法。我們將以.的數(shù)量來表示任務的復雜度网杆。每一個點表示需要“工作”1s羹饰,例如:一個假任務描述為:Hello...表示需要花費3秒的時間。

我們將簡單修改一下我們之前的例子Send.java碳却,官網(wǎng)的例子是用命令行队秩,但是我們用IDE,所以不和官網(wǎng)的一樣了昼浦。官網(wǎng)用命令行連發(fā)了5條消息刹碾,我們將用for循環(huán)來實現(xiàn)。新的程序我們命名為NewTask.java座柱,以下是所有代碼:

package com.roachfu.tutorial.rabbitmq.website.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作隊列例子
 */
public class NewTask {

    private static final String QUEUE_NAME = "work.queue";

    private static final String[] strings = {
            "First message.",
            "Second message..",
            "Third message...",
            "Fourth message....",
            "Fifth message....."
    };

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String[] messages = strings;
        for (String message: messages){
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Send '" + message + "'");
        }

        channel.close();
        connection.close();
    }

}

我們老的Recv.java也是只需要稍微修改一下迷帜。只需要對.進行一個處理,我們將新的程序命名為Worker.java色洞。以下是全部代碼:

package com.roachfu.tutorial.rabbitmq.website.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作隊列消費端
 */
public class Worker {

    private static final String QUEUE_NAME = "work.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    doWork(message);
                }finally {
                    System.out.println(" [x] Done");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);

    }

    private static void doWork(String message) {
        for (char ch : message.toCharArray()){
            if (ch == '.'){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

輪循分發(fā)(Round-robin dispatching)

任務隊列的一個好處就是能很簡單的并行工作戏锹。如果我們有積壓的工作,我們只需要添加更多的工作者火诸,很容易擴展锦针。

首先,讓我們同時跑兩個工作者實例置蜀。它們都將從隊列中獲取消息奈搜,結(jié)果怎樣,我們拭目以待盯荤。

你需要開三個控制臺馋吗,兩個跑工作者程序。這兩個就是我們的消費者 - C1C2秋秤。

rabbitmq-worker

第三個控制臺我們發(fā)布新的任務宏粤。一旦你啟動好了消費者們,你就可以發(fā)布消息了:即執(zhí)行生產(chǎn)者程序NewTask.java灼卢。以下是執(zhí)行結(jié)果:

rabbitmq-newtask

下面是當執(zhí)行上面的程序之后绍哎,兩個worker的輸出:

worker-1

rabbitmq-worker-1

worker-2

rabbitmq-worker-2

默認的,RabbitMQ會有序的將一個個消息交付給下一個消費者鞋真。平均每個消費者將會得到相同數(shù)量的消息崇堰。這種分發(fā)消息的方式叫輪循(round-robin)。可以嘗試3個或更多的工作者海诲。

消息確認(message acknowledgment)

完成一個任務需要花費一些時間繁莹,你可以想象一個需要較長時間完成的任務在執(zhí)行的中途中掛了會發(fā)生什么。我們當前的代碼饿肺,一旦RabbitMQ將消息交付給客戶,它將立即從內(nèi)存中被移除盾似。在這個例子中敬辣,如果你在執(zhí)行過程中殺死一個工作者我們將丟失這個消息。我們也會丟失所有分發(fā)給指定的這個工作者但是還沒有處理的消息零院。

但是我們不想丟失任何任務溉跃。如果一個工作者掛了,我們希望這個任務能交付給另一個工作者告抄。

為了確保消息永遠不會丟失撰茎,RabbitMQ提供了消息確認機制,消費者向RabbitMQ中發(fā)送一個確認表示消息已經(jīng)接收打洼、處理并且RabbitMQ可以自由的刪除它了龄糊。

如果一個消費者掛了(通道關閉,連接關閉或者TCP連接丟失)沒有發(fā)送確認募疮,RabbitMQ將會理解成消息沒有被完全處理并將消息發(fā)回隊列炫惩。如果這個時候有其他消費者在線,它將被快速的交付給另一個消費者阿浓。通過這種方式能確保沒有消息丟失他嚷,即使工作者偶爾掛掉。

這里沒有消息超時芭毙,當消費者掛了RabbitMQ將會重新交付消息筋蓖。如果一個消息的處理過程花費了很長很長的時間,這個是允許的退敦。

消息確認默認是打開的粘咖,在上一個例子中我們可以通過設置標志autoAck=true將其顯示的關掉。這里我們將標志設置為false并當我們完成任務后發(fā)送一個確認侈百。以下是修改后的代碼片段:

//只接受一個未確認的消息(見下文)
channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");

        try {
            doWork(message);
        }finally {
            System.out.println(" [x] Done");
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
};
// 自動確認設置為false
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

使用這些代碼涂炎,我們能確保即使我們在消息處理的過程中殺死消費者,也不會有消息丟失设哗,因為所有未確認的消息在工作線程掛掉后都會被重新交付到其他消費者唱捣。

消息持久化(Message Durability)

我們已經(jīng)學習如何確保即使消費者掛了,消息也不會丟失网梢。但是如果RabbitMQ服務掛了震缭,我們還是會丟失我們的消息。

當RabbitMQ退出或者崩潰战虏,它將忽略掉隊列和消息拣宰,除非你告訴它不要忽略党涕。兩個步驟確保消息不會丟失:我們需要將隊列和消息都標志為持久化的。

第一巡社,我們需要保證RabbitMQ永遠不會丟失我們的隊列膛堤。為了能實現(xiàn)它,我們需要將其定義為持久化的晌该。

boolean durable = true;
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

如果QUEUE_NAME這個隊列名在之前就已經(jīng)使用肥荔,并且沒有設置為持久化,那么我們需要重新設置一個隊列名朝群,不然就會有沖突燕耿。RabbitMQ是不允許設置一個既是持久化又是非持久化的隊列存在的。

上面的定義需要將消費者和生產(chǎn)者都改掉姜胖。

在這里即使RabbitMQ重啟我們也能確保隊列不會丟失∮В現(xiàn)在我們需要使我們的消息是持久化的——通過設置MessageProperties(它實現(xiàn)了BasicProperties)的值為PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

注意:消息持久化

設置消息持久化并不能完全保證消息不會丟失右莱。即使告知RabbitMQ將消息持久化到磁盤蚜锨,但是RabbitMQ還是會出現(xiàn)已經(jīng)接受到消息但是卻沒有保存它的情況。并且慢蜓,RabbitMQ不會為每個消息執(zhí)行fsync(2)——它可能只是保存到內(nèi)存中但是沒有真的寫到磁盤中踏志。持久化并不是強持久化的,但是對于簡單的任務隊列已經(jīng)足夠使用了胀瞪。如果你需要一個強持久化针余,你可以使用publisher confirms

公平分發(fā)

你可能已經(jīng)注意到分發(fā)還是不能完全的達到我們想要的效果凄诞。例如:有這么一種情形圆雁,對于兩個工作者,基數(shù)任務很繁重帆谍,偶數(shù)任務很輕松伪朽,一個工作者就會不間斷的工作而另一個將幾乎不做什么任務。然而RabbitMQ并不知道這些汛蝙,依然在這樣分發(fā)消息烈涮。

發(fā)生這種情況的原因是RabbitMQ只是單純的當消息發(fā)送到隊列后將消息進行分發(fā)。它并不關心消費者未確認的消息數(shù)量窖剑。它只是盲目的將每N個消息發(fā)送給n個消費者坚洽。

rabbitmq-prefetchcount

我們可以使用basicQos方法設置prefetchCount = 1防止這樣的失敗。這個將告知RabbitMQ不要在同一時間將很多消息給消費者西土。換句話說讶舰,在前一個消息完成并確認之前不要再將新的消息分發(fā)給這個消費者。而是將消息分發(fā)到下一個不忙的消費者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意:隊列大小

如果所有的消息者都在忙跳昼,你的隊列又使用完了般甲。你需要關注這個,也許需要增加更多的消費者鹅颊,或者其他的策略敷存。

以下是所有的代碼整合

NewTask.java

package com.roachfu.tutorial.rabbitmq.website.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作隊列例子
 */
public class NewTask {

    private static final String QUEUE_NAME = "task.queue";

    private static final String[] strings = {
            "First message.",
            "Second message..",
            "Third message...",
            "Fourth message....",
            "Fifth message....."
    };

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        String[] messages = strings;
        for (String message: messages){
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println(" [x] Send '" + message + "'");
        }

        channel.close();
        connection.close();
    }

}

Worker.java

package com.roachfu.tutorial.rabbitmq.website.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作隊列消費端
 */
public class Worker {

    private static final String QUEUE_NAME = "task.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        //只接受一個未確認的消息(見下文)
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    doWork(message);
                }finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 自動確認設置為false
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);

    }

    private static void doWork(String message) {
        for (char ch : message.toCharArray()){
            if (ch == '.'){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

下個教程我們將學習怎么將相同的消息交付給多個消費者。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末堪伍,一起剝皮案震驚了整個濱河市锚烦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌杠娱,老刑警劉巖挽牢,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谱煤,死亡現(xiàn)場離奇詭異摊求,居然都是意外死亡,警方通過查閱死者的電腦和手機刘离,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門室叉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人硫惕,你說我怎么就攤上這事茧痕。” “怎么了恼除?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵踪旷,是天一觀的道長。 經(jīng)常有香客問我豁辉,道長令野,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任徽级,我火速辦了婚禮气破,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘餐抢。我一直安慰自己现使,他們只是感情好,可當我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布旷痕。 她就那樣靜靜地躺著碳锈,像睡著了一般。 火紅的嫁衣襯著肌膚如雪欺抗。 梳的紋絲不亂的頭發(fā)上殴胧,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天,我揣著相機與錄音,去河邊找鬼团滥。 笑死竿屹,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的灸姊。 我是一名探鬼主播拱燃,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼力惯!你這毒婦竟也來了碗誉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤父晶,失蹤者是張志新(化名)和其女友劉穎哮缺,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體甲喝,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡尝苇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了埠胖。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片糠溜。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖直撤,靈堂內(nèi)的尸體忽然破棺而出非竿,到底是詐尸還是另有隱情,我是刑警寧澤谋竖,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布红柱,位于F島的核電站,受9級特大地震影響蓖乘,放射性物質(zhì)發(fā)生泄漏锤悄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一驱敲、第九天 我趴在偏房一處隱蔽的房頂上張望铁蹈。 院中可真熱鬧,春花似錦众眨、人聲如沸姑廉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽叔扼。三九已至,卻和暖如春狈定,著一層夾襖步出監(jiān)牢的瞬間颂龙,已是汗流浹背习蓬。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留措嵌,地道東北人躲叼。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像企巢,于是被迫代替她去往敵國和親枫慷。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器浪规。支持消息的持久化或听、事務、擁塞控...
    jiangmo閱讀 10,367評論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理笋婿,服務發(fā)現(xiàn)誉裆,斷路器,智...
    卡卡羅2017閱讀 134,701評論 18 139
  • RabbitMQ詳解 本文地址:http://www.host900.com/index.php/articles...
    嘉加家佳七閱讀 2,518評論 0 9
  • 1. 歷史 RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,096評論 3 51
  • 幸福來源于內(nèi)心缸濒,植根于選擇足丢。 中學時以為考上大學就輕松了,上大學時以為工作以后就自由了绍填,談戀愛時以為結(jié)婚以后就幸福...
    不雨蕭瀟閱讀 385評論 0 3