Task任務進程是Swoole中獨立于Worker工作進程的一個異步工作進程协饲,用于處理一些耗時較長的邏輯躁绸。這些邏輯如果在Task異步任務進程中處理做鹰,不會影響到Worker工作進程處理來自客戶端的請求僧鲁,因此大大提高了Swoole擴展并發(fā)處理能力伤柄。
Task異步任務進程和Worker工作進程之間通過UNIX的Sock管道進行通信并扇,也可以配置通過消息隊列進行通信去团,Task異步任務進程只能傳遞字符串格式的消息。
應用場景
Task異步任務進程主要用于執(zhí)行耗時較長的操作上,如給多人發(fā)送郵件渗勘,廣播消息等需要長時間等待的操作沐绒。
機制原理
Swoole的Task異步任務機制的本質(zhì)是Worker工作進程將耗時的任務投遞給異步的TaskWorker任務工作進程中進行處理,所以swoole.onTask
事件回調(diào)是在Task異步任務進程中執(zhí)行的旺坠。
-
onTask
回調(diào)函數(shù)用于執(zhí)行Task異步任務 -
onFinish
回調(diào)函數(shù)用于處理Task異步任務的返回結(jié)果
注意事項
- Task異步任務進程傳遞的數(shù)據(jù)大小
當數(shù)據(jù)小于8KB時乔遮,在Swoole的結(jié)構(gòu)中會直接通過管道進行傳遞,當數(shù)據(jù)大于8KB時會超出Swoole的buffer緩沖空間取刃,此時數(shù)據(jù)會被先寫入系統(tǒng)臨時文件中進行傳遞蹋肮。在onTask
監(jiān)聽到后再去系統(tǒng)臨時目錄\tmp
下讀取文件。
- Task異步任務進程傳遞對象
可以通過序列化傳遞一個對象的拷貝璧疗,注意這里并不是傳遞了一個對象的引用坯辩。由于Task異步任務進程和Worker工作進程是兩個各自獨立的進程,擁有各自不同的內(nèi)存空間崩侠。因此漆魔,Task異步任務進程中對象的改變不會反映到Worker工作進程中。
Task異步任務進程中數(shù)據(jù)庫連接却音、網(wǎng)絡連接對象是不可以傳遞的改抡。
- Task異步任務進程的
onFinish
回調(diào)
Task異步任務進程的onFinish
回調(diào)會返回并回調(diào)task
方法的Worker工作進程,也就是返回給投遞者的進程系瓢。
代碼實踐
實踐1:客戶端向服務器發(fā)送消息阿纤,服務器以異步方式處理后返回JSON字符串給客戶端。
創(chuàng)建服務器
$ vim server.php
<?php
class Server
{
public $test;
public function __construct($host="0.0.0.0", $port=9501, $options=[])
{
$svr = new swoole_server($host, $port);
if(!empty($options)){
$svr->set($options);
}
$svr->on("Start",[$this, "onStart"]);
$svr->on("Connect", [$this, "onConnect"]);
$svr->on("Receive", [$this, "onReceive"]);
$svr->on("Close", [$this, "onClose"]);
$svr->on("Task", [$this, "onTask"]);
$svr->on("Finish", [$this, "onFinish"]);
$svr->on("WorkerStart", [$this, "onWorkerStart"]);
$svr->start();
}
public function log($msg)
{
$filepath = __DIR__.DIRECTORY_SEPARATOR;
$filename = date("Ymd").".log";
$file = $filepath.$filename;
$message = "[".date("Y-m-d H:i:s")."] ";
$message .= $msg;
$message .= PHP_EOL;
file_put_contents($file, $message, FILE_APPEND);
}
public function onStart($svr)
{
echo __METHOD__.PHP_EOL;
}
public function onConnect($svr, $fd, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
}
public function onClose($svr, $fd, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
}
/**
* 接收來自客戶端的請求并轉(zhuǎn)發(fā)數(shù)據(jù)
*/
public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
//收到客戶端數(shù)據(jù)創(chuàng)建任務并投遞
$params = [];
$params["client_id"] = $fd;//客戶端描述符
$params["unique_id"] = time().mt_rand(100000, 999999);
$params["message"] = $data;
//Worker進程使用task()向Task進程投遞任務數(shù)據(jù)
$svr->task(json_encode($params));//task只能傳遞字符串
}
/**
* 監(jiān)聽并處理Worker工作線程
*/
public function onWorkerStart($svr, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
}
/**
* Task進程的onTask方法
* 接收Worker進程使用task()函數(shù)投遞的任務數(shù)據(jù)
*/
public function onTask($svr, $task_id, $worker_id, $data)
{
echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;
//獲取參數(shù)獲取任務
$params = json_decode($data, true);
//處理數(shù)據(jù)
$data = [];
$data["client_id"] = $params["client_id"];
$data["unique_id"] = $params["unique_id"];
$data["message"] = "thank you";
//發(fā)送數(shù)據(jù)給客戶端
$svr->send($params["client_id"], json_encode($data));
//返回數(shù)據(jù)給onFinish
return "finished";
}
/**
* Worker進程的onFinish方法
* 用于接收Task進程執(zhí)行finish后的參數(shù)
**/
public function onFinish($svr, $task_id, $data)
{
echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;
}
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);
創(chuàng)建客戶端
$ vim client.php
<?php
class Client
{
private $client;
public function __construct(){
$this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$this->client->on("Connect", [$this, "onConnect"]);
$this->client->on("Receive", [$this, "onReceive"]);
$this->client->on("Close", [$this, "onClose"]);
$this->client->on("Error", [$this, "onError"]);
}
/**
* 連接服務器
*/
public function connect($host, $port)
{
$fp = $this->client->connect($host, $port, 1);
if(!$fp){
echo __METHOD__.":ErrMsg:".$fp->errMsg.":ErrCode:".$fp->errCode.PHP_EOL;
return;
}
}
/**
* 執(zhí)行connect()方法后會自動調(diào)用onConnect()方法
*/
public function onConnect($client)
{
//接收命令行CLI標準輸入夷陋,并向服務器發(fā)送消息欠拾。
fwrite(STDOUT, "Enter Message:");
swoole_event_add(STDIN, function(){
fwrite(STDOUT, "Enter Message:");
$msg = trim(fgets(STDIN));
$this->send($msg);
});
}
/**
* 向服務器發(fā)送消息
*/
public function send($msg)
{
$this->client->send($msg);
}
/**
* 判斷客戶端是否仍然連接
*/
public function isConnected()
{
return $this->client->isConnected();
}
/**
* 接收來自服務器的數(shù)據(jù)
*/
public function onReceive($client, $data)
{
echo __METHOD__.":".$data.PHP_EOL;
}
public function onClose($client)
{
echo __METHOD__.":client close connection".PHP_EOL;
}
public function onError()
{
echo __METHOD__.":error";
}
}
$client = new Client();
$client->connect("127.0.0.1", 9501);
運行服務器
$ php server.php
運行服務端并發(fā)送數(shù)據(jù)
$ php client.php
Enter Message: hello server
查看服務器命令行輸出
$ php server.php
Server::onStart
Server::onWorkerStart:worker:3
Server::onWorkerStart:worker:2
Server::onWorkerStart:worker:0
Server::onWorkerStart:worker:1
Server::onConnect:worker:0:client:1
Server::onReceive:worker:0:client:1:data:hello server
Server::onTask:worker:1:task:0:data:{"client_id":1,"unique_id":"1555006210826270","message":"hello server"}
Server::onFinish:task:0:data:finished
查看客戶端命令行輸出
Enter Message:hello server
Enter Message:Client::onReceive:{"client_id":1,"unique_id":"1555006210826270","message":"thank you"}
案例2:使用Task異步任務進程傳遞對象副本而非引用
<?php
class Test
{
public $index = 0;
}
class Server
{
public $test;
public function __construct($host="0.0.0.0", $port=9501, $options=[])
{
$svr = new swoole_server($host, $port);
if(!empty($options)){
$svr->set($options);
}
$svr->on("Start",[$this, "onStart"]);
$svr->on("Connect", [$this, "onConnect"]);
$svr->on("Receive", [$this, "onReceive"]);
$svr->on("Close", [$this, "onClose"]);
$svr->on("Task", [$this, "onTask"]);
$svr->on("Finish", [$this, "onFinish"]);
$svr->on("WorkerStart", [$this, "onWorkerStart"]);
$svr->start();
}
public function log($msg)
{
$filepath = __DIR__.DIRECTORY_SEPARATOR;
$filename = date("Ymd").".log";
$file = $filepath.$filename;
$message = "[".date("Y-m-d H:i:s")."] ";
$message .= $msg;
$message .= PHP_EOL;
file_put_contents($file, $message, FILE_APPEND);
}
public function onStart($svr)
{
echo __METHOD__.PHP_EOL;
}
public function onConnect($svr, $fd, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
}
public function onClose($svr, $fd, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
}
/**
* 接收來自客戶端的請求并轉(zhuǎn)發(fā)數(shù)據(jù)
*/
public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
//創(chuàng)建對象
$this->test = new Test();
$svr->task(serialize($this->test));
}
/**
* 監(jiān)聽并處理Worker工作線程
*/
public function onWorkerStart($svr, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
}
/**
* Task進程的onTask方法
* 接收Worker進程使用task()函數(shù)投遞的任務數(shù)據(jù)
*/
public function onTask($svr, $task_id, $worker_id, $data)
{
echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;
//獲取序列化后的對象
$obj = unserialize($data);
$this->log("onTask:index:".$obj->index);
$obj->index = 1;
//返回數(shù)據(jù)給onFinish
return "finished";
}
/**
* Worker進程的onFinish方法
* 用于接收Task進程執(zhí)行finish后的參數(shù)
**/
public function onFinish($svr, $task_id, $data)
{
echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;
$this->log("onFinish:index:".$this->test->index);
}
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);