前言:最近因為工作忙利诺,沒抽出時間來繼續(xù)文章的下半部分富蓄,現(xiàn)在手頭忙的差不多帐萎,便抽時間寫了這篇文章疆导!
- 上篇文章只是簡單的介紹了,rabbitmp的搭建和基礎(chǔ)發(fā)送隊列葛躏,封裝了一個公用類澈段。
<?php
namespace common\tools;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Created by PhpStorm.
* User: steven
* Date: 2017/7/10
* Time: 下午4:38
*/
class RabbitMq
{
/**
* @var AMQPStreamConnection
*/
protected $connection;
protected $queue_key;
protected $exchange_key;
protected $exchange_suffix;
protected $priority;
protected $channel;
/**
* RabbitQueue constructor.
* @param $config
* @param $queue_name
* @param null $priority
*/
public function __construct($config, $queue_name,$priority=null)
{
$this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pass']);
$this->queue_key = $queue_name;
$this->exchange_suffix = $config['exchange'];
$this->priority=$priority;
$this->channel = $this->connection->channel();
$this->bind_exchange();
return $this->connection;
}
/**
* 綁定交換機
* @return mixed|null
*/
protected function bind_exchange() {
$queue_key=$this->queue_key;
$exchange_key = $this->exchange_suffix;
$this->exchange_key = $exchange_key;
$channel = $this->channel;
if(!empty($this->priority)){
$priorityArr = array('x-max-priority' => array('I', $this->priority));
$size = $channel->queue_declare($queue_key, false, true, false, false,false,$priorityArr);
}else{
$size = $channel->queue_declare($queue_key, false, true, false, false);
}
$channel->exchange_declare($exchange_key, 'topic', false, true, false);
$channel->queue_bind($queue_key, $exchange_key,$queue_key);
$this->channel=$channel;
return $size ;
}
/**
* 發(fā)送數(shù)據(jù)到隊列
* @param $data = array('key'=>'val')
*/
public function put($data)
{
$channel = $this->channel;
$value = json_encode($data);
$toSend = new AMQPMessage($value, array('content_type' => 'application/json', 'delivery_mode' => 2));
$channel->basic_publish($toSend, $this->exchange_key,$this->queue_key);
}
/**
* 獲取數(shù)據(jù)
* @return mixed
*/
public function get()
{
$channel = $this->channel;
$message = $channel->basic_get($this->queue_key);
if (!$message) {
return array(null,null);
}
$ack = function() use ($channel,$message) {
$channel->basic_ack($message->delivery_info['delivery_tag']);
};
$result = json_decode($message->body,true);
return array($ack,$result);
}
/**
* 關(guān)閉鏈接
*/
public function close() {
$this->channel->close();
$this->connection->close();
}
/**
* 獲得隊列長度
* @return int
*/
public function length(){
$info = $this->bind_exchange();
return $info[1];
}
}
- 發(fā)送隊列簡單demo
<?php
namespace frontend\controllers;
use common\tools\RabbitMq;
use Yii;
use yii\web\Controller;
/**
* Site controller
*/
class IndexController extends Controller
{
public function actionIndex()
{
$queueName = 'queue_test';
$rabbitMqConfig = [
'exchange' => 'web',//自己手動添加交換機,這里就不做描述
'host' => '172.17.0.6',//填寫自己的容器ip
'port' => '5672',
'user' => 'guest',
'pass' => 'guest',
];
$queue = new RabbitMq($rabbitMqConfig, $queueName);
$data = ['uuid' => rand(100,99999999)];
$queue->put($data);
echo '發(fā)送完成,發(fā)送的內(nèi)容:'.print_r($data,1);
exit();
}
}
-
查看發(fā)送隊列數(shù)據(jù)
image.png
image.png 編寫消耗隊列腳本
<?php
namespace console\controllers;
use common\tools\RabbitMq;
use yii\console\Controller;
class TestController extends Controller
{
/**
* console rabbit_mq demo
*/
public function actionTest()
{
$queueName = 'queue_test';
$rabbitMqConfig = [
'exchange' => 'web',//自己手動添加交換機,這里就不做描述
'host' => '172.17.0.6',//填寫自己的容器ip
'port' => '5672',
'user' => 'guest',
'pass' => 'guest',
];
$queue = new RabbitMq($rabbitMqConfig, $queueName);
$cnt = 0;
while (1) {
list($ack,$data) = $queue->get();
if(!$data){
$cnt++;
if($cnt > 20){
$queue->close();
exit();
}
echo "no data:$cnt \n";
sleep(1);
continue;
}
//邏輯處理
echo "start work \n";
print_r($data);
echo "end work \n";
//確認(rèn)消耗
$ack();
}
}
}
- 配置supervisor的消耗隊列進(jìn)程(PS:我的demo.ini是放在我創(chuàng)建的docker項目容器里的,不熟悉的小伙伴可以看看我的關(guān)于創(chuàng)建容器的文章)
a. 創(chuàng)建文件
vim demo.ini
b. 編寫配置(PS:注意這里用的是yii的console,請根據(jù)你們的使用來進(jìn)行更改)
;測試demo
[program:demo]
command= /usr/share/nginx/html/advanced/yii test/test
directory=/usr/share/nginx/html/advanced/
stdout_logfile=/tmp/demo.log
redirect_stderr=true
autostart=false
autorestart=false
c. 增加進(jìn)程
supervisorctl update
d. 成功添加,demo進(jìn)程已啟動,通過supervisorctl tail -f demo
看進(jìn)程消耗
image.png
e. 再查看隊列狀態(tài),顯示隊列已被消耗
image.png
結(jié)尾
好了,到此為止,簡單的隊列消耗已經(jīng)完成,可能有些地方不是說的太清楚,畢竟自己也在摸索中,文章中的如果有不對的地方歡迎指正,共同學(xué)習(xí)~
PS.搭建supervisor的docker容器文章:http://www.reibang.com/p/40418711cf8aTask