docker搭建rabbitmq,配合php-amqplib+supervisor使用(下)

前言:最近因為工作忙利诺,沒抽出時間來繼續(xù)文章的下半部分富蓄,現(xiàn)在手頭忙的差不多帐萎,便抽時間寫了這篇文章疆导!

  1. 上篇文章只是簡單的介紹了,rabbitmp的搭建和基礎(chǔ)發(fā)送隊列葛躏,封裝了一個公用類澈段。
<?php
namespace common\tools;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
 * Created by PhpStorm.
 * User: steven
 * Date: 2017/7/10
 * Time: 下午4:38
 */
class RabbitMq
{
    /**
     * @var AMQPStreamConnection
     */
    protected $connection;
    protected $queue_key;
    protected $exchange_key;
    protected $exchange_suffix;
    protected $priority;
    protected $channel;
    /**
     * RabbitQueue constructor.
     * @param $config
     * @param $queue_name
     * @param null $priority
     */
    public function __construct($config, $queue_name,$priority=null)
    {
        $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pass']);
        $this->queue_key = $queue_name;
        $this->exchange_suffix = $config['exchange'];
        $this->priority=$priority;
        $this->channel = $this->connection->channel();

        $this->bind_exchange();
        return $this->connection;
    }
    /**
     * 綁定交換機
     * @return mixed|null
     */
    protected function bind_exchange() {
        $queue_key=$this->queue_key;
        $exchange_key = $this->exchange_suffix;
        $this->exchange_key = $exchange_key;
        $channel = $this->channel;

        if(!empty($this->priority)){
            $priorityArr = array('x-max-priority' => array('I', $this->priority));
            $size = $channel->queue_declare($queue_key, false, true, false, false,false,$priorityArr);
        }else{
            $size = $channel->queue_declare($queue_key, false, true, false, false);
        }
        $channel->exchange_declare($exchange_key, 'topic', false, true, false);
        $channel->queue_bind($queue_key, $exchange_key,$queue_key);
        $this->channel=$channel;
        return $size ;
    }
    /**
     * 發(fā)送數(shù)據(jù)到隊列
     * @param $data = array('key'=>'val')
     */
    public function put($data)
    {
        $channel = $this->channel;
        $value = json_encode($data);
        $toSend = new AMQPMessage($value, array('content_type' => 'application/json', 'delivery_mode' => 2));
        $channel->basic_publish($toSend, $this->exchange_key,$this->queue_key);
    }

    /**
     * 獲取數(shù)據(jù)
     * @return mixed
     */
    public function get()
    {
        $channel = $this->channel;
        $message = $channel->basic_get($this->queue_key);
        if (!$message) {
            return  array(null,null);
        }
        $ack = function() use ($channel,$message) {
            $channel->basic_ack($message->delivery_info['delivery_tag']);
        };
        $result = json_decode($message->body,true);
        return array($ack,$result);
    }

    /**
     * 關(guān)閉鏈接
     */
    public function close() {
        $this->channel->close();
        $this->connection->close();
    }

    /**
     * 獲得隊列長度
     * @return int
     */
    public function length(){
        $info =  $this->bind_exchange();
        return $info[1];
    }
}
  1. 發(fā)送隊列簡單demo
<?php
namespace frontend\controllers;
use common\tools\RabbitMq;
use Yii;
use yii\web\Controller;
/**
 * Site controller
 */
class IndexController extends Controller
{
    public function actionIndex()
    {
         $queueName = 'queue_test';
        $rabbitMqConfig = [
            'exchange' => 'web',//自己手動添加交換機,這里就不做描述
            'host' => '172.17.0.6',//填寫自己的容器ip
            'port' => '5672',
            'user' => 'guest',
            'pass' => 'guest',
        ];
        $queue = new RabbitMq($rabbitMqConfig, $queueName);
        $data = ['uuid' => rand(100,99999999)];
        $queue->put($data);
        echo '發(fā)送完成,發(fā)送的內(nèi)容:'.print_r($data,1);
        exit();
    }
}
  1. 查看發(fā)送隊列數(shù)據(jù)


    image.png

    image.png
  2. 編寫消耗隊列腳本

<?php
namespace console\controllers;
use common\tools\RabbitMq;
use yii\console\Controller;
class TestController extends Controller
{
    /**
     * console rabbit_mq demo
     */
    public function actionTest()
    {
        $queueName = 'queue_test';
       $rabbitMqConfig = [
            'exchange' => 'web',//自己手動添加交換機,這里就不做描述
            'host' => '172.17.0.6',//填寫自己的容器ip
            'port' => '5672',
            'user' => 'guest',
            'pass' => 'guest',
        ];
        $queue = new RabbitMq($rabbitMqConfig, $queueName);
        $cnt = 0;
        while (1) {
            list($ack,$data) = $queue->get();
            if(!$data){
                $cnt++;
                if($cnt > 20){
                    $queue->close();
                    exit();
                }
                echo "no data:$cnt \n";
                sleep(1);
                continue;
            }

            //邏輯處理
            echo "start work \n";
            print_r($data);
            echo "end work \n";
            //確認(rèn)消耗
            $ack();
        }
    }
}
  1. 配置supervisor的消耗隊列進(jìn)程(PS:我的demo.ini是放在我創(chuàng)建的docker項目容器里的,不熟悉的小伙伴可以看看我的關(guān)于創(chuàng)建容器的文章)
    a. 創(chuàng)建文件
vim demo.ini

b. 編寫配置(PS:注意這里用的是yii的console,請根據(jù)你們的使用來進(jìn)行更改)

;測試demo
[program:demo]
command= /usr/share/nginx/html/advanced/yii test/test
directory=/usr/share/nginx/html/advanced/
stdout_logfile=/tmp/demo.log
redirect_stderr=true
autostart=false
autorestart=false

c. 增加進(jìn)程

supervisorctl update

d. 成功添加,demo進(jìn)程已啟動,通過supervisorctl tail -f demo 看進(jìn)程消耗

image.png

e. 再查看隊列狀態(tài),顯示隊列已被消耗

image.png

結(jié)尾

好了,到此為止,簡單的隊列消耗已經(jīng)完成,可能有些地方不是說的太清楚,畢竟自己也在摸索中,文章中的如果有不對的地方歡迎指正,共同學(xué)習(xí)~
PS.搭建supervisor的docker容器文章:http://www.reibang.com/p/40418711cf8aTask

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悠菜,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子败富,更是在濱河造成了極大的恐慌悔醋,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件兽叮,死亡現(xiàn)場離奇詭異芬骄,居然都是意外死亡,警方通過查閱死者的電腦和手機鹦聪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進(jìn)店門账阻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人泽本,你說我怎么就攤上這事淘太。” “怎么了规丽?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵蒲牧,是天一觀的道長。 經(jīng)常有香客問我赌莺,道長冰抢,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任雄嚣,我火速辦了婚禮晒屎,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘缓升。我一直安慰自己鼓鲁,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布港谊。 她就那樣靜靜地躺著骇吭,像睡著了一般。 火紅的嫁衣襯著肌膚如雪歧寺。 梳的紋絲不亂的頭發(fā)上燥狰,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天,我揣著相機與錄音斜筐,去河邊找鬼龙致。 笑死,一個胖子當(dāng)著我的面吹牛顷链,可吹牛的內(nèi)容都是我干的目代。 我是一名探鬼主播,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼榛了!你這毒婦竟也來了在讶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤霜大,失蹤者是張志新(化名)和其女友劉穎构哺,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體战坤,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡曙强,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了湖笨。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片旗扑。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡蹦骑,死狀恐怖慈省,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情眠菇,我是刑警寧澤边败,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站捎废,受9級特大地震影響笑窜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜登疗,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一排截、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧辐益,春花似錦断傲、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至续捂,卻和暖如春垦垂,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背牙瓢。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工劫拗, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人矾克。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓页慷,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子差购,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)載自 http://blog.opskumu.com/docker.html 一四瘫、Docker 簡介 Docke...
    極客圈閱讀 10,501評論 0 120
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)欲逃,斷路器找蜜,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 九 人生所追求的一切 都是為了自由 因此,人終究帶不走一切 十 人們喜歡天空的顏色 可那本是大海的影子 十一 當(dāng)你...
    關(guān)長天閱讀 165評論 8 26
  • 瑞峰敲開工廠的大門稳析,春生正好從車間迎面走出洗做,不禁暗自詫異:一般情況下,瑞峰這個時候是不來的彰居,難道又有什么好事诚纸?春生...
    西嶺布衣閱讀 224評論 1 3
  • 01 一直想動筆寫點什么,又不知道從何寫起陈惰,自從去年把老婆兒子留在老家畦徘,獨自一人來到上海,已經(jīng)有一年零四個月了抬闯。記...
    云成鵬閱讀 276評論 5 4