RabbitMQ官網(wǎng)教程翻譯(PHP版本)_2

本文所有內(nèi)容均個人從RabbitMQ官網(wǎng)教程中翻譯舶沛,若圖片文字的引用有任何侵權(quán)的地方翻诉,聯(lián)系我炮姨,我會立馬刪除捌刮。

This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.

工作隊(duì)列

(使用php-amqplib

producer->queue->consumers

在第一個教程中我們寫了程序去通過一個已經(jīng)被named(被命名)Queue(隊(duì)列)發(fā)送和接收消息舒岸。在這個教程中绅作,我們將會創(chuàng)建一個 Work Queue(工作隊(duì)列) 用于在多個處理程序中分配耗時任務(wù)。

Work Queues(工作隊(duì)列)(又名:Task Queues(任務(wù)隊(duì)列))的主要思想是避免立即處理資源密集型(resource-intensive)任務(wù)蛾派,并且必須等待它完成俄认。相反地,我們計劃讓任務(wù)稍后完成碍脏。我們把一個任務(wù)封裝成消息梭依,并且發(fā)送它到Queue(隊(duì)列)。一個在后臺運(yùn)行的工作線程將會把任務(wù)出隊(duì)并最終執(zhí)行它典尾。當(dāng)你運(yùn)行非常多的處理程序役拴,這些(多個)任務(wù)將會被均分到它們之間。

這個概念對于不可能在短暫的HTTP請求中處理一個復(fù)雜運(yùn)算的網(wǎng)站應(yīng)用來說十分有用钾埂。

準(zhǔn)備

在此教程的上一部分我們發(fā)送了一個包含 "Hello World!" 的消息『尤颍現(xiàn)在我們將會發(fā)送字符串來代替(模擬)復(fù)雜的任務(wù)杆融。我們并沒有一個真實(shí)的類似于縮放圖片或者渲染pdf的(復(fù)雜)任務(wù)怔软,所以讓我們通過使用 sleep() 函數(shù)假裝我們很忙來偽造此類復(fù)雜任務(wù)。我們以字符串中的點(diǎn)(.)來描述它的復(fù)雜度恐锣;每一個點(diǎn)(.)將會占用“工作”的一秒中髓考。例如部念,一個通過Hello...來描述的偽裝任務(wù)將會花費(fèi)三秒鐘來處理。

為了允許發(fā)送來自命令行的任意消息我們將會稍微修改上一個例子的 send.php 代碼氨菇。這個程序?qū)才湃蝿?wù)到我們的工作Queue(隊(duì)列)中儡炼,我們把他命名為 new_task.php:

$data = implode(' ',array_slice($argv,1)); 
if(empty($data)) $data = 'Hello World!';
$msg = new AMQPMessage($data);

$channel->basic_publish($msg,'','hello');

echo " [x] Sent ", $data, "\n";

我們舊的 receive.php 腳本同樣需要一點(diǎn)修改:它需要去將消息中的每一個點(diǎn)(.)偽裝成任務(wù)中花費(fèi)一秒的時間。它將會把消息從Queue(隊(duì)列)中出隊(duì)并且執(zhí)行它查蓉,所以我們將它命名為 worker.php :

$callback = function($msg){
  echo "[x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body,'.'));
  echo " [x] Done","\n";
}

$channel->basic_consume('hello','',false,true,false,false,$callback);

注意我們偽裝的任務(wù)將會模擬執(zhí)行時間乌询。

想在第一個教程中那樣運(yùn)行他們

# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."

循環(huán)調(diào)度

使用任務(wù)隊(duì)列(Task Queue) 的一個優(yōu)點(diǎn)就是它可以輕松地并行化任務(wù)。如果我們積壓了大量的任務(wù)豌研,我們只需要添加更多的工作程序妹田,這可以很輕松的進(jìn)行擴(kuò)展規(guī)模。

首先鹃共,讓我們嘗試同時運(yùn)行兩個 worker.php 腳本鬼佣。他們都將會從Queue(隊(duì)列)中獲得消息,但確切的情況又是如何呢及汉?讓我們來看看:

你需要打開三個控制臺沮趣。其中兩個運(yùn)行 worker.php 腳本。這些控制臺將會成為我們的兩個Consumer(消費(fèi)者)——C1和C2坷随。

# shell 1
php worker.php
# => [x] Waiting for messages.To exit press CTRL+C
# shell 2
php worker.php
# => [x] Waiting for messages.To exit press CTRL+C

在第三個控制臺我們將會發(fā)送一些新的任務(wù)房铭。一旦你運(yùn)行了Consumers(消費(fèi)者)你就可以推送一些消息:

# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

讓我們看一下任務(wù)交付情況:

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默認(rèn)情況下驻龟,RabbitMQ 將會按順序地發(fā)送每一個消息到下一個Consumer(消費(fèi)者)。平均每個消費(fèi)者都會得到相同數(shù)量的消息缸匪。這個分發(fā)消息的方式叫做輪詢調(diào)度 (round-robin)翁狐。使用三個或更多的處理程序來嘗試它吧。

消息確認(rèn)

執(zhí)行一個任務(wù)要花費(fèi)一些時間凌蔬。你可能會想如果其中一個Consumer(消費(fèi)者)開始了一個長任務(wù)并且只完成部分任務(wù)就掛掉了會發(fā)生什么露懒。目前我們的代碼,一旦 RabbitMQ 發(fā)送了一個消息去客戶端它會立馬標(biāo)記他們?yōu)閯h除砂心。在這一情況下懈词,如果你殺掉其中一個工作程序我們將會失去那些只是在處理中(但尚未完成)的消息。我們同樣失去了所有發(fā)送了給這個程序但還未處理的消息辩诞。

但是我們不想丟失任何任務(wù)坎弯。如果一個工作程序掛掉,我們想把這個任務(wù)發(fā)送到其他的工作程序译暂。

為了確保消息永不消失抠忘,RabbitMQ 支持 消息確認(rèn)Consumer將會反回一個應(yīng)答去告訴 RabbitMQ 一個特定的消息已經(jīng)被接收外永、處理崎脉,這時RabbitMQ 就可以自由地刪除這一消息了。

如果一個Consumer(消費(fèi)者)在反回應(yīng)答(ack)信號前掛掉了(例如它的channel(頻道) 關(guān)閉了伯顶,連接關(guān)閉了囚灼,或者TCP連接丟失了),RabbitMQ 將會知道該消息并沒有被完全處理并且會重新把它插入Queue(隊(duì)列)祭衩。如果這時有其他的Consumers(消費(fèi)者)在線啦撮,它會立馬重新發(fā)送這條消息去其他的Consumer(消費(fèi)者)。這一種方式即使處理程序意外掛掉你也能確保沒有消息丟失汪厨。

這里沒有任何消息會因?yàn)槌瑫r而丟失;當(dāng)Consumer(消費(fèi)者) 掛掉 RabbitMQ 將會重新發(fā)送消息愉择,即使處理一個消息要很長很長的時間劫乱。(一直等到能發(fā)送到下一個消費(fèi)者?)

消息確認(rèn)默認(rèn)是關(guān)閉的锥涕。當(dāng)設(shè)置 basic_consume函數(shù)的第四個參數(shù)為 falsetrue代表沒有消息確認(rèn))并且當(dāng)工作程序在完成一個任務(wù)的時候返回一個適當(dāng)?shù)膽?yīng)答信號將會開啟消息確認(rèn)衷戈。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

使用這段代碼我們將能確保沒有任何消失會丟失,即使你用ctrl+c 殺掉了一個正在處理消息的工作程序层坠。很快在該工作程序掛掉之后殖妇,所有未被應(yīng)答的消息將會被重新發(fā)送。


忘了確認(rèn)
錯過ack是一個很常見的錯誤破花。它是一個簡單的錯誤谦趣,但是后果都會很嚴(yán)重疲吸。當(dāng)你的客戶端退出時候消息將會被重新發(fā)送(這可能會像是隨機(jī)地重新發(fā)送給),但是如果不能夠釋放一些未被應(yīng)答的消息前鹅, RabbitMQ 將會占用越來越多的內(nèi)存摘悴。
為了調(diào)試這種錯誤你可以使用 rabbitmqctlmessages_unacknowledged (未被確認(rèn)的消息)都打印出來:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows平臺,省略掉sudo即可:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

消息持久化

我們已經(jīng)學(xué)習(xí)了怎樣做才能確保即使Consumer(消費(fèi)者)掛掉舰绘,任務(wù)也不會丟失蹂喻。但是如果是 RabbitMQ 服務(wù)停止了,我們的任務(wù)也同樣會丟失捂寿。

當(dāng) RabbitMQ 退出或者崩潰口四,它會把隊(duì)列與消息統(tǒng)統(tǒng)忘掉,除非你告訴它不要這樣做秦陋。為了確保消息不會丟失(即使RabbitMQ退出或崩潰)蔓彩,我們需要做兩件事情:我們需要將Queue(隊(duì)列)和消息都標(biāo)記為持久化的。

首先我們需要確保 RabbitMQ 將永不會丟失我們的Queue(隊(duì)列)踱侣。為了實(shí)現(xiàn)這個粪小,我們需要聲明它為durable(持久的)。所以我們把(聲明隊(duì)列時候的)第三個參數(shù) queue_declare 置為 true

$channel->queue_declare('hello', false, true, false, false);

盡管這條命令它本身是正確無誤的抡句,但是就目前我們(RabbitMQ)的配置來看探膊,這并不會真正生效。這是因?yàn)槲覀円呀?jīng)定義了一個并不持久化(not durable)的叫做 helloQueue(隊(duì)列)待榔。RabbitMQ將不會允許你對已經(jīng)存在的Queue(隊(duì)列)重新定義逞壁,并且當(dāng)任何程序嘗試這樣做的時候會返回一個 error 錯誤。但這有一個快速的解決方法——讓我們重新定義一個不同名字的Queue(隊(duì)列)锐锣,例如 task_queus

$channel->queue_declare('task_queue', false, true, false, false);

Producer(生產(chǎn)者)Consumer(消費(fèi)者)的代碼中這個標(biāo)記都要被設(shè)為 true.

這時候我們就能確保即使 RabbitMQ 重啟腌闯, task_queue 也不會丟失。現(xiàn)在我們需要通過設(shè)置 AMQPMessage 參數(shù)數(shù)組中的 delivery_mode = 2 這一消息屬性來確保我們的消息是持久化的雕憔。

$msg = new AMQPMessage($data,
       array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
       );

消息持久化需要注意的地方

標(biāo)記消息為持久花的并不能完全保重一條消息都不會丟失姿骏。盡管它高數(shù)了 RabbitMQ 把消息保存到磁盤,在 RabbitMQ 接收了一條消息但并未將它保存(到磁盤)也會由一段很短的時間(可能發(fā)生錯誤)斤彼。同樣 RabbitMQ 不會對所有消息都執(zhí)行 fsync(2) ——它可能只是保存在緩存中而不是真正地寫入到磁盤中分瘦。這種持久化的保證不是很強(qiáng)大,但對于我們的 Simple Task Queue (簡單任務(wù)隊(duì)列) 來說也已經(jīng)完全足夠了琉苇。如果你需要一個更加強(qiáng)大持久化嘲玫,你可以是Publisher Confirms(發(fā)送者確認(rèn))

公平調(diào)度(Fair dispatch)

你可能已經(jīng)注意到目前的調(diào)度并不能完全按照我們所希望的進(jìn)行。例如在有兩個處理程序Consumer(消費(fèi)者)的時候并扇,所有奇數(shù)的消息都十分繁雜但其他都都很輕松去团,這樣,其中一個處理程序就會一直繁忙,然而另外一個幾乎什么都不用做土陪。然而昼汗,RabbitMQ 并不知道這種情況,它將會繼續(xù)均勻地分發(fā)所有消息旺坠。

導(dǎo)致這種情況的原因是 RabbitMQ 在消息進(jìn)入Queue(隊(duì)列)的時候僅僅是分發(fā)這些消息乔遮。它(RabbitMQ)并沒有留意一個Consumer(消費(fèi)者)的未被應(yīng)答消息數(shù)量。它只是盲目地將第n條消息發(fā)送到第n個Consumer(消費(fèi)者)取刃。

Producer->Queue->Consumers

為了解決這種問題蹋肮,我們使用 basic_qos 函數(shù)并設(shè)置該函數(shù)的 (第二個參數(shù))prefetch_count = 1。這會告訴 RabbitMQ 不要在同一時間分發(fā)不止一條消息到一個處理程序上璧疗。換句話說就是坯辩,不要在處理程序處理并應(yīng)答了上一條消息前再分發(fā)一個新的消息給它。相反地崩侠,它會把這條新的消息分發(fā)到下一個并不繁忙的處理程序漆魔。

$channel->basic_qos(null, 1, null);

關(guān)于Queue(隊(duì)列) 的大小

如果所有的處理程序都在繁忙,你的隊(duì)列將會被填滿却音。你將會想關(guān)注這一點(diǎn)改抡,添加更多的處理程序,或者更換一種策略系瓢。

讓他們一起運(yùn)行

最后我們的 new_task.php 文件是這樣的:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
    array(
      'delivery_mode' =>AMQPMessage::DELIVERY_MODE_PERSISTENT
    )
 );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();
?>

(new_task.php源碼)

以及我們的 worker.php

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

(worker.php源碼)

在這章教程中我們學(xué)習(xí)到使用消息確認(rèn)以及 prefetch 你可以設(shè)立一個工作隊(duì)列阿纤。持久化配置讓讓任務(wù)在即使 RabbitMQ 重啟的情況下也能留存。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末夷陋,一起剝皮案震驚了整個濱河市欠拾,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌骗绕,老刑警劉巖藐窄,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異酬土,居然都是意外死亡荆忍,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門撤缴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來东揣,“玉大人,你說我怎么就攤上這事腹泌。” “怎么了尔觉?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵凉袱,是天一觀的道長。 經(jīng)常有香客問我,道長专甩,這世上最難降的妖魔是什么钟鸵? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮涤躲,結(jié)果婚禮上棺耍,老公的妹妹穿的比我還像新娘。我一直安慰自己种樱,他們只是感情好蒙袍,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著嫩挤,像睡著了一般害幅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上岂昭,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天以现,我揣著相機(jī)與錄音,去河邊找鬼约啊。 笑死邑遏,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的恰矩。 我是一名探鬼主播记盒,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼枢里!你這毒婦竟也來了孽鸡?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤栏豺,失蹤者是張志新(化名)和其女友劉穎彬碱,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體奥洼,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡巷疼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了灵奖。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嚼沿。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖瓷患,靈堂內(nèi)的尸體忽然破棺而出骡尽,到底是詐尸還是另有隱情,我是刑警寧澤擅编,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布攀细,位于F島的核電站箫踩,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏谭贪。R本人自食惡果不足惜境钟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望俭识。 院中可真熱鬧慨削,春花似錦、人聲如沸套媚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凑阶。三九已至猿规,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間宙橱,已是汗流浹背姨俩。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留师郑,地道東北人环葵。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像宝冕,于是被迫代替她去往敵國和親张遭。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理地梨,服務(wù)發(fā)現(xiàn)菊卷,斷路器,智...
    卡卡羅2017閱讀 134,651評論 18 139
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器宝剖。支持消息的持久化洁闰、事務(wù)、擁塞控...
    jiangmo閱讀 10,357評論 2 34
  • 本文所有內(nèi)容均個人從RabbitMQ官網(wǎng)教程中翻譯万细,若圖片文字的引用有任何侵權(quán)的地方扑眉,聯(lián)系我,我會立馬刪除赖钞。 Th...
    JobinLi閱讀 1,729評論 0 3
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,906評論 2 11
  • 十個大坑讓你累成狗(7~10) 往期回顧:解釋了造成乙方總是很“累”的三個根本原因腰素,分別是:1、甲乙雙方的權(quán)利不平...
    霧風(fēng)的幻想閱讀 870評論 9 1