工作隊列
在上一個教程中州邢,我們寫了一個從一個已經(jīng)命好名的隊列中收發(fā)消息的程序襟诸。在這個教程中淀歇,我們將創(chuàng)建一個工作隊列用來在多個工作者之間分發(fā)耗時(time-consuming)任務线婚。
工作隊列(又名:任務隊列)背后的主要思想是避免立即做資源密集型的任務并且要等到它完成。相反蜒灰,我們調(diào)度這個任務在以后完成弦蹂。我們封裝一個消息任務并把它發(fā)送給隊列。一個工作進程在后臺運行:取出任務并最終執(zhí)行這個任務强窖。如果跑了多個工作任務凸椿,那么消息被它們共享。
這些概念在web應用中是特別有用的翅溺,在很短的http請求完成一個復雜的任務脑漫。
準備
在前面的一個教程中我們發(fā)送了一個包含“Hello World!”的消息∷枰郑現(xiàn)在我們打算發(fā)送一個字符串來代替一個復雜的任務。我們沒有一個真正的任務优幸,比如改變圖片大小或者渲染一個pdf文件吨拍,我們假裝我們很忙-通過使用Thread.sleep()
方法。我們將以.
的數(shù)量來表示任務的復雜度网杆。每一個點表示需要“工作”1s羹饰,例如:一個假任務描述為:Hello...
表示需要花費3秒的時間。
我們將簡單修改一下我們之前的例子Send.java
碳却,官網(wǎng)的例子是用命令行队秩,但是我們用IDE,所以不和官網(wǎng)的一樣了昼浦。官網(wǎng)用命令行連發(fā)了5條消息刹碾,我們將用for循環(huán)來實現(xiàn)。新的程序我們命名為NewTask.java
座柱,以下是所有代碼:
package com.roachfu.tutorial.rabbitmq.website.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作隊列例子
*/
public class NewTask {
private static final String QUEUE_NAME = "work.queue";
private static final String[] strings = {
"First message.",
"Second message..",
"Third message...",
"Fourth message....",
"Fifth message....."
};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String[] messages = strings;
for (String message: messages){
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Send '" + message + "'");
}
channel.close();
connection.close();
}
}
我們老的Recv.java
也是只需要稍微修改一下迷帜。只需要對.
進行一個處理,我們將新的程序命名為Worker.java
色洞。以下是全部代碼:
package com.roachfu.tutorial.rabbitmq.website.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作隊列消費端
*/
public class Worker {
private static final String QUEUE_NAME = "work.queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
}finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String message) {
for (char ch : message.toCharArray()){
if (ch == '.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
輪循分發(fā)(Round-robin dispatching)
任務隊列的一個好處就是能很簡單的并行工作戏锹。如果我們有積壓的工作,我們只需要添加更多的工作者火诸,很容易擴展锦针。
首先,讓我們同時跑兩個工作者實例置蜀。它們都將從隊列中獲取消息奈搜,結(jié)果怎樣,我們拭目以待盯荤。
你需要開三個控制臺馋吗,兩個跑工作者程序。這兩個就是我們的消費者 - C1
和 C2
秋秤。
第三個控制臺我們發(fā)布新的任務宏粤。一旦你啟動好了消費者們,你就可以發(fā)布消息了:即執(zhí)行生產(chǎn)者程序NewTask.java
灼卢。以下是執(zhí)行結(jié)果:
下面是當執(zhí)行上面的程序之后绍哎,兩個worker的輸出:
worker-1
worker-2
默認的,RabbitMQ會有序的將一個個消息交付給下一個消費者鞋真。平均每個消費者將會得到相同數(shù)量的消息崇堰。這種分發(fā)消息的方式叫輪循(round-robin)。可以嘗試3個或更多的工作者海诲。
消息確認(message acknowledgment)
完成一個任務需要花費一些時間繁莹,你可以想象一個需要較長時間完成的任務在執(zhí)行的中途中掛了會發(fā)生什么。我們當前的代碼饿肺,一旦RabbitMQ將消息交付給客戶,它將立即從內(nèi)存中被移除盾似。在這個例子中敬辣,如果你在執(zhí)行過程中殺死一個工作者我們將丟失這個消息。我們也會丟失所有分發(fā)給指定的這個工作者但是還沒有處理的消息零院。
但是我們不想丟失任何任務溉跃。如果一個工作者掛了,我們希望這個任務能交付給另一個工作者告抄。
為了確保消息永遠不會丟失撰茎,RabbitMQ提供了消息確認機制,消費者向RabbitMQ中發(fā)送一個確認表示消息已經(jīng)接收打洼、處理并且RabbitMQ可以自由的刪除它了龄糊。
如果一個消費者掛了(通道關閉,連接關閉或者TCP連接丟失)沒有發(fā)送確認募疮,RabbitMQ將會理解成消息沒有被完全處理并將消息發(fā)回隊列炫惩。如果這個時候有其他消費者在線,它將被快速的交付給另一個消費者阿浓。通過這種方式能確保沒有消息丟失他嚷,即使工作者偶爾掛掉。
這里沒有消息超時芭毙,當消費者掛了RabbitMQ將會重新交付消息筋蓖。如果一個消息的處理過程花費了很長很長的時間,這個是允許的退敦。
消息確認默認是打開的粘咖,在上一個例子中我們可以通過設置標志autoAck=true
將其顯示的關掉。這里我們將標志設置為false
并當我們完成任務后發(fā)送一個確認侈百。以下是修改后的代碼片段:
//只接受一個未確認的消息(見下文)
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
}finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自動確認設置為false
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
使用這些代碼涂炎,我們能確保即使我們在消息處理的過程中殺死消費者,也不會有消息丟失设哗,因為所有未確認的消息在工作線程掛掉后都會被重新交付到其他消費者唱捣。
消息持久化(Message Durability)
我們已經(jīng)學習如何確保即使消費者掛了,消息也不會丟失网梢。但是如果RabbitMQ服務掛了震缭,我們還是會丟失我們的消息。
當RabbitMQ退出或者崩潰战虏,它將忽略掉隊列和消息拣宰,除非你告訴它不要忽略党涕。兩個步驟確保消息不會丟失:我們需要將隊列和消息都標志為持久化的。
第一巡社,我們需要保證RabbitMQ永遠不會丟失我們的隊列膛堤。為了能實現(xiàn)它,我們需要將其定義為持久化的晌该。
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
如果QUEUE_NAME
這個隊列名在之前就已經(jīng)使用肥荔,并且沒有設置為持久化,那么我們需要重新設置一個隊列名朝群,不然就會有沖突燕耿。RabbitMQ是不允許設置一個既是持久化又是非持久化的隊列存在的。
上面的定義需要將消費者和生產(chǎn)者都改掉姜胖。
在這里即使RabbitMQ重啟我們也能確保隊列不會丟失∮В現(xiàn)在我們需要使我們的消息是持久化的——通過設置MessageProperties
(它實現(xiàn)了BasicProperties)的值為PERSISTENT_TEXT_PLAIN
。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
注意:消息持久化
設置消息持久化并不能完全保證消息不會丟失右莱。即使告知RabbitMQ將消息持久化到磁盤蚜锨,但是RabbitMQ還是會出現(xiàn)已經(jīng)接受到消息但是卻沒有保存它的情況。并且慢蜓,RabbitMQ不會為每個消息執(zhí)行
fsync(2)
——它可能只是保存到內(nèi)存中但是沒有真的寫到磁盤中踏志。持久化并不是強持久化的,但是對于簡單的任務隊列已經(jīng)足夠使用了胀瞪。如果你需要一個強持久化针余,你可以使用publisher confirms。
公平分發(fā)
你可能已經(jīng)注意到分發(fā)還是不能完全的達到我們想要的效果凄诞。例如:有這么一種情形圆雁,對于兩個工作者,基數(shù)任務很繁重帆谍,偶數(shù)任務很輕松伪朽,一個工作者就會不間斷的工作而另一個將幾乎不做什么任務。然而RabbitMQ并不知道這些汛蝙,依然在這樣分發(fā)消息烈涮。
發(fā)生這種情況的原因是RabbitMQ只是單純的當消息發(fā)送到隊列后將消息進行分發(fā)。它并不關心消費者未確認的消息數(shù)量窖剑。它只是盲目的將每N個消息發(fā)送給n個消費者坚洽。
我們可以使用basicQos
方法設置prefetchCount = 1
防止這樣的失敗。這個將告知RabbitMQ不要在同一時間將很多消息給消費者西土。換句話說讶舰,在前一個消息完成并確認之前不要再將新的消息分發(fā)給這個消費者。而是將消息分發(fā)到下一個不忙的消費者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意:隊列大小
如果所有的消息者都在忙跳昼,你的隊列又使用完了般甲。你需要關注這個,也許需要增加更多的消費者鹅颊,或者其他的策略敷存。
以下是所有的代碼整合
package com.roachfu.tutorial.rabbitmq.website.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作隊列例子
*/
public class NewTask {
private static final String QUEUE_NAME = "task.queue";
private static final String[] strings = {
"First message.",
"Second message..",
"Third message...",
"Fourth message....",
"Fifth message....."
};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
String[] messages = strings;
for (String message: messages){
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [x] Send '" + message + "'");
}
channel.close();
connection.close();
}
}
package com.roachfu.tutorial.rabbitmq.website.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作隊列消費端
*/
public class Worker {
private static final String QUEUE_NAME = "task.queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
//只接受一個未確認的消息(見下文)
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
}finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自動確認設置為false
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String message) {
for (char ch : message.toCharArray()){
if (ch == '.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
下個教程我們將學習怎么將相同的消息交付給多個消費者。