date: 2018-1-8 20:56:08
title: Swoole| Swoole 中 Process
這篇 blog 折騰了很久才寫出來(lái), 問(wèn)題主要還是在 理解 上. 有時(shí)候就是這樣,
理解了之后就很簡(jiǎn)單, 不理解就很難; 知道了就很簡(jiǎn)單, 不知道往往就很難. 所以 stay hungry stay foolish stay young 真的很重要
本來(lái)計(jì)劃開(kāi)發(fā) swoft 框架 中的 Process 模塊, 所以需要對(duì) swoole 的 Process 模塊要有比較深入的了解才行. 不過(guò)根據(jù) swoole 官方 wiki 的實(shí)踐過(guò)程中, 一直有未理解的部分. 之前雖然也做過(guò)多次 多進(jìn)程編程, 但是當(dāng)真正需要進(jìn)行框架開(kāi)發(fā)的時(shí)候, 就會(huì)發(fā)現(xiàn)以前學(xué)到的知識(shí)不夠全面, 無(wú)法指導(dǎo)整體的設(shè)計(jì). 好在一直在堅(jiān)持, 奉上現(xiàn)在理解的程度.
內(nèi)容一覽:
- 進(jìn)程相關(guān)基礎(chǔ)操作: fork/exit/kill/wait
- 進(jìn)程相關(guān)高級(jí)操作: 主進(jìn)程退出子進(jìn)程干完活后也退出; 子進(jìn)程異常退出主進(jìn)程自動(dòng)重啟
- 進(jìn)程間通信(IPC) - 管道(pipe)
- 進(jìn)程間通信(IPC) - 消息隊(duì)列(message queue)
- swoole process 模塊提供的更多功能
進(jìn)程相關(guān)基礎(chǔ)操作
進(jìn)程是什么: 進(jìn)程是運(yùn)行者的程序
先來(lái)看看一個(gè)最簡(jiǎn)單的例子:
<?php
echo posix_getpid(); // 獲取當(dāng)前進(jìn)程的 pid
swoole_set_process_name('swoole process master'); // 修改所在進(jìn)程的進(jìn)程名
sleep(100); // 模擬一個(gè)持續(xù)運(yùn)行 100s 的程序, 這樣就可以在進(jìn)程中查看到它, 而不是運(yùn)行完了就結(jié)束
通過(guò) ps aux
查看進(jìn)程:
再來(lái)看看 swoole 中使用子進(jìn)程的基礎(chǔ)操作:
use Swoole\Process;
$process = new Process(function (Process $worker) {
if (Process::kill($worker->pid, 0)) { // kill操作常用來(lái)殺死進(jìn)程, 傳入 0 可以用來(lái)檢測(cè)進(jìn)程是否存在
$worker->exit(); // 退出子進(jìn)程
}
});
$process->start(); // 啟動(dòng)子進(jìn)程
Process::wait(); // 回收退出的子進(jìn)程
new Process()
: 通過(guò)回調(diào)函數(shù)來(lái)設(shè)置子進(jìn)程將要執(zhí)行的邏輯$process->start()
: 調(diào)用fork()
系統(tǒng)調(diào)用, 來(lái)生成子進(jìn)程Process::kill()
: kill操作給進(jìn)程發(fā)送信號(hào), 常用來(lái)殺死進(jìn)程, 傳入 0 可以用來(lái)檢測(cè)進(jìn)程是否存在Process::wait()
: 調(diào)用wait()
系統(tǒng)調(diào)用, 回收子進(jìn)程, 如果不回收, 子進(jìn)程會(huì)編程 僵尸進(jìn)程, 浪費(fèi)系統(tǒng)資源$worker->exit()
: 子進(jìn)程主動(dòng)退出
我在這里有一個(gè)疑問(wèn):
主進(jìn)程的生命周期是怎么樣的? 子進(jìn)程的生命周期是怎么樣的?
有這樣一個(gè)疑問(wèn)也來(lái)自于我之前的思維慣性: 理解一個(gè)事物時(shí)從事物的生命周期進(jìn)行理解. 結(jié)合 進(jìn)程是運(yùn)行著的程序 來(lái)一起理解:
-
new Process()
: 只有回調(diào)函數(shù)的邏輯會(huì)在進(jìn)程中執(zhí)行 - 除此之外的代碼都是在主進(jìn)程中執(zhí)行
進(jìn)程相關(guān)高級(jí)操作
- 主進(jìn)程退出子進(jìn)程干完活后也退出
- 子進(jìn)程異常退出主進(jìn)程自動(dòng)重啟
<?php
use Swoole\Process;
class MyProcess1
{
public $mpid = 0; // master pid, 即當(dāng)前程序的進(jìn)程ID
public $works = []; // 記錄子進(jìn)程的 pid
public $maxProcessNum = 1;
public $newIndex = 0;
public function __construct()
{
try {
swoole_set_process_name(__CLASS__. ' : master');
$this->mpid = posix_getpid();
$this->run();
$this->processWait();
} catch (\Exception $e) {
die('Error: '. $e->getMessage());
}
}
public function run()
{
for ($i=0; $i<$this->maxProcessNum; $i++) {
$this->createProcess();
}
}
public function createProcess($index = null)
{
if (is_null($index)) {
$index = $this->newIndex;
$this->newIndex++;
}
$process = new Process(function (Process $worker) use($index) { // 子進(jìn)程創(chuàng)建后需要執(zhí)行的函數(shù)
swoole_set_process_name(__CLASS__. ": worker $index");
for ($j=0; $j<3; $j++) { // 模擬子進(jìn)程執(zhí)行耗時(shí)任務(wù)
$this->checkMpid($worker);
echo "msg: {$j}\n";
sleep(1);
}
}, false, false); // 不重定向輸入輸出; 不使用管道
$pid = $process->start();
$this->works[$index] = $pid;
return $pid;
}
// 主進(jìn)程異常退出, 子進(jìn)程工作完后退出
public function checkMpid(Process $worker) // demo中使用的引用, 引用表示傳的參數(shù)可以被改變, 由于傳入 $worker 是 \Swoole\Process 對(duì)象, 所以不用使用 &
{
if (!Process::kill($this->mpid, 0)) { // 0 可以用來(lái)檢測(cè)進(jìn)程是否存在
$worker->exit();
$msg = "master process exited, worker {$worker->pid} also quit\n"; // 需要寫入到日志中
file_put_contents('process.log', $msg, FILE_APPEND); // todo: 這句話沒(méi)有執(zhí)行
}
}
// 重啟子進(jìn)程
public function rebootProcess($pid)
{
$index = array_search($pid, $this->works);
if ($index !== false) {
$newPid = $this->createProcess($index);
echo "rebootProcess: {$index}={$pid}->{$newPid} Done\n";
return;
}
throw new \Exception("rebootProcess error: no pid {$pid}");
}
// 自動(dòng)重啟子進(jìn)程
public function processWait()
{
while (1) {
if (count($this->works)) {
$ret = Process::wait(); // 子進(jìn)程退出
if ($ret) {
$this->rebootProcess($ret['pid']);
}
} else {
break;
}
}
}
}
new MyProcess1();
說(shuō)明以下幾點(diǎn):
- 子進(jìn)程運(yùn)行結(jié)束后就會(huì)退出, 通過(guò)
Process::wait()
檢測(cè)到子進(jìn)程退出信號(hào)執(zhí)行自動(dòng)重啟, 子進(jìn)程就會(huì)一直執(zhí)行下去 - 關(guān)于函數(shù)參數(shù)傳 引用/指針, 一個(gè)很好的理解方式是: 參數(shù)可以被修改
運(yùn)行并模擬主進(jìn)程異常退出:
進(jìn)程間通信(IPC) - 管道(pipe)
管道的幾個(gè)關(guān)鍵詞:
- 半雙工: 數(shù)據(jù)單向流動(dòng), 一端只讀, 一端只寫.
- 同步 vs 異步: 默認(rèn)為同步阻塞模式, 可以使用
swoole_event_add()
添加管道到 swoole 的 event loop 中, 實(shí)現(xiàn)異步IO - 管道類型(數(shù)據(jù)格式):
SOCK_STREAM
, 流式, 需要用戶自己處理數(shù)據(jù)的封包/解包;SOCK_DGRAM
, 數(shù)據(jù)報(bào), 每次收發(fā)都是一次完整的數(shù)據(jù)包 (DGRAM/STREAM
)
注意, swoole wiki - process->write() 中提到 SOCK_DGRAM
并不會(huì)亂序丟包
先來(lái)看一個(gè)簡(jiǎn)單的例子, php從shell管道中讀取數(shù)據(jù):
// get pip data
$fp = fopen('php://stdin', 'r');
if ($fp) {
while ($line = fgets($fp, 4096)) {
echo "php get pip data: ". $line;
}
fclose($fp);
}
swoole process中的管道很強(qiáng)大, 支持 子進(jìn)程寫, 主進(jìn)程讀 以及 主進(jìn)程寫, 子進(jìn)程讀:
use Swoole\Process;
// 子進(jìn)程寫, 父進(jìn)程讀
$process = new Process(function (Process $worker) {
$worker->write("worker");
});
$process->start();
$msg = $process->read();
echo "from process: $msg", "\n";
// 父進(jìn)程寫, 子進(jìn)程讀
$process = new Process(function (Process $worker) {
$msg = $worker->read();
echo "from master: $msg", "\n";
});
$process->start();
$process->write('master');
注意區(qū)分 $worker->write()
和 $process->write()
, 之前一直錯(cuò)誤的以為這 2 個(gè)是相同的, 其實(shí)就是把 $process
誤以為是子進(jìn)程, 從而相當(dāng)于 $process->write()
就是子進(jìn)程寫管道 -- 其實(shí)這里是主進(jìn)程內(nèi)執(zhí)行的邏輯, 是主進(jìn)程寫數(shù)據(jù)到管道, 供子進(jìn)程讀取
swoole中其他管道相關(guān)操作:
- 異步IO
use Swoole\Process;
use Swoole\Event;
// 異步IO
$process = new Process(function (Process $worker) {
$GLOBALS['worker'] = $worker;
Event::add($worker->pipe, function (int $pipe) { // 使用 swoole_event_add 添加管道到異步IO
/** @var Process $worker */
$worker = $GLOBALS['worker'];
$msg = $worker->read();
echo "from master: $msg \n";
$worker->write("hello master");
sleep(2);
$worker->exit(0);
});
});
$process->start();
$process->write("master msg 1");
$msg = $process->read();
echo "from process: $msg \n";
- 設(shè)置超時(shí)
use Swoole\Process;
// 設(shè)置管道超時(shí)
$process = new Process(function (Process $worker) {
sleep(5);
});
$process->start();
$process->setTimeout(0.5);
$ret = $process->read();
var_dump($ret);
var_dump(swoole_errno());
插播一個(gè)趣事, @thinkpc 看完 2017北京PHP開(kāi)發(fā)者年會(huì), 就知道為啥會(huì)點(diǎn)贊了
- 關(guān)閉管道
// 關(guān)閉管道: 默認(rèn)值0->關(guān)閉讀寫 1->關(guān)閉寫 2->關(guān)閉讀
$process->close();
進(jìn)程間通信(IPC) - 消息隊(duì)列(message queue)
消息隊(duì)列:
- 一系列保存在內(nèi)核中的消息鏈表
- 有一個(gè) msgKey, 可以通過(guò)此訪問(wèn)不同的消息隊(duì)列
- 有數(shù)據(jù)大小限制, 默認(rèn) 8192, 可以通過(guò)內(nèi)核修改
- 阻塞 vs 非阻塞: 阻塞模式下
pop()
空消息隊(duì)列/push()
滿消息隊(duì)列會(huì)阻塞, 非阻塞模式可以直接返回
swoole 中使用消息隊(duì)列:
- 通信模式: 默認(rèn)為爭(zhēng)搶模式, 無(wú)法將消息投遞給指定子進(jìn)程
- 新建消息隊(duì)列后, 主進(jìn)程就可以使用
- 消息隊(duì)列不可和管道一起使用, 也無(wú)法使用 swoole event loop
- 主進(jìn)程中要調(diào)用
wait()
, 否則子進(jìn)程中調(diào)用pop()/push()
會(huì)報(bào)錯(cuò)
use Swoole\Process;
$process = new Process(function (Process $worker) {
// $worker->push('worker');
echo "from master: ". $worker->pop(). "\n";
sleep(2);
// $worker->exit();
}, false, false); // 關(guān)閉管道
// 參數(shù)一為 msgKey, 這里是默認(rèn)值
// 參數(shù)二為 通信模式, 默認(rèn)值 2 表示爭(zhēng)搶模式, 這里還加上了 非阻塞
$process->useQueue(ftok(__FILE__, 1), 2| Process::IPC_NOWAIT);
$process->push('hello1'); // 使用 useQueue 后, 主進(jìn)程就可以讀寫消息隊(duì)列了
$process->push('hello2');
echo "from woker: ". $process->pop(). "\n";
// echo "from woker: ". $process->pop(). "\n";
$process->start(); // 啟動(dòng)子進(jìn)程
// 消息隊(duì)列狀態(tài)
var_dump($process->statQueue());
// 刪除隊(duì)列, 如果不調(diào)用則不會(huì)在程序結(jié)束時(shí)清楚數(shù)據(jù), 下次使用相同 msgKey 時(shí)還可以訪問(wèn)數(shù)據(jù)
$process->freeQueue();
var_dump(Process::wait()); // 要調(diào)用 wait(), 否則子進(jìn)程中 push()/pop() 會(huì)報(bào)錯(cuò)
swoole process 模塊提供的更多功能
swoole_set_process_name()
: 修改進(jìn)程名, 不兼容 macswoole_process->exec(string $execfile, array $args)
執(zhí)行外部程序
參數(shù) $execfile
需要使用可執(zhí)行文件的絕對(duì)路徑, 參數(shù) args
為參數(shù)數(shù)組
// 比如 python test.py 123
swoole_process->exec('/usr/bin/python', ['test.py', 123]);
// 更復(fù)雜的例子
swoole_process->exec(('/usr/local/bin/php', ['/var/www/project/yii-best-practice/cli/yii', 't/index', '-m=123', 'abc', 'xyz']);
// 父進(jìn)程 exec 進(jìn)程進(jìn)行管道通信
use Swoole\Process;
$process = new Process(function (Process $worker) {
$worker->exec('/bin/echo', ['hello']);
$worker->write('hello');
}, true); // 需要啟用標(biāo)準(zhǔn)輸入輸出重定向
$process->start();
echo "from exec: ". $process->read(). "\n";
\Swoole\Process::kill($pid, $signo = SIGTERM)
: 向指定進(jìn)程發(fā)送信號(hào), 默認(rèn)是終止進(jìn)程, 傳 0 可檢測(cè)進(jìn)程是否存在\Swoole\Process::wait()
: 回收子進(jìn)程, 如果主進(jìn)程不調(diào)用此方法, 子進(jìn)程會(huì)變成 僵尸進(jìn)程, 浪費(fèi)系統(tǒng)資源\Swoole\Process::signal()
: 異步信號(hào)監(jiān)聽(tīng)
use Swoole\Process;
// 異步信號(hào)監(jiān)聽(tīng) + wait
Process::signal(SIGCHLD, function ($signal) { // 監(jiān)聽(tīng)子進(jìn)程退出信號(hào)
// 可能同時(shí)有多個(gè)子進(jìn)程退出, 所以要while循環(huán)
while ($ret = Process::wait(false)) { // false 表示不阻塞
var_dump($ret);
}
});
\Swoole\Process::daemon()
: 將當(dāng)前進(jìn)程變?yōu)橐粋€(gè)守護(hù)進(jìn)程
use Swoole\Process;
// daemon
Process::daemon();
swoole_set_process_name('test daemon process');
sleep(100);
-
\Swoole\Process::alarm()
: 高精度定時(shí)器(微秒級(jí)), 對(duì)setitimer
系統(tǒng)調(diào)用的封裝, 可以配合\Swoole\Process::signal()
/pcntl_signal
使用
注意不可和 \Swoole\Timer
同時(shí)使用
// signal + alarm
// 第一個(gè)參數(shù)表示時(shí)間, 單位 us, -1 表示清除定時(shí)器
// 第二個(gè)參數(shù)表示類型 0->真實(shí)時(shí)間->SIGALAM 1->cpu時(shí)間->SIGVTALAM 2->用戶態(tài)+內(nèi)核態(tài)時(shí)間->SIGPROF
Process::alarm(100*1000); // 100ms
Process::signal(SIGALRM, function ($signal) {
static $i = 0;
echo "#$i \t alarm \n";
$i++;
if ($i>20) {
Process::alarm(-1); // -1 表示清除
}
});
-
\Swoole\Process::setaffinity()
: 設(shè)置CPU親和, 即將進(jìn)程綁定到指定CPU核上
傳值范圍: [0, swoole_cpu_num())
CPU親和: CPU的速度遠(yuǎn)遠(yuǎn)高于IO的速度, 所以CPU有多級(jí)緩存來(lái)解決IO等待的問(wèn)題, 綁定指定CPU, 更容易命中CPU緩存
寫在最后
資源推薦:
todo:
- 使用輸入輸出重定向
- 管道類型為
SOCK_STREAM
時(shí)的情況, 是否需要 封包/解包 處理, 即 swoole wiki - process->write() 中提到的 管道通信默認(rèn)的方式是流式,write寫入的數(shù)據(jù)在read可能會(huì)被底層合并 - 多進(jìn)程 + 異步IO 的注意事項(xiàng)
能理解 因?yàn)樽舆M(jìn)程會(huì)繼承父進(jìn)程的內(nèi)存和IO句柄 這個(gè)會(huì)產(chǎn)生的影響, 但是給的示例并沒(méi)有說(shuō)明這個(gè)問(wèn)題
use Swoole\Process;
use Swoole\Event;
// 多個(gè)子進(jìn)程 + 異步IO
$workers = [];
$workerNum = 3;
for ($i=0; $i<$workerNum; $i++) {
$process = new Process(function (Process $worker) {
$worker->write($worker->pid);
echo "worker: {$worker->pid} \n";
});
$pid = $process->start();
$workers[$pid] = $process;
// Event::add($process->pipe, function (int $pipe) use ($process) {
// $data = $process->read();
// echo "recv: $data \n";
// });
}
foreach ($workers as $worker) {
Event::add($worker->pipe, function (int $pipe) use ($worker) {
$data = $worker->read();
echo "recv: $data \n";
});
}