Swoft的任務(wù)功能是基于Swoole的Task機(jī)制,Swoft的Task機(jī)制本質(zhì)上是對(duì)SwooleTask機(jī)制的封裝和加強(qiáng)篷店,Swoft提供了精度為秒的定時(shí)任務(wù)功能用于替代Linux的Crontab隆檀。
Crontab是指需要定期運(yùn)行的命令列表,以及用于管理該列表的命令的名稱,crontab代表cron表劫映,因?yàn)樗褂米鳂I(yè)調(diào)度程序cron來(lái)執(zhí)行任務(wù)违孝,cron本身是以chronos命名,是希臘語(yǔ)中時(shí)間的意思泳赋。cron是Linux的系統(tǒng)進(jìn)程雌桑,會(huì)根據(jù)一個(gè)安排的時(shí)間表計(jì)劃為用戶自動(dòng)執(zhí)行任務(wù),該計(jì)劃也稱為crontab祖今,也用于編輯計(jì)劃的程序名稱校坑。
環(huán)境配置
修改環(huán)境配置.env
文件中定時(shí)任務(wù)配置項(xiàng)
$ vim .env
CRONABLE=true
創(chuàng)建任務(wù)
一個(gè)類就是一個(gè)任務(wù)組,類中的每個(gè)方法就是一個(gè)任務(wù)千诬。
$ vim app/task/TestTask.php
<?php
namespace App\Tasks;
use Swoft\Task\Bean\Annotation\Scheduled;
use Swoft\Task\Bean\Annotation\Task;
/**
* @Task(“Test”)
*/
class TestTask
{
/**
* @Scheduled(cron="* * * * * *")
*/
public function run($num)
{
echo "{$num} TestTask running...".PHP_EOL;
}
}
-
@Task("Test")
定義任務(wù)名稱耍目,名稱必須唯一。 -
@Scheduled
用于設(shè)置觸發(fā)時(shí)間cron
0 1 2 3 4 5
* * * * * *
- - - - - -
| | | | | |
| | | | | +----- 星期 (0 - 6) (星期日=0)
| | | | +----- 月 (1 - 12)
| | | +------- 日 (1 - 31)
| | +--------- 時(shí) (0 - 23)
| +----------- 分 (0 - 59)
+------------- 秒 (0-59)
例如
// 每分鐘的第10秒觸發(fā)
@Scheduled(cron="10 * * * * *")
//每小時(shí)50分鐘10秒時(shí)觸發(fā)
@Scheduled(cron="10 50 * * *")
//每天21點(diǎn)01分10秒觸發(fā)
@Scheduled(cron="10 1 21 * *")
任務(wù)投遞
在控制器中投遞任務(wù)
$result = Task::deliver("Test", "run", ["3"], Task::TYPE_ASYNC);
Swoft任務(wù)投遞的實(shí)現(xiàn)機(jī)制離不開(kāi)Swoole\Timer::tick()
徐绑,和\Swoole\Server->task()
底層執(zhí)行機(jī)制是一樣的邪驮,Swoft在實(shí)現(xiàn)的時(shí)候填了crontab
方式,實(shí)現(xiàn)在src/Crontab
下:
-
ParseContab
解析crontab
-
TableCrontab
使用Swoole\Table
實(shí)現(xiàn) 用于存儲(chǔ)crontab
任務(wù) -
Crontab
連接Task
和TaskCrontab
Task::deliver(任務(wù)組名稱, 任務(wù)名稱, 任務(wù)參數(shù), 投遞方式);
參數(shù)說(shuō)明
- 參數(shù)1:
@Task
定義的 - 參數(shù)2:方法名稱
- 參數(shù)3:以數(shù)組的格式傳值
- 參數(shù)4:指定是協(xié)程投遞
Task::TYPE_CO
還是異步投遞Task::TYPE_ASYNC
任務(wù)投遞方式
任務(wù)投遞Task::deliver()
將調(diào)用參數(shù)打包后傲茄,根據(jù)$type
參數(shù)使用Swoole的$server->taskCo()
或$server->task()
接口投遞到Task進(jìn)程耕捞。Task本身始終是同步執(zhí)行的,$type
僅僅影響投遞這個(gè)操作行為烫幕。
-
Task::TYPE_ASYNC
對(duì)應(yīng)的$server->task()
是異步投遞俺抽,Task::deliver()
調(diào)用后立即返回。 -
Task::TYPE_CO
對(duì)應(yīng)的$server->taskCo()
是協(xié)程投遞较曼,投遞后讓出協(xié)程控制磷斧,任務(wù)完成后或執(zhí)行超時(shí)后Task::deliver()
才會(huì)從協(xié)程返回。
Swoole的Task機(jī)制的本質(zhì)是Worker進(jìn)程將耗時(shí)任務(wù)投遞給同步的Task進(jìn)程(TaskWorker進(jìn)程)處理捷犹。換句話說(shuō)弛饭,Swoole的$server->taskCo()
或$server->task()
都只能在Worker進(jìn)程中使用,這一點(diǎn)限制了使用場(chǎng)景萍歉。如何才能在Process中投遞任務(wù)呢侣颂?Swoft為了繞過(guò)這個(gè)限制提供了Task::deliverByProcess()
方法。其實(shí)現(xiàn)原理是通過(guò)Swoole的$server->sendMessage()
方法將調(diào)用信息從Process中投遞到Worker進(jìn)程中枪孩,然后由Worker進(jìn)程替其投遞到Task進(jìn)程當(dāng)中憔晒。
數(shù)據(jù)打包后會(huì)使用$server->sendMessage()
投遞給Worker,$server->sendMessage
后Worker進(jìn)程收到數(shù)據(jù)時(shí)會(huì)觸發(fā)一個(gè)swoole.pipeMessage
事件回調(diào)蔑舞,Swoft會(huì)將其轉(zhuǎn)換為自己的swoft.pipeMessage
事件并觸發(fā)拒担。swoft.pipeMessage
事件最終由PipeMessageListener
處理,在相關(guān)的監(jiān)聽(tīng)中如果發(fā)現(xiàn)swoft.pipeMessage
事件由Task::deliverByProceess()
產(chǎn)生攻询,Worker進(jìn)程會(huì)提替其執(zhí)行一次Task::deliver()
从撼,最終將任務(wù)數(shù)據(jù)投遞到TaskWorker
進(jìn)程中。
任務(wù)投遞流程
- 當(dāng)框架啟動(dòng)后會(huì)啟動(dòng)定時(shí)器每秒去更新執(zhí)行一次任務(wù)钧栖,更新任務(wù)之前需要先去隊(duì)列內(nèi)存表中清理已完成的隊(duì)列數(shù)據(jù)低零。
- 然后獲取出所有的任務(wù)中的隊(duì)列婆翔,可理解為獲取所有Task類中的方法,任務(wù)規(guī)則以TaskClass掏婶、分鐘啃奴、時(shí)間戳這些數(shù)據(jù)以md5方式加密得到每個(gè)任務(wù)隊(duì)列的key值,保存到runTimeTable中气堕。
任務(wù)執(zhí)行
Swoole的Task任務(wù)機(jī)制的本質(zhì)是Worker進(jìn)程將耗時(shí)任務(wù)投遞給同步的TaskWorker進(jìn)程處理,所以swoole.onTask
的事件回調(diào)是在Task進(jìn)程中執(zhí)行的畔咧。
$ vim vendor/swoft/task/src/Bootstrap/Listeners/TaskEventListener.php
此處是swoole.onTask
的事件回調(diào)茎芭,其職責(zé)僅僅是將Worker進(jìn)程投遞來(lái)打包后的數(shù)據(jù)轉(zhuǎn)發(fā)給TaskExecutor
。
/**
* @param \Swoole\Server $server
* @param int $taskId
* @param int $workerId
* @param mixed $data
* @return mixed
* @throws \InvalidArgumentException
*/
public function onTask(Server $server, int $taskId, int $workerId, $data)
{
try {
/* @var TaskExecutor $taskExecutor*/
$taskExecutor = App::getBean(TaskExecutor::class);
$result = $taskExecutor->run($data);
} catch (\Throwable $throwable) {
App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
$result = false;
// Release system resources
App::trigger(AppEvent::RESOURCE_RELEASE);
App::trigger(TaskEvent::AFTER_TASK);
}
return $result;
}
Worker進(jìn)程是大部分HTTP服務(wù)代碼執(zhí)行的環(huán)境誓沸,但從TaskEventListener.onTask()
方法開(kāi)始梅桩,代碼的執(zhí)行環(huán)境都是Task進(jìn)程,也就是說(shuō)拜隧,TaskExecutor
和具體的TaskBean
都是執(zhí)行在Task
進(jìn)程中的宿百。
$ vim vendor/swoft/task/src/TaskExecutor.php
任務(wù)執(zhí)行的思路是將Worker進(jìn)程發(fā)過(guò)來(lái)的數(shù)據(jù)解包并還原為原來(lái)的調(diào)用參數(shù),根據(jù)$name
參數(shù)找到對(duì)應(yīng)的TaskBean
并調(diào)用其對(duì)應(yīng)的Task
方法洪添,其中TaskBean
使用類級(jí)別注解@Task(name="TaskName")
或@Task("TaskName")
聲明垦页。注意@Task
注解除了name
屬性外還有一個(gè)coroutine
屬性。
/**
* @return mixed
*/
public function run(string $data)
{
$data = TaskHelper::unpack($data);
$name = $data['name'];
$type = $data['type'];
$method = $data['method'];
$params = $data['params'];
$logid = $data['logid'] ?? uniqid('', true);
$spanid = $data['spanid'] ?? 0;
$collector = TaskCollector::getCollector();
if (! isset($collector['task'][$name])) {
return false;
}
list(, $coroutine) = $collector['task'][$name];
$task = bean($name);
if ($coroutine) {
$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
} else {
$result = $this->runAsyncTask($task, $method, $params, $logid, $spanid, $name, $type);
}
return $result;
}
任務(wù)執(zhí)行流程
- 通過(guò)
getExecTasks
方法將所有滿足條件的隊(duì)列放入到一個(gè)數(shù)組干奢,遍歷數(shù)組將runStatus
修改為self::START
痊焊。 - 執(zhí)行所有
runStatus
值為self::START
的隊(duì)列任務(wù) - 將執(zhí)行后的隊(duì)列任務(wù)的
runStatus
值修改為self::FINISH
- 將
runStatus
值修改為self::FINISH
的剔除掉
任務(wù)進(jìn)程
Swoft使用兩個(gè)前置進(jìn)程
- 任務(wù)計(jì)劃進(jìn)程
CronTimerProcess
CronTimerProcess
進(jìn)程是Swoft的定時(shí)任務(wù)調(diào)度進(jìn)程,其核心方法是Crontab->initRunTimeTableData()
忿峻,該進(jìn)程使用了Swoole的定時(shí)器功能薄啥,通過(guò)Swoole\Timer
在每分鐘首秒時(shí)執(zhí)行的回調(diào),CronTimerProcess
每次被喚醒后都會(huì)遍歷任務(wù)表逛尚,計(jì)算出當(dāng)前這一分鐘內(nèi)的60秒分別需要執(zhí)行的任務(wù)清單垄惧,寫入執(zhí)行表并標(biāo)記為未執(zhí)行。
- 任務(wù)執(zhí)行進(jìn)程
CronExecProcess
CronExecProcess
作為定時(shí)任務(wù)的執(zhí)行者绰寞,通過(guò)Swoole\Timer
每0.5秒喚醒自身一次到逊,然后把執(zhí)行表遍歷一次,挑選當(dāng)下需要執(zhí)行的任務(wù)滤钱,通過(guò)sendMessage()
投遞出去并更新該任務(wù)執(zhí)行表中的狀態(tài)蕾管。該執(zhí)行進(jìn)程只負(fù)責(zé)任務(wù)的投遞,任務(wù)的實(shí)際執(zhí)行仍然在Task
進(jìn)程中由TaskExecutor
處理菩暗。
內(nèi)存表
Swoft使用兩張內(nèi)存數(shù)據(jù)表
在定時(shí)器中會(huì)使用到兩個(gè)內(nèi)存表Table
掰曾,一個(gè)是用于存儲(chǔ)任務(wù)實(shí)例originTable
,一個(gè)是存儲(chǔ)任務(wù)隊(duì)列實(shí)例runTimeTable
停团,也就是存儲(chǔ)需要執(zhí)行的任務(wù)實(shí)例旷坦。
為什么要使用Swoole的內(nèi)存表呢掏熬?
Swoft的定時(shí)任務(wù)管理分別由任務(wù)計(jì)劃進(jìn)程和任務(wù)執(zhí)行進(jìn)程負(fù)責(zé),兩個(gè)進(jìn)程的運(yùn)行共同管理定時(shí)任務(wù)秒梅,如果使用進(jìn)程間獨(dú)立的數(shù)組等結(jié)構(gòu)旗芬,兩個(gè)進(jìn)程必然需要頻繁地進(jìn)程間通信。而使用跨進(jìn)程的Swoole\Table
結(jié)構(gòu)直接進(jìn)行進(jìn)程間數(shù)據(jù)共享捆蜀,不僅性能高疮丛,操作簡(jiǎn)單還能解耦兩個(gè)進(jìn)程。為了讓Table能夠在兩個(gè)進(jìn)程間共同使用辆它,Table必須在Swoole Server
啟動(dòng)前創(chuàng)建并分配內(nèi)存誊薄。
Table底層是建立在共享內(nèi)存之上的HashTable數(shù)據(jù)結(jié)構(gòu),$size
參數(shù)指定了Table的最大行數(shù)锰茉,最大行數(shù)決定了HashTable的總行數(shù)呢蔫,由于HashTable是在共享內(nèi)存之上,所以無(wú)法動(dòng)態(tài)擴(kuò)容飒筑,因此$size
必須在創(chuàng)建前設(shè)置好片吊。
$size
若不是2的N次方,如1024协屡、8196俏脊、65536等,底層會(huì)自動(dòng)調(diào)整為接近的一個(gè)數(shù)字肤晓,如果小于1024則默認(rèn)為1024联予,即1024為最小值。
$ vim vendor/swoft/task/src/Crontab/TableCrontab.php
- 任務(wù)配置表
OriginTable
任務(wù)表用于記錄用戶配置的任務(wù)信息材原,任務(wù)表每行記錄包含的字段包括
-
rule
定時(shí)任務(wù)執(zhí)行規(guī)則沸久,對(duì)應(yīng)@Scheduled
注解的cron
屬性。 -
taskClass
任務(wù)名稱余蟹,對(duì)應(yīng)@Task
的name
屬性卷胯,默認(rèn)為類名。 -
taskMethod
Task方法威酒,對(duì)應(yīng)@Scheduled
注解所在的方法 -
add_time
初始化表內(nèi)容時(shí)的10位時(shí)間戳
rule
窑睁、taskClass
、taskMethod
是生成key
葵孤,唯一確定一條記錄担钮。
/**
* @var \Swoft\Memory\Table $originTable 內(nèi)存任務(wù)表
*/
private $originTable;
/**
* @var array $originStruct 任務(wù)表結(jié)構(gòu)
*/
private $originStruct = [
'rule' => [\Swoole\Table::TYPE_STRING, 100],
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],
'add_time' => [\Swoole\Table::TYPE_STRING, 11],
];
- 任務(wù)執(zhí)行表
RunTimeTable
這里的執(zhí)行并非指任務(wù)本身的執(zhí)行,而是指任務(wù)投遞這個(gè)操作的執(zhí)行尤仍。
執(zhí)行表記錄短時(shí)間內(nèi)要執(zhí)行的任務(wù)列表及其執(zhí)行狀態(tài)箫津,表每行記錄包含字段:
taskClass
taskMethod
-
minute
需要執(zhí)行任務(wù)的時(shí)間,精確到分鐘,格式為date("YmdHi") -
sec
需要執(zhí)行任務(wù)的時(shí)間苏遥,精確到分鐘饼拍,10位時(shí)間戳。 -
runStatus
任務(wù)狀態(tài)包括0未執(zhí)行田炭、1已執(zhí)行师抄、2執(zhí)行中三種
/**
* @var \Swoft\Memory\Table $runTimeTable 內(nèi)存運(yùn)行表
*/
private $runTimeTable;
/**
* @var array $runTimeStruct 運(yùn)行表結(jié)構(gòu)
*/
private $runTimeStruct = [
'taskClass' => [\Swoole\Table::TYPE_STRING, 255],
'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],
'minute' => [\Swoole\Table::TYPE_STRING, 20],
'sec' => [\Swoole\Table::TYPE_STRING, 20],
'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],
];