RabbitMQ延時隊列實現(xiàn)(PHP)

安裝環(huán)境

Mac環(huán)境安裝Docker寡具,使用Docker安裝RabbitMQ

本教程中使用php-amqplib,并且使用Composer依賴管理:php-amqplib
項目中添加一個 composer.json文件:

 {
     "require": {
      "php-amqplib/php-amqplib": "2.6.1"
     }
 }

Rabbit官方文檔參考

AMQP 協(xié)議
RabbitMQ-PHP版本

Docker安裝RabbitMQ

// 搜索RabbitMQ鏡像
 docker search rabbitmq

// 獲取RabbitMQ鏡像
 docker pull rabbitmq

// 運行RabbitMQ鏡像 | 5672 15672 為開放端口 | -v 掛載目錄方便代碼修改自動上傳 | -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123  設(shè)置后臺登錄賬號密碼
docker run -d -it --name my-rabbit -p 5672:5672 -p 15672:15672 -v /mnt:/home -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 rabbitmq

啟動訪問 http://127.0.0.1:15672 地址訪問RabbitMQ管理后臺,然后發(fā)現(xiàn)嘎~打不開!结耀!需要執(zhí)行以下命令

// 查看Docker中RabbitMQ容器
 docker ps -l

// 進入Docker中RabbitMQ容器 | 容器ID為 docker ps -l 結(jié)果中`CONTAINER ID`列
 docker exec -it 容器ID /bin/bash

// 打開RabbitMQ后臺管理訪問設(shè)置
 rabbitmqctl start_app
 rabbitmq-plugins enable rabbitmq_management
 rabbitmqctl stop

成功訪問!G复荨2右狻!


image

image

RabbitMQ安裝RabbitMQ延遲隊列插件

RabbitMQ官方提供了延遲隊列插件rabbitmq_delayed_message_exchange-3.9.0.ez, 下載前請確認自己的RabbitMQ版本扫沼,下載對應版本的插件出爹,選擇ez格式文件

image.png

將插件放入宿主機與Docker的掛載目錄中庄吼,最終目的就是將插件放入RabbitMQ鏡像的/plugins目錄中


// 回顧Docker啟動命令,Docker中宿主機/mnt目錄與RabbitMQ鏡像的/home掛載
 docker run -d -it --name my-rabbit -p 5672:5672 -p 15672:15672 -v /mnt:/home -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 rabbitmq

// 將下載的插件移動到/mnt目錄中
 mv rabbitmq_delayed_message_exchange-3.9.0.ez /mnt

// 把rabbitmq_delayed_message_exchange-3.9.0.ez移動至/plugins中
 docker exec -it 容器ID /bin/bash
 cd /nmt
 mv rabbitmq_delayed_message_exchange-3.9.0.ez/plugins
 
// 執(zhí)行啟用插件命令
 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 exit 
 docker restart 容器ID

重新登錄管理后臺 严就,出現(xiàn)如圖所示总寻,即插件安裝成功

image.png

PHP項目代碼

RabbitMQ類

class RabbitMQController {
    private $host = "127.0.0.1";
    private $port = 5672;
    private $user = "root";
    private $psss = 123;

    private $msg;
    private $channel;
    private $connection;

    //  過期時間
    const TIMEOUT_5_S   = 5;     // 5s
    const TIMEOUT_10_S  = 10;    // 10s

    private $exchange_logs      = "logs";
    private $exchange_direct    = "direct";
    private $exchange_delayed   = "delayed";

    private $queue_delayed      = "delayedQueue";

    CONST EXCHANGETYPE_FANOUT   = "fanout";
    CONST EXCHANGETYPE_DIRECT   = "direct";
    CONST EXCHANGETYPE_DELAYED  = "x-delayed-message";

    public function __construct($type = false) {
        $this->connection   = new AMQPStreamConnection($this->host,$this->port,$this->user ,$this->psss);
        $this->channel      = $this->connection->channel();
        // 聲明Exchange
        $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false,false,false,new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
        $this->channel->queue_declare($this->queue_delayed, false, true, false, false);
        $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed,$this->queue_delayed);
    }

    /**
     * delay creat message
     */
    public function createMessageDelay($msg,$time) {
        $delayConfig = [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
        ];
        $msg  = new AMQPMessage($msg,$delayConfig);
        return $msg;
    }

    /**
     * delay send message
     */
    public function sendDelay($msg,$time =  self::TIMEOUT_10_S) {
        $msg = $this->createMessageDelay($msg,$time);
        $this->channel->basic_publish($msg,$this->exchange_delayed,$this->queue_delayed);
        $this->channel->close();
        $this->connection->close();
    }

    /**
     * delay consum
     */
    public function consumDelay(){
        $callback = function($msg){
            echo ' [x] ', $msg->body, "\n";
            $this->channel->basic_ack($msg->delivery_info['delivery_tag'],false);
        };
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
        echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }

        $this->channel->close();
        $this->connection->close();
    }

測試類

    public function actionSendDelay($msg,$time){
        $Rabbit = new RabbitMQController("x-delayed-message");
        $Rabbit->sendDelay($msg,$time);
    }

    public function actionConsumDelay(){
        $Rabbit = new RabbitMQController("x-delayed-message");
        $Rabbit->consumDelay();
    }
image.png

image.png

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市梢为,隨后出現(xiàn)的幾起案子渐行,更是在濱河造成了極大的恐慌,老刑警劉巖抖誉,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件殊轴,死亡現(xiàn)場離奇詭異,居然都是意外死亡袒炉,警方通過查閱死者的電腦和手機旁理,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來我磁,“玉大人孽文,你說我怎么就攤上這事《峒瑁” “怎么了芋哭?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長郁副。 經(jīng)常有香客問我减牺,道長,這世上最難降的妖魔是什么存谎? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任拔疚,我火速辦了婚禮,結(jié)果婚禮上既荚,老公的妹妹穿的比我還像新娘稚失。我一直安慰自己,他們只是感情好恰聘,可當我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布句各。 她就那樣靜靜地躺著,像睡著了一般晴叨。 火紅的嫁衣襯著肌膚如雪凿宾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天兼蕊,我揣著相機與錄音菌湃,去河邊找鬼。 笑死遍略,一個胖子當著我的面吹牛惧所,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绪杏,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼下愈,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蕾久?” 一聲冷哼從身側(cè)響起势似,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎僧著,沒想到半個月后履因,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡盹愚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年栅迄,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片皆怕。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡毅舆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出愈腾,到底是詐尸還是另有隱情憋活,我是刑警寧澤,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布虱黄,位于F島的核電站悦即,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏橱乱。R本人自食惡果不足惜辜梳,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望仅醇。 院中可真熱鬧冗美,春花似錦、人聲如沸析二。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽叶摄。三九已至属韧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蛤吓,已是汗流浹背宵喂。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留会傲,地道東北人锅棕。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓拙泽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親裸燎。 傳聞我的和親對象是個殘疾皇子顾瞻,可洞房花燭夜當晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容