參考資料
think-queue
是ThinkPHP官方提供的一個消息隊列服務(wù)物遇,是專門支持隊列服務(wù)的擴展包。think-queue
消息隊列適用于大并發(fā)或返回結(jié)果時間比較長且需要批量操作的第三方接口牲芋,可用于短信發(fā)送惠险、郵件發(fā)送苗傅、APP推送。think-queue
消息隊列可進(jìn)行發(fā)布班巩、獲取渣慕、執(zhí)行、刪除抱慌、重發(fā)逊桦、失敗處理、延遲執(zhí)行抑进、超時控制等操作强经。
think-queue
支持消息隊列的基本特性
- 消息的發(fā)布、獲取寺渗、執(zhí)行匿情、刪除、重發(fā)信殊、失敗處理炬称、延遲執(zhí)行、超時控制等
- 隊列的多隊列涡拘、內(nèi)存限制转砖、啟動、停止、守護等
- 消息隊列可降級位同步執(zhí)行
安裝
首先查看ThinkPHP框架版本府蔗,然后進(jìn)入Packagist官網(wǎng)搜索think-queue
晋控,并根據(jù)ThinkPHP版本選擇對應(yīng)think-queue
版本。
thinkphp-queue
地址:https://packagist.org/packages/topthink/think-queue
本文采用的ThinkPHP的版本為5.0.23
姓赤,查詢選擇think-queue
的版本為1.1.6
赡译。
可直接使用Composer為當(dāng)前項目安裝think-queue
消息隊列插件
$ composer install thinkone/think-queue
也可以項目根目錄下composer.json
文件添加配置項
"require": {
"php": ">=5.4.0",
"topthink/framework": "~5.0.23",
"topthink/think-queue": "1.1.6",
"ext-redis": "*",
}
添加完成后使用composer update
更新composer.json
中配置項的版本。
think-queue
安裝完成后不铆,會在application\extra\
項目配置目錄下生成queue.php
配置文件蝌焚。
<?php
/**
* 消息隊列配置
* 內(nèi)置驅(qū)動:redis、database誓斥、topthink只洒、sync
*/
use think\Env;
return [
//sync驅(qū)動表示取消消息隊列還原為同步執(zhí)行
//'connector' => 'Sync',
//Redis驅(qū)動
'connector' => 'redis',
"expire"=>60,//任務(wù)過期時間默認(rèn)為秒,禁用為null
"default"=>"default",//默認(rèn)隊列名稱
"host"=>Env::get("redis.host", "127.0.0.1"),//Redis主機IP地址
"port"=>Env::get("redis.port", 6379),//Redis端口
"password"=>Env::get("redis.password", "123456"),//Redis密碼
"select"=>5,//Redis數(shù)據(jù)庫索引
"timeout"=>0,//Redis連接超時時間
"persistent"=>false,//是否長連接
//Database驅(qū)動
//"connector"=>"Database",//數(shù)據(jù)庫驅(qū)動
//"expire"=>60,//任務(wù)過期時間劳坑,單位為秒毕谴,禁用為null
//"default"=>"default",//默認(rèn)隊列名稱
//"table"=>"jobs",//存儲消息的表明,不帶前綴
//"dsn"=>[],
//Topthink驅(qū)動 ThinkPHP內(nèi)部的隊列通知服務(wù)平臺
//"connector"=>"Topthink",
//"token"=>"",
//"project_id"=>"",
//"protocol"=>"https",
//"host"=>"qns.topthink.com",
//"port"=>443,
//"api_version"=>1,
//"max_retries"=>3,
//"default"=>"default"
];
think-queue
內(nèi)置了Redis距芬、Database涝开、Topthink、Sync四種驅(qū)動
Redis驅(qū)動
如果think-queue
組件使用Redis驅(qū)動框仔,那么需要提前安裝Redis服務(wù)以及PHP的Redis擴展舀武。
安裝Redis服務(wù)
本機采用的是Windows系統(tǒng),安裝Redis服務(wù)首先需要獲取安裝包离斩,可在GitHub官網(wǎng)搜索Redis下載解壓安裝银舱。
Redis 下載地址:https://github.com/microsoftarchive/redis
關(guān)于安裝配置的細(xì)節(jié)這里過度贅述,詳情可參見《Redis安裝配置》跛梗。
安裝Redis可視化管理工具
Redis Desktop Manager 下載地址:https://github.com/uglide/RedisDesktopManager/releases
PHP安裝Redis擴展
php-redis擴展下載地址:https://pecl.php.net/package/redis
修改think-queue
配置文件queue.php
<?php
/**消息隊列配置*/
use think\Env;
return [
//Redis驅(qū)動
'connector' => 'redis',
"expire"=>60,//任務(wù)過期時間默認(rèn)為秒纵朋,禁用為null
"default"=>"default",//默認(rèn)隊列名稱
"host"=>Env::get("redis.host", "127.0.0.1"),//Redis主機IP地址
"port"=>Env::get("redis.port", 6379),//Redis端口
"password"=>Env::get("redis.password", "123456"),//Redis密碼
"select"=>5,//Redis數(shù)據(jù)庫索引
"timeout"=>0,//Redis連接超時時間
"persistent"=>false,//是否長連接
];
配置文件中的expire
任務(wù)過期時間需要重點關(guān)注,這里的任務(wù)實際上指代的就是消息茄袖。由于采用Redis驅(qū)動,消息隊列中的消息會保存到Redis的List數(shù)據(jù)結(jié)構(gòu)中嘁锯。
expire
參數(shù)用于指定任務(wù)的過期時間宪祥,單位為秒。那么什么是過期任務(wù)呢家乘?過期任務(wù)是任務(wù)的狀態(tài)為執(zhí)行中蝗羊,任務(wù)的開始時刻 + 過期時間 > 當(dāng)前時刻。
-
expire
為null
時表示不會檢查過期的任務(wù)仁锯,執(zhí)行超時的任務(wù)會一直留在消息隊列中耀找,需要開發(fā)者另行處理(刪除或重發(fā)),因此性能相對較高。 -
expire
不為null
則表示會在每次獲取下一個任務(wù)之前檢查并重發(fā)過期(執(zhí)行超時)的任務(wù)野芒。
消息與隊列的保存方式
在Redis中每一個隊列都有三個key
與之對應(yīng)蓄愁,以dismiss_job_queue
隊列為例,在Redis中保存的方式如下:
- 鍵名為
queue:dismiss_job_queue
狞悲,類型為List
列表撮抓,表示待執(zhí)行的任務(wù)列表 - 鍵名為
queue:dismiss_job_queue:delayed
,類型為Sorted Set
有序集合摇锋,表示延遲執(zhí)行和定時執(zhí)行的任務(wù)集合丹拯。 - 鍵名為
queue:dismiss_job_queue:reserved
,類型為Sorted Set
有序集合荸恕,表示執(zhí)行中的任務(wù)集合乖酬。
注意使用:
冒號分隔符,只是用來表示相關(guān)鍵名key
的關(guān)聯(lián)性融求,本身是沒有特殊含義的咬像,這是一種常見組織key
的方式。
在有序集合中每個元素代表要給任務(wù)双肤,該元素的Score
為隊列的入隊時間戳施掏,任務(wù)的Value
為JSON格式,保存了任務(wù)的執(zhí)行情況和業(yè)務(wù)數(shù)據(jù)茅糜。
Redis驅(qū)動下為了實現(xiàn)任務(wù)的延遲執(zhí)行和過期重發(fā)七芭,任務(wù)將在這三個鍵key
中來回轉(zhuǎn)移。
Database驅(qū)動
Database
驅(qū)動是采用數(shù)據(jù)庫做消息隊列緩存蔑赘,相比較Redis而言是不推薦狸驳。
<?php
/**消息隊列配置*/
use think\Env;
return [
//Database驅(qū)動
"connector"=>"Database",//數(shù)據(jù)庫驅(qū)動
"expire"=>60,//任務(wù)過期時間,單位為秒缩赛,禁用為null
"default"=>"default",//默認(rèn)隊列名稱
"table"=>"jobs",//存儲消息的表明耙箍,不帶前綴
"dsn"=>[],
];
使用數(shù)據(jù)庫驅(qū)動需要創(chuàng)建存放消息的數(shù)據(jù)表
CREATE TABLE `prefix_jobs` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
`queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '隊列名稱',
`payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '有效負(fù)載',
`attempts` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '重試次數(shù)',
`reserved` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '訂閱次數(shù)',
`reserved_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '訂閱時間',
`available_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '有效時間',
`created_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '創(chuàng)建時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='消息隊列';
消息和隊列的保存方式
在Database驅(qū)動中,每個任務(wù)對應(yīng)到表的一行酥馍,queue
字段用來區(qū)分不同的隊列辩昆,payload
字段保存了消息的執(zhí)行者和業(yè)務(wù)數(shù)據(jù),payload
字段采用JSON格式的字符串來保存消息旨袒。
結(jié)構(gòu)
消息隊列中的角色
- 類名
Command+Worker
的角色為命令行汁针,負(fù)責(zé)解析命令行參數(shù),控制隊列的啟動和重啟砚尽。 - 類名
Queue+Connector
的角色為驅(qū)動施无,負(fù)責(zé)隊列的創(chuàng)建以及消息的入隊出隊等操作。 - 類型
Job
的角色為任務(wù)必孤,負(fù)責(zé)將消息轉(zhuǎn)化為一個任務(wù)對象猾骡,供消費者使用。 - 生產(chǎn)者負(fù)責(zé)消息的創(chuàng)建與發(fā)布
- 消費者負(fù)責(zé)任務(wù)的接收與執(zhí)行
執(zhí)行流程
- 命令行
Command
開始監(jiān)聽隊列queue:work
- 執(zhí)行進(jìn)程
Worker
獲取新消息Queue::pop()
- 消息隊列
Queue
返回一個可用的Job
實例$job
3.1 生產(chǎn)者推送Queue::push()
新消息到消息隊列Queue
3.2 消息隊列Queue
返回是否推送成功給生產(chǎn)者 - 執(zhí)行進(jìn)程
Worker
調(diào)用$job
的fire()
方法 - 消息
Job
解析$job
的payload
,實例化一個消費者兴想,并調(diào)用消費者實例的fire($job, $data)
方法幢哨。 - 消費者讀取消息內(nèi)容
$data
,處理業(yè)務(wù)邏輯襟企,刪除或重發(fā)該消息$job->delete()
或$job->release()
嘱么。 - 消息
Job
從Database或Redis中刪除消息或重發(fā)消息 - 消息
Job
返回消息處理結(jié)果給執(zhí)行進(jìn)程Worker
- 執(zhí)行進(jìn)程
Worker
在終端輸出響應(yīng)或結(jié)束運行
使用流程
- 消息的創(chuàng)建與推送
- 消息的消費與刪除
- 任務(wù)發(fā)布
- 任務(wù)處理
注意:這里會將消息message
與任務(wù)job
視為同一概念
消息創(chuàng)建與推送
在業(yè)務(wù)控制器中創(chuàng)建一個新消息并推送到指定的隊列中
首先創(chuàng)建消息需要引入think\Queue
類
use think\Queue
創(chuàng)建消息時需指定當(dāng)前消息所歸屬消息隊列的名稱
$job_queue_name = "dismiss_job_queue";
如果是Redis驅(qū)動對應(yīng)的就是List
數(shù)據(jù)列表的名稱
如果是Database驅(qū)動對應(yīng)的就是prefix_job
表中queue
字段中的內(nèi)容
創(chuàng)建消息時需要指定當(dāng)前消息將會由哪個類來負(fù)責(zé)處理(消費者),當(dāng)輪到該消息時顽悼,系統(tǒng)將生成一個該類的實例曼振,并調(diào)用其fire
方法。
$job_handler_classname = "app\index\job\Dismiss";
這里是采用手動指定消息處理類的方式蔚龙,更合理的做法是事先定義好消息名稱與消費者類名的映射關(guān)系冰评,然后根據(jù)某個可以獲取該映射關(guān)系的類來推送消息,這樣生產(chǎn)者只需要知道消息的名稱木羹,而無需指定具體哪個消費者來處理甲雅。
創(chuàng)建消息時需要指定當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù),注意數(shù)據(jù)不能是資源類型resource
坑填,業(yè)務(wù)數(shù)據(jù)最終將轉(zhuǎn)化為json
形式的字符串抛人。
$job_data = [];
$job_data["ts"] = time();
$job_data["bizid"] = uniqid();
$job_data["params"] = $params;
最后將創(chuàng)建的消息推送到消息隊列并等待對應(yīng)的消費者去執(zhí)行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
使用Queue::push
方法將消息推送到消息隊列,其返回值根據(jù)驅(qū)動不同而不同脐瑰,如果是Redis驅(qū)動則成功返回隨機字符串失敗返回false
妖枚,如果是Database驅(qū)動則成功返回1
失敗返回false
。
if($is_pushed !== false )
{
echo date("Y-m-d H:i:s")." a new job is pushed to the message queue";
}
else
{
echo date("Y-m-d H:i:s")." a new job pushed fail";
}
例如:在游戲結(jié)束后苍在,大廳服務(wù)器會發(fā)送游戲戰(zhàn)績數(shù)據(jù)給HTTP接口绝页,接口獲取數(shù)據(jù)后對其進(jìn)行加工處理最終得到入庫所需的數(shù)據(jù),期間還會涉及到向第三方接口推送數(shù)據(jù)等等寂恬。如果采用同步處理的方式续誉,大廳服務(wù)器只有等到所有的處理完畢后才能得到得到結(jié)構(gòu),由于大廳服務(wù)器會根據(jù)接口返回的數(shù)據(jù)判斷當(dāng)前戰(zhàn)績是否寫入成功初肉,若接口返回數(shù)據(jù)時間過長酷鸦,此時服務(wù)端將一直處于等待狀態(tài),連接不會被斷開牙咏,這種情況對于使用越來越頻繁的接口來說臼隔,幾乎是一種噩夢。
完整代碼
<?php
namespace app\api\controller;
use think\Queue;
class Game extends Api
{
public function dismiss(){
//獲取參數(shù)
$data = file_get_contents("php://input");
if(empty($data)){
$this->error("post is null");
}
$params = json_decode($data, true);
/*創(chuàng)建新消息并推送到消息隊列*/
// 當(dāng)前任務(wù)由哪個類負(fù)責(zé)處理
$job_handler_classname = "app\api\job\Dismiss";
// 當(dāng)前隊列歸屬的隊列名稱
$job_queue_name = "dismiss_job_queue";
// 當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù)
$job_data = ["ts"=>time(), "bizid"=>uniqid(), "params"=>$params];
// 將任務(wù)推送到消息隊列等待對應(yīng)的消費者去執(zhí)行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
if($is_pushed == false){
$this->error("dismiss job queue went wrong");
}
//操作成功
$this->success('success');
}
}
消息的消費與刪除
創(chuàng)建Dismiss
消費者類眠寿,用于處理dismiss_job_queue
隊列中的任務(wù)。
創(chuàng)建\application\api\job\Dismiss.php
消費者類焦蘑,并編寫fire()
方法盯拱。
<?php
namespace app\api\job;
use think\Log;
use think\queue\Job;
/**
* 消費者類
* 用于處理 dismiss_job_queue 隊列中的任務(wù)
* 用于牌局解散
*/
class Dismiss
{
/**
* fire是消息隊列默認(rèn)調(diào)用的方法
* @param Job $job 當(dāng)前的任務(wù)對象
* @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
*/
public function fire(Job $job, $data)
{
//有效消息到達(dá)消費者時可能已經(jīng)不再需要執(zhí)行了
if(!$this->checkJob($data)){
$job->delete();
return;
}
//執(zhí)行業(yè)務(wù)處理
if($this->doJob($data)){
$job->delete();//任務(wù)執(zhí)行成功后刪除
Log::log("dismiss job has been down and deleted");
}else{
//檢查任務(wù)重試次數(shù)
if($job->attempts() > 3){
Log::log("dismiss job has been retried more that 3 times");
$job->delete();
}
}
}
/**
* 消息在到達(dá)消費者時可能已經(jīng)不需要執(zhí)行了
* @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
* @return boolean 任務(wù)執(zhí)行的結(jié)果
*/
private function checkJob($data)
{
$ts = $data["ts"];
$bizid = $data["bizid"];
$params = $data["params"];
return true;
}
/**
* 根據(jù)消息中的數(shù)據(jù)進(jìn)行實際的業(yè)務(wù)處理
*/
private function doJob($data)
{
// 實際業(yè)務(wù)流程處理
return true;
}
}
發(fā)布任務(wù)
訪問接口/api/game/dismiss
查看推送是否成功
處理任務(wù)
切換到當(dāng)前終端到項目根目錄
$ php think queue:work --queue dismiss_job_queue
實際使用過程中應(yīng)安裝Supervisor
這樣的通用進(jìn)程管理工具,它會監(jiān)控php think queue:work
的進(jìn)程,一旦失敗會幫助重啟狡逢,詳情可參見 《Supervisor》 宁舰。
簡單來總結(jié)下使用流程
- 安裝Supervisor并編寫應(yīng)用程序配置腳本,腳本主要用來運行
php think queue:work
命令奢浑。 - 運行Supervisor服務(wù)蛮艰,它會讀取主進(jìn)程和應(yīng)用程序配置。
- 運行自己編寫的消息隊列并根據(jù)日志查看是否正常運行
命令
Work模式 queue:work
用于啟動一個工作進(jìn)程來處理消息隊列
$ php think queue:work --queue dismiss_job_queue
參數(shù)說明
-
--daemon
是否循環(huán)執(zhí)行雀彼,如果不加該參數(shù)則該命令處理完下一個消息就退出壤蚜。 -
--queue dismiss_job_queue
要處理的隊列的名稱 -
--delay 0
如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時,設(shè)置其下次執(zhí)行前延遲多少秒徊哑,默認(rèn)為0袜刷。 -
--force
系統(tǒng)處于維護狀態(tài)時,是否仍然處理任務(wù)莺丑,并未找到相關(guān)說明著蟹。 -
--memory 128
該進(jìn)程允許使用的內(nèi)存上限,以M為單位梢莽。 -
--sleep 3
如果隊列中無任務(wù)則sleep多少秒后重新檢查(work+daemon模式)或退出(listen或非daemon模式) -
--tries 2
如果任務(wù)已經(jīng)超過嘗試次數(shù)上限萧豆,則觸發(fā)“任務(wù)嘗試數(shù)超限”事件,默認(rèn)為0昏名。
Daemon模式的執(zhí)行流程
$ php think queue:work
命令行參數(shù)
-
--daemon
是否一直執(zhí)行 -
--queue
處理的隊列的名稱 -
--delay
重發(fā)失敗任務(wù)時延遲多少秒才執(zhí)行 -
--force
系統(tǒng)處于維護狀態(tài)時是否仍然處理任務(wù) -
--memory
該進(jìn)程允許使用的內(nèi)存上限涮雷,以M
為單位。 -
--sleep
入股隊列中無任務(wù)則多少秒后重新檢查 -
--tries
任務(wù)重發(fā)多少次之后進(jìn)入失敗處理邏輯
如何從緩沖中得到上次重啟的時間葡粒?
Cache::get("think:queue:restart")
從緩存得到上次重啟的事件
如何判斷是否退出daemon
循環(huán)呢份殿?
- 內(nèi)存超限:
memory_get_usage()
>=--memory
參數(shù) - 重啟時間有更新:外部通過
php think queue:restart
命令更新了重啟時間的緩存
Listen模式 queue:listen
用于啟動一個listen
進(jìn)程,然后由listen
進(jìn)程通過proc_open('php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s')
來周期性地創(chuàng)建一次性的work
進(jìn)程來消費消息隊列嗽交,并且限制該work
進(jìn)程的執(zhí)行事件卿嘲,同時通過管道來監(jiān)聽work
進(jìn)程的輸出。
$ php think queue:listen --queue dismiss_job_queue
-
--queue dismiss_job_queue
監(jiān)聽隊列的名稱 -
--delay 0
如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時夫壁,設(shè)置其下次執(zhí)行前延遲多少秒拾枣,默認(rèn)為0。 -
--memory 128
該進(jìn)程允許使用的內(nèi)存上限盒让,以M
為單位梅肤。 -
--sleep 3
如果隊列中無任務(wù),則多長時間后重新檢查邑茄。 -
--tries 0
如果任務(wù)已經(jīng)超過重發(fā)次數(shù)上限姨蝴,則進(jìn)入失敗處理邏輯,默認(rèn)為0肺缕。 -
--timeout 60
工作進(jìn)程允許執(zhí)行的最長時間左医,以秒為單位授帕。
Work模式和Listen模式的異同點
兩者都可以用于處理消息隊列中的任務(wù),區(qū)別在于:
- 執(zhí)行原理不同
Work模式是單進(jìn)程的處理模式浮梢,按照是否設(shè)置--daemon
參數(shù)又可以分為單次執(zhí)行和循環(huán)執(zhí)行兩種模式跛十。單次執(zhí)行不添加--daemon
參數(shù),該模式下Work進(jìn)程在從處理完下一個消息后直接結(jié)束當(dāng)前進(jìn)程秕硝。當(dāng)隊列為空時會sleep
一段時間然后退出芥映。循環(huán)執(zhí)行添加了--daemon
參數(shù),該模式下Work進(jìn)程會循環(huán)地處理隊列中的消息直到內(nèi)存超出參數(shù)配置才結(jié)束進(jìn)程远豺。當(dāng)隊列為空時會在每次循環(huán)中sleep
一段時間奈偏。
Listen命令是“雙進(jìn)程+管道”的處理模式,Listen命令所在的進(jìn)程會循環(huán)地創(chuàng)建單次執(zhí)行模式的Work進(jìn)程憋飞,每次創(chuàng)建的Work進(jìn)程只消費一個消息就會結(jié)束霎苗,然后Listen進(jìn)程再創(chuàng)建一個新的Work進(jìn)程。Listen進(jìn)程會定時檢查當(dāng)前的Work進(jìn)程執(zhí)行時間是否超過了--timeout
參數(shù)的值榛做,如果已經(jīng)超過則Listen進(jìn)程會殺掉所有Work進(jìn)程唁盏,然后拋出異常。Listen進(jìn)程會通過管道來監(jiān)聽當(dāng)前的Work進(jìn)程的輸出检眯,當(dāng)Work進(jìn)程有輸出時Listen進(jìn)程會將輸出寫入到stdout/stderr
厘擂。Listen進(jìn)程會定時通過proc_get_status()
函數(shù)來監(jiān)控當(dāng)前的Work進(jìn)程是否仍再運行,Work進(jìn)程消費完一個任務(wù)之后锰瘸,Work進(jìn)程就結(jié)束了刽严,其狀態(tài)會變成terminated
,此時Listen進(jìn)程就會重新創(chuàng)建一個新的Work進(jìn)程并對其計時避凝,新的Work進(jìn)程開始消費下一個任務(wù)舞萄。
- 結(jié)束時機不同
Listen命令中Listen進(jìn)程和Work進(jìn)程會在以下情況下結(jié)束:Listen進(jìn)程會定時檢查當(dāng)前的Work進(jìn)程的執(zhí)行時間是否超過了--timeout
參數(shù)的值,如果已經(jīng)超時此時Listen進(jìn)程會殺掉當(dāng)前的Work進(jìn)程管削,然后拋出一個ProcessTimeoutException
異常并結(jié)束Listen進(jìn)程倒脓。Listen進(jìn)程會定時檢查自身使用的內(nèi)存是否超過了--memory
參數(shù)的值,如果已經(jīng)超過此時Listen進(jìn)程會直接die
掉含思,Work進(jìn)程也會自動結(jié)束崎弃。
- 性能不同
Work命令是在腳本內(nèi)部做循環(huán),框架腳本在命名執(zhí)行的初期就已經(jīng)加載完畢含潘。而Listen模式則是處理完一個任務(wù)之后新開一個Work進(jìn)程饲做,此時會重新加載框架腳本。因此Work模式的性能會比Listen模式高遏弱。注意當(dāng)代碼有更新時Work模式下需要手動去執(zhí)行php think queue:restart
命令重啟隊列來使改動生效盆均,而Listen模式會自動生效無需其它操作。
- 超時控制能力
Work模式本質(zhì)上既不能控制進(jìn)程自身的運行時間漱逸,也無法限制執(zhí)行中的任務(wù)的執(zhí)行時間泪姨。舉例來說居砖,假如在某次上線后\app\api\job\Dismiss
消費者的fire
方法中添加一段死循環(huán)。
public function fire()
{
while(true){
$consoleOutPut->writeln("looping forever inside a job");
sleep(1);
}
}
這個循環(huán)將永遠(yuǎn)不能停止驴娃,直到任務(wù)所在的進(jìn)程超過內(nèi)存限制或者由管理員手動結(jié)束。這個過程不會由任何的警告循集。更嚴(yán)重的是如果配置了expire
唇敞,那么這個死循環(huán)的任務(wù)可能會污染到同樣處理dismiss_job_queue
隊列的其它Work進(jìn)程,最后好幾個Work進(jìn)程將被卡死在這段死循環(huán)中咒彤。
Work模式下的超時控制能力實際應(yīng)理解為多個Work進(jìn)程配合下的過期任務(wù)重發(fā)能力疆柔。
Listen命令可以限制Listen進(jìn)程創(chuàng)建的Work進(jìn)程的最大執(zhí)行時間,Listen命令可以通過--timeout
參數(shù)限制Work進(jìn)程允許運行的最長時間镶柱,超過該時間限制后Work進(jìn)程會被強制殺死旷档,Listen進(jìn)程本身也會拋出異常并結(jié)束。
expire
與timeout
之間的區(qū)別
expire
在配置文件中設(shè)置歇拆,timeout
在Listen命令的命令行參數(shù)中設(shè)置鞋屈。expire
和timeout
是兩個不同層次上的概念:expire
是指任務(wù)的過期時間,這個時間是全局的影響到所有的Work進(jìn)程故觅,不管是獨立的Work命令還是Listen模式下創(chuàng)建的Work進(jìn)程厂庇。expire
針對的對象是任務(wù)。timeout
是指Work進(jìn)程的超時時間输吏,這個時間只針對當(dāng)前執(zhí)行的Listen命令有效权旷,timeout
針對的對象是Work進(jìn)程。
- 使用場景不同
Work命令的適用場景是任務(wù)數(shù)量較多贯溅、性能要求較高拄氯、任務(wù)的執(zhí)行時間較短、消費者類中不存在死循環(huán)/sleep()
/exit()
/die()
等容易導(dǎo)致bug的邏輯。
Listen命令的適用場景是任務(wù)數(shù)量較少、任務(wù)的執(zhí)行時間較長(如生成大型的Excel報表等)炫七、任務(wù)的執(zhí)行時間需要有嚴(yán)格限制窗悯。
消息處理流程
消息隊列處理一個任務(wù)的具體流程
- 重發(fā)超時的任務(wù)
超時任務(wù)是指任務(wù)處于執(zhí)行中,當(dāng)前時間 - 任務(wù)開始執(zhí)行的時刻 > expire
時間
重發(fā)是指將任務(wù)的狀態(tài)還原為未執(zhí)行芦瘾,并將任務(wù)的已執(zhí)行次數(shù)加1。
- 獲取下一個有效任務(wù)
有效任務(wù)是指未執(zhí)行、最早可執(zhí)行的時間 <= 當(dāng)前時間黔衡、按時間先后排序(先進(jìn)先出)
- 任務(wù)次數(shù)超限邏輯
任務(wù)的已嘗試次數(shù)大于命令行中的--tries
參數(shù),命令行中的--tries
參數(shù)大于0腌乡。
- 觸發(fā)次數(shù)超限事件
queue_failed
內(nèi)置的次數(shù)超限事件標(biāo)簽盟劫,是否定義了queue_failed
行為,未定義則不處理直接返回与纽,已定義則對次數(shù)超限的任務(wù)進(jìn)行處理侣签。
$runHookCb = Behavior::queueFailed() //返回true則刪除任務(wù)執(zhí)行任務(wù)失敗回調(diào)塘装,返回false則不執(zhí)行任何操作。
- 消費當(dāng)前的任務(wù)
$job->fire()
從$job
對象的payload
屬性中解析出消費者類影所,創(chuàng)建消費者類的實例蹦肴,執(zhí)行消費者類的實例的fire($job, $data)
方法。
需要在fire($job, $data)
中手動刪除任務(wù)猴娩,$job
參數(shù)表示當(dāng)前任務(wù)對象阴幌,$data
參數(shù)表示當(dāng)前的任務(wù)數(shù)據(jù)即創(chuàng)建隊列時傳入的參數(shù)。
消息隊列的開始卷中、停止矛双、重啟
開始一個消息隊列
$ php think queue:work
停止所有的消息隊列
$ php think queue:restart
重啟所有的消息隊列
$ php think queue:restart
$ php think queue:work
多模塊多任務(wù)的處理
- 多模塊
單模塊項目推薦時間app/job
作為任務(wù)類的命名空間,多任務(wù)項目可使用app/module/job
作為任務(wù)類的命名空間蟆豫,也可以放在任意可以自動加載到的地方议忽。
- 多任務(wù)
如果一個任務(wù)類中有多個小任務(wù)的話,在發(fā)布任務(wù)的時候十减,需要使用任務(wù)的“類名@方法名”的形式栈幸,例如app\lib\job\Job@task
,注意命令行中的--queue
參數(shù)不執(zhí)行@
的解析帮辟。
消息的延遲執(zhí)行與定時任務(wù)
延遲執(zhí)行是相對于即使執(zhí)行的侦镇,是用來限制某個任務(wù)的最早可執(zhí)行時刻。在到達(dá)該時刻之前該任務(wù)會被跳過织阅,可以利用該功能實現(xiàn)定時任務(wù)壳繁。
延遲執(zhí)行的使用方式
- 在生產(chǎn)者業(yè)務(wù)代碼中
// 即時執(zhí)行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name)
// 延遲2秒執(zhí)行
$is_pushed = Queue::later(2, $job_handler_classname, $job_data_arr, $job_queue_name);
// 延遲到2019-06-01 00:00:00時刻執(zhí)行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$is_pushed = Queue::later($time2wait, $job_handler_classname, $job_data_arr, $job_queue_name);
- 在消費者類中
// 重發(fā),即時執(zhí)行
$job->release();
// 重發(fā)荔棉,延遲2秒后執(zhí)行
$job->release(2);
// 延遲到2019-06-01 00:00:00時刻執(zhí)行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$job->release($time2wait);
- 在命令行中
如果消費者類的fire
方法拋出了異常且任務(wù)未被刪除時闹炉,將自動重發(fā)該任務(wù)。重發(fā)時會設(shè)置下次執(zhí)行前延遲多少秒润樱,默認(rèn)為0渣触。
$ php think queue:work --delay 3
消息重發(fā)
消息重發(fā)時機有三種情況:
- 在消費者類中手動重發(fā)
if($is_job_done === false)
{
$job->release();
}
Work進(jìn)程自動重發(fā)需要同時滿足以下兩個條件:消費者類的
fire()
方法拋出異常、任務(wù)未被刪除當(dāng)配置了
expire
不為null
時壹若,Work進(jìn)程內(nèi)部每次查詢可用任務(wù)之前會自動重發(fā)已過期的任務(wù)嗅钻。
注意:在Database模式下,2.7.1和2.7.2中的重發(fā)邏輯是先刪除原來的任務(wù)店展,然后插入一個新的任務(wù)养篓。2.7.3中的重發(fā)機制是直接更新原任務(wù)。在Redis模式下赂蕴,三種重發(fā)都是先刪除再插入柳弄。不管是那種重發(fā)方式,重發(fā)之后任務(wù)的已嘗試次數(shù)會在原來的基礎(chǔ)上加1概说。
此外碧注,消費者類中需要注意嚣伐,如果fire()
方法中拋出異常,將出現(xiàn)兩種情況:
- 如果不需要自動重發(fā)的話萍丐,請在拋出異常之前將任務(wù)刪除
$job->delete()
轩端,否則會被框架自動重發(fā)。 - 如果需要自動重發(fā)的話逝变,請直接拋出異常船万,不要在
fire()
方法中手動使用$job->release()
,這樣將導(dǎo)致任務(wù)被重發(fā)兩次骨田,產(chǎn)生兩個一樣的新任務(wù)。
Redis驅(qū)動下的任務(wù)重發(fā)細(xì)節(jié)
在Redis驅(qū)動下為了實現(xiàn)任務(wù)的延遲執(zhí)行和過期重發(fā)声怔,任務(wù)將在這三個key
中來回轉(zhuǎn)移态贤。
在Database模式下消息處理的消息流程中,如果配置的expire
不是null
那么think-queue
的work
進(jìn)程每次在獲取下一個可執(zhí)行任務(wù)之前醋火,會先嘗試重發(fā)所有過期的任務(wù)悠汽。而在Redis驅(qū)動下這個步驟則做了更多的事情,詳情如下:
- 從
queue:xxx:delayed
的key
中查詢出有哪些任務(wù)在當(dāng)前時刻已經(jīng)可以開始執(zhí)行芥驳,然后將這些任務(wù)轉(zhuǎn)移到queue:xxx
的key
的尾部柿冲。 - 從
queue:xxx:reserved
的key
中查詢出有哪些任務(wù)在當(dāng)前時刻已經(jīng)過期,然后將這些任務(wù)轉(zhuǎn)移到queue:xxx
的key
的尾部兆旬。 - 嘗試從
queue:xxx
的key
的頭部取出一個任務(wù)假抄,如果取出成功,則將這個任務(wù)轉(zhuǎn)移到queue:xxx:reserved
的key
的頭部丽猬,同時將這個任務(wù)實例化成任務(wù)對象宿饱,交給消費者去執(zhí)行。
Redis隊列中的過期任務(wù)重發(fā)步驟脚祟,執(zhí)行前:
Redis隊列中的過期任務(wù)重發(fā)步驟,執(zhí)行后:
任務(wù)的失敗回調(diào)與警告
當(dāng)同時滿足以下條件時將觸發(fā)任務(wù)失敗回調(diào):
- 命令行的
--tries
參數(shù)的值大于0 - 任務(wù)的已嘗試次數(shù)大于命令行的
--tries
參數(shù) - 開發(fā)者添加了
queue_failed
事件標(biāo)簽及其對應(yīng)的回調(diào)代碼 - 消費者類中定義了
failed()
方法用于接收任務(wù)失敗的通知
注意为黎,queue_failed
標(biāo)簽需要在安裝think-queue
之后手動去/app/tags.php
文件中添加。
注意事項
-任務(wù)完成后使用$job->delete()
刪除任務(wù)
- 在消費者類的
fire()
方法中使用$job->attempt()
檢查任務(wù)已執(zhí)行次數(shù)行您,對于次數(shù)異常的做相應(yīng)的處理铭乾。 - 在消費者類的
fire()
方法在中根據(jù)業(yè)務(wù)數(shù)據(jù)來判斷該任務(wù)是否已經(jīng)執(zhí)行過,以避免該任務(wù)被重復(fù)執(zhí)行娃循。 - 編寫失敗回調(diào)事件將事件中失敗的任務(wù)及時通知給開發(fā)人員
拓展
- 隊列的穩(wěn)定性和拓展性
穩(wěn)定性:不管是listen
模式還是work
模式片橡,建議使用supervisor
或自定義的cron
腳本去定時檢查work
進(jìn)程是否正常。
拓展性:當(dāng)某個隊列的消費者不足時在給這個隊列添加work
進(jìn)程即可
使用注意
最好配置本地的Redis淮野,使用遠(yuǎn)程Redis曾出現(xiàn)無法解釋的原因捧书。
$ yum install -g redis
$ systemctl restart redis
停止原正在運行的
$ supervisorctl shutdown
重新加載服務(wù)
$ supervisord -c /etc/supervisord.conf
$ supervisorctl reload
$ ps aux | grep supervisord
$ top
未完待續(xù)...