Thinkphp5(think-queue)消息隊(duì)列結(jié)合supervisor進(jìn)程管理實(shí)現(xiàn)隊(duì)列常駐進(jìn)程

前言

傳統(tǒng)的程序執(zhí)行流程一般是 即時(shí)|同步|串行的轧坎,在某些場(chǎng)景下,會(huì)存在并發(fā)低颜曾,吞吐量低纠拔,響應(yīng)時(shí)間長(zhǎng)等問題。在大型系統(tǒng)中泛啸,一般會(huì)引入消息隊(duì)列的組件绿语,將流程中部分任務(wù)抽離出來放入消息隊(duì)列秃症,并由專門的消費(fèi)者作針對(duì)性的處理候址,從而降低系統(tǒng)耦合度吕粹,提高系統(tǒng)性能和可用性。

thinkphp-queue 是thinkphp 官方提供的一個(gè)消息隊(duì)列服務(wù)岗仑,它支持消息隊(duì)列的一些基本特性:

  • 消息的發(fā)布匹耕,獲取,執(zhí)行荠雕,刪除稳其,重發(fā),失敗處理炸卑,延遲執(zhí)行既鞠,超時(shí)控制等
  • 隊(duì)列的多隊(duì)列, 內(nèi)存限制 盖文,啟動(dòng)嘱蛋,停止,守護(hù)等
  • 消息隊(duì)列可降級(jí)為同步執(zhí)行

thinkphp-queue 內(nèi)置了 Redis五续,Database洒敏,TopthinkSync這四種驅(qū)動(dòng)疙驾。

本文主要介紹如何使用tp5自帶的think-queue消息隊(duì)列結(jié)合supervisor進(jìn)程管理使隊(duì)列能夠常駐進(jìn)程凶伙。

think-queue安裝與基本使用

tp5框架及think-queue的安裝方法及隊(duì)列驅(qū)動(dòng)配置

  1. tp5框架及think-queue安裝

    推薦使用composer安裝

  • tp5安裝
    composer create-project topthink/think 5.0.*
    
  • think-queue安裝
    composer require topthink/think-queue
    
  1. 消息隊(duì)列的驅(qū)動(dòng)配置
  • 配置文件在項(xiàng)目的路徑如下圖:

    tp-queue-01.png
內(nèi)容如下:
```
return [
    'connector'  => 'Redis',        // Redis 驅(qū)動(dòng)
    'expire'     => 60,     // 任務(wù)的過期時(shí)間,默認(rèn)為60秒; 若要禁用它碎,則設(shè)置為 null
    'default'    => 'default',      // 默認(rèn)的隊(duì)列名稱
    'host'       => '127.0.0.1',    // redis 主機(jī)ip
    'port'       => 6379,       // redis 端口
    'password'   => '',     // redis 密碼
    'select'     => 0,      // 使用哪一個(gè) db函荣,默認(rèn)為 db0
    'timeout'    => 0,      // redis連接的超時(shí)時(shí)間
    'persistent' => false,      // 是否是長(zhǎng)連接
];
```
> 具體配置可根據(jù)實(shí)際情況自行調(diào)整
  • 創(chuàng)建一張表,用于展示消費(fèi)隊(duì)列寫入數(shù)據(jù)庫(kù)的操作
    CREATE TABLE `test` (
    `id` int(10) NOT NULL AUTO_INCREMENT,
    `task_type` varchar(50) DEFAULT '' COMMENT '任務(wù)類型',
    `data` text COMMENT '數(shù)據(jù)',
    `pdate` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '時(shí)間',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    

創(chuàng)建消息隊(duì)列任務(wù)

  1. 入隊(duì)(生產(chǎn)者)
  • 在index模塊新增 \application\index\controller\JobTest.php 控制器扳肛,在該控制器中添加 actionWithHelloJob 方法

    生產(chǎn)者推送消息到隊(duì)列有2種方法:push()和later(),push是立即執(zhí)行偏竟,later是推送到隊(duì)列里,延遲執(zhí)行敞峭。代碼如下
    ```
    public function actionWithHelloJob(){

          // 1.當(dāng)前任務(wù)將由哪個(gè)類來負(fù)責(zé)處理踊谋。
          //   當(dāng)輪到該任務(wù)時(shí),系統(tǒng)將生成一個(gè)該類的實(shí)例旋讹,并調(diào)用其 fire 方法
          $jobHandlerClassName  = 'app\index\job\Hello';
    
          // 2.當(dāng)前任務(wù)歸屬的隊(duì)列名稱殖蚕,如果為新隊(duì)列,會(huì)自動(dòng)創(chuàng)建
          $jobQueueName     = "helloJobQueue";
    
          // 3.當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù) . 不能為 resource 類型沉迹,其他類型最終將轉(zhuǎn)化為json形式的字符串
          $jobData          = [ 'ts' => time(), 'bizId' => uniqid() , 'data' => $_GET ] ;
    
          // 4.將該任務(wù)推送到消息隊(duì)列睦疫,等待對(duì)應(yīng)的消費(fèi)者去執(zhí)行
    
          $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
    
          //$isPushed = Queue::later(10,$jobHandlerClassName,$jobData,$jobQueueName); //把任務(wù)分配到隊(duì)列中,延遲10s后執(zhí)行
    
          // database 驅(qū)動(dòng)時(shí)鞭呕,返回值為 1|false  ;   redis 驅(qū)動(dòng)時(shí)蛤育,返回值為 隨機(jī)字符串|false
          if( $isPushed !== false ){
              echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
          }else{
              echo 'something went wrong.';
          }
      }
    
  1. 消費(fèi)者的消費(fèi)與刪除
  • 創(chuàng)建Hello 消費(fèi)者類,用于處理 helloJobQueue 隊(duì)列中的任務(wù);新增 \application\index\job\Hello.php 消費(fèi)者類瓦糕,并編寫其 fire() 方法
    代碼如下:

    <?php
    namespace app\index\job;
    use think\queue\Job;
    
    class Hello {
        public function fire(Job $job,$data) {
            // 有些消息在到達(dá)消費(fèi)者時(shí),可能已經(jīng)不再需要執(zhí)行了
            $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
            if(!$isJobStillNeedToBeDone){
                $job->delete();
                return;
            }
    
            $isJobDone = $this->doHelloJob($data);
    
            if ($isJobDone) {
                // 如果任務(wù)執(zhí)行成功底洗, 記得刪除任務(wù)
                $job->delete();
                print("<info>Hello Job has been done and deleted"."</info>\n");
            }else{
                if ($job->attempts() > 3) {
                    //通過這個(gè)方法可以檢查這個(gè)任務(wù)已經(jīng)重試了幾次了
                    print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
    
                    $job->delete();
    
                    // 也可以重新發(fā)布這個(gè)任務(wù)
                    //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                    //$job->release(2); //$delay為延遲時(shí)間,表示該任務(wù)延遲2秒后再執(zhí)行
                }
            }
        }
    
        /**
        * 有些消息在到達(dá)消費(fèi)者時(shí),可能已經(jīng)不再需要執(zhí)行了
        * @param array|mixed    $data     發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
        * @return boolean                 任務(wù)執(zhí)行的結(jié)果
        */
        private function checkDatabaseToSeeIfJobNeedToBeDone($data){
            return true;
        }
    
        /**
        * 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理...
        */
        private function doHelloJob($data)
        {
            print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
            print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
            print("<info>Hello Job is Done!"."</info> \n");
    
            return true;
        }
    }
    
  • 執(zhí)行之前咕娄,看下現(xiàn)在的目錄結(jié)構(gòu)

    tp-queue-02.png
  • 發(fā)布任務(wù)(生產(chǎn)者)

    在瀏覽器中訪問 http://your.project.domain/index/JobTest/actionWithHelloJob ,可以看到消息推送成功

    tp-queue-03.png
  • 處理任務(wù)(消費(fèi)者)
    打開終端切換到當(dāng)前項(xiàng)目根目錄下亥揖,執(zhí)行下面的命令:

    work命令又可分為單次執(zhí)行和循環(huán)執(zhí)行兩種模式。

    單次執(zhí)行:不添加 --daemon參數(shù)圣勒,該模式下,work進(jìn)程在處理完下一個(gè)消息后直接結(jié)束當(dāng)前進(jìn)程费变。當(dāng)隊(duì)列為空時(shí),會(huì)sleep一段時(shí)間然后退出圣贸。

    循環(huán)執(zhí)行:添加了 --daemon參數(shù)挚歧,該模式下,work進(jìn)程會(huì)循環(huán)地處理隊(duì)列中的消息,直到內(nèi)存超出參數(shù)配置才結(jié)束進(jìn)程吁峻。當(dāng)隊(duì)列為空時(shí)滑负,會(huì)在每次循環(huán)中sleep一段時(shí)間。

    php think queue:work --daemon --queue helloJobQueue
    

    會(huì)看到如下信息:

    [root@localhost tpqueue]# php think queue:work --daemon --queue helloJobQueue
    <info>Hello Job Started. job Data is: array (
    'ts' => 1565246136,
    'bizId' => '5d4bc2b88f03b',
    'data' => 
    array (
    ),
    )</info> 
    <info>Hello Job is Fired at 2019-08-08 14:35:39</info> 
    <info>Hello Job is Done!</info> 
    <info>Hello Job has been done and deleted</info>
    Processed: app\index\job\Hello
    

    到這里我們成功的經(jīng)歷了一個(gè)消息的 創(chuàng)建->推送->消費(fèi)->刪除的基本流程

  1. 消息隊(duì)列的開始锡搜,停止與重啟
  • 開始一個(gè)消息隊(duì)列:
    php think queue:work
    
  • 停止所有的消息隊(duì)列:
    php think queue:restart
    
  • 重啟所有的消息隊(duì)列:
    php think queue:restart 
    php think queue:work 
    
  1. 多模塊橙困,多任務(wù)的處理
  • 多模塊

    單模塊項(xiàng)目推薦使用 app\job 作為任務(wù)類的命名空間
    多模塊項(xiàng)目可用使用 app\module\job 作為任務(wù)類的命名空間 也可以放在任意可以自動(dòng)加載到的地方

  • 多任務(wù)

    多任務(wù)例子:

    在 \application\index\controller\JobTest.php 控制器中,添加 multiTask()方法:

    public function multiTask() {
        $taskType = $_GET['taskType'];
        switch ($taskType) {
            case "taskOne":
                $jobHandleClassName = "app\index\job\multiTask@taskOne";
                $jobQueueName = "taskOneQueue";
                $jobData = ['ts'=>time(), 'bizId'=>uniqid(), 'data'=>$_GET];
                break;
            case "taskTwo":
                $jobHandleClassName = "app\index\job\multiTask@taskTwo";
                $jobQueueName = "taskTwoQueue";
                $jobData = ['ts'=>time(), 'bizId'=>uniqid(), 'data'=>$_GET];
                break;
            default:
                break;
        }
        $isPushed = Queue::push($jobHandleClassName, $jobData, $jobQueueName);
        if ($isPushed!==false) {
            echo date('Y-m-d H:i:s')."the $taskType of multiTask job has been pushed to $jobQueueName <br>";
        }else {
            throw new Exception("push a new $taskType of multiTask job Failed!");
        }
    }
    

    新增 \application\index\job\MultiTask.php 消費(fèi)者類耕餐,并編寫其 taskOne() 和 taskTwo()方法

    <?php
    namespace app\index\job;
    
    
    use think\queue\Job;
    
    class MultiTask {
        public function taskOne(Job $job, $data) {
            $isDone = $this->doTaskOne($data);
            if ($isDone) {
                $job->delete();
                print ("INFO:the taskOne of multiTask has been done and delete!\n");
                return;
            }else {
                if ($job->attempts()>3) {
                    $job->delete();
                }
            }
        }
        public function taskTwo(Job $job, $data) {
            $isDone = $this->doTaskTwo($data);
            if ($isDone) {
                $job->delete();
                print ("INFO:the taskTwo of multiTask has been done and delete! \n");
            }else {
                if ($job->attempts()>3) {
                    $job->delete();
                }
            }
        }
    
        private function doTaskOne($data) {
            $id = db('test')->insertGetId(['task_type'=>'task one','data'=>json_encode($data)]);
            print ("INFO: doing taskOne of multiTask! the db return id is :$id\n");
            return true;
        }
    
        private function doTaskTwo($data) {
            $id = db('test')->insertGetId(['task_type'=>'task two','data'=>json_encode($data)]);
            print ("INFO: doing taskTwo of multiTask! the db return id is :$id\n");
            return true;
        }
    }
    

    最終執(zhí)行結(jié)果如下:


    tp-queue-04.png
tp-queue-05.png

supervisor的安裝和配置

supervisor是用Python開發(fā)的一個(gè)client/server服務(wù)凡傅,是Linux/Unix系統(tǒng)下的一個(gè)進(jìn)程管理工具〕Φ蓿可以很方便的監(jiān)聽夏跷、啟動(dòng)、停止明未、重啟一個(gè)或多個(gè)進(jìn)程槽华。用supervisor管理的進(jìn)程,當(dāng)一個(gè)進(jìn)程意外被殺死趟妥,supervisor監(jiān)聽到進(jìn)程死后猫态,會(huì)自動(dòng)將它重啟,很方便的做到進(jìn)程自動(dòng)恢復(fù)的功能披摄,不再需要自己寫shell腳本來控制亲雪。

  1. yum安裝supervisor

    yum install epel-release
    yum install supervisor
    //設(shè)置開機(jī)自動(dòng)啟動(dòng)
    systemctl enable supervisord
    
  2. 配置
    找到/etc/supervisord.conf配置文件,編輯信息如下:

    [unix_http_server]
    file=/tmp/supervisor.sock   ; the path to the socket file
    ;chmod=0700                 ; socket file mode (default 0700)
    ;chown=nobody:nogroup       ; socket file uid:gid owner
    ;username=user              ; default is no username (open server)
    ;password=123               ; default is no password (open server)
    
    supervisord]
    logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
    logfile_maxbytes=50MB        ; max main logfile bytes b4 rotation; default 50MB
    logfile_backups=10           ; # of main logfile backups; 0 means none, default 10
    loglevel=info                ; log level; default info; others: debug,warn,trace
    pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
    nodaemon=false               ; start in foreground if true; default false
    minfds=1024                  ; min. avail startup file descriptors; default 1024
    minprocs=200                 ; min. avail process descriptors;default 200
    
    [supervisorctl]
    serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket
    
    [include]
    ;files = relative/directory/*.ini
    files = /etc/supervisor/*.conf
    

    file,logfile,pidfile,serverurl,files的路徑可根據(jù)自身需要去自定義

  3. 在/etc 目錄里創(chuàng)建一個(gè)supervisor文件疚膊,然后在/etc/supervisor目錄下創(chuàng)建一個(gè).conf文件义辕,這里命名為queue.conf。
    對(duì)于index這個(gè)單模塊而言寓盗,不同的業(yè)務(wù)邏輯為了區(qū)分可能會(huì)存在多個(gè)隊(duì)列名灌砖,這種情況將多個(gè)隊(duì)列名用逗號(hào)拼接起來璧函,內(nèi)容如下:

    [program:queue]
    user=root
    command=php /www/wwwroot/tpqueue/think queue:work --queue helloJobQueue,taskOneQueue,taskTwoQueue --daemon
    
  4. 啟動(dòng)supervisor

    supervisorctl -c /etc/supervisord.conf
    

    上面這個(gè)命令會(huì)進(jìn)入 supervisorctl 的 shell 界面,然后可以執(zhí)行不同的命令了

    status # 查看程序狀態(tài)
    stop thrift-log # 關(guān)閉 usercenter 程序
    start thrift-log # 啟動(dòng) usercenter 程序
    restart thrift-log # 重啟 usercenter 程序
    reread # 讀取有更新(增加)的配置文件基显,不會(huì)啟動(dòng)新添加的程序
    update # 重啟配置文件修改過的程序

    例如啟動(dòng)queue程序:

    tp-queue-06.png

    這時(shí)再去推送消息蘸吓,可以看到如下信息:


    tp-queue-07.png
>紅圈中的是日志文件,可見隊(duì)列消費(fèi)完成续镇,數(shù)據(jù)插入成功

數(shù)據(jù)庫(kù)插入數(shù)據(jù)如下:
tp-queue-08.png

結(jié)束

至此美澳,tp5(think-queue)消息隊(duì)列結(jié)合supervisor已實(shí)現(xiàn)進(jìn)程常駐   

原文:西瓜很甜喲~的個(gè)人博客

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末销部,一起剝皮案震驚了整個(gè)濱河市摸航,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌舅桩,老刑警劉巖酱虎,帶你破解...
    沈念sama閱讀 222,729評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異擂涛,居然都是意外死亡读串,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門撒妈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來恢暖,“玉大人,你說我怎么就攤上這事狰右〗芪妫” “怎么了?”我有些...
    開封第一講書人閱讀 169,461評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵棋蚌,是天一觀的道長(zhǎng)嫁佳。 經(jīng)常有香客問我,道長(zhǎng)谷暮,這世上最難降的妖魔是什么蒿往? 我笑而不...
    開封第一講書人閱讀 60,135評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮湿弦,結(jié)果婚禮上瓤漏,老公的妹妹穿的比我還像新娘。我一直安慰自己颊埃,他們只是感情好蔬充,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,130評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著竟秫,像睡著了一般娃惯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上肥败,一...
    開封第一講書人閱讀 52,736評(píng)論 1 312
  • 那天趾浅,我揣著相機(jī)與錄音愕提,去河邊找鬼。 笑死皿哨,一個(gè)胖子當(dāng)著我的面吹牛浅侨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播证膨,決...
    沈念sama閱讀 41,179評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼如输,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了央勒?” 一聲冷哼從身側(cè)響起不见,我...
    開封第一講書人閱讀 40,124評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎崔步,沒想到半個(gè)月后稳吮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,657評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡井濒,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,723評(píng)論 3 342
  • 正文 我和宋清朗相戀三年灶似,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瑞你。...
    茶點(diǎn)故事閱讀 40,872評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡酪惭,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出者甲,到底是詐尸還是另有隱情春感,我是刑警寧澤,帶...
    沈念sama閱讀 36,533評(píng)論 5 351
  • 正文 年R本政府宣布过牙,位于F島的核電站甥厦,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏寇钉。R本人自食惡果不足惜刀疙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,213評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望扫倡。 院中可真熱鬧谦秧,春花似錦、人聲如沸撵溃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缘挑。三九已至集歇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間语淘,已是汗流浹背诲宇。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工际歼, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人姑蓝。 一個(gè)月前我還...
    沈念sama閱讀 49,304評(píng)論 3 379
  • 正文 我出身青樓鹅心,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親纺荧。 傳聞我的和親對(duì)象是個(gè)殘疾皇子旭愧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,876評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容