本文所有內(nèi)容均個人從RabbitMQ官網(wǎng)教程中翻譯媒惕,若圖片文字的引用有任何侵權(quán)的地方泊愧,聯(lián)系我,我會立馬刪除裹驰。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
發(fā)布/訂閱
(使用php-amqplib)
在上一個教程婉弹,我們創(chuàng)建了一個工作隊列睬魂。Work Queue(工作隊列)是在每一個任務(wù)都分發(fā)給一個確切的處理程序的假設(shè)上建立的。在這一部分镀赌,我們將會做一些完全不同的事情——我們將會發(fā)送一條消息去多個Consumers(消費者)上氯哮。這種模式被稱為“發(fā)布/訂閱”。
為了說明這種模式商佛,我們將會建立一個簡單的日志系統(tǒng)喉钢。它將會包含兩個程序——第一個會發(fā)出日志消息,而另一個則會接收并打印這些消息良姆。
在我們的日志系統(tǒng)里肠虽,每一個正在運行的接收程序副本都會獲得(所有)消息。那樣我們就可以運行一個接收程序把日志引導(dǎo)到磁盤玛追;同時我們可以運行另外一個接收程序把日志打印到屏幕上來看舔痕。
基本上,被發(fā)布的消息將會在所有接收程序間進(jìn)行廣播豹缀。
交換
在本教程的上一部分,我們通過一個Queue(隊列)發(fā)送和接收消息】現(xiàn)在是時候介紹一下 RabbitMQ 的全消息模型(Full Messaging Model)了邢笙。
讓我們快速地復(fù)習(xí)前面的教程涵蓋的內(nèi)容:
1.一個Producer(生產(chǎn)者)就是一個發(fā)送消息的程序。
2.一個Queue(隊列)就是一個保存消息的緩沖(buffer)侍匙。
3.一個Consumer(消費者)就是一個接收消息的用戶程序氮惯。
RabbitMQ的消息模型的主要思想就是:Producer(生產(chǎn)者)從不會直接將消息發(fā)送到Queue(隊列)中。實際上想暗,很多時候Producer(生產(chǎn)者)甚至完全不知道一條消息將會被發(fā)送到Queue(隊列)妇汗。
相反地,Producer(生產(chǎn)者)只能發(fā)送消息到一個交換機(jī)(exchange)说莫。一個交換機(jī)就是一個非常簡單的東西杨箭。一方面(或者是叫一端更形象?)從Producer(生產(chǎn)者)處接收消息储狭,另一方面(端互婿?)它把這些消息推進(jìn)Queue(隊列)中。交換機(jī)必須準(zhǔn)確地知道對它接收的每一條消息做什么辽狈。應(yīng)該將這條消息追加到一個特別的Queue(隊列)嗎慈参?應(yīng)該將這條消息追加到多個Queue(隊列)嗎?或者應(yīng)該把這條消息丟棄掉嗎刮萌?做這些事情的規(guī)則是通過定義交換類型來確定的驮配。
這里只有少數(shù)有效的交換類型:direct
,topic
,headers
和fanout
壮锻。我們將會集中(介紹)最后一個fanout
琐旁。讓我們創(chuàng)建一個這種類型的交換機(jī)并稱他為logs
。
$channel->exchange_declare('logs', 'fanout', false, false, false);
fanout
交換機(jī)是非常簡單的躯保。從它的名稱就能猜到(反正我是不知道怎么翻譯好-_-)旋膳,它只是將它獲取到的所有消息廣播到它所知道的所有Queue(隊列)中。這與我們的日志系統(tǒng)需求十分吻合途事。
列出交換機(jī)
你可以運行十分有用的
rabbitmqctl
來列出服務(wù)器上的所有交換機(jī):sudo rabbitmqctl list_exchanges
在列出的隊列中會由一些
amq.*
交換機(jī)以及默認(rèn)的(未被命名)的交換機(jī)验懊。這是默認(rèn)創(chuàng)建的,但此時你似乎并不需要使用他們尸变。默認(rèn)的交換機(jī)
在教程的上一部分我們對交換機(jī)一無所知义图,卻仍然可以發(fā)送消息到Queues(隊列)中。這很可能是因為我們使用了一個通過空字符串("")識別的默認(rèn)的交換機(jī)召烂。
重新回顧我們之前發(fā)送一個消息的時候:
$channel->basic_publish($msg, '', 'hello');
在此處碱工,我們使用了默認(rèn)的,或者說是無名的交換機(jī):消息會被路由到
routing_key
指定的Queue(隊列)奏夫,如果這個隊列存在的化概漱。routing_key
就是basic_publish
的第三個參數(shù)。
現(xiàn)在奋姿,讓我們發(fā)送消息到一個被命名的交換機(jī)上:
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
臨時隊列
你可能記得之前我們使用的Queue(隊列)都是擁有了一個指定的名稱(還記得hello
和task_queue
嗎闺骚?)。在我們需要將工作程序指向?qū)?yīng)的Queue(隊列)時可以命名一個Queue(隊列)對于我們來說是十分重要的麻削。當(dāng)你想要在Producer(生產(chǎn)者)與Consumer(消費者)之間共享Queue(隊列)時候為Queue(隊列)命名是十分重要的蒸痹。
但是這對我們的日志記錄器來說這并不重要。我們打算監(jiān)聽所有的日志消息嗎呛哟,而不是僅僅是其中一部分叠荠。同樣我們只對當(dāng)前流動的消息感興趣,而不是舊的消息扫责。為了解決這一情況榛鼎,我們需要兩樣?xùn)|西。
首先鳖孤,無論何時借帘,我們連接到RabbitMQ時候都需要一個新的,并且是空的Queue(隊列)淌铐。為了達(dá)到這一目的肺然,我們可以使用隨機(jī)的名稱來創(chuàng)建一個Queue(隊列),或者腿准,更好的選擇是——讓RabbitMQ服務(wù)器為我們選擇一個隨機(jī)的Queue(隊列)际起。
其次拾碌,一旦我們的Consumer(消費者)斷開了鏈接,對應(yīng)的Queue(隊列)應(yīng)該被自動刪除街望。
在php-amqplib客戶端中校翔,當(dāng)我們傳給Queue(隊列)名稱參數(shù)一個空字符串時候,我們能創(chuàng)建一個非持久化隊列(non-durable Queue)灾前,并反回了一個(自動)生成的隊列名稱:
list($queue_name,,) = $channel->queue_declare('');
當(dāng)這個方法反回防症,$queue_name
變量包含了一個由RabbitMQ生成的隨機(jī)名稱。例如哎甲,它看起來會像是這樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
蔫敲。
當(dāng)這個連接宣布關(guān)閉了,對應(yīng)的Queue(隊列)將會被刪除炭玫,因為它被聲明為獨有的(exclusive)奈嘿。
綁定
我們已經(jīng)創(chuàng)建了一個fanout
交換機(jī)以及一個Queue(隊列)。現(xiàn)在我們需要告訴交換機(jī)去發(fā)送消息到我們的Queue(隊列)吞加。交換機(jī)與隊列之間的關(guān)系成為綁定(binding)裙犹。
$channel->queue_bind($queue_name, 'logs');
從現(xiàn)在起,logs
交換機(jī)將會追加消息去我們的隊列衔憨。
列出綁定
如你所想叶圃,通過以下方式你可以把正在使用的綁定(bindings)。
rabbitmqctl list_bindings
將他們放在一起
發(fā)出日志的這個Producer(生產(chǎn)者)程序與我們之前教程的不會相差太多践图。最重要的改變就是我們現(xiàn)在希望發(fā)送消息到我們的logs
交換機(jī)而不是無名的那個掺冠。下面就是emit_log.php
腳本的代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 聲明交換機(jī)
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
// 發(fā)送消息到交換機(jī)上(而不是指定隊列)
$channel->basic_publish($msg, 'logs');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
?>
這如你所看到的,在建立連接之后我們聲明了一個交換機(jī)平项。這一步十分重要因為發(fā)送消息到不存在的交換機(jī)是被禁止的,
如果已經(jīng)沒有Queue(隊列)綁定到交換機(jī)悍及,交換機(jī)的消息將會被丟失闽瓢,但對于我們來說可以接受;如果已經(jīng)沒有Consumer(消費者)監(jiān)聽該交換機(jī)的消息了心赶,我們可以安全地刪除這些消息扣讼。
reveive_logs.php
的代碼如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 聲明對應(yīng)的交換機(jī)
$channel->exchange_declare('logs', 'fanout', false, false, false);
// 創(chuàng)建一個非持久化的隊列并獲取自動生成的隊列名稱
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 綁定隊列到交換機(jī)
$channel->queue_bind($queue_name, 'logs');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果你想保存這些日志到文件,只需要打開一個控制臺缨叫,并輸入:
php reveive_logs.php > logs_from_rabbit.log
如果你想在屏幕上看到這些日志椭符,新建一個新的終端并運行:
php reveive_logs.php
當(dāng)然,你還需要發(fā)送日志:
php emit_log.php
使用rabbitmqctl list_bindings
你可以驗證這些代碼已經(jīng)如我們所想地創(chuàng)建了綁定與Queue(隊列)耻姥。如果是運行著兩個receive_logs.php
程序销钝,你將會看到類似下面的情況:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
這結(jié)果的解析很直接了當(dāng):數(shù)據(jù)從logs
交換機(jī)發(fā)送到了兩個以服務(wù)器分配的名稱命名的Queue(隊列)上。這正式我們所期望的琐簇。