本文所有內(nèi)容均個人從RabbitMQ官網(wǎng)教程中翻譯舶沛,若圖片文字的引用有任何侵權(quán)的地方翻诉,聯(lián)系我炮姨,我會立馬刪除捌刮。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
工作隊(duì)列
(使用php-amqplib)
在第一個教程中我們寫了程序去通過一個已經(jīng)被named(被命名)的Queue(隊(duì)列)發(fā)送和接收消息舒岸。在這個教程中绅作,我們將會創(chuàng)建一個 Work Queue(工作隊(duì)列) 用于在多個處理程序中分配耗時任務(wù)。
Work Queues(工作隊(duì)列)(又名:Task Queues(任務(wù)隊(duì)列))的主要思想是避免立即處理資源密集型(resource-intensive)任務(wù)蛾派,并且必須等待它完成俄认。相反地,我們計劃讓任務(wù)稍后完成碍脏。我們把一個任務(wù)封裝成消息梭依,并且發(fā)送它到Queue(隊(duì)列)。一個在后臺運(yùn)行的工作線程將會把任務(wù)出隊(duì)并最終執(zhí)行它典尾。當(dāng)你運(yùn)行非常多的處理程序役拴,這些(多個)任務(wù)將會被均分到它們之間。
這個概念對于不可能在短暫的HTTP請求中處理一個復(fù)雜運(yùn)算的網(wǎng)站應(yīng)用來說十分有用钾埂。
準(zhǔn)備
在此教程的上一部分我們發(fā)送了一個包含 "Hello World!" 的消息『尤颍現(xiàn)在我們將會發(fā)送字符串來代替(模擬)復(fù)雜的任務(wù)杆融。我們并沒有一個真實(shí)的類似于縮放圖片或者渲染pdf的(復(fù)雜)任務(wù)怔软,所以讓我們通過使用 sleep()
函數(shù)假裝我們很忙來偽造此類復(fù)雜任務(wù)。我們以字符串中的點(diǎn)(.)來描述它的復(fù)雜度恐锣;每一個點(diǎn)(.)將會占用“工作”的一秒中髓考。例如部念,一個通過Hello...
來描述的偽裝任務(wù)將會花費(fèi)三秒鐘來處理。
為了允許發(fā)送來自命令行的任意消息我們將會稍微修改上一個例子的 send.php 代碼氨菇。這個程序?qū)才湃蝿?wù)到我們的工作Queue(隊(duì)列)中儡炼,我們把他命名為 new_task.php:
$data = implode(' ',array_slice($argv,1));
if(empty($data)) $data = 'Hello World!';
$msg = new AMQPMessage($data);
$channel->basic_publish($msg,'','hello');
echo " [x] Sent ", $data, "\n";
我們舊的 receive.php 腳本同樣需要一點(diǎn)修改:它需要去將消息中的每一個點(diǎn)(.)偽裝成任務(wù)中花費(fèi)一秒的時間。它將會把消息從Queue(隊(duì)列)中出隊(duì)并且執(zhí)行它查蓉,所以我們將它命名為 worker.php :
$callback = function($msg){
echo "[x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body,'.'));
echo " [x] Done","\n";
}
$channel->basic_consume('hello','',false,true,false,false,$callback);
注意我們偽裝的任務(wù)將會模擬執(zhí)行時間乌询。
想在第一個教程中那樣運(yùn)行他們
# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."
循環(huán)調(diào)度
使用任務(wù)隊(duì)列(Task Queue) 的一個優(yōu)點(diǎn)就是它可以輕松地并行化任務(wù)。如果我們積壓了大量的任務(wù)豌研,我們只需要添加更多的工作程序妹田,這可以很輕松的進(jìn)行擴(kuò)展規(guī)模。
首先鹃共,讓我們嘗試同時運(yùn)行兩個 worker.php 腳本鬼佣。他們都將會從Queue(隊(duì)列)中獲得消息,但確切的情況又是如何呢及汉?讓我們來看看:
你需要打開三個控制臺沮趣。其中兩個運(yùn)行 worker.php 腳本。這些控制臺將會成為我們的兩個Consumer(消費(fèi)者)——C1和C2坷随。
# shell 1
php worker.php
# => [x] Waiting for messages.To exit press CTRL+C
# shell 2
php worker.php
# => [x] Waiting for messages.To exit press CTRL+C
在第三個控制臺我們將會發(fā)送一些新的任務(wù)房铭。一旦你運(yùn)行了Consumers(消費(fèi)者)你就可以推送一些消息:
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
讓我們看一下任務(wù)交付情況:
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默認(rèn)情況下驻龟,RabbitMQ 將會按順序地發(fā)送每一個消息到下一個Consumer(消費(fèi)者)。平均每個消費(fèi)者都會得到相同數(shù)量的消息缸匪。這個分發(fā)消息的方式叫做輪詢調(diào)度 (round-robin)翁狐。使用三個或更多的處理程序來嘗試它吧。
消息確認(rèn)
執(zhí)行一個任務(wù)要花費(fèi)一些時間凌蔬。你可能會想如果其中一個Consumer(消費(fèi)者)開始了一個長任務(wù)并且只完成部分任務(wù)就掛掉了會發(fā)生什么露懒。目前我們的代碼,一旦 RabbitMQ 發(fā)送了一個消息去客戶端它會立馬標(biāo)記他們?yōu)閯h除砂心。在這一情況下懈词,如果你殺掉其中一個工作程序我們將會失去那些只是在處理中(但尚未完成)的消息。我們同樣失去了所有發(fā)送了給這個程序但還未處理的消息辩诞。
但是我們不想丟失任何任務(wù)坎弯。如果一個工作程序掛掉,我們想把這個任務(wù)發(fā)送到其他的工作程序译暂。
為了確保消息永不消失抠忘,RabbitMQ 支持 消息確認(rèn)。Consumer將會反回一個應(yīng)答去告訴 RabbitMQ 一個特定的消息已經(jīng)被接收外永、處理崎脉,這時RabbitMQ 就可以自由地刪除這一消息了。
如果一個Consumer(消費(fèi)者)在反回應(yīng)答(ack)信號前掛掉了(例如它的channel(頻道) 關(guān)閉了伯顶,連接關(guān)閉了囚灼,或者TCP連接丟失了),RabbitMQ 將會知道該消息并沒有被完全處理并且會重新把它插入Queue(隊(duì)列)祭衩。如果這時有其他的Consumers(消費(fèi)者)在線啦撮,它會立馬重新發(fā)送這條消息去其他的Consumer(消費(fèi)者)。這一種方式即使處理程序意外掛掉你也能確保沒有消息丟失汪厨。
這里沒有任何消息會因?yàn)槌瑫r而丟失;當(dāng)Consumer(消費(fèi)者) 掛掉 RabbitMQ 將會重新發(fā)送消息愉择,即使處理一個消息要很長很長的時間劫乱。(一直等到能發(fā)送到下一個消費(fèi)者?)
消息確認(rèn)默認(rèn)是關(guān)閉的锥涕。當(dāng)設(shè)置 basic_consume
函數(shù)的第四個參數(shù)為 false
(true
代表沒有消息確認(rèn))并且當(dāng)工作程序在完成一個任務(wù)的時候返回一個適當(dāng)?shù)膽?yīng)答信號將會開啟消息確認(rèn)衷戈。
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用這段代碼我們將能確保沒有任何消失會丟失,即使你用ctrl+c
殺掉了一個正在處理消息的工作程序层坠。很快在該工作程序掛掉之后殖妇,所有未被應(yīng)答的消息將會被重新發(fā)送。
忘了確認(rèn)
錯過ack
是一個很常見的錯誤破花。它是一個簡單的錯誤谦趣,但是后果都會很嚴(yán)重疲吸。當(dāng)你的客戶端退出時候消息將會被重新發(fā)送(這可能會像是隨機(jī)地重新發(fā)送給),但是如果不能夠釋放一些未被應(yīng)答的消息前鹅, RabbitMQ 將會占用越來越多的內(nèi)存摘悴。
為了調(diào)試這種錯誤你可以使用rabbitmqctl
把messages_unacknowledged
(未被確認(rèn)的消息)都打印出來:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows平臺,省略掉sudo即可:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
我們已經(jīng)學(xué)習(xí)了怎樣做才能確保即使Consumer(消費(fèi)者)掛掉舰绘,任務(wù)也不會丟失蹂喻。但是如果是 RabbitMQ 服務(wù)停止了,我們的任務(wù)也同樣會丟失捂寿。
當(dāng) RabbitMQ 退出或者崩潰口四,它會把隊(duì)列與消息統(tǒng)統(tǒng)忘掉,除非你告訴它不要這樣做秦陋。為了確保消息不會丟失(即使RabbitMQ退出或崩潰)蔓彩,我們需要做兩件事情:我們需要將Queue(隊(duì)列)和消息都標(biāo)記為持久化的。
首先我們需要確保 RabbitMQ 將永不會丟失我們的Queue(隊(duì)列)踱侣。為了實(shí)現(xiàn)這個粪小,我們需要聲明它為durable
(持久的)。所以我們把(聲明隊(duì)列時候的)第三個參數(shù) queue_declare
置為 true
:
$channel->queue_declare('hello', false, true, false, false);
盡管這條命令它本身是正確無誤的抡句,但是就目前我們(RabbitMQ)的配置來看探膊,這并不會真正生效。這是因?yàn)槲覀円呀?jīng)定義了一個并不持久化(not durable)的叫做 hello
的 Queue(隊(duì)列)待榔。RabbitMQ將不會允許你對已經(jīng)存在的Queue(隊(duì)列)重新定義逞壁,并且當(dāng)任何程序嘗試這樣做的時候會返回一個 error
錯誤。但這有一個快速的解決方法——讓我們重新定義一個不同名字的Queue(隊(duì)列)锐锣,例如 task_queus
:
$channel->queue_declare('task_queue', false, true, false, false);
Producer(生產(chǎn)者)和Consumer(消費(fèi)者)的代碼中這個標(biāo)記都要被設(shè)為 true
.
這時候我們就能確保即使 RabbitMQ 重啟腌闯, task_queue
也不會丟失。現(xiàn)在我們需要通過設(shè)置 AMQPMessage
參數(shù)數(shù)組中的 delivery_mode = 2
這一消息屬性來確保我們的消息是持久化的雕憔。
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
消息持久化需要注意的地方
標(biāo)記消息為持久花的并不能完全保重一條消息都不會丟失姿骏。盡管它高數(shù)了 RabbitMQ 把消息保存到磁盤,在 RabbitMQ 接收了一條消息但并未將它保存(到磁盤)也會由一段很短的時間(可能發(fā)生錯誤)斤彼。同樣 RabbitMQ 不會對所有消息都執(zhí)行
fsync(2)
——它可能只是保存在緩存中而不是真正地寫入到磁盤中分瘦。這種持久化的保證不是很強(qiáng)大,但對于我們的 Simple Task Queue (簡單任務(wù)隊(duì)列) 來說也已經(jīng)完全足夠了琉苇。如果你需要一個更加強(qiáng)大持久化嘲玫,你可以是Publisher Confirms(發(fā)送者確認(rèn))
公平調(diào)度(Fair dispatch)
你可能已經(jīng)注意到目前的調(diào)度并不能完全按照我們所希望的進(jìn)行。例如在有兩個處理程序( Consumer(消費(fèi)者) )的時候并扇,所有奇數(shù)的消息都十分繁雜但其他都都很輕松去团,這樣,其中一個處理程序就會一直繁忙,然而另外一個幾乎什么都不用做土陪。然而昼汗,RabbitMQ 并不知道這種情況,它將會繼續(xù)均勻地分發(fā)所有消息旺坠。
導(dǎo)致這種情況的原因是 RabbitMQ 在消息進(jìn)入Queue(隊(duì)列)的時候僅僅是分發(fā)這些消息乔遮。它(RabbitMQ)并沒有留意一個Consumer(消費(fèi)者)的未被應(yīng)答消息數(shù)量。它只是盲目地將第n條消息發(fā)送到第n個Consumer(消費(fèi)者)取刃。
為了解決這種問題蹋肮,我們使用 basic_qos
函數(shù)并設(shè)置該函數(shù)的 (第二個參數(shù))prefetch_count = 1
。這會告訴 RabbitMQ 不要在同一時間分發(fā)不止一條消息到一個處理程序上璧疗。換句話說就是坯辩,不要在處理程序處理并應(yīng)答了上一條消息前再分發(fā)一個新的消息給它。相反地崩侠,它會把這條新的消息分發(fā)到下一個并不繁忙的處理程序漆魔。
$channel->basic_qos(null, 1, null);
關(guān)于Queue(隊(duì)列) 的大小
如果所有的處理程序都在繁忙,你的隊(duì)列將會被填滿却音。你將會想關(guān)注這一點(diǎn)改抡,添加更多的處理程序,或者更換一種策略系瓢。
讓他們一起運(yùn)行
最后我們的 new_task.php
文件是這樣的:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array(
'delivery_mode' =>AMQPMessage::DELIVERY_MODE_PERSISTENT
)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
以及我們的 worker.php
:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
在這章教程中我們學(xué)習(xí)到使用消息確認(rèn)以及 prefetch
你可以設(shè)立一個工作隊(duì)列阿纤。持久化配置讓讓任務(wù)在即使 RabbitMQ 重啟的情況下也能留存。