背景
由于PHP不支持多線程,但是作為一個(gè)完善的系統(tǒng),有很多操作都是需要異步完成的逸贾。為了完成這些異步操作,我們做了一個(gè)基于Redis隊(duì)列任務(wù)系統(tǒng)津滞。
大家知道,一個(gè)消息隊(duì)列處理系統(tǒng)主要分為兩大部分:消費(fèi)者和生產(chǎn)者灼伤。
在我們的系統(tǒng)中触徐,主系統(tǒng)作為生產(chǎn)者,任務(wù)系統(tǒng)作為消費(fèi)者狐赡。
具體的工作流程如下:
1撞鹉、主系統(tǒng)將需要需要處理的任務(wù)名稱+任務(wù)參數(shù)push到隊(duì)列中。
2颖侄、任務(wù)系統(tǒng)實(shí)時(shí)的對(duì)任務(wù)隊(duì)列進(jìn)行pop鸟雏,pop出來(lái)一個(gè)任務(wù)就fork一個(gè)子進(jìn)程,由子進(jìn)程完成具體的任務(wù)邏輯览祖。
具體代碼如下:
/**
* 啟動(dòng)守護(hù)進(jìn)程
*/
public function runAction() {
Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
while (true) {
$this->fork_process();
}
exit;
}
/**
* 創(chuàng)建子進(jìn)程
*/
private function fork_process() {
$ppid = getmypid();
$pid = pcntl_fork();
if ($pid == 0) {//子進(jìn)程
$pid = posix_getpid();
//echo "* Process {$pid} was created \n\n";
$this->mq_process();
exit;
} else {//主進(jìn)程
$pid = pcntl_wait($status, WUNTRACED); //取得子進(jìn)程結(jié)束狀態(tài)
if (pcntl_wifexited($status)) {
//echo "\n\n* Sub process: {$pid} exited with {$status}";
//Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
} else {
Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
}
}
}
/**
* 業(yè)務(wù)任務(wù)隊(duì)列處理
*/
private function mq_process() {
$data_pop = $this->masterRedis->rPop($this->redis_list_key);
$data = json_decode($data_pop, 1);
if (!$data) {
return FALSE;
}
$worker = '_task_' . $data['worker'];
$class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
$params = $data['params'];
$class = new $class_name();
$class->$worker($params);
return TRUE;
}
這是一個(gè)簡(jiǎn)單的任務(wù)處理系統(tǒng)孝鹊。
通過(guò)這個(gè)任務(wù)系統(tǒng)幫助我們實(shí)現(xiàn)了異步,到目前為止已經(jīng)穩(wěn)定運(yùn)行了將近一年展蒂。
但很可惜又活,它是一個(gè)單進(jìn)程的系統(tǒng)。它是一直在不斷的fork锰悼,如果有任務(wù)就處理柳骄,沒(méi)有任務(wù)就跳過(guò)。
這樣很穩(wěn)定箕般。
但問(wèn)題有兩個(gè):一是不斷地fork耐薯、pop會(huì)浪費(fèi)服務(wù)器資源,二是不支持并發(fā)丝里!
第一個(gè)問(wèn)題還好曲初,但第二個(gè)問(wèn)題就很嚴(yán)重。
當(dāng)主系統(tǒng) 同時(shí) 拋過(guò)來(lái)大量的任務(wù)時(shí)丙者,任務(wù)的處理時(shí)間就會(huì)無(wú)限的拉長(zhǎng)复斥。
新的設(shè)計(jì)
為了解決并發(fā)的問(wèn)題,我們計(jì)劃做一個(gè)更加高效強(qiáng)壯的隊(duì)里處理系統(tǒng)械媒。
因?yàn)樵赑HP7之前不支持多線程目锭,所以我們采用多進(jìn)程评汰。
從網(wǎng)上找了不少資料,大多所謂的多進(jìn)程都是N個(gè)進(jìn)程同時(shí)在后臺(tái)運(yùn)行痢虹。
顯然這是不合適的被去。
我的預(yù)想是:每pop出一個(gè)任務(wù)就fork一個(gè)任務(wù),任務(wù)執(zhí)行完成后子進(jìn)程結(jié)束奖唯。
遇到的問(wèn)題
1惨缆、如何控制最大進(jìn)程數(shù)
這個(gè)問(wèn)題很簡(jiǎn)單,那就是每fork一個(gè)子進(jìn)程就自增一次丰捷。而當(dāng)子進(jìn)程執(zhí)行完成就自減一次坯墨。
自增沒(méi)有問(wèn)題,我們就在主進(jìn)程中操作就完了病往。那么該如何自減呢捣染?
可能你會(huì)說(shuō),當(dāng)然是在子進(jìn)程中啊停巷。但這里你需要注意:當(dāng)fork的時(shí)候是從主進(jìn)程復(fù)制了一份資源給子進(jìn)程耍攘,這就意味著你無(wú)法在子進(jìn)程中操作主進(jìn)程中的計(jì)數(shù)器!
所以畔勤,這里就需要了解一個(gè)知識(shí)點(diǎn):信號(hào)蕾各。
具體的可以自行Google,這里直接看代碼庆揪。
// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));
這就安裝了一個(gè)信號(hào)處理器式曲。當(dāng)然還缺少一點(diǎn)。
declare(ticks = 1);
declare是一個(gè)控制結(jié)構(gòu)語(yǔ)句嚷硫,具體的用法也請(qǐng)去Google检访。
這句代碼的意思就是每執(zhí)行一條低級(jí)語(yǔ)句就調(diào)用一次信號(hào)處理器。
這樣仔掸,每當(dāng)子進(jìn)程結(jié)束的時(shí)候就會(huì)調(diào)用信號(hào)處理器脆贵,我們就可以在信號(hào)處理器中進(jìn)行自減。
2起暮、如何解決進(jìn)程殘留
在多進(jìn)程開(kāi)發(fā)中卖氨,如果處理不當(dāng)就會(huì)導(dǎo)致進(jìn)程殘留。
為了解決進(jìn)程殘留负懦,必須得將子進(jìn)程回收筒捺。
那么如何對(duì)子進(jìn)程進(jìn)行回收就是一個(gè)技術(shù)點(diǎn)了。
在pcntl的demo中纸厉,包括很多博文中都是說(shuō)在主進(jìn)程中回收子進(jìn)程系吭。
但我們是基于Redis的brpop的,而brpop是阻塞的颗品。
這就導(dǎo)致一個(gè)問(wèn)題:當(dāng)執(zhí)行N個(gè)任務(wù)之后肯尺,任務(wù)系統(tǒng)空閑的時(shí)候主進(jìn)程是阻塞的沃缘,而在發(fā)生阻塞的時(shí)候子進(jìn)程還在執(zhí)行,所以就無(wú)法完成最后幾個(gè)子進(jìn)程的進(jìn)程回收则吟。槐臀。。
這里本來(lái)一直很糾結(jié)氓仲,但當(dāng)我將信號(hào)處理器搞定之后就也很簡(jiǎn)單了水慨。
進(jìn)程回收也放到信號(hào)處理器中去。
新系統(tǒng)的評(píng)估
pcntl是一個(gè)進(jìn)程處理的擴(kuò)展敬扛,但很可惜它對(duì)多進(jìn)程的支持非常乏力晰洒。
所以這里采用Swoole擴(kuò)展中的Process。
具體代碼如下:
declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{
use Trait_Redis;
private $maxProcesses = 800;
private $child;
private $masterRedis;
private $redis_task_wing = 'task:wing'; //待處理隊(duì)列
public function init(){
// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));
set_time_limit(0);
ini_set('default_socket_timeout', -1); //隊(duì)列處理不超時(shí),解決redis報(bào)錯(cuò):read error on connection
}
private function redis_client(){
$rds = new Redis();
$rds->connect('redis.master.host',6379);
return $rds;
}
public function process(swoole_process $worker){// 第一個(gè)處理
$GLOBALS['worker'] = $worker;
swoole_event_add($worker->pipe, function($pipe) {
$worker = $GLOBALS['worker'];
$recv = $worker->read(); //send data to master
sleep(rand(1, 3));
echo "From Master: $recv\n";
$worker->exit(0);
});
exit;
}
public function testAction(){
for ($i = 0; $i < 10000; $i++){
$data = [
'abc' => $i,
'timestamp' => time().rand(100,999)
];
$this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
}
exit;
}
public function runAction(){
while (1){
// echo "\t now we de have $this->child child processes\n";
if ($this->child < $this->maxProcesses){
$rds = $this->redis_client();
$data_pop = $rds->brpop($this->redis_task_wing, 3);//無(wú)任務(wù)時(shí),阻塞等待
if (!$data_pop){
continue;
}
echo "\t Starting new child | now we de have $this->child child processes\n";
$this->child++;
$process = new swoole_process([$this, 'process']);
$process->write(json_encode($data_pop));
$pid = $process->start();
}
}
}
private function sig_handler($signo) {
// echo "Recive: $signo \r\n";
switch ($signo) {
case SIGCHLD:
while($ret = swoole_process::wait(false)) {
// echo "PID={$ret['pid']}\n";
$this->child--;
}
}
}
}
最終舔哪,經(jīng)過(guò)測(cè)試欢顷,單核1G的服務(wù)器執(zhí)行1到3秒的任務(wù)可以做到800的并發(fā)。
ps:歡迎各位大神與我交流捉蚤,不知能否做到更好~
**** 我是閆大伯,一只奮戰(zhàn)了兩個(gè)周末的野生程序猿 ****