思路:首先swoole作為后臺守護(hù)進(jìn)程處理耗時(shí)的任務(wù)(Job)盾计,通過swoole_client將任務(wù)丟進(jìn)后臺Job去處理疏魏,等候處理完畢將結(jié)果通知到回調(diào)Url上宏榕,一個(gè)Job處理完畢黄橘。
話不多說呐矾,先看代碼實(shí)現(xiàn)過程
1信不、隊(duì)列處理(核心):server.php
<?php
$serv = new swoole_server("192.168.206.128", 9502);
// 設(shè)置異步任務(wù)的工作進(jìn)程數(shù)量
$serv->set(['task_worker_num' => 4]);
$serv->on('receive', function($serv, $fd, $from_id, $data) {
$serv->send($fd, '{"status":true}');
//投遞異步任務(wù)
$serv->task($data);
});
// 處理異步任務(wù)
$serv->on('task', function ($serv, $task_id, $from_id, $data) {
// 休眠模擬任務(wù)處理時(shí)間
sleep(5);
$serv->finish($data);
});
// 處理異步任務(wù)的結(jié)果
$serv->on('finish', function ($serv, $task_id, $data) {
$data = json_decode($data, true);
$callbackInfo = parse_url($data['callback']);
// 利用協(xié)程發(fā)起處理完成的通知請求
go(function () use ($callbackInfo) {
$cli = new Swoole\Coroutine\Http\Client($callbackInfo['host'], ($callbackInfo['scheme'] === 'http' ? 80 : 443));
// 發(fā)起Http POST請求
$cli->post($callbackInfo['path'], ['status' => true]);
echo $cli->body . PHP_EOL;
$cli->close();
});
});
$serv->start();
面向?qū)ο蟀?/h5>
<?php
use Swoole\Server as SwooleServer;
use Swoole\Coroutine\Http\Client as HttpClient;
class Server
{
private $server = null;
public function __construct()
{
$this->server = new SwooleServer('192.168.206.128', 9502);
// 設(shè)置異步任務(wù)的工作進(jìn)程數(shù)量
$this->server->set([
'worker_num' => 2,
'task_worker_num' => 2
]);
// 添加事件監(jiān)聽
$this->server->on('start', [$this, 'onStart']);
$this->server->on('connect', [$this, 'onConnect']);
$this->server->on('receive', [$this, 'onReceive']);
$this->server->on('task', [$this, 'onTask']);
$this->server->on('finish', [$this, 'onFinish']);
$this->server->on('close', [$this, 'onClose']);
$this->server->start();
}
public function onStart($server)
{
echo 'Server start...'.PHP_EOL;
}
public function onConnect($server, $fd, $reactorId)
{
echo '新的連接:'.$fd.PHP_EOL;
}
public function onReceive($server, $fd, $reactorId, $data)
{
echo '收到數(shù)據(jù):'.$data.PHP_EOL;
$data = json_decode($data, true);
if (!$data) {
$server->send($fd, json_encode([
'status' => false,
'message' => '數(shù)據(jù)有誤'
], JSON_UNESCAPED_UNICODE));
$server->close($fd, true);
}
$server->send($fd, json_encode([
'status' => true,
'message' => '等待處理'
], JSON_UNESCAPED_UNICODE));
// 開始執(zhí)行異步任務(wù)
$server->task($data);
}
public function onTask($server, $taskId, $srcWorkerId, $data)
{
if (!isset($data['task_url'])) {
$server->finish($data);
}
// 利用協(xié)程發(fā)起處理請求
go(function () use ($server, $data) {
$callbackInfo = parse_url($data['task_url']);
$cli = new HttpClient($callbackInfo['host'], ($callbackInfo['scheme'] === 'http' ? 80 : 443));
$cli->setHeaders([
'Content-Type' => "application/json"
]);
// 發(fā)起Http POST請求
$cli->post($callbackInfo['path'], ['status' => true]);
// echo $cli->body.':'.date('H:i:s').PHP_EOL;
$cli->close();
$server->finish($data);
});
}
public function onFinish($server, $taskId, $data)
{
echo '處理完成:'.$taskId.PHP_EOL;
}
public function onClose($server, $fd, $reactorId)
{
echo '斷開連接:'.$fd.PHP_EOL;
}
}
new Server();
run:php server.php
image.png
2嘲叔、接口:client.php
(業(yè)務(wù)中可以自行封裝至框架中)
<?php
$client = new swoole_client(SWOOLE_SOCK_TCP);
if (!$client->connect('192.168.206.128', 9502, -1)) {
exit("connect failed. Error: {$client->errCode}\n");
}
$data = [
'callback' => 'http://192.168.206.128/callback.php'// 處理完成通知地址
];
$client->send(json_encode($data));
// 輸出任務(wù)響應(yīng)信息 {"status":true}
$res = json_decode($client->recv(), true);
$client->close();
if ($res['status']) {
echo 'success';
} else {
echo 'error';
}
run:postman直接請求client.php
image.png
3、接收結(jié)果:callback.php
<?php
$data = file_get_contents('php://input');
file_put_contents('./result.log', $data . PHP_EOL, FILE_APPEND);
echo 'success';
最后callback.php
去完善任務(wù)處理成功做的一下事情
image.png
相信看完你已經(jīng)了解swoole異步隊(duì)列的一些應(yīng)用了
<?php
use Swoole\Server as SwooleServer;
use Swoole\Coroutine\Http\Client as HttpClient;
class Server
{
private $server = null;
public function __construct()
{
$this->server = new SwooleServer('192.168.206.128', 9502);
// 設(shè)置異步任務(wù)的工作進(jìn)程數(shù)量
$this->server->set([
'worker_num' => 2,
'task_worker_num' => 2
]);
// 添加事件監(jiān)聽
$this->server->on('start', [$this, 'onStart']);
$this->server->on('connect', [$this, 'onConnect']);
$this->server->on('receive', [$this, 'onReceive']);
$this->server->on('task', [$this, 'onTask']);
$this->server->on('finish', [$this, 'onFinish']);
$this->server->on('close', [$this, 'onClose']);
$this->server->start();
}
public function onStart($server)
{
echo 'Server start...'.PHP_EOL;
}
public function onConnect($server, $fd, $reactorId)
{
echo '新的連接:'.$fd.PHP_EOL;
}
public function onReceive($server, $fd, $reactorId, $data)
{
echo '收到數(shù)據(jù):'.$data.PHP_EOL;
$data = json_decode($data, true);
if (!$data) {
$server->send($fd, json_encode([
'status' => false,
'message' => '數(shù)據(jù)有誤'
], JSON_UNESCAPED_UNICODE));
$server->close($fd, true);
}
$server->send($fd, json_encode([
'status' => true,
'message' => '等待處理'
], JSON_UNESCAPED_UNICODE));
// 開始執(zhí)行異步任務(wù)
$server->task($data);
}
public function onTask($server, $taskId, $srcWorkerId, $data)
{
if (!isset($data['task_url'])) {
$server->finish($data);
}
// 利用協(xié)程發(fā)起處理請求
go(function () use ($server, $data) {
$callbackInfo = parse_url($data['task_url']);
$cli = new HttpClient($callbackInfo['host'], ($callbackInfo['scheme'] === 'http' ? 80 : 443));
$cli->setHeaders([
'Content-Type' => "application/json"
]);
// 發(fā)起Http POST請求
$cli->post($callbackInfo['path'], ['status' => true]);
// echo $cli->body.':'.date('H:i:s').PHP_EOL;
$cli->close();
$server->finish($data);
});
}
public function onFinish($server, $taskId, $data)
{
echo '處理完成:'.$taskId.PHP_EOL;
}
public function onClose($server, $fd, $reactorId)
{
echo '斷開連接:'.$fd.PHP_EOL;
}
}
new Server();
image.png
client.php
(業(yè)務(wù)中可以自行封裝至框架中)
<?php
$client = new swoole_client(SWOOLE_SOCK_TCP);
if (!$client->connect('192.168.206.128', 9502, -1)) {
exit("connect failed. Error: {$client->errCode}\n");
}
$data = [
'callback' => 'http://192.168.206.128/callback.php'// 處理完成通知地址
];
$client->send(json_encode($data));
// 輸出任務(wù)響應(yīng)信息 {"status":true}
$res = json_decode($client->recv(), true);
$client->close();
if ($res['status']) {
echo 'success';
} else {
echo 'error';
}
client.php
image.png
callback.php
<?php
$data = file_get_contents('php://input');
file_put_contents('./result.log', $data . PHP_EOL, FILE_APPEND);
echo 'success';
callback.php
去完善任務(wù)處理成功做的一下事情image.png