Swoole Task

Task任務進程是Swoole中獨立于Worker工作進程的一個異步工作進程协饲,用于處理一些耗時較長的邏輯躁绸。這些邏輯如果在Task異步任務進程中處理做鹰,不會影響到Worker工作進程處理來自客戶端的請求僧鲁,因此大大提高了Swoole擴展并發(fā)處理能力伤柄。

Task異步任務進程和Worker工作進程之間通過UNIX的Sock管道進行通信并扇,也可以配置通過消息隊列進行通信去团,Task異步任務進程只能傳遞字符串格式的消息。

應用場景

Task異步任務進程主要用于執(zhí)行耗時較長的操作上,如給多人發(fā)送郵件渗勘,廣播消息等需要長時間等待的操作沐绒。

機制原理

Swoole的Task異步任務機制的本質(zhì)是Worker工作進程將耗時的任務投遞給異步的TaskWorker任務工作進程中進行處理,所以swoole.onTask事件回調(diào)是在Task異步任務進程中執(zhí)行的旺坠。

TaskWorker進程
  • onTask回調(diào)函數(shù)用于執(zhí)行Task異步任務
  • onFinish回調(diào)函數(shù)用于處理Task異步任務的返回結(jié)果

注意事項

  • Task異步任務進程傳遞的數(shù)據(jù)大小

當數(shù)據(jù)小于8KB時乔遮,在Swoole的結(jié)構(gòu)中會直接通過管道進行傳遞,當數(shù)據(jù)大于8KB時會超出Swoole的buffer緩沖空間取刃,此時數(shù)據(jù)會被先寫入系統(tǒng)臨時文件中進行傳遞蹋肮。在onTask監(jiān)聽到后再去系統(tǒng)臨時目錄\tmp下讀取文件。

  • Task異步任務進程傳遞對象

可以通過序列化傳遞一個對象的拷貝璧疗,注意這里并不是傳遞了一個對象的引用坯辩。由于Task異步任務進程和Worker工作進程是兩個各自獨立的進程,擁有各自不同的內(nèi)存空間崩侠。因此漆魔,Task異步任務進程中對象的改變不會反映到Worker工作進程中。

Task異步任務進程中數(shù)據(jù)庫連接却音、網(wǎng)絡連接對象是不可以傳遞的改抡。

  • Task異步任務進程的onFinish回調(diào)

Task異步任務進程的onFinish回調(diào)會返回并回調(diào)task方法的Worker工作進程,也就是返回給投遞者的進程系瓢。

代碼實踐

實踐1:客戶端向服務器發(fā)送消息阿纤,服務器以異步方式處理后返回JSON字符串給客戶端。

創(chuàng)建服務器

$ vim server.php
<?php
class Server
{
    public $test;
    public function __construct($host="0.0.0.0", $port=9501, $options=[])
    {
        $svr = new swoole_server($host, $port);
        if(!empty($options)){
            $svr->set($options);
        }
        $svr->on("Start",[$this, "onStart"]);
        $svr->on("Connect", [$this, "onConnect"]);
        $svr->on("Receive", [$this, "onReceive"]);
        $svr->on("Close", [$this, "onClose"]);
        $svr->on("Task", [$this, "onTask"]);
        $svr->on("Finish", [$this, "onFinish"]);
        $svr->on("WorkerStart", [$this, "onWorkerStart"]);
        $svr->start();
    }
    public function log($msg)
    {
        $filepath = __DIR__.DIRECTORY_SEPARATOR;
        $filename = date("Ymd").".log";
        $file = $filepath.$filename;

        $message = "[".date("Y-m-d H:i:s")."] ";
        $message .= $msg;
        $message .= PHP_EOL;
        file_put_contents($file, $message, FILE_APPEND);
    }
    public function onStart($svr)
    {
        echo __METHOD__.PHP_EOL;
    }
    public function onConnect($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    public function onClose($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    /**
     * 接收來自客戶端的請求并轉(zhuǎn)發(fā)數(shù)據(jù)
     */
    public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
        
        //收到客戶端數(shù)據(jù)創(chuàng)建任務并投遞
        $params = [];
        $params["client_id"] = $fd;//客戶端描述符
        $params["unique_id"] = time().mt_rand(100000, 999999);
        $params["message"] = $data;
        //Worker進程使用task()向Task進程投遞任務數(shù)據(jù)
        $svr->task(json_encode($params));//task只能傳遞字符串
    }
    /**
     * 監(jiān)聽并處理Worker工作線程
     */
    public function onWorkerStart($svr, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
    }
    /**
     * Task進程的onTask方法
     * 接收Worker進程使用task()函數(shù)投遞的任務數(shù)據(jù)
     */
    public function onTask($svr, $task_id, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;

        //獲取參數(shù)獲取任務
        $params = json_decode($data, true);
        //處理數(shù)據(jù)
        $data = [];
        $data["client_id"] = $params["client_id"];
        $data["unique_id"] = $params["unique_id"];
        $data["message"] = "thank you";

        //發(fā)送數(shù)據(jù)給客戶端
        $svr->send($params["client_id"], json_encode($data));
        
        //返回數(shù)據(jù)給onFinish
        return "finished";       
    }
    /**
     * Worker進程的onFinish方法
     * 用于接收Task進程執(zhí)行finish后的參數(shù)
     **/
    public function onFinish($svr, $task_id, $data)
    {
        echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;
    }
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);

創(chuàng)建客戶端

$ vim client.php
<?php
class Client
{
    private $client;
    public function __construct(){
        $this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
        $this->client->on("Connect", [$this, "onConnect"]);
        $this->client->on("Receive", [$this, "onReceive"]);
        $this->client->on("Close", [$this, "onClose"]);
        $this->client->on("Error", [$this, "onError"]);
    }
    /**
     * 連接服務器
     */
    public function connect($host, $port)
    {
        $fp = $this->client->connect($host, $port, 1);
        if(!$fp){
            echo __METHOD__.":ErrMsg:".$fp->errMsg.":ErrCode:".$fp->errCode.PHP_EOL;
            return;
        }
    }
    /**
     * 執(zhí)行connect()方法后會自動調(diào)用onConnect()方法
     */
    public function onConnect($client)
    {
        //接收命令行CLI標準輸入夷陋,并向服務器發(fā)送消息欠拾。
        fwrite(STDOUT, "Enter Message:");
        swoole_event_add(STDIN, function(){
            fwrite(STDOUT, "Enter Message:");
            $msg = trim(fgets(STDIN));
            $this->send($msg);
        });
    }
    /**
     * 向服務器發(fā)送消息
     */
    public function send($msg)
    {
        $this->client->send($msg);
    }
    /**
     * 判斷客戶端是否仍然連接
     */
    public function isConnected()
    {
        return $this->client->isConnected();
    }
    /**
     * 接收來自服務器的數(shù)據(jù)
     */
    public function onReceive($client, $data)
    {
        echo __METHOD__.":".$data.PHP_EOL;
    }
    public function onClose($client)
    {
        echo __METHOD__.":client close connection".PHP_EOL;
    }
    public function onError()
    {
        echo __METHOD__.":error";
    }
}
$client = new Client();
$client->connect("127.0.0.1", 9501);

運行服務器

$ php server.php

運行服務端并發(fā)送數(shù)據(jù)

$ php client.php
Enter Message: hello server

查看服務器命令行輸出

$ php server.php
Server::onStart
Server::onWorkerStart:worker:3
Server::onWorkerStart:worker:2
Server::onWorkerStart:worker:0
Server::onWorkerStart:worker:1
Server::onConnect:worker:0:client:1
Server::onReceive:worker:0:client:1:data:hello server
Server::onTask:worker:1:task:0:data:{"client_id":1,"unique_id":"1555006210826270","message":"hello server"}
Server::onFinish:task:0:data:finished

查看客戶端命令行輸出

Enter Message:hello server
Enter Message:Client::onReceive:{"client_id":1,"unique_id":"1555006210826270","message":"thank you"}

案例2:使用Task異步任務進程傳遞對象副本而非引用

<?php
class Test
{
    public $index = 0;
}
class Server
{
    public $test;
    public function __construct($host="0.0.0.0", $port=9501, $options=[])
    {
        $svr = new swoole_server($host, $port);
        if(!empty($options)){
            $svr->set($options);
        }
        $svr->on("Start",[$this, "onStart"]);
        $svr->on("Connect", [$this, "onConnect"]);
        $svr->on("Receive", [$this, "onReceive"]);
        $svr->on("Close", [$this, "onClose"]);
        $svr->on("Task", [$this, "onTask"]);
        $svr->on("Finish", [$this, "onFinish"]);
        $svr->on("WorkerStart", [$this, "onWorkerStart"]);
        $svr->start();
    }
    public function log($msg)
    {
        $filepath = __DIR__.DIRECTORY_SEPARATOR;
        $filename = date("Ymd").".log";
        $file = $filepath.$filename;

        $message = "[".date("Y-m-d H:i:s")."] ";
        $message .= $msg;
        $message .= PHP_EOL;
        file_put_contents($file, $message, FILE_APPEND);
    }
    public function onStart($svr)
    {
        echo __METHOD__.PHP_EOL;
    }
    public function onConnect($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    public function onClose($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    /**
     * 接收來自客戶端的請求并轉(zhuǎn)發(fā)數(shù)據(jù)
     */
    public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
        
        //創(chuàng)建對象
        $this->test = new Test();
        $svr->task(serialize($this->test));
    }
    /**
     * 監(jiān)聽并處理Worker工作線程
     */
    public function onWorkerStart($svr, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
    }
    /**
     * Task進程的onTask方法
     * 接收Worker進程使用task()函數(shù)投遞的任務數(shù)據(jù)
     */
    public function onTask($svr, $task_id, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;
        //獲取序列化后的對象
        $obj = unserialize($data);
        $this->log("onTask:index:".$obj->index);
        $obj->index = 1;
        //返回數(shù)據(jù)給onFinish
        return "finished";       
    }
    /**
     * Worker進程的onFinish方法
     * 用于接收Task進程執(zhí)行finish后的參數(shù)
     **/
    public function onFinish($svr, $task_id, $data)
    {
        echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;

        $this->log("onFinish:index:".$this->test->index);
    }
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市骗绕,隨后出現(xiàn)的幾起案子藐窄,更是在濱河造成了極大的恐慌,老刑警劉巖爹谭,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件枷邪,死亡現(xiàn)場離奇詭異,居然都是意外死亡诺凡,警方通過查閱死者的電腦和手機东揣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來腹泌,“玉大人嘶卧,你說我怎么就攤上這事×垢ぃ” “怎么了芥吟?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵侦铜,是天一觀的道長。 經(jīng)常有香客問我钟鸵,道長钉稍,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任棺耍,我火速辦了婚禮贡未,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蒙袍。我一直安慰自己俊卤,他們只是感情好,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布害幅。 她就那樣靜靜地躺著消恍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪以现。 梳的紋絲不亂的頭發(fā)上狠怨,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天,我揣著相機與錄音叼风,去河邊找鬼取董。 笑死,一個胖子當著我的面吹牛无宿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播枢里,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼孽鸡,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了栏豺?” 一聲冷哼從身側(cè)響起彬碱,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎奥洼,沒想到半個月后巷疼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡灵奖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年嚼沿,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓷患。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡骡尽,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出擅编,到底是詐尸還是另有隱情攀细,我是刑警寧澤箫踩,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站谭贪,受9級特大地震影響境钟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜俭识,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一吱韭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧鱼的,春花似錦理盆、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至宙橱,卻和暖如春姨俩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背师郑。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工环葵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人宝冕。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓张遭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親地梨。 傳聞我的和親對象是個殘疾皇子菊卷,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內(nèi)容

  • 進程 什么是進程 進程Process是計算機中的程序關(guān)于某數(shù)據(jù)集合上的一次運行活動,是系統(tǒng)分配資源和調(diào)度的基本單位...
    JunChow520閱讀 2,040評論 2 9
  • 前文再續(xù)宝剖,就書接上一回洁闰,隨著與Server、TCP万细、Protocol的邂逅扑眉,Swoole終于迎來了自己的故事,今天...
    蝸牛淋雨閱讀 1,736評論 1 14
  • 參考資料 官方網(wǎng)站 https://www.swoole.com/page/download PHP沒有像Pyth...
    JunChow520閱讀 2,951評論 0 6
  • swoole 中的swerver,一個異步服務器程序赖钞,支持TCP腰素、UDP、UnixSocket 3種協(xié)議,僅需要設...
    小小小胡閱讀 715評論 0 0
  • Swoft的任務功能基于Swoole的Task機制,或者說Swoft的Task機制本質(zhì)就是對SwooleTask機...
    bromine閱讀 7,218評論 3 8