1.在 Server 程序中如果需要執(zhí)行很耗時(shí)的操作汰翠,比如一個(gè)聊天服務(wù)器發(fā)送廣播类少,Web 服務(wù)器中發(fā)送郵件十拣。如果直接去執(zhí)行這些函數(shù)就會(huì)阻塞當(dāng)前進(jìn)程,導(dǎo)致服務(wù)器響應(yīng)變慢志鹃。
Swoole 提供了異步任務(wù)處理的功能夭问,可以投遞一個(gè)異步任務(wù)到 TaskWorker
進(jìn)程池中執(zhí)行,不影響當(dāng)前請(qǐng)求的處理速度曹铃。(官網(wǎng)說明)
1. 服務(wù)端代碼
執(zhí)行服務(wù)端監(jiān)聽端口
9501
缰趋。通過設(shè)置daemonize
這個(gè)參數(shù),以守護(hù)進(jìn)程在系統(tǒng)去維護(hù)這個(gè)TaskWorker
進(jìn)程池陕见。我們客戶端將消息傳遞給服務(wù)端秘血,服務(wù)端異步將數(shù)據(jù)請(qǐng)求放入進(jìn)程池隊(duì)列運(yùn)行,從大大縮短了響應(yīng)時(shí)間评甜。
<?php /**
* Created by PhpStorm
* User: pl
* Date: 2020/5/22
* Time: 15:23
*/
class TaskServer
{
private $server;
public function __construct()
{
$this->server = new Swoole\Server('127.0.0.1', 9501);
$this->server->set([
'task_worker_num' => 3, //開啟的進(jìn)程數(shù) 一般為cup核數(shù) 1-4倍
'daemonize' => 1, //已守護(hù)進(jìn)程執(zhí)行該程序
'max_request' => 10000, //worker進(jìn)程最大任務(wù)數(shù)
'dispatch_mode' => 2, //設(shè)置為爭(zhēng)搶模式
'task_ipc_mode' => 3, //設(shè)置為消息隊(duì)列模式
]);
$this->server->on('Receive', array($this, 'onReceive'));
$this->server->on('Task', array($this, 'onTask'));
$this->server->on('Finish', array($this, 'onFinish'));
$this->server->start();
}
public function onReceive(swoole_server $server, $fd, $form_id, $data)
{
$this->server->task($data);
}
/**
* @param swoole_server $server
* @param $fd
* @param $from_id
* @param $data
* 執(zhí)行異步任務(wù)
*/
public function onTask($server, $fd, $from_id, $data)
{
$data = json_decode($data, true);
try {
$log_txt = date('Y-m-d H:i:s') . "開始執(zhí)行任務(wù)" . PHP_EOL;
$this->log($log_txt);
$type = $data['data']['type'];
$time = intval($data['data']['timing']);
unset($data['data']['timing']);
unset($data['data']['type']);
if (intval($type) == 1) {
$this->request_curl($data['url'], $data['data'], $data['data']['http_type']);
} else {
Swoole\Timer::after($time, function () use ($data) {
$this->request_curl($data['url'], $data['data'], $data['data']['http_type']);
});
}
} catch (\Exception $exception) {
$log_txt = date('Y-m-d H:i:s') . "執(zhí)行任務(wù)失敗發(fā)生錯(cuò)誤" . PHP_EOL;
$this->log($log_txt);
}
}
public function onFinish($server, $task_id, $data)
{
$log_txt = date('Y-m-d H:i:s') . "$data" . PHP_EOL;
$this->log($log_txt);
}
public function request_curl($url = '', $request_data = '', $request_type = 'get', $headers = [], $is_ssl = false)
{
$ch = curl_init(); //curl初始化
if ($request_type == 'get' && !empty($request_data)) {
$num = 0;
foreach ($request_data as $key => $value) {
if ($num == 0) {
$url .= '?' . $key . '=' . $value;
} else {
$url .= '&' . $key . '=' . $value;
}
$num++;
}
$num = 0;
}
//區(qū)分get和post
curl_setopt($ch, CURLOPT_URL, $url); //URL地址
curl_setopt($ch, CURLOPT_HEADER, 0); //頭信息不輸出
//如果成功只將結(jié)果返回灰粮,不自動(dòng)輸出任何內(nèi)容
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
//post類型就實(shí)現(xiàn)此結(jié)果
if ($request_type == 'post') {
//設(shè)置為POST方式
curl_setopt($ch, CURLOPT_POST, 1);
//POST數(shù)據(jù)
curl_setopt($ch, CURLOPT_POSTFIELDS, $request_data);
//當(dāng)post數(shù)據(jù)大于1024時(shí)強(qiáng)制執(zhí)行
curl_setopt($ch, CURLOPT_HTTPHEADER, array("Expect:"));
}
//判斷是否繞過證書
if ($is_ssl) {
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);//繞過ssl驗(yàn)證
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
}
if (!empty($headers)) curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
$result = curl_exec($ch); //執(zhí)行
if ($result == FALSE) return false;
curl_close($ch); //關(guān)閉資源
return $result;
}
public function log($log_txt)
{
$log = 'log/' . date('Y_m_d') . 'log';
if (!file_exists($log)) {
touch($log);
chown($log, 0777);
}
$file_log = fopen($log, "a");
fputs($file_log, $log_txt);
fclose($file_log);
}
}
$task = new TaskServer();
2.客戶端代碼
<?php const API_KEY = 'xxx';
class ClientRequest
{
private $client;
private $params; //請(qǐng)求參數(shù)
public function __construct($params)
{
$this->client = new swoole_client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
$this->params = $params;
}
public function connect()
{
if (!$this->client->connect('127.0.0.1', 9501, 1)) {
return json_encode([
'code' => 500,
'err_msg' => '鏈接異步客戶端失敗'
]);
}
/**
* 注意請(qǐng)求格式
* $params['url'] 接口地址
* $params['type']接口請(qǐng)求方式
* $params['data']參數(shù)
*/
$params = $this->params;
$array['url'] = $params['url'];
unset($params['url']);
$array['data'] = $params;
$this->client->send(json_encode($array, JSON_UNESCAPED_UNICODE));
}
}
if($_SERVER['REQUEST_METHOD']!='POST') throw new ErrorException('路由不存在','404');
if(!array_key_exists('HTTP_TOKEN',$_SERVER)){
header('Content-type: application/json');
echo json_encode(['code'=>'200007','token is not empty']);exit();
}
if($_SERVER['HTTP_TOKEN']!= API_KEY ) {
header('Content-type: application/json');
echo json_encode(['code'=>'200007','error token']);exit();
}
$params = $_POST;
$client = new ClientRequest($params);
$client->connect();
開始執(zhí)行服務(wù)端程序,
php TaskServer.php
我們以接口形式上去調(diào)用另外一個(gè)耗時(shí)接口.簡單對(duì)比一下響應(yīng)速度。
最后補(bǔ)充:基于swoole 一個(gè)簡單的異步隊(duì)列就完成了忍坷≌持郏可以將此隊(duì)列封裝成api隊(duì)列接口,將它丟到task里面去慢慢執(zhí)行吧~哈哈
<a name="introduction1"></a>
補(bǔ)充新增的一次定時(shí)器任務(wù)佩研,支持毫秒級(jí)
3.定時(shí)任務(wù)和異步接口
說明:
1柑肴、在swoole 中 毫秒【如 1000 表示 1 秒,v4.2.10 以下版本最大不得超過 86400000【一天】】
2旬薯、后臺(tái)定時(shí)器操作接口:操作方法如下
當(dāng)操作時(shí)間在一天內(nèi) 則直接執(zhí)行定時(shí)器
3晰骑、大于一天時(shí)
系統(tǒng)執(zhí)行一個(gè)定時(shí)crontab任務(wù) ->
每隔12-24小時(shí) 運(yùn)行一次 將數(shù)據(jù)庫執(zhí)行接口時(shí)間大于當(dāng)前時(shí)間且不超過一天的數(shù)據(jù)數(shù)據(jù)執(zhí)行該接口
關(guān) 于用戶執(zhí)行撤銷則通過推送數(shù)據(jù)接口去判斷該數(shù)據(jù)是否進(jìn)行推送
4、當(dāng)該需要給執(zhí)行接口添加參數(shù)時(shí) 直接將參數(shù)放入接口
接口文檔
ClientRequest.php
請(qǐng)求方法 post
參數(shù)
參數(shù) | 是否必選 | 備注 | 限制 | 新增 |
---|---|---|---|---|
token | 是 | 密鑰【header】xxxx
|
||
url | 是 | 執(zhí)行接口的地址 | ||
type | 是 | 1袍暴、異步執(zhí)行該接口 2些侍、定時(shí)執(zhí)行該接口 | ||
http_type | 是 | get隶症、post | ||
timing | 否 | 定時(shí)執(zhí)行該接口的時(shí)間 單位【毫秒】 |
?