Swoft 定時(shí)任務(wù)

Swoft的任務(wù)功能是基于Swoole的Task機(jī)制,Swoft的Task機(jī)制本質(zhì)上是對(duì)SwooleTask機(jī)制的封裝和加強(qiáng)篷店,Swoft提供了精度為秒的定時(shí)任務(wù)功能用于替代Linux的Crontab隆檀。

Crontab是指需要定期運(yùn)行的命令列表,以及用于管理該列表的命令的名稱,crontab代表cron表劫映,因?yàn)樗褂米鳂I(yè)調(diào)度程序cron來(lái)執(zhí)行任務(wù)违孝,cron本身是以chronos命名,是希臘語(yǔ)中時(shí)間的意思泳赋。cron是Linux的系統(tǒng)進(jìn)程雌桑,會(huì)根據(jù)一個(gè)安排的時(shí)間表計(jì)劃為用戶自動(dòng)執(zhí)行任務(wù),該計(jì)劃也稱為crontab祖今,也用于編輯計(jì)劃的程序名稱校坑。

定時(shí)任務(wù)進(jìn)程

環(huán)境配置

修改環(huán)境配置.env文件中定時(shí)任務(wù)配置項(xiàng)

$ vim .env
CRONABLE=true

創(chuàng)建任務(wù)

一個(gè)類就是一個(gè)任務(wù)組,類中的每個(gè)方法就是一個(gè)任務(wù)千诬。

$ vim app/task/TestTask.php
<?php
namespace App\Tasks;

use Swoft\Task\Bean\Annotation\Scheduled;
use Swoft\Task\Bean\Annotation\Task;

/**
 * @Task(“Test”)
*/
class TestTask
{
    /**
     * @Scheduled(cron="* * * * * *")
    */
    public function run($num)
    {
        echo "{$num} TestTask running...".PHP_EOL;
    }
}
  • @Task("Test") 定義任務(wù)名稱耍目,名稱必須唯一。
  • @Scheduled用于設(shè)置觸發(fā)時(shí)間cron
0     1    2    3    4    5
*     *    *    *    *    *
-     -    -    -    -    -
|     |    |    |    |    |
|     |    |    |    |    +----- 星期 (0 - 6) (星期日=0)
|     |    |    |    +----- 月 (1 - 12)
|     |    |    +------- 日 (1 - 31)
|     |    +--------- 時(shí) (0 - 23)
|     +----------- 分 (0 - 59)
+------------- 秒 (0-59)

例如

// 每分鐘的第10秒觸發(fā)
@Scheduled(cron="10 * * * * *")

//每小時(shí)50分鐘10秒時(shí)觸發(fā)
@Scheduled(cron="10 50 * * *")

//每天21點(diǎn)01分10秒觸發(fā)
@Scheduled(cron="10 1 21 * *")

任務(wù)投遞

在控制器中投遞任務(wù)

$result = Task::deliver("Test", "run", ["3"], Task::TYPE_ASYNC);

Swoft任務(wù)投遞的實(shí)現(xiàn)機(jī)制離不開(kāi)Swoole\Timer::tick()徐绑,和\Swoole\Server->task()底層執(zhí)行機(jī)制是一樣的邪驮,Swoft在實(shí)現(xiàn)的時(shí)候填了crontab方式,實(shí)現(xiàn)在src/Crontab下:

  • ParseContab 解析crontab
  • TableCrontab 使用Swoole\Table實(shí)現(xiàn) 用于存儲(chǔ)crontab任務(wù)
  • Crontab 連接TaskTaskCrontab
Task::deliver(任務(wù)組名稱, 任務(wù)名稱, 任務(wù)參數(shù), 投遞方式);

參數(shù)說(shuō)明

  • 參數(shù)1:@Task定義的
  • 參數(shù)2:方法名稱
  • 參數(shù)3:以數(shù)組的格式傳值
  • 參數(shù)4:指定是協(xié)程投遞Task::TYPE_CO還是異步投遞Task::TYPE_ASYNC

任務(wù)投遞方式

任務(wù)投遞Task::deliver()將調(diào)用參數(shù)打包后傲茄,根據(jù)$type參數(shù)使用Swoole的$server->taskCo()$server->task()接口投遞到Task進(jìn)程耕捞。Task本身始終是同步執(zhí)行的,$type僅僅影響投遞這個(gè)操作行為烫幕。

  • Task::TYPE_ASYNC對(duì)應(yīng)的$server->task()是異步投遞俺抽,Task::deliver()調(diào)用后立即返回。
  • Task::TYPE_CO對(duì)應(yīng)的$server->taskCo()是協(xié)程投遞较曼,投遞后讓出協(xié)程控制磷斧,任務(wù)完成后或執(zhí)行超時(shí)后Task::deliver()才會(huì)從協(xié)程返回。

Swoole的Task機(jī)制的本質(zhì)是Worker進(jìn)程將耗時(shí)任務(wù)投遞給同步的Task進(jìn)程(TaskWorker進(jìn)程)處理捷犹。換句話說(shuō)弛饭,Swoole的$server->taskCo()$server->task()都只能在Worker進(jìn)程中使用,這一點(diǎn)限制了使用場(chǎng)景萍歉。如何才能在Process中投遞任務(wù)呢侣颂?Swoft為了繞過(guò)這個(gè)限制提供了Task::deliverByProcess()方法。其實(shí)現(xiàn)原理是通過(guò)Swoole的$server->sendMessage()方法將調(diào)用信息從Process中投遞到Worker進(jìn)程中枪孩,然后由Worker進(jìn)程替其投遞到Task進(jìn)程當(dāng)中憔晒。

數(shù)據(jù)打包后會(huì)使用$server->sendMessage()投遞給Worker,$server->sendMessage后Worker進(jìn)程收到數(shù)據(jù)時(shí)會(huì)觸發(fā)一個(gè)swoole.pipeMessage事件回調(diào)蔑舞,Swoft會(huì)將其轉(zhuǎn)換為自己的swoft.pipeMessage事件并觸發(fā)拒担。swoft.pipeMessage事件最終由PipeMessageListener處理,在相關(guān)的監(jiān)聽(tīng)中如果發(fā)現(xiàn)swoft.pipeMessage事件由Task::deliverByProceess()產(chǎn)生攻询,Worker進(jìn)程會(huì)提替其執(zhí)行一次Task::deliver()从撼,最終將任務(wù)數(shù)據(jù)投遞到TaskWorker進(jìn)程中。

任務(wù)投遞流程

  1. 當(dāng)框架啟動(dòng)后會(huì)啟動(dòng)定時(shí)器每秒去更新執(zhí)行一次任務(wù)钧栖,更新任務(wù)之前需要先去隊(duì)列內(nèi)存表中清理已完成的隊(duì)列數(shù)據(jù)低零。
  2. 然后獲取出所有的任務(wù)中的隊(duì)列婆翔,可理解為獲取所有Task類中的方法,任務(wù)規(guī)則以TaskClass掏婶、分鐘啃奴、時(shí)間戳這些數(shù)據(jù)以md5方式加密得到每個(gè)任務(wù)隊(duì)列的key值,保存到runTimeTable中气堕。

任務(wù)執(zhí)行

Swoole的Task任務(wù)機(jī)制的本質(zhì)是Worker進(jìn)程將耗時(shí)任務(wù)投遞給同步的TaskWorker進(jìn)程處理,所以swoole.onTask的事件回調(diào)是在Task進(jìn)程中執(zhí)行的畔咧。

$ vim vendor/swoft/task/src/Bootstrap/Listeners/TaskEventListener.php

此處是swoole.onTask的事件回調(diào)茎芭,其職責(zé)僅僅是將Worker進(jìn)程投遞來(lái)打包后的數(shù)據(jù)轉(zhuǎn)發(fā)給TaskExecutor

/**
 * @param \Swoole\Server $server
 * @param int            $taskId
 * @param int            $workerId
 * @param mixed          $data
 * @return mixed
 * @throws \InvalidArgumentException
 */
public function onTask(Server $server, int $taskId, int $workerId, $data)
{
    try {
        /* @var TaskExecutor $taskExecutor*/
        $taskExecutor = App::getBean(TaskExecutor::class);
        $result = $taskExecutor->run($data);
    } catch (\Throwable $throwable) {
        App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
        $result = false;

        // Release system resources
        App::trigger(AppEvent::RESOURCE_RELEASE);

        App::trigger(TaskEvent::AFTER_TASK);
    }
    return $result;
}

Worker進(jìn)程是大部分HTTP服務(wù)代碼執(zhí)行的環(huán)境誓沸,但從TaskEventListener.onTask()方法開(kāi)始梅桩,代碼的執(zhí)行環(huán)境都是Task進(jìn)程,也就是說(shuō)拜隧,TaskExecutor和具體的TaskBean都是執(zhí)行在Task進(jìn)程中的宿百。

$ vim vendor/swoft/task/src/TaskExecutor.php

任務(wù)執(zhí)行的思路是將Worker進(jìn)程發(fā)過(guò)來(lái)的數(shù)據(jù)解包并還原為原來(lái)的調(diào)用參數(shù),根據(jù)$name參數(shù)找到對(duì)應(yīng)的TaskBean并調(diào)用其對(duì)應(yīng)的Task方法洪添,其中TaskBean使用類級(jí)別注解@Task(name="TaskName")@Task("TaskName")聲明垦页。注意@Task注解除了name屬性外還有一個(gè)coroutine屬性。

/**
 * @return mixed
 */
public function run(string $data)
{
    $data = TaskHelper::unpack($data);

    $name = $data['name'];
    $type = $data['type'];
    $method = $data['method'];
    $params = $data['params'];
    $logid = $data['logid'] ?? uniqid('', true);
    $spanid = $data['spanid'] ?? 0;


    $collector = TaskCollector::getCollector();
    if (! isset($collector['task'][$name])) {
        return false;
    }

    list(, $coroutine) = $collector['task'][$name];
    $task = bean($name);

    if ($coroutine) {
        $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
    } else {
        $result = $this->runAsyncTask($task, $method, $params, $logid, $spanid, $name, $type);
    }

    return $result;
}

任務(wù)執(zhí)行流程

  1. 通過(guò)getExecTasks方法將所有滿足條件的隊(duì)列放入到一個(gè)數(shù)組干奢,遍歷數(shù)組將runStatus修改為self::START痊焊。
  2. 執(zhí)行所有runStatus值為self::START的隊(duì)列任務(wù)
  3. 將執(zhí)行后的隊(duì)列任務(wù)的runStatus值修改為self::FINISH
  4. runStatus值修改為self::FINISH的剔除掉

任務(wù)進(jìn)程

Swoft使用兩個(gè)前置進(jìn)程

  1. 任務(wù)計(jì)劃進(jìn)程CronTimerProcess

CronTimerProcess進(jìn)程是Swoft的定時(shí)任務(wù)調(diào)度進(jìn)程,其核心方法是Crontab->initRunTimeTableData()忿峻,該進(jìn)程使用了Swoole的定時(shí)器功能薄啥,通過(guò)Swoole\Timer在每分鐘首秒時(shí)執(zhí)行的回調(diào),CronTimerProcess每次被喚醒后都會(huì)遍歷任務(wù)表逛尚,計(jì)算出當(dāng)前這一分鐘內(nèi)的60秒分別需要執(zhí)行的任務(wù)清單垄惧,寫入執(zhí)行表并標(biāo)記為未執(zhí)行。

  1. 任務(wù)執(zhí)行進(jìn)程CronExecProcess

CronExecProcess作為定時(shí)任務(wù)的執(zhí)行者绰寞,通過(guò)Swoole\Timer每0.5秒喚醒自身一次到逊,然后把執(zhí)行表遍歷一次,挑選當(dāng)下需要執(zhí)行的任務(wù)滤钱,通過(guò)sendMessage()投遞出去并更新該任務(wù)執(zhí)行表中的狀態(tài)蕾管。該執(zhí)行進(jìn)程只負(fù)責(zé)任務(wù)的投遞,任務(wù)的實(shí)際執(zhí)行仍然在Task進(jìn)程中由TaskExecutor處理菩暗。

定時(shí)任務(wù)機(jī)制

內(nèi)存表

Swoft使用兩張內(nèi)存數(shù)據(jù)表

在定時(shí)器中會(huì)使用到兩個(gè)內(nèi)存表Table掰曾,一個(gè)是用于存儲(chǔ)任務(wù)實(shí)例originTable,一個(gè)是存儲(chǔ)任務(wù)隊(duì)列實(shí)例runTimeTable停团,也就是存儲(chǔ)需要執(zhí)行的任務(wù)實(shí)例旷坦。

為什么要使用Swoole的內(nèi)存表呢掏熬?

Swoft的定時(shí)任務(wù)管理分別由任務(wù)計(jì)劃進(jìn)程和任務(wù)執(zhí)行進(jìn)程負(fù)責(zé),兩個(gè)進(jìn)程的運(yùn)行共同管理定時(shí)任務(wù)秒梅,如果使用進(jìn)程間獨(dú)立的數(shù)組等結(jié)構(gòu)旗芬,兩個(gè)進(jìn)程必然需要頻繁地進(jìn)程間通信。而使用跨進(jìn)程的Swoole\Table結(jié)構(gòu)直接進(jìn)行進(jìn)程間數(shù)據(jù)共享捆蜀,不僅性能高疮丛,操作簡(jiǎn)單還能解耦兩個(gè)進(jìn)程。為了讓Table能夠在兩個(gè)進(jìn)程間共同使用辆它,Table必須在Swoole Server啟動(dòng)前創(chuàng)建并分配內(nèi)存誊薄。

Table底層是建立在共享內(nèi)存之上的HashTable數(shù)據(jù)結(jié)構(gòu),$size參數(shù)指定了Table的最大行數(shù)锰茉,最大行數(shù)決定了HashTable的總行數(shù)呢蔫,由于HashTable是在共享內(nèi)存之上,所以無(wú)法動(dòng)態(tài)擴(kuò)容飒筑,因此$size必須在創(chuàng)建前設(shè)置好片吊。

$size若不是2的N次方,如1024协屡、8196俏脊、65536等,底層會(huì)自動(dòng)調(diào)整為接近的一個(gè)數(shù)字肤晓,如果小于1024則默認(rèn)為1024联予,即1024為最小值。

$ vim vendor/swoft/task/src/Crontab/TableCrontab.php
  1. 任務(wù)配置表OriginTable

任務(wù)表用于記錄用戶配置的任務(wù)信息材原,任務(wù)表每行記錄包含的字段包括

  • rule 定時(shí)任務(wù)執(zhí)行規(guī)則沸久,對(duì)應(yīng)@Scheduled注解的cron屬性。
  • taskClass 任務(wù)名稱余蟹,對(duì)應(yīng)@Taskname屬性卷胯,默認(rèn)為類名。
  • taskMethod Task方法威酒,對(duì)應(yīng)@Scheduled注解所在的方法
  • add_time 初始化表內(nèi)容時(shí)的10位時(shí)間戳

rule窑睁、taskClasstaskMethod是生成key葵孤,唯一確定一條記錄担钮。

/**
 * @var \Swoft\Memory\Table $originTable 內(nèi)存任務(wù)表
 */
private $originTable;

/**
 * @var array $originStruct 任務(wù)表結(jié)構(gòu)
 */
private $originStruct = [
    'rule'       => [\Swoole\Table::TYPE_STRING, 100],
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],
    'add_time'   => [\Swoole\Table::TYPE_STRING, 11],
];
  1. 任務(wù)執(zhí)行表RunTimeTable

這里的執(zhí)行并非指任務(wù)本身的執(zhí)行,而是指任務(wù)投遞這個(gè)操作的執(zhí)行尤仍。

執(zhí)行表記錄短時(shí)間內(nèi)要執(zhí)行的任務(wù)列表及其執(zhí)行狀態(tài)箫津,表每行記錄包含字段:

  • taskClass
  • taskMethod
  • minute 需要執(zhí)行任務(wù)的時(shí)間,精確到分鐘,格式為date("YmdHi")
  • sec 需要執(zhí)行任務(wù)的時(shí)間苏遥,精確到分鐘饼拍,10位時(shí)間戳。
  • runStatus 任務(wù)狀態(tài)包括0未執(zhí)行田炭、1已執(zhí)行师抄、2執(zhí)行中三種
/**
 * @var \Swoft\Memory\Table $runTimeTable 內(nèi)存運(yùn)行表
 */
private $runTimeTable;
/**
 * @var array $runTimeStruct 運(yùn)行表結(jié)構(gòu)
 */
private $runTimeStruct = [
    'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],
    'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],
    'minute'      => [\Swoole\Table::TYPE_STRING, 20],
    'sec'        => [\Swoole\Table::TYPE_STRING, 20],
    'runStatus'  => [\Swoole\TABLE::TYPE_INT, 4],
];
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市教硫,隨后出現(xiàn)的幾起案子叨吮,更是在濱河造成了極大的恐慌,老刑警劉巖瞬矩,帶你破解...
    沈念sama閱讀 211,496評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件茶鉴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡丧鸯,警方通過(guò)查閱死者的電腦和手機(jī)蛤铜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,187評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門嫩絮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)丛肢,“玉大人,你說(shuō)我怎么就攤上這事剿干》湓酰” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 157,091評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵置尔,是天一觀的道長(zhǎng)杠步。 經(jīng)常有香客問(wèn)我,道長(zhǎng)榜轿,這世上最難降的妖魔是什么幽歼? 我笑而不...
    開(kāi)封第一講書人閱讀 56,458評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮谬盐,結(jié)果婚禮上甸私,老公的妹妹穿的比我還像新娘。我一直安慰自己飞傀,他們只是感情好皇型,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,542評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著砸烦,像睡著了一般弃鸦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上幢痘,一...
    開(kāi)封第一講書人閱讀 49,802評(píng)論 1 290
  • 那天唬格,我揣著相機(jī)與錄音,去河邊找鬼。 笑死西轩,一個(gè)胖子當(dāng)著我的面吹牛员舵,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播藕畔,決...
    沈念sama閱讀 38,945評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼马僻,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了注服?” 一聲冷哼從身側(cè)響起韭邓,我...
    開(kāi)封第一講書人閱讀 37,709評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎溶弟,沒(méi)想到半個(gè)月后女淑,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,158評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡辜御,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,502評(píng)論 2 327
  • 正文 我和宋清朗相戀三年鸭你,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片擒权。...
    茶點(diǎn)故事閱讀 38,637評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡袱巨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出碳抄,到底是詐尸還是另有隱情愉老,我是刑警寧澤,帶...
    沈念sama閱讀 34,300評(píng)論 4 329
  • 正文 年R本政府宣布剖效,位于F島的核電站嫉入,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏璧尸。R本人自食惡果不足惜咒林,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,911評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望爷光。 院中可真熱鬧垫竞,春花似錦、人聲如沸瞎颗。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,744評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)哼拔。三九已至引有,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間倦逐,已是汗流浹背譬正。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,982評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人曾我。 一個(gè)月前我還...
    沈念sama閱讀 46,344評(píng)論 2 360
  • 正文 我出身青樓粉怕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親抒巢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子贫贝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,500評(píng)論 2 348