基于Swoole和Redis實(shí)現(xiàn)的并發(fā)隊(duì)列處理系統(tǒng)

背景

由于PHP不支持多線程,但是作為一個(gè)完善的系統(tǒng),有很多操作都是需要異步完成的逸贾。為了完成這些異步操作,我們做了一個(gè)基于Redis隊(duì)列任務(wù)系統(tǒng)津滞。

大家知道,一個(gè)消息隊(duì)列處理系統(tǒng)主要分為兩大部分:消費(fèi)者和生產(chǎn)者灼伤。

在我們的系統(tǒng)中触徐,主系統(tǒng)作為生產(chǎn)者,任務(wù)系統(tǒng)作為消費(fèi)者狐赡。

具體的工作流程如下:
1撞鹉、主系統(tǒng)將需要需要處理的任務(wù)名稱+任務(wù)參數(shù)push到隊(duì)列中。
2颖侄、任務(wù)系統(tǒng)實(shí)時(shí)的對(duì)任務(wù)隊(duì)列進(jìn)行pop鸟雏,pop出來(lái)一個(gè)任務(wù)就fork一個(gè)子進(jìn)程,由子進(jìn)程完成具體的任務(wù)邏輯览祖。

具體代碼如下:

/**
 * 啟動(dòng)守護(hù)進(jìn)程
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}

/**
 * 創(chuàng)建子進(jìn)程
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//子進(jìn)程
        $pid = posix_getpid();
        //echo "* Process {$pid} was created \n\n";
        $this->mq_process();
        exit;
    } else {//主進(jìn)程
        $pid = pcntl_wait($status, WUNTRACED); //取得子進(jìn)程結(jié)束狀態(tài)
        if (pcntl_wifexited($status)) {
            //echo "\n\n* Sub process: {$pid} exited with {$status}";
            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}

/**
 * 業(yè)務(wù)任務(wù)隊(duì)列處理
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}

這是一個(gè)簡(jiǎn)單的任務(wù)處理系統(tǒng)孝鹊。

通過(guò)這個(gè)任務(wù)系統(tǒng)幫助我們實(shí)現(xiàn)了異步,到目前為止已經(jīng)穩(wěn)定運(yùn)行了將近一年展蒂。

但很可惜又活,它是一個(gè)單進(jìn)程的系統(tǒng)。它是一直在不斷的fork锰悼,如果有任務(wù)就處理柳骄,沒(méi)有任務(wù)就跳過(guò)。

這樣很穩(wěn)定箕般。

但問(wèn)題有兩個(gè):一是不斷地fork耐薯、pop會(huì)浪費(fèi)服務(wù)器資源,二是不支持并發(fā)丝里!

第一個(gè)問(wèn)題還好曲初,但第二個(gè)問(wèn)題就很嚴(yán)重。

當(dāng)主系統(tǒng) 同時(shí) 拋過(guò)來(lái)大量的任務(wù)時(shí)丙者,任務(wù)的處理時(shí)間就會(huì)無(wú)限的拉長(zhǎng)复斥。

新的設(shè)計(jì)

為了解決并發(fā)的問(wèn)題,我們計(jì)劃做一個(gè)更加高效強(qiáng)壯的隊(duì)里處理系統(tǒng)械媒。

因?yàn)樵赑HP7之前不支持多線程目锭,所以我們采用多進(jìn)程评汰。

從網(wǎng)上找了不少資料,大多所謂的多進(jìn)程都是N個(gè)進(jìn)程同時(shí)在后臺(tái)運(yùn)行痢虹。

顯然這是不合適的被去。

我的預(yù)想是:每pop出一個(gè)任務(wù)就fork一個(gè)任務(wù),任務(wù)執(zhí)行完成后子進(jìn)程結(jié)束奖唯。

遇到的問(wèn)題


1惨缆、如何控制最大進(jìn)程數(shù)

這個(gè)問(wèn)題很簡(jiǎn)單,那就是每fork一個(gè)子進(jìn)程就自增一次丰捷。而當(dāng)子進(jìn)程執(zhí)行完成就自減一次坯墨。

自增沒(méi)有問(wèn)題,我們就在主進(jìn)程中操作就完了病往。那么該如何自減呢捣染?

可能你會(huì)說(shuō),當(dāng)然是在子進(jìn)程中啊停巷。但這里你需要注意:當(dāng)fork的時(shí)候是從主進(jìn)程復(fù)制了一份資源給子進(jìn)程耍攘,這就意味著你無(wú)法在子進(jìn)程中操作主進(jìn)程中的計(jì)數(shù)器!

所以畔勤,這里就需要了解一個(gè)知識(shí)點(diǎn):信號(hào)蕾各。

具體的可以自行Google,這里直接看代碼庆揪。

// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));

這就安裝了一個(gè)信號(hào)處理器式曲。當(dāng)然還缺少一點(diǎn)。

declare(ticks = 1);

declare是一個(gè)控制結(jié)構(gòu)語(yǔ)句嚷硫,具體的用法也請(qǐng)去Google检访。

這句代碼的意思就是每執(zhí)行一條低級(jí)語(yǔ)句就調(diào)用一次信號(hào)處理器。

這樣仔掸,每當(dāng)子進(jìn)程結(jié)束的時(shí)候就會(huì)調(diào)用信號(hào)處理器脆贵,我們就可以在信號(hào)處理器中進(jìn)行自減。

2起暮、如何解決進(jìn)程殘留

在多進(jìn)程開(kāi)發(fā)中卖氨,如果處理不當(dāng)就會(huì)導(dǎo)致進(jìn)程殘留。

為了解決進(jìn)程殘留负懦,必須得將子進(jìn)程回收筒捺。

那么如何對(duì)子進(jìn)程進(jìn)行回收就是一個(gè)技術(shù)點(diǎn)了。

在pcntl的demo中纸厉,包括很多博文中都是說(shuō)在主進(jìn)程中回收子進(jìn)程系吭。

但我們是基于Redis的brpop的,而brpop是阻塞的颗品。

這就導(dǎo)致一個(gè)問(wèn)題:當(dāng)執(zhí)行N個(gè)任務(wù)之后肯尺,任務(wù)系統(tǒng)空閑的時(shí)候主進(jìn)程是阻塞的沃缘,而在發(fā)生阻塞的時(shí)候子進(jìn)程還在執(zhí)行,所以就無(wú)法完成最后幾個(gè)子進(jìn)程的進(jìn)程回收则吟。槐臀。。

這里本來(lái)一直很糾結(jié)氓仲,但當(dāng)我將信號(hào)處理器搞定之后就也很簡(jiǎn)單了水慨。

進(jìn)程回收也放到信號(hào)處理器中去。

新系統(tǒng)的評(píng)估

pcntl是一個(gè)進(jìn)程處理的擴(kuò)展敬扛,但很可惜它對(duì)多進(jìn)程的支持非常乏力晰洒。

所以這里采用Swoole擴(kuò)展中的Process。

具體代碼如下:

declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{

    use Trait_Redis;

    private $maxProcesses = 800;
    private $child;
    private $masterRedis;
    private $redis_task_wing = 'task:wing'; //待處理隊(duì)列

    public function init(){
        // install signal handler for dead kids
        pcntl_signal(SIGCHLD, array($this, "sig_handler"));
        set_time_limit(0);
        ini_set('default_socket_timeout', -1); //隊(duì)列處理不超時(shí),解決redis報(bào)錯(cuò):read error on connection
    }

    private function redis_client(){
        $rds = new Redis();
        $rds->connect('redis.master.host',6379);
        return $rds;
    }

    public function process(swoole_process $worker){// 第一個(gè)處理
        $GLOBALS['worker'] = $worker;
        swoole_event_add($worker->pipe, function($pipe) {
            $worker = $GLOBALS['worker'];
            $recv = $worker->read();            //send data to master

            sleep(rand(1, 3));
            echo "From Master: $recv\n";
            $worker->exit(0);
        });
        exit;
    }

    public function testAction(){
        for ($i = 0; $i < 10000; $i++){
            $data = [
                'abc' => $i,
                'timestamp' => time().rand(100,999)
            ];
            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
        }
        exit;
    }

    public function runAction(){
        while (1){
//            echo "\t now we de have $this->child child processes\n";
            if ($this->child < $this->maxProcesses){
                $rds = $this->redis_client();
                $data_pop = $rds->brpop($this->redis_task_wing, 3);//無(wú)任務(wù)時(shí),阻塞等待
                if (!$data_pop){
                    continue;
                }
                echo "\t Starting new child | now we de have $this->child child processes\n";
                $this->child++;
                $process = new swoole_process([$this, 'process']);
                $process->write(json_encode($data_pop));
                $pid = $process->start();
            }
        }
    }

    private function sig_handler($signo) {
//        echo "Recive: $signo \r\n";
        switch ($signo) {
            case SIGCHLD:
                while($ret = swoole_process::wait(false)) {
//                    echo "PID={$ret['pid']}\n";
                    $this->child--;
                }
        }
    }
}

最終舔哪,經(jīng)過(guò)測(cè)試欢顷,單核1G的服務(wù)器執(zhí)行1到3秒的任務(wù)可以做到800的并發(fā)。

ps:歡迎各位大神與我交流捉蚤,不知能否做到更好~


**** 我是閆大伯,一只奮戰(zhàn)了兩個(gè)周末的野生程序猿 ****

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末炼七,一起剝皮案震驚了整個(gè)濱河市缆巧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌豌拙,老刑警劉巖陕悬,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異按傅,居然都是意外死亡捉超,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門唯绍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)拼岳,“玉大人,你說(shuō)我怎么就攤上這事况芒∠е剑” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵绝骚,是天一觀的道長(zhǎng)耐版。 經(jīng)常有香客問(wèn)我,道長(zhǎng)压汪,這世上最難降的妖魔是什么粪牲? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮止剖,結(jié)果婚禮上腺阳,老公的妹妹穿的比我還像新娘落君。我一直安慰自己,他們只是感情好舌狗,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布叽奥。 她就那樣靜靜地躺著,像睡著了一般痛侍。 火紅的嫁衣襯著肌膚如雪朝氓。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,443評(píng)論 1 302
  • 那天主届,我揣著相機(jī)與錄音赵哲,去河邊找鬼。 笑死君丁,一個(gè)胖子當(dāng)著我的面吹牛枫夺,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绘闷,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼橡庞,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了印蔗?” 一聲冷哼從身側(cè)響起扒最,我...
    開(kāi)封第一講書(shū)人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎华嘹,沒(méi)想到半個(gè)月后吧趣,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡耙厚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年强挫,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片薛躬。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡俯渤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出泛豪,到底是詐尸還是另有隱情稠诲,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布诡曙,位于F島的核電站臀叙,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏价卤。R本人自食惡果不足惜劝萤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望慎璧。 院中可真熱鬧床嫌,春花似錦跨释、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至阔涉,卻和暖如春缆娃,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瑰排。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工贯要, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人椭住。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓崇渗,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親京郑。 傳聞我的和親對(duì)象是個(gè)殘疾皇子宅广,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容