rabbitMq 教程二:工作隊(duì)列
生產(chǎn)者
/**
* 生產(chǎn)者
*/
public function actionTask()
{
$queue='hello-task';
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare($queue, false, false, false, false);
for ($i=0;$i<=10;$i++){
$message="send {$i}";
$msg = new AMQPMessage($message);
$channel->basic_publish($msg, '', $queue);
echo ' [x] Sent ', $message, "\n";
}
//關(guān)閉
$channel->close();
$connection->close();
}
消費(fèi)者:
RabbitMQ調(diào)度隊(duì)列有兩種方式
1、公平調(diào)度(平均分)
2、預(yù)取調(diào)度(能者多勞)
代碼示例1边败、公平調(diào)度:
public function actionFairWorker()
{
$queue='hello--fair-task';
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare($queue, false, false, false, false);
$callback = function ($msg) {
echo ' [x] work ', $msg->body, "\n";
// sleep(2);
};
//監(jiān)聽(tīng)隊(duì)列 ture表自動(dòng) false表手動(dòng)返回完成狀態(tài)
$no_ack=true;
$channel->basic_consume($queue, '', false, $no_ack, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
}
代碼示例2减途、預(yù)取調(diào)度:
public function actionWorker()
{
$queue='hello-task';
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare($queue, false, false, false, false);
$callback = function ($msg) {
echo ' [x] work1 ', $msg->body, "\n";
sleep(2);
//手動(dòng)完成狀態(tài)
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//設(shè)置消費(fèi)者預(yù)取的消費(fèi)數(shù)量
$channel->basic_qos(null, 1, null);
//監(jiān)聽(tīng)隊(duì)列 ture表自動(dòng) false表手動(dòng)返回完成狀態(tài)
$no_ack=false;
$channel->basic_consume($queue, '', false, $no_ack, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
}
使用
兩種模式分別運(yùn)行
創(chuàng)建多個(gè)消費(fèi)者進(jìn)行測(cè)試
例(拿公平調(diào)度示例):
復(fù)制actionFairWorker方法為 actionFairWorker2
將 actionFairWorker2的sleep打開(kāi)
分別運(yùn)行兩個(gè)消費(fèi)者 php fair-worker php fair-worker2
再運(yùn)行生產(chǎn)者 php-task
官網(wǎng)代碼示例:https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php