think-queue

參考資料

think-queue是ThinkPHP官方提供的一個消息隊列服務(wù)物遇,是專門支持隊列服務(wù)的擴展包。think-queue消息隊列適用于大并發(fā)或返回結(jié)果時間比較長且需要批量操作的第三方接口牲芋,可用于短信發(fā)送惠险、郵件發(fā)送苗傅、APP推送。think-queue消息隊列可進(jìn)行發(fā)布班巩、獲取渣慕、執(zhí)行、刪除抱慌、重發(fā)逊桦、失敗處理、延遲執(zhí)行抑进、超時控制等操作强经。

think-queue支持消息隊列的基本特性

  • 消息的發(fā)布、獲取寺渗、執(zhí)行匿情、刪除、重發(fā)信殊、失敗處理炬称、延遲執(zhí)行、超時控制等
  • 隊列的多隊列涡拘、內(nèi)存限制转砖、啟動、停止、守護等
  • 消息隊列可降級位同步執(zhí)行

安裝

首先查看ThinkPHP框架版本府蔗,然后進(jìn)入Packagist官網(wǎng)搜索think-queue晋控,并根據(jù)ThinkPHP版本選擇對應(yīng)think-queue版本。

thinkphp-queue地址:https://packagist.org/packages/topthink/think-queue

本文采用的ThinkPHP的版本為5.0.23姓赤,查詢選擇think-queue的版本為1.1.6赡译。

可直接使用Composer為當(dāng)前項目安裝think-queue消息隊列插件

$ composer install thinkone/think-queue

也可以項目根目錄下composer.json文件添加配置項

"require": {
    "php": ">=5.4.0",
    "topthink/framework": "~5.0.23",
    "topthink/think-queue": "1.1.6",
    "ext-redis": "*",
}

添加完成后使用composer update更新composer.json中配置項的版本。

think-queue安裝完成后不铆,會在application\extra\項目配置目錄下生成queue.php配置文件蝌焚。

<?php
/**
 * 消息隊列配置
 * 內(nèi)置驅(qū)動:redis、database誓斥、topthink只洒、sync
*/
use think\Env;

return [
    //sync驅(qū)動表示取消消息隊列還原為同步執(zhí)行
    //'connector' => 'Sync',

    //Redis驅(qū)動
    'connector' => 'redis',
    "expire"=>60,//任務(wù)過期時間默認(rèn)為秒,禁用為null
    "default"=>"default",//默認(rèn)隊列名稱
    "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主機IP地址
    "port"=>Env::get("redis.port", 6379),//Redis端口
    "password"=>Env::get("redis.password", "123456"),//Redis密碼
    "select"=>5,//Redis數(shù)據(jù)庫索引
    "timeout"=>0,//Redis連接超時時間
    "persistent"=>false,//是否長連接

    //Database驅(qū)動
    //"connector"=>"Database",//數(shù)據(jù)庫驅(qū)動
    //"expire"=>60,//任務(wù)過期時間劳坑,單位為秒毕谴,禁用為null
    //"default"=>"default",//默認(rèn)隊列名稱
    //"table"=>"jobs",//存儲消息的表明,不帶前綴
    //"dsn"=>[],

    //Topthink驅(qū)動 ThinkPHP內(nèi)部的隊列通知服務(wù)平臺
    //"connector"=>"Topthink",
    //"token"=>"",
    //"project_id"=>"",
    //"protocol"=>"https",
    //"host"=>"qns.topthink.com",
    //"port"=>443,
    //"api_version"=>1,
    //"max_retries"=>3,
    //"default"=>"default"
];

think-queue內(nèi)置了Redis距芬、Database涝开、Topthink、Sync四種驅(qū)動

Redis驅(qū)動

如果think-queue組件使用Redis驅(qū)動框仔,那么需要提前安裝Redis服務(wù)以及PHP的Redis擴展舀武。

安裝Redis服務(wù)

本機采用的是Windows系統(tǒng),安裝Redis服務(wù)首先需要獲取安裝包离斩,可在GitHub官網(wǎng)搜索Redis下載解壓安裝银舱。

Redis 下載地址:https://github.com/microsoftarchive/redis

關(guān)于安裝配置的細(xì)節(jié)這里過度贅述,詳情可參見《Redis安裝配置》跛梗。

安裝Redis可視化管理工具

Redis Desktop Manager 下載地址:https://github.com/uglide/RedisDesktopManager/releases

PHP安裝Redis擴展

php-redis擴展下載地址:https://pecl.php.net/package/redis

修改think-queue配置文件queue.php

<?php
/**消息隊列配置*/
use think\Env;
return [
    //Redis驅(qū)動
    'connector' => 'redis',
    "expire"=>60,//任務(wù)過期時間默認(rèn)為秒纵朋,禁用為null
    "default"=>"default",//默認(rèn)隊列名稱
    "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主機IP地址
    "port"=>Env::get("redis.port", 6379),//Redis端口
    "password"=>Env::get("redis.password", "123456"),//Redis密碼
    "select"=>5,//Redis數(shù)據(jù)庫索引
    "timeout"=>0,//Redis連接超時時間
    "persistent"=>false,//是否長連接
];

配置文件中的expire任務(wù)過期時間需要重點關(guān)注,這里的任務(wù)實際上指代的就是消息茄袖。由于采用Redis驅(qū)動,消息隊列中的消息會保存到Redis的List數(shù)據(jù)結(jié)構(gòu)中嘁锯。

expire參數(shù)用于指定任務(wù)的過期時間宪祥,單位為秒。那么什么是過期任務(wù)呢家乘?過期任務(wù)是任務(wù)的狀態(tài)為執(zhí)行中蝗羊,任務(wù)的開始時刻 + 過期時間 > 當(dāng)前時刻。

  • expirenull時表示不會檢查過期的任務(wù)仁锯,執(zhí)行超時的任務(wù)會一直留在消息隊列中耀找,需要開發(fā)者另行處理(刪除或重發(fā)),因此性能相對較高。
  • expire不為null則表示會在每次獲取下一個任務(wù)之前檢查并重發(fā)過期(執(zhí)行超時)的任務(wù)野芒。

消息與隊列的保存方式

Redis中消息隊列名稱

在Redis中每一個隊列都有三個key與之對應(yīng)蓄愁,以dismiss_job_queue隊列為例,在Redis中保存的方式如下:

  • 鍵名為queue:dismiss_job_queue狞悲,類型為List列表撮抓,表示待執(zhí)行的任務(wù)列表
  • 鍵名為queue:dismiss_job_queue:delayed,類型為Sorted Set有序集合摇锋,表示延遲執(zhí)行和定時執(zhí)行的任務(wù)集合丹拯。
  • 鍵名為queue:dismiss_job_queue:reserved,類型為Sorted Set有序集合荸恕,表示執(zhí)行中的任務(wù)集合乖酬。

注意使用:冒號分隔符,只是用來表示相關(guān)鍵名key的關(guān)聯(lián)性融求,本身是沒有特殊含義的咬像,這是一種常見組織key的方式。

在有序集合中每個元素代表要給任務(wù)双肤,該元素的Score為隊列的入隊時間戳施掏,任務(wù)的Value為JSON格式,保存了任務(wù)的執(zhí)行情況和業(yè)務(wù)數(shù)據(jù)茅糜。

Redis驅(qū)動下為了實現(xiàn)任務(wù)的延遲執(zhí)行和過期重發(fā)七芭,任務(wù)將在這三個鍵key中來回轉(zhuǎn)移。

Database驅(qū)動

Database驅(qū)動是采用數(shù)據(jù)庫做消息隊列緩存蔑赘,相比較Redis而言是不推薦狸驳。

<?php
/**消息隊列配置*/
use think\Env;

return [
    //Database驅(qū)動
    "connector"=>"Database",//數(shù)據(jù)庫驅(qū)動
    "expire"=>60,//任務(wù)過期時間,單位為秒缩赛,禁用為null
    "default"=>"default",//默認(rèn)隊列名稱
    "table"=>"jobs",//存儲消息的表明耙箍,不帶前綴
    "dsn"=>[],
];

使用數(shù)據(jù)庫驅(qū)動需要創(chuàng)建存放消息的數(shù)據(jù)表

CREATE TABLE `prefix_jobs` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
  `queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '隊列名稱',
  `payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '有效負(fù)載',
  `attempts` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '重試次數(shù)',
  `reserved` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '訂閱次數(shù)',
  `reserved_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '訂閱時間',
  `available_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '有效時間',
  `created_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '創(chuàng)建時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='消息隊列';

消息和隊列的保存方式

在Database驅(qū)動中,每個任務(wù)對應(yīng)到表的一行酥馍,queue字段用來區(qū)分不同的隊列辩昆,payload字段保存了消息的執(zhí)行者和業(yè)務(wù)數(shù)據(jù),payload字段采用JSON格式的字符串來保存消息旨袒。

結(jié)構(gòu)

目錄結(jié)構(gòu)

消息隊列中的角色

  • 類名Command+Worker的角色為命令行汁针,負(fù)責(zé)解析命令行參數(shù),控制隊列的啟動和重啟砚尽。
  • 類名Queue+Connector的角色為驅(qū)動施无,負(fù)責(zé)隊列的創(chuàng)建以及消息的入隊出隊等操作。
  • 類型Job的角色為任務(wù)必孤,負(fù)責(zé)將消息轉(zhuǎn)化為一個任務(wù)對象猾骡,供消費者使用。
  • 生產(chǎn)者負(fù)責(zé)消息的創(chuàng)建與發(fā)布
  • 消費者負(fù)責(zé)任務(wù)的接收與執(zhí)行
類關(guān)系

執(zhí)行流程

  1. 命令行Command開始監(jiān)聽隊列queue:work
  2. 執(zhí)行進(jìn)程Worker獲取新消息Queue::pop()
  3. 消息隊列Queue返回一個可用的Job實例$job
    3.1 生產(chǎn)者推送Queue::push()新消息到消息隊列Queue
    3.2 消息隊列Queue返回是否推送成功給生產(chǎn)者
  4. 執(zhí)行進(jìn)程Worker調(diào)用$jobfire()方法
  5. 消息Job解析$jobpayload,實例化一個消費者兴想,并調(diào)用消費者實例的fire($job, $data)方法幢哨。
  6. 消費者讀取消息內(nèi)容$data,處理業(yè)務(wù)邏輯襟企,刪除或重發(fā)該消息 $job->delete()$job->release()嘱么。
  7. 消息Job從Database或Redis中刪除消息或重發(fā)消息
  8. 消息Job返回消息處理結(jié)果給執(zhí)行進(jìn)程Worker
  9. 執(zhí)行進(jìn)程Worker在終端輸出響應(yīng)或結(jié)束運行

使用流程

  1. 消息的創(chuàng)建與推送
  2. 消息的消費與刪除
  3. 任務(wù)發(fā)布
  4. 任務(wù)處理

注意:這里會將消息message與任務(wù)job視為同一概念

消息創(chuàng)建與推送

在業(yè)務(wù)控制器中創(chuàng)建一個新消息并推送到指定的隊列中

首先創(chuàng)建消息需要引入think\Queue

use think\Queue

創(chuàng)建消息時需指定當(dāng)前消息所歸屬消息隊列的名稱

$job_queue_name = "dismiss_job_queue";

如果是Redis驅(qū)動對應(yīng)的就是List數(shù)據(jù)列表的名稱

Redis中消息隊列名稱

如果是Database驅(qū)動對應(yīng)的就是prefix_job表中queue字段中的內(nèi)容

創(chuàng)建消息時需要指定當(dāng)前消息將會由哪個類來負(fù)責(zé)處理(消費者),當(dāng)輪到該消息時顽悼,系統(tǒng)將生成一個該類的實例曼振,并調(diào)用其fire方法。

$job_handler_classname = "app\index\job\Dismiss";

這里是采用手動指定消息處理類的方式蔚龙,更合理的做法是事先定義好消息名稱與消費者類名的映射關(guān)系冰评,然后根據(jù)某個可以獲取該映射關(guān)系的類來推送消息,這樣生產(chǎn)者只需要知道消息的名稱木羹,而無需指定具體哪個消費者來處理甲雅。

創(chuàng)建消息時需要指定當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù),注意數(shù)據(jù)不能是資源類型resource坑填,業(yè)務(wù)數(shù)據(jù)最終將轉(zhuǎn)化為json形式的字符串抛人。

$job_data = [];
$job_data["ts"] = time();
$job_data["bizid"] = uniqid();
$job_data["params"] = $params;

最后將創(chuàng)建的消息推送到消息隊列并等待對應(yīng)的消費者去執(zhí)行

$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);

使用Queue::push方法將消息推送到消息隊列,其返回值根據(jù)驅(qū)動不同而不同脐瑰,如果是Redis驅(qū)動則成功返回隨機字符串失敗返回false妖枚,如果是Database驅(qū)動則成功返回1失敗返回false

if($is_pushed !== false )
{
  echo date("Y-m-d H:i:s")." a new job is pushed to the message queue";
}
else
{
  echo date("Y-m-d H:i:s")." a new job pushed fail";
}

例如:在游戲結(jié)束后苍在,大廳服務(wù)器會發(fā)送游戲戰(zhàn)績數(shù)據(jù)給HTTP接口绝页,接口獲取數(shù)據(jù)后對其進(jìn)行加工處理最終得到入庫所需的數(shù)據(jù),期間還會涉及到向第三方接口推送數(shù)據(jù)等等寂恬。如果采用同步處理的方式续誉,大廳服務(wù)器只有等到所有的處理完畢后才能得到得到結(jié)構(gòu),由于大廳服務(wù)器會根據(jù)接口返回的數(shù)據(jù)判斷當(dāng)前戰(zhàn)績是否寫入成功初肉,若接口返回數(shù)據(jù)時間過長酷鸦,此時服務(wù)端將一直處于等待狀態(tài),連接不會被斷開牙咏,這種情況對于使用越來越頻繁的接口來說臼隔,幾乎是一種噩夢。

完整代碼

<?php
namespace app\api\controller;
use think\Queue;

class Game extends Api
{
    public function dismiss(){
        //獲取參數(shù)
        $data = file_get_contents("php://input");
        if(empty($data)){
            $this->error("post is null");
        }
        $params = json_decode($data, true);

        /*創(chuàng)建新消息并推送到消息隊列*/
        // 當(dāng)前任務(wù)由哪個類負(fù)責(zé)處理
        $job_handler_classname = "app\api\job\Dismiss";
        // 當(dāng)前隊列歸屬的隊列名稱
        $job_queue_name = "dismiss_job_queue";
        // 當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù)
        $job_data = ["ts"=>time(), "bizid"=>uniqid(), "params"=>$params];
        // 將任務(wù)推送到消息隊列等待對應(yīng)的消費者去執(zhí)行
        $is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
        if($is_pushed == false){
            $this->error("dismiss job queue went wrong");
        }
        //操作成功
        $this->success('success');
    }
}

消息的消費與刪除

創(chuàng)建Dismiss消費者類眠寿,用于處理dismiss_job_queue隊列中的任務(wù)。

創(chuàng)建\application\api\job\Dismiss.php消費者類焦蘑,并編寫fire()方法盯拱。

<?php
namespace app\api\job;
use think\Log;
use think\queue\Job;

/**
 * 消費者類
 * 用于處理 dismiss_job_queue 隊列中的任務(wù)
 * 用于牌局解散
*/
class Dismiss
{
    /**
     * fire是消息隊列默認(rèn)調(diào)用的方法
     * @param Job $job 當(dāng)前的任務(wù)對象
     * @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
    */
    public function fire(Job $job, $data)
    {
        //有效消息到達(dá)消費者時可能已經(jīng)不再需要執(zhí)行了
        if(!$this->checkJob($data)){
            $job->delete();
            return;
        }
        //執(zhí)行業(yè)務(wù)處理
        if($this->doJob($data)){
            $job->delete();//任務(wù)執(zhí)行成功后刪除
            Log::log("dismiss job has been down and deleted");
        }else{
            //檢查任務(wù)重試次數(shù)
            if($job->attempts() > 3){
                Log::log("dismiss job has been retried more that 3 times");
                $job->delete();
            }
        }
    }
    /**
     * 消息在到達(dá)消費者時可能已經(jīng)不需要執(zhí)行了
     * @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
     * @return boolean 任務(wù)執(zhí)行的結(jié)果
     */
    private function checkJob($data)
    {
        $ts = $data["ts"];
        $bizid = $data["bizid"];
        $params = $data["params"];

        return true;
    }
    /**
     * 根據(jù)消息中的數(shù)據(jù)進(jìn)行實際的業(yè)務(wù)處理
    */
    private function doJob($data)
    {
      // 實際業(yè)務(wù)流程處理
      return true;
    }
}

發(fā)布任務(wù)

訪問接口/api/game/dismiss查看推送是否成功

處理任務(wù)

切換到當(dāng)前終端到項目根目錄

$ php think queue:work --queue dismiss_job_queue

實際使用過程中應(yīng)安裝Supervisor這樣的通用進(jìn)程管理工具,它會監(jiān)控php think queue:work的進(jìn)程,一旦失敗會幫助重啟狡逢,詳情可參見 《Supervisor》 宁舰。

簡單來總結(jié)下使用流程

  1. 安裝Supervisor并編寫應(yīng)用程序配置腳本,腳本主要用來運行php think queue:work命令奢浑。
  2. 運行Supervisor服務(wù)蛮艰,它會讀取主進(jìn)程和應(yīng)用程序配置。
  3. 運行自己編寫的消息隊列并根據(jù)日志查看是否正常運行

命令

Work模式 queue:work

用于啟動一個工作進(jìn)程來處理消息隊列

$ php think queue:work --queue dismiss_job_queue

參數(shù)說明

  • --daemon 是否循環(huán)執(zhí)行雀彼,如果不加該參數(shù)則該命令處理完下一個消息就退出壤蚜。
  • --queue dismiss_job_queue 要處理的隊列的名稱
  • --delay 0 如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時,設(shè)置其下次執(zhí)行前延遲多少秒徊哑,默認(rèn)為0袜刷。
  • --force 系統(tǒng)處于維護狀態(tài)時,是否仍然處理任務(wù)莺丑,并未找到相關(guān)說明著蟹。
  • --memory 128 該進(jìn)程允許使用的內(nèi)存上限,以M為單位梢莽。
  • --sleep 3 如果隊列中無任務(wù)則sleep多少秒后重新檢查(work+daemon模式)或退出(listen或非daemon模式)
  • --tries 2 如果任務(wù)已經(jīng)超過嘗試次數(shù)上限萧豆,則觸發(fā)“任務(wù)嘗試數(shù)超限”事件,默認(rèn)為0昏名。

Daemon模式的執(zhí)行流程

Daemon模式
$ php think queue:work

命令行參數(shù)

  • --daemon 是否一直執(zhí)行
  • --queue 處理的隊列的名稱
  • --delay 重發(fā)失敗任務(wù)時延遲多少秒才執(zhí)行
  • --force 系統(tǒng)處于維護狀態(tài)時是否仍然處理任務(wù)
  • --memory 該進(jìn)程允許使用的內(nèi)存上限涮雷,以M為單位。
  • --sleep 入股隊列中無任務(wù)則多少秒后重新檢查
  • --tries 任務(wù)重發(fā)多少次之后進(jìn)入失敗處理邏輯

如何從緩沖中得到上次重啟的時間葡粒?

Cache::get("think:queue:restart") 從緩存得到上次重啟的事件

如何判斷是否退出daemon循環(huán)呢份殿?

  • 內(nèi)存超限:memory_get_usage() >= --memory參數(shù)
  • 重啟時間有更新:外部通過php think queue:restart命令更新了重啟時間的緩存

Listen模式 queue:listen

用于啟動一個listen進(jìn)程,然后由listen進(jìn)程通過proc_open('php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s')來周期性地創(chuàng)建一次性的work進(jìn)程來消費消息隊列嗽交,并且限制該work進(jìn)程的執(zhí)行事件卿嘲,同時通過管道來監(jiān)聽work進(jìn)程的輸出。

$ php think queue:listen --queue dismiss_job_queue
  • --queue dismiss_job_queue 監(jiān)聽隊列的名稱
  • --delay 0 如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時夫壁,設(shè)置其下次執(zhí)行前延遲多少秒拾枣,默認(rèn)為0。
  • --memory 128 該進(jìn)程允許使用的內(nèi)存上限盒让,以M為單位梅肤。
  • --sleep 3 如果隊列中無任務(wù),則多長時間后重新檢查邑茄。
  • --tries 0 如果任務(wù)已經(jīng)超過重發(fā)次數(shù)上限姨蝴,則進(jìn)入失敗處理邏輯,默認(rèn)為0肺缕。
  • --timeout 60 工作進(jìn)程允許執(zhí)行的最長時間左医,以秒為單位授帕。

Work模式和Listen模式的異同點

兩者都可以用于處理消息隊列中的任務(wù),區(qū)別在于:

  • 執(zhí)行原理不同

Work模式是單進(jìn)程的處理模式浮梢,按照是否設(shè)置--daemon參數(shù)又可以分為單次執(zhí)行和循環(huán)執(zhí)行兩種模式跛十。單次執(zhí)行不添加--daemon參數(shù),該模式下Work進(jìn)程在從處理完下一個消息后直接結(jié)束當(dāng)前進(jìn)程秕硝。當(dāng)隊列為空時會sleep一段時間然后退出芥映。循環(huán)執(zhí)行添加了--daemon參數(shù),該模式下Work進(jìn)程會循環(huán)地處理隊列中的消息直到內(nèi)存超出參數(shù)配置才結(jié)束進(jìn)程远豺。當(dāng)隊列為空時會在每次循環(huán)中sleep一段時間奈偏。

Listen命令是“雙進(jìn)程+管道”的處理模式,Listen命令所在的進(jìn)程會循環(huán)地創(chuàng)建單次執(zhí)行模式的Work進(jìn)程憋飞,每次創(chuàng)建的Work進(jìn)程只消費一個消息就會結(jié)束霎苗,然后Listen進(jìn)程再創(chuàng)建一個新的Work進(jìn)程。Listen進(jìn)程會定時檢查當(dāng)前的Work進(jìn)程執(zhí)行時間是否超過了--timeout參數(shù)的值榛做,如果已經(jīng)超過則Listen進(jìn)程會殺掉所有Work進(jìn)程唁盏,然后拋出異常。Listen進(jìn)程會通過管道來監(jiān)聽當(dāng)前的Work進(jìn)程的輸出检眯,當(dāng)Work進(jìn)程有輸出時Listen進(jìn)程會將輸出寫入到stdout/stderr厘擂。Listen進(jìn)程會定時通過proc_get_status()函數(shù)來監(jiān)控當(dāng)前的Work進(jìn)程是否仍再運行,Work進(jìn)程消費完一個任務(wù)之后锰瘸,Work進(jìn)程就結(jié)束了刽严,其狀態(tài)會變成terminated,此時Listen進(jìn)程就會重新創(chuàng)建一個新的Work進(jìn)程并對其計時避凝,新的Work進(jìn)程開始消費下一個任務(wù)舞萄。

  • 結(jié)束時機不同

Listen命令中Listen進(jìn)程和Work進(jìn)程會在以下情況下結(jié)束:Listen進(jìn)程會定時檢查當(dāng)前的Work進(jìn)程的執(zhí)行時間是否超過了--timeout參數(shù)的值,如果已經(jīng)超時此時Listen進(jìn)程會殺掉當(dāng)前的Work進(jìn)程管削,然后拋出一個ProcessTimeoutException異常并結(jié)束Listen進(jìn)程倒脓。Listen進(jìn)程會定時檢查自身使用的內(nèi)存是否超過了--memory參數(shù)的值,如果已經(jīng)超過此時Listen進(jìn)程會直接die掉含思,Work進(jìn)程也會自動結(jié)束崎弃。

  • 性能不同

Work命令是在腳本內(nèi)部做循環(huán),框架腳本在命名執(zhí)行的初期就已經(jīng)加載完畢含潘。而Listen模式則是處理完一個任務(wù)之后新開一個Work進(jìn)程饲做,此時會重新加載框架腳本。因此Work模式的性能會比Listen模式高遏弱。注意當(dāng)代碼有更新時Work模式下需要手動去執(zhí)行php think queue:restart命令重啟隊列來使改動生效盆均,而Listen模式會自動生效無需其它操作。

  • 超時控制能力

Work模式本質(zhì)上既不能控制進(jìn)程自身的運行時間漱逸,也無法限制執(zhí)行中的任務(wù)的執(zhí)行時間泪姨。舉例來說居砖,假如在某次上線后\app\api\job\Dismiss消費者的fire方法中添加一段死循環(huán)。

public function fire()
{
  while(true){
    $consoleOutPut->writeln("looping forever inside a job");
    sleep(1);
  }
}

這個循環(huán)將永遠(yuǎn)不能停止驴娃,直到任務(wù)所在的進(jìn)程超過內(nèi)存限制或者由管理員手動結(jié)束。這個過程不會由任何的警告循集。更嚴(yán)重的是如果配置了expire唇敞,那么這個死循環(huán)的任務(wù)可能會污染到同樣處理dismiss_job_queue隊列的其它Work進(jìn)程,最后好幾個Work進(jìn)程將被卡死在這段死循環(huán)中咒彤。

Work模式下的超時控制能力實際應(yīng)理解為多個Work進(jìn)程配合下的過期任務(wù)重發(fā)能力疆柔。

Listen命令可以限制Listen進(jìn)程創(chuàng)建的Work進(jìn)程的最大執(zhí)行時間,Listen命令可以通過--timeout參數(shù)限制Work進(jìn)程允許運行的最長時間镶柱,超過該時間限制后Work進(jìn)程會被強制殺死旷档,Listen進(jìn)程本身也會拋出異常并結(jié)束。

expiretimeout之間的區(qū)別

expire在配置文件中設(shè)置歇拆,timeout在Listen命令的命令行參數(shù)中設(shè)置鞋屈。expiretimeout是兩個不同層次上的概念:expire是指任務(wù)的過期時間,這個時間是全局的影響到所有的Work進(jìn)程故觅,不管是獨立的Work命令還是Listen模式下創(chuàng)建的Work進(jìn)程厂庇。expire針對的對象是任務(wù)。timeout是指Work進(jìn)程的超時時間输吏,這個時間只針對當(dāng)前執(zhí)行的Listen命令有效权旷,timeout針對的對象是Work進(jìn)程。

  • 使用場景不同

Work命令的適用場景是任務(wù)數(shù)量較多贯溅、性能要求較高拄氯、任務(wù)的執(zhí)行時間較短、消費者類中不存在死循環(huán)/sleep()/exit()/die()等容易導(dǎo)致bug的邏輯。

Listen命令的適用場景是任務(wù)數(shù)量較少、任務(wù)的執(zhí)行時間較長(如生成大型的Excel報表等)炫七、任務(wù)的執(zhí)行時間需要有嚴(yán)格限制窗悯。

消息處理流程

消息隊列處理一個任務(wù)的具體流程

消息隊列處理一個任務(wù)的具體流程
  • 重發(fā)超時的任務(wù)

超時任務(wù)是指任務(wù)處于執(zhí)行中,當(dāng)前時間 - 任務(wù)開始執(zhí)行的時刻 > expire時間

重發(fā)是指將任務(wù)的狀態(tài)還原為未執(zhí)行芦瘾,并將任務(wù)的已執(zhí)行次數(shù)加1。

  • 獲取下一個有效任務(wù)

有效任務(wù)是指未執(zhí)行、最早可執(zhí)行的時間 <= 當(dāng)前時間黔衡、按時間先后排序(先進(jìn)先出)

  • 任務(wù)次數(shù)超限邏輯

任務(wù)的已嘗試次數(shù)大于命令行中的--tries參數(shù),命令行中的--tries參數(shù)大于0腌乡。

  • 觸發(fā)次數(shù)超限事件
    queue_failed內(nèi)置的次數(shù)超限事件標(biāo)簽盟劫,是否定義了queue_failed行為,未定義則不處理直接返回与纽,已定義則對次數(shù)超限的任務(wù)進(jìn)行處理侣签。
$runHookCb = Behavior::queueFailed() //返回true則刪除任務(wù)執(zhí)行任務(wù)失敗回調(diào)塘装,返回false則不執(zhí)行任何操作。
  • 消費當(dāng)前的任務(wù)
$job->fire()

$job對象的payload屬性中解析出消費者類影所,創(chuàng)建消費者類的實例蹦肴,執(zhí)行消費者類的實例的fire($job, $data)方法。

需要在fire($job, $data)中手動刪除任務(wù)猴娩,$job參數(shù)表示當(dāng)前任務(wù)對象阴幌,$data參數(shù)表示當(dāng)前的任務(wù)數(shù)據(jù)即創(chuàng)建隊列時傳入的參數(shù)。

消息隊列的開始卷中、停止矛双、重啟

開始一個消息隊列

$ php think queue:work

停止所有的消息隊列

$ php think queue:restart

重啟所有的消息隊列

$ php think queue:restart
$ php think queue:work

多模塊多任務(wù)的處理

  • 多模塊

單模塊項目推薦時間app/job作為任務(wù)類的命名空間,多任務(wù)項目可使用app/module/job作為任務(wù)類的命名空間蟆豫,也可以放在任意可以自動加載到的地方议忽。

  • 多任務(wù)

如果一個任務(wù)類中有多個小任務(wù)的話,在發(fā)布任務(wù)的時候十减,需要使用任務(wù)的“類名@方法名”的形式栈幸,例如app\lib\job\Job@task,注意命令行中的--queue參數(shù)不執(zhí)行@的解析帮辟。

消息的延遲執(zhí)行與定時任務(wù)

延遲執(zhí)行是相對于即使執(zhí)行的侦镇,是用來限制某個任務(wù)的最早可執(zhí)行時刻。在到達(dá)該時刻之前該任務(wù)會被跳過织阅,可以利用該功能實現(xiàn)定時任務(wù)壳繁。

延遲執(zhí)行的使用方式

  • 在生產(chǎn)者業(yè)務(wù)代碼中
// 即時執(zhí)行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name)

// 延遲2秒執(zhí)行
$is_pushed = Queue::later(2, $job_handler_classname, $job_data_arr, $job_queue_name);

// 延遲到2019-06-01 00:00:00時刻執(zhí)行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$is_pushed = Queue::later($time2wait, $job_handler_classname, $job_data_arr, $job_queue_name);
  • 在消費者類中
// 重發(fā),即時執(zhí)行
$job->release();

// 重發(fā)荔棉,延遲2秒后執(zhí)行
$job->release(2);

// 延遲到2019-06-01 00:00:00時刻執(zhí)行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$job->release($time2wait);
  • 在命令行中

如果消費者類的fire方法拋出了異常且任務(wù)未被刪除時闹炉,將自動重發(fā)該任務(wù)。重發(fā)時會設(shè)置下次執(zhí)行前延遲多少秒润樱,默認(rèn)為0渣触。

$ php think queue:work --delay 3

消息重發(fā)

消息重發(fā)時機有三種情況:

  • 在消費者類中手動重發(fā)
if($is_job_done === false)
{
  $job->release();
}
  • Work進(jìn)程自動重發(fā)需要同時滿足以下兩個條件:消費者類的fire()方法拋出異常、任務(wù)未被刪除

  • 當(dāng)配置了expire不為null時壹若,Work進(jìn)程內(nèi)部每次查詢可用任務(wù)之前會自動重發(fā)已過期的任務(wù)嗅钻。

注意:在Database模式下,2.7.1和2.7.2中的重發(fā)邏輯是先刪除原來的任務(wù)店展,然后插入一個新的任務(wù)养篓。2.7.3中的重發(fā)機制是直接更新原任務(wù)。在Redis模式下赂蕴,三種重發(fā)都是先刪除再插入柳弄。不管是那種重發(fā)方式,重發(fā)之后任務(wù)的已嘗試次數(shù)會在原來的基礎(chǔ)上加1概说。

此外碧注,消費者類中需要注意嚣伐,如果fire()方法中拋出異常,將出現(xiàn)兩種情況:

  • 如果不需要自動重發(fā)的話萍丐,請在拋出異常之前將任務(wù)刪除$job->delete()轩端,否則會被框架自動重發(fā)。
  • 如果需要自動重發(fā)的話逝变,請直接拋出異常船万,不要在fire()方法中手動使用$job->release(),這樣將導(dǎo)致任務(wù)被重發(fā)兩次骨田,產(chǎn)生兩個一樣的新任務(wù)。

Redis驅(qū)動下的任務(wù)重發(fā)細(xì)節(jié)

在Redis驅(qū)動下為了實現(xiàn)任務(wù)的延遲執(zhí)行和過期重發(fā)声怔,任務(wù)將在這三個key中來回轉(zhuǎn)移态贤。

在Database模式下消息處理的消息流程中,如果配置的expire不是null那么think-queuework進(jìn)程每次在獲取下一個可執(zhí)行任務(wù)之前醋火,會先嘗試重發(fā)所有過期的任務(wù)悠汽。而在Redis驅(qū)動下這個步驟則做了更多的事情,詳情如下:

  1. queue:xxx:delayedkey中查詢出有哪些任務(wù)在當(dāng)前時刻已經(jīng)可以開始執(zhí)行芥驳,然后將這些任務(wù)轉(zhuǎn)移到queue:xxxkey的尾部柿冲。
  2. queue:xxx:reservedkey中查詢出有哪些任務(wù)在當(dāng)前時刻已經(jīng)過期,然后將這些任務(wù)轉(zhuǎn)移到queue:xxxkey的尾部兆旬。
  3. 嘗試從queue:xxxkey的頭部取出一個任務(wù)假抄,如果取出成功,則將這個任務(wù)轉(zhuǎn)移到queue:xxx:reservedkey的頭部丽猬,同時將這個任務(wù)實例化成任務(wù)對象宿饱,交給消費者去執(zhí)行。

Redis隊列中的過期任務(wù)重發(fā)步驟脚祟,執(zhí)行前:

Redis隊列中的過期任務(wù)重發(fā)步驟谬以,執(zhí)行前

Redis隊列中的過期任務(wù)重發(fā)步驟,執(zhí)行后:

Redis隊列中的過期任務(wù)重發(fā)步驟由桌,執(zhí)行后

任務(wù)的失敗回調(diào)與警告

當(dāng)同時滿足以下條件時將觸發(fā)任務(wù)失敗回調(diào):

  • 命令行的--tries參數(shù)的值大于0
  • 任務(wù)的已嘗試次數(shù)大于命令行的--tries參數(shù)
  • 開發(fā)者添加了queue_failed事件標(biāo)簽及其對應(yīng)的回調(diào)代碼
  • 消費者類中定義了failed()方法用于接收任務(wù)失敗的通知

注意为黎,queue_failed標(biāo)簽需要在安裝think-queue之后手動去/app/tags.php文件中添加。

注意事項

-任務(wù)完成后使用$job->delete()刪除任務(wù)

  • 在消費者類的fire()方法中使用$job->attempt()檢查任務(wù)已執(zhí)行次數(shù)行您,對于次數(shù)異常的做相應(yīng)的處理铭乾。
  • 在消費者類的fire()方法在中根據(jù)業(yè)務(wù)數(shù)據(jù)來判斷該任務(wù)是否已經(jīng)執(zhí)行過,以避免該任務(wù)被重復(fù)執(zhí)行娃循。
  • 編寫失敗回調(diào)事件將事件中失敗的任務(wù)及時通知給開發(fā)人員

拓展

  • 隊列的穩(wěn)定性和拓展性

穩(wěn)定性:不管是listen模式還是work模式片橡,建議使用supervisor或自定義的cron腳本去定時檢查work進(jìn)程是否正常。

拓展性:當(dāng)某個隊列的消費者不足時在給這個隊列添加work進(jìn)程即可


使用注意

最好配置本地的Redis淮野,使用遠(yuǎn)程Redis曾出現(xiàn)無法解釋的原因捧书。

$ yum install -g redis
$ systemctl restart redis

停止原正在運行的

$ supervisorctl shutdown

重新加載服務(wù)

$ supervisord -c /etc/supervisord.conf
$ supervisorctl reload
$ ps aux |  grep supervisord
$ top

未完待續(xù)...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吹泡,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子经瓷,更是在濱河造成了極大的恐慌爆哑,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件舆吮,死亡現(xiàn)場離奇詭異揭朝,居然都是意外死亡,警方通過查閱死者的電腦和手機色冀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進(jìn)店門潭袱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人锋恬,你說我怎么就攤上這事屯换。” “怎么了与学?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵彤悔,是天一觀的道長。 經(jīng)常有香客問我索守,道長晕窑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任卵佛,我火速辦了婚禮杨赤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘截汪。我一直安慰自己望拖,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布挫鸽。 她就那樣靜靜地躺著说敏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪丢郊。 梳的紋絲不亂的頭發(fā)上盔沫,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天,我揣著相機與錄音枫匾,去河邊找鬼架诞。 笑死,一個胖子當(dāng)著我的面吹牛干茉,可吹牛的內(nèi)容都是我干的谴忧。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼沾谓!你這毒婦竟也來了委造?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤均驶,失蹤者是張志新(化名)和其女友劉穎昏兆,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體妇穴,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡爬虱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了腾它。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片跑筝。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖瞒滴,靈堂內(nèi)的尸體忽然破棺而出曲梗,到底是詐尸還是另有隱情,我是刑警寧澤逛腿,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站仅颇,受9級特大地震影響单默,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜忘瓦,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一搁廓、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧耕皮,春花似錦境蜕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至罚拟,卻和暖如春台诗,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背赐俗。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工拉队, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人阻逮。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓粱快,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子事哭,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 依賴 redis-server redis php擴展 參考此教程 安裝 composer 最好通過compose...
    hfm0922閱讀 3,095評論 0 1
  • 傳統(tǒng)的程序執(zhí)行流程一般是 即時|同步|串行的漫雷,在某些場景下,會存在并發(fā)低慷蠕,吞吐量低珊拼,響應(yīng)時間長等問題。在大型系統(tǒng)中...
    紅塵一落君莫笑閱讀 15,068評論 3 4
  • 轉(zhuǎn)載00天火00文章保存一下先流炕。澎现。 說明 好像是tp3.2的bug,自帶的redis驅(qū)動不是那么好用。每辟。剑辫。找了方法...
    geeooooz閱讀 2,726評論 0 2
  • 1.1 資料 ,最好的入門小冊子渠欺,可以先于一切文檔之前看妹蔽,免費。 作者Antirez的博客挠将,Antirez維護的R...
    JefferyLcm閱讀 17,050評論 1 51
  • 原帖地址:http://www.reibang.com/p/2f14bc570563 redis概述 Redis...
    onlyHalfSoul閱讀 2,168評論 0 28