在一般的 Server 程序中都會(huì)有一些耗時(shí)的任務(wù),比如:發(fā)送郵件虽界、聊天服務(wù)器發(fā)送廣播等。如果我們采用同步阻塞的防水去執(zhí)行這些任務(wù)涛菠,那么這肯定會(huì)非常的慢莉御。
Swoole 的 TaskWorker 進(jìn)程池可以用來(lái)執(zhí)行一些異步的任務(wù),而且不會(huì)影響接下來(lái)的任務(wù)俗冻,很適合處理以上場(chǎng)景礁叔。
那么什么是異步任務(wù)呢?
可以從下面的圖示中來(lái)簡(jiǎn)單了解一下迄薄。(來(lái)源于網(wǎng)絡(luò)琅关,侵刪)
我們上一個(gè) Swoole 的文章介紹了如何創(chuàng)建一個(gè)簡(jiǎn)單的服務(wù)器,并且知道了幾個(gè)核心的回調(diào)函數(shù)的使用方法。
要實(shí)現(xiàn)上述的異步處理,只需要增加兩個(gè)事件回調(diào)即可:onTask 和 onFinish, 這兩個(gè)回調(diào)函數(shù)分別用于執(zhí)行 Task 任務(wù)和處理 Task 任務(wù)的返回結(jié)果锹引。另外還需要在 set 方法中設(shè)置 task 進(jìn)程數(shù)量。
使用示例:
<?php
class Server
{
private $serv;
public function __construct() {
$this->serv = new swoole_server("0.0.0.0", 9501);
$this->serv->set(array(
'worker_num' => 4,
'daemonize' => false,
'task_worker_num' => 8 // task進(jìn)程數(shù)量 即為維持的MySQL連接的數(shù)量
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('Task', array($this, 'onTask'));
$this->serv->on('Finish', array($this, 'onFinish'));
$this->serv->start();
}
public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
echo "收到數(shù)據(jù)". $data . PHP_EOL;
// 發(fā)送任務(wù)到Task進(jìn)程
$param = array(
'sql' => $data, // 接收客戶端發(fā)送的 sql
'fd' => $fd
);
$serv->task( json_encode( $param ) ); // 向 task 投遞任務(wù)
echo "繼續(xù)處理之后的邏輯\n";
}
public function onTask($serv, $task_id, $from_id, $data) {
$data = json_decode($data, true);
echo "This Task {$task_id} from Worker {$from_id}\n";
echo "recv SQL: {$data['sql']}\n";
static $link = null;
$sql = $data['sql'];
$fd = $data['fd'];
HELL:
if ($link == null) {
$link = @mysqli_connect("127.0.0.1", "root", "root", "test");
}
$result = $link->query($sql);
if (!$result) { //如果查詢失敗
if(in_array(mysqli_errno($link), [2013, 2006])){
//錯(cuò)誤碼為2013新症,或者2006,則重連數(shù)據(jù)庫(kù)响禽,重新執(zhí)行sql
$link = null;
goto HELL;
}
}
if(preg_match("/^select/i", $sql)){//如果是select操作徒爹,就返回關(guān)聯(lián)數(shù)組
$data = array();
while ($fetchResult = mysqli_fetch_assoc($result) ){
$data['data'][] = $fetchResult;
}
}else{//否則直接返回結(jié)果
$data['data'] = $result;
}
$data['status'] = "OK";
$data['fd'] = $fd;
$serv->finish(json_encode($data));
}
public function onFinish($serv, $task_id, $data) {
echo "Task {$task_id} finish\n";
$result = json_decode($data, true);
if ($result['status'] == 'OK') {
$this->serv->send($result['fd'], json_encode($result['data']) . "\n");
} else {
$this->serv->send($result['fd'], $result);
}
}
public function onStart( $serv ) {
echo "Server Start\n";
}
public function onConnect( $serv, $fd, $from_id ) {
echo "Client {$fd} connect\n";
}
public function onClose( $serv, $fd, $from_id ) {
echo "Client {$fd} close connection\n";
}
}
$server = new Server();
以上代碼在 onReceive 時(shí)直接接收一條 sql,之后直接發(fā)送到 Task 任務(wù)中金抡。這個(gè)時(shí)候下一步的流程緊接著輸出瀑焦,這里也就體現(xiàn)出了異步。然后 onTask 和 onFinish 分別用來(lái)向數(shù)據(jù)庫(kù)發(fā)送 sql梗肝,處理 task 執(zhí)行結(jié)果榛瓮。
參考鏈接: