【轉(zhuǎn)】在PHP中使用協(xié)程實現(xiàn)多任務(wù)調(diào)度

概述

協(xié)程一個異常強(qiáng)大的概念。原文:

鳥哥博客:http://www.laruence.com/2015/05/28/3038.html

PHP5.5一個比較好的新功能是加入了對迭代生成器和協(xié)程的支持.對于生成器,PHP的文檔和各種其他的博客文章已經(jīng)有了非常詳細(xì)的講解.協(xié)程相對受到的關(guān)注就少了,因為協(xié)程雖然有很強(qiáng)大的功能但相對比較復(fù)雜, 也比較難被理解,解釋起來也比較困難.

這篇文章將嘗試通過介紹如何使用協(xié)程來實施任務(wù)調(diào)度, 來解釋在PHP中的協(xié)程.

我將在前三節(jié)做一個簡單的背景介紹.如果你已經(jīng)有了比較好的基礎(chǔ),可以直接跳到“協(xié)同多任務(wù)處理”一節(jié).

迭代生成器

(迭代)生成器也是一個函數(shù),不同的是這個函數(shù)的返回值是依次返回,而不是只返回一個單獨的值.或者,換句話說,生成器使你能更方便的實現(xiàn)了迭代器接口.下面通過實現(xiàn)一個xrange函數(shù)來簡單說明:

<?php
function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}
 
foreach (xrange(1, 1000000) as $num) {
    echo $num, "\n";
}

上面這個xrange()函數(shù)提供了和PHP的內(nèi)建函數(shù)range()一樣的功能.但是不同的是range()函數(shù)返回的是一個包含值從1到100萬0的數(shù)組(注:請查看手冊). 而xrange()函數(shù)返回的是依次輸出這些值的一個迭代器, 而不會真正以數(shù)組形式返回.

這種方法的優(yōu)點是顯而易見的.它可以讓你在處理大數(shù)據(jù)集合的時候不用一次性的加載到內(nèi)存中.甚至你可以處理無限大的數(shù)據(jù)流.

當(dāng)然,也可以不同通過生成器來實現(xiàn)這個功能,而是可以通過繼承Iterator接口實現(xiàn).但通過使用生成器實現(xiàn)起來會更方便,不用再去實現(xiàn)iterator接口中的5個方法了.

生成器為可中斷的函數(shù)

要從生成器認(rèn)識協(xié)程, 理解它內(nèi)部是如何工作是非常重要的: 生成器是一種可中斷的函數(shù), 在它里面的yield構(gòu)成了中斷點.

還是看上面的例子, 調(diào)用xrange(1,1000000)的時候, xrange()函數(shù)里代碼其實并沒有真正地運行. 它只是返回了一個迭代器:

<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)

這也解釋了為什么xrange叫做迭代生成器, 因為它返回一個迭代器, 而這個迭代器實現(xiàn)了Iterator接口.

調(diào)用迭代器的方法一次, 其中的代碼運行一次.例如, 如果你調(diào)用$range->rewind(), 那么xrange()里的代碼就會運行到控制流第一次出現(xiàn)yield的地方. 而函數(shù)內(nèi)傳遞給yield語句的返回值可以通過$range->current()獲取.

為了繼續(xù)執(zhí)行生成器中yield后的代碼, 你就需要調(diào)用$range->next()方法. 這將再次啟動生成器, 直到下一次yield語句出現(xiàn). 因此,連續(xù)調(diào)用next()和current()方法, 你就能從生成器里獲得所有的值, 直到再沒有yield語句出現(xiàn).

對xrange()來說, 這種情形出現(xiàn)在$i超過$end時. 在這中情況下, 控制流將到達(dá)函數(shù)的終點,因此將不執(zhí)行任何代碼.一旦這種情況發(fā)生,vaild()方法將返回假, 這時迭代結(jié)束.

協(xié)程

協(xié)程的支持是在迭代生成器的基礎(chǔ)上, 增加了可以回送數(shù)據(jù)給生成器的功能(調(diào)用者發(fā)送數(shù)據(jù)給被調(diào)用的生成器函數(shù)). 這就把生成器到調(diào)用者的單向通信轉(zhuǎn)變?yōu)閮烧咧g的雙向通信.

傳遞數(shù)據(jù)的功能是通過迭代器的send()方法實現(xiàn)的. 下面的logger()協(xié)程是這種通信如何運行的例子:

<?php
function logger($fileName) {
    $fileHandle = fopen($fileName, 'a');
    while (true) {
        fwrite($fileHandle, yield . "\n");
    }
}
 
$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')

正如你能看到,這兒yield沒有作為一個語句來使用, 而是用作一個表達(dá)式, 即它能被演化成一個值. 這個值就是調(diào)用者傳遞給send()方法的值. 在這個例子里, yield表達(dá)式將首先被”Foo”替代寫入Log, 然后被”Bar”替代寫入Log.

上面的例子里演示了yield作為接受者, 接下來我們看如何同時進(jìn)行接收和發(fā)送的例子:

<?php
function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}
 
$gen = gen();
var_dump($gen->current()); // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen)
                              // string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen)
                              // NULL (the return value of ->send())

要很快的理解輸出的精確順序可能稍微有點困難, 但你確定要搞清楚為什按照這種方式輸出. 以便后續(xù)繼續(xù)閱讀.

另外, 我要特別指出的有兩點:

第一點,yield表達(dá)式兩邊的括號在PHP7以前不是可選的, 也就是說在PHP5.5和PHP5.6中圓括號是必須的.

第二點,你可能已經(jīng)注意到調(diào)用current()之前沒有調(diào)用rewind().這是因為生成迭代對象的時候已經(jīng)隱含地執(zhí)行了rewind操作.

多任務(wù)協(xié)作

如果閱讀了上面的logger()例子, 你也許會疑惑“為了雙向通信我為什么要使用協(xié)程呢色瘩?我完全可以使用其他非協(xié)程方法實現(xiàn)同樣的功能啊?”, 是的, 你是對的, 但上面的例子只是為了演示了基本用法, 這個例子其實并沒有真正的展示出使用協(xié)程的優(yōu)點.

正如上面介紹里提到的,協(xié)程是非常強(qiáng)大的概念,不過卻應(yīng)用的很稀少而且常常十分復(fù)雜.要給出一些簡單而真實的例子很難.

在這篇文章里,我決定去做的是使用協(xié)程實現(xiàn)多任務(wù)協(xié)作.我們要解決的問題是你想并發(fā)地運行多任務(wù)(或者“程序”).不過我們都知道CPU在一個時刻只能運行一個任務(wù)(不考慮多核的情況).因此處理器需要在不同的任務(wù)之間進(jìn)行切換,而且總是讓每個任務(wù)運行 “一小會兒”.

多任務(wù)協(xié)作這個術(shù)語中的“協(xié)作”很好的說明了如何進(jìn)行這種切換的:它要求當(dāng)前正在運行的任務(wù)自動把控制傳回給調(diào)度器,這樣就可以運行其他任務(wù)了. 這與“搶占”多任務(wù)相反, 搶占多任務(wù)是這樣的:調(diào)度器可以中斷運行了一段時間的任務(wù), 不管它喜歡還是不喜歡. 協(xié)作多任務(wù)在Windows的早期版本(windows95)和Mac OS中有使用, 不過它們后來都切換到使用搶先多任務(wù)了. 理由相當(dāng)明確:如果你依靠程序自動交出控制的話, 那么一些惡意的程序?qū)⒑苋菀渍加谜麄€CPU, 不與其他任務(wù)共享.

現(xiàn)在你應(yīng)當(dāng)明白協(xié)程和任務(wù)調(diào)度之間的關(guān)系:yield指令提供了任務(wù)中斷自身的一種方法, 然后把控制交回給任務(wù)調(diào)度器. 因此協(xié)程可以運行多個其他任務(wù). 更進(jìn)一步來說, yield還可以用來在任務(wù)和調(diào)度器之間進(jìn)行通信.

為了實現(xiàn)我們的多任務(wù)調(diào)度, 首先實現(xiàn)“任務(wù)” — 一個用輕量級的包裝的協(xié)程函數(shù):

<?php
class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;
 
    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }
 
    public function getTaskId() {
        return $this->taskId;
    }
 
    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }
 
    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }
 
    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

如代碼, 一個任務(wù)就是用任務(wù)ID標(biāo)記的一個協(xié)程(函數(shù)). 使用setSendValue()方法, 你可以指定哪些值將被發(fā)送到下次的恢復(fù)(在之后你會了解到我們需要這個), run()函數(shù)確實沒有做什么, 除了調(diào)用send()方法的協(xié)同程序, 要理解為什么添加了一個 beforeFirstYieldflag變量, 需要考慮下面的代碼片段:

<?php
function gen() {
    yield 'foo';
    yield 'bar';
}
 
$gen = gen();
var_dump($gen->send('something'));
 
// 如之前提到的在send之前, 當(dāng)$gen迭代器被創(chuàng)建的時候一個renwind()方法已經(jīng)被隱式調(diào)用
// 所以實際上發(fā)生的應(yīng)該類似:
//$gen->rewind();
//var_dump($gen->send('something'));
 
//這樣renwind的執(zhí)行將會導(dǎo)致第一個yield被執(zhí)行, 并且忽略了他的返回值.
//真正當(dāng)我們調(diào)用yield的時候, 我們得到的是第二個yield的值! 導(dǎo)致第一個yield的值被忽略.
//string(3) "bar"

通過添加 beforeFirstYieldcondition 我們可以確定第一個yield的值能被正確返回.

調(diào)度器現(xiàn)在不得不比多任務(wù)循環(huán)要做稍微多點了, 然后才運行多任務(wù):

<?php
class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;
 
    public function __construct() {
        $this->taskQueue = new SplQueue();
    }
 
    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }
 
    public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }
 
    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $task->run();
 
            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}

newTask()方法(使用下一個空閑的任務(wù)id)創(chuàng)建一個新任務(wù),然后把這個任務(wù)放入任務(wù)map數(shù)組里. 接著它通過把任務(wù)放入任務(wù)隊列里來實現(xiàn)對任務(wù)的調(diào)度. 接著run()方法掃描任務(wù)隊列, 運行任務(wù).如果一個任務(wù)結(jié)束了, 那么它將從隊列里刪除, 否則它將在隊列的末尾再次被調(diào)度.

讓我們看看下面具有兩個簡單(沒有什么意義)任務(wù)的調(diào)度器:

<?php
function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}
 
function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}
 
$scheduler = new Scheduler;
 
$scheduler->newTask(task1());
$scheduler->newTask(task2());
 
$scheduler->run();

兩個任務(wù)都僅僅回顯一條信息,然后使用yield把控制回傳給調(diào)度器.輸出結(jié)果如下:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

輸出確實如我們所期望的:對前五個迭代來說,兩個任務(wù)是交替運行的, 而在第二個任務(wù)結(jié)束后, 只有第一個任務(wù)繼續(xù)運行.

與調(diào)度器之間通信

既然調(diào)度器已經(jīng)運行了, 那么我們來看下一個問題:任務(wù)和調(diào)度器之間的通信.

我們將使用進(jìn)程用來和操作系統(tǒng)會話的同樣的方式來通信:系統(tǒng)調(diào)用.

我們需要系統(tǒng)調(diào)用的理由是操作系統(tǒng)與進(jìn)程相比它處在不同的權(quán)限級別上. 因此為了執(zhí)行特權(quán)級別的操作(如殺死另一個進(jìn)程), 就不得不以某種方式把控制傳回給內(nèi)核, 這樣內(nèi)核就可以執(zhí)行所說的操作了. 再說一遍, 這種行為在內(nèi)部是通過使用中斷指令來實現(xiàn)的. 過去使用的是通用的int指令, 如今使用的是更特殊并且更快速的syscall/sysenter指令.

我們的任務(wù)調(diào)度系統(tǒng)將反映這種設(shè)計:不是簡單地把調(diào)度器傳遞給任務(wù)(這樣就允許它做它想做的任何事), 我們將通過給yield表達(dá)式傳遞信息來與系統(tǒng)調(diào)用通信. 這兒yield即是中斷, 也是傳遞信息給調(diào)度器(和從調(diào)度器傳遞出信息)的方法.

為了說明系統(tǒng)調(diào)用, 我們對可調(diào)用的系統(tǒng)調(diào)用做一個小小的封裝:

<?php
class SystemCall {
    protected $callback;
 
    public function __construct(callable $callback) {
        $this->callback = $callback;
    }
 
    public function __invoke(Task $task, Scheduler $scheduler) {
        $callback = $this->callback;
        return $callback($task, $scheduler);
    }
}

它和其他任何可調(diào)用的對象(使用_invoke)一樣的運行, 不過它要求調(diào)度器把正在調(diào)用的任務(wù)和自身傳遞給這個函數(shù).

為了解決這個問題我們不得不微微的修改調(diào)度器的run方法:

<?php
public function run() {
    while (!$this->taskQueue->isEmpty()) {
        $task = $this->taskQueue->dequeue();
        $retval = $task->run();
 
        if ($retval instanceof SystemCall) {
            $retval($task, $this);
            continue;
        }
 
        if ($task->isFinished()) {
            unset($this->taskMap[$task->getTaskId()]);
        } else {
            $this->schedule($task);
        }
    }
}

第一個系統(tǒng)調(diào)用除了返回任務(wù)ID外什么都沒有做:

<?php
function getTaskId() {
    return new SystemCall(function(Task $task, Scheduler $scheduler) {
        $task->setSendValue($task->getTaskId());
        $scheduler->schedule($task);
    });
}

這個函數(shù)設(shè)置任務(wù)id為下一次發(fā)送的值, 并再次調(diào)度了這個任務(wù) .由于使用了系統(tǒng)調(diào)用, 所以調(diào)度器不能自動調(diào)用任務(wù), 我們需要手工調(diào)度任務(wù)(稍后你將明白為什么這么做). 要使用這個新的系統(tǒng)調(diào)用的話, 我們要重新編寫以前的例子:

<?php
function task($max) {
    $tid = (yield getTaskId()); // <-- here's the syscall!
    for ($i = 1; $i <= $max; ++$i) {
        echo "This is task $tid iteration $i.\n";
        yield;
    }
}
 
$scheduler = new Scheduler;
 
$scheduler->newTask(task(10));
$scheduler->newTask(task(5));
 
$scheduler->run();

這段代碼將給出與前一個例子相同的輸出. 請注意系統(tǒng)調(diào)用如何同其他任何調(diào)用一樣正常地運行, 只不過預(yù)先增加了yield.

要創(chuàng)建新的任務(wù), 然后再殺死它們的話, 需要兩個以上的系統(tǒng)調(diào)用:

<?php
function newTask(Generator $coroutine) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($coroutine) {
            $task->setSendValue($scheduler->newTask($coroutine));
            $scheduler->schedule($task);
        }
    );
}
 
function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            $task->setSendValue($scheduler->killTask($tid));
            $scheduler->schedule($task);
        }
    );
}

killTask函數(shù)需要在調(diào)度器里增加一個方法:

<?php
public function killTask($tid) {
    if (!isset($this->taskMap[$tid])) {
        return false;
    }
 
    unset($this->taskMap[$tid]);
 
    // This is a bit ugly and could be optimized so it does not have to walk the queue,
    // but assuming that killing tasks is rather rare I won't bother with it now
    foreach ($this->taskQueue as $i => $task) {
        if ($task->getTaskId() === $tid) {
            unset($this->taskQueue[$i]);
            break;
        }
    }
 
    return true;
}

用來測試新功能的微腳本:

<?php
function childTask() {
    $tid = (yield getTaskId());
    while (true) {
        echo "Child task $tid still alive!\n";
        yield;
    }
}
 
function task() {
    $tid = (yield getTaskId());
    $childTid = (yield newTask(childTask()));
 
    for ($i = 1; $i <= 6; ++$i) {
        echo "Parent task $tid iteration $i.\n";
        yield;
 
        if ($i == 3) yield killTask($childTid);
    }
}
 
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

這段代碼將打印以下信息:

Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.

經(jīng)過三次迭代以后子任務(wù)將被殺死, 因此這就是”Child is still alive”消息結(jié)束的時候. 不過你要明白這還不是真正的父子關(guān)系. 因為在父任務(wù)結(jié)束后子任務(wù)仍然可以運行, 子任務(wù)甚至可以殺死父任務(wù). 可以修改調(diào)度器使它具有更層級化的任務(wù)結(jié)構(gòu), 不過這個不是我們這個文章要繼續(xù)討論的范圍了.

現(xiàn)在你可以實現(xiàn)許多進(jìn)程管理調(diào)用. 例如 wait(它一直等待到任務(wù)結(jié)束運行時), exec(它替代當(dāng)前任務(wù))和fork(它創(chuàng)建一個當(dāng)前任務(wù)的克隆). fork非撤宓酷,而 且你可以使用PHP的協(xié)程真正地實現(xiàn)它, 因為它們都支持克隆.

讓我們把這些留給有興趣的讀者吧,我們來看下一個議題.

非阻塞IO

很明顯, 我們的任務(wù)管理系統(tǒng)的真正很酷的應(yīng)用應(yīng)該是web服務(wù)器. 它有一個任務(wù)是在套接字上偵聽是否有新連接, 當(dāng)有新連接要建立的時候, 它創(chuàng)建一個新任務(wù)來處理新連接.

Web服務(wù)器最難的部分通常是像讀數(shù)據(jù)這樣的套接字操作是阻塞的. 例如PHP將等待到客戶端完成發(fā)送為止. 對一個Web服務(wù)器來說, 這有點不太高效. 因為服務(wù)器在一個時間點上只能處理一個連接.

解決方案是確保在真正對套接字讀寫之前該套接字已經(jīng)“準(zhǔn)備就緒”. 為了查找哪個套接字已經(jīng)準(zhǔn)備好讀或者寫了, 可以使用 流選擇函數(shù).

首先,讓我們添加兩個新的 syscall, 它們將等待直到指定socket 準(zhǔn)備好:

<?php
function waitForRead($socket) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($socket) {
            $scheduler->waitForRead($socket, $task);
        }
    );
}
 
function waitForWrite($socket) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($socket) {
            $scheduler->waitForWrite($socket, $task);
        }
    );
}

這些 syscall 只是在調(diào)度器中代理其各自的方法:

<?php
 
// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];
 
public function waitForRead($socket, Task $task) {
    if (isset($this->waitingForRead[(int) $socket])) {
        $this->waitingForRead[(int) $socket][1][] = $task;
    } else {
        $this->waitingForRead[(int) $socket] = [$socket, [$task]];
    }
}
 
public function waitForWrite($socket, Task $task) {
    if (isset($this->waitingForWrite[(int) $socket])) {
        $this->waitingForWrite[(int) $socket][1][] = $task;
    } else {
        $this->waitingForWrite[(int) $socket] = [$socket, [$task]];
    }
}

waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務(wù)的數(shù)組. 有趣的部分在于下面的方法,它將檢查 socket 是否可用, 并重新安排各自任務(wù):

<?php
 
protected function ioPoll($timeout) {
    $rSocks = [];
    foreach ($this->waitingForRead as list($socket)) {
        $rSocks[] = $socket;
    }
 
    $wSocks = [];
    foreach ($this->waitingForWrite as list($socket)) {
        $wSocks[] = $socket;
    }
 
    $eSocks = []; // dummy
 
    if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
        return;
    }
 
    foreach ($rSocks as $socket) {
        list(, $tasks) = $this->waitingForRead[(int) $socket];
        unset($this->waitingForRead[(int) $socket]);
 
        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
 
    foreach ($wSocks as $socket) {
        list(, $tasks) = $this->waitingForWrite[(int) $socket];
        unset($this->waitingForWrite[(int) $socket]);
 
        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
}

stream_select 函數(shù)接受承載讀取既琴、寫入以及待檢查的socket的數(shù)組(我們無需考慮最后一類). 數(shù)組將按引用傳遞, 函數(shù)只會保留那些狀態(tài)改變了的數(shù)組元素. 我們可以遍歷這些數(shù)組, 并重新安排與之相關(guān)的任務(wù).

為了正常地執(zhí)行上面的輪詢動作, 我們將在調(diào)度器里增加一個特殊的任務(wù):

<?php
protected function ioPollTask() {
    while (true) {
        if ($this->taskQueue->isEmpty()) {
            $this->ioPoll(null);
        } else {
            $this->ioPoll(0);
        }
        yield;
    }
}

需要在某個地方注冊這個任務(wù), 例如, 你可以在run()方法的開始增加$this->newTask($this->ioPollTask()). 然后就像其他任務(wù)一樣每執(zhí)行完整任務(wù)循環(huán)一次就執(zhí)行輪詢操作一次(這么做一定不是最好的方法), ioPollTask將使用0秒的超時來調(diào)用ioPoll, 也就是stream_select將立即返回(而不是等待).

只有任務(wù)隊列為空時,我們才使用null超時,這意味著它一直等到某個套接口準(zhǔn)備就緒.如果我們沒有這么做,那么輪詢?nèi)蝿?wù)將一而再, 再而三的循環(huán)運行, 直到有新的連接建立. 這將導(dǎo)致100%的CPU利用率. 相反, 讓操作系統(tǒng)做這種等待會更有效.

現(xiàn)在編寫服務(wù)器就相對容易了:

<?php
 
function server($port) {
    echo "Starting server at port $port...\n";
 
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
 
    stream_set_blocking($socket, 0);
 
    while (true) {
        yield waitForRead($socket);
        $clientSocket = stream_socket_accept($socket, 0);
        yield newTask(handleClient($clientSocket));
    }
}
 
function handleClient($socket) {
    yield waitForRead($socket);
    $data = fread($socket, 8192);
 
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);
 
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
 
    yield waitForWrite($socket);
    fwrite($socket, $response);
 
    fclose($socket);
}
 
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();

這段代碼實現(xiàn)了接收localhost:8000上的連接, 然后返回發(fā)送來的內(nèi)容作為HTTP響應(yīng). 當(dāng)然它還能處理真正的復(fù)雜HTTP請求, 上面的代碼片段只是演示了一般性的概念.

你可以使用類似于ab -n 10000 -c 100 localhost:8000/這樣命令來測試服務(wù)器. 這條命令將向服務(wù)器發(fā)送10000個請求, 并且其中100個請求將同時到達(dá). 使用這樣的數(shù)目, 我得到了處于中間的10毫秒的響應(yīng)時間. 不過還有一個問題:有少數(shù)幾個請求真正處理的很慢(如5秒), 這就是為什么總吞吐量只有2000請求/秒(如果是10毫秒的響應(yīng)時間的話, 總的吞吐量應(yīng)該更像是10000請求/秒)

協(xié)程堆棧

如果你試圖用我們的調(diào)度系統(tǒng)建立更大的系統(tǒng)的話, 你將很快遇到問題:我們習(xí)慣了把代碼分解為更小的函數(shù), 然后調(diào)用它們. 然而, 如果使用了協(xié)程的話, 就不能這么做了. 例如,看下面代碼:

<?php
function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
}
 
function task() {
    echoTimes('foo', 10); // print foo ten times
    echo "---\n";
    echoTimes('bar', 5); // print bar five times
    yield; // force it to be a coroutine
}
 
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

這段代碼試圖把重復(fù)循環(huán)“輸出n次“的代碼嵌入到一個獨立的協(xié)程里,然后從主任務(wù)里調(diào)用它. 然而它無法運行. 正如在這篇文章的開始所提到的, 調(diào)用生成器(或者協(xié)程)將沒有真正地做任何事情, 它僅僅返回一個對象.這 也出現(xiàn)在上面的例子里:echoTimes調(diào)用除了放回一個(無用的)協(xié)程對象外不做任何事情.

為了仍然允許這么做,我們需要在這個裸協(xié)程上寫一個小小的封裝.我們將調(diào)用它:“協(xié)程堆椄〔担”. 因為它將管理嵌套的協(xié)程調(diào)用堆棧. 這將是通過生成協(xié)程來調(diào)用子協(xié)程成為可能:

$retval = (yield someCoroutine($foo, $bar));

使用yield,子協(xié)程也能再次返回值

yield retval("I'm a return value!");

retval函數(shù)除了返回一個值的封裝外沒有做任何其他事情.這個封裝將表示它是一個返回值.

<?php
 
class CoroutineReturnValue {
    protected $value;
 
    public function __construct($value) {
        $this->value = $value;
    }
 
    public function getValue() {
        return $this->value;
    }
}
 
function retval($value) {
    return new CoroutineReturnValue($value);
}

為了把協(xié)程轉(zhuǎn)變?yōu)閰f(xié)程堆棧(它支持子調(diào)用),我們將不得不編寫另外一個函數(shù)(很明顯,它是另一個協(xié)程):

<?php
 
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
 
    for (;;) {
        $value = $gen->current();
 
        if ($value instanceof Generator) {
            $stack->push($gen);
            $gen = $value;
            continue;
        }
 
        $isReturnValue = $value instanceof CoroutineReturnValue;
        if (!$gen->valid() || $isReturnValue) {
            if ($stack->isEmpty()) {
                return;
            }
 
            $gen = $stack->pop();
            $gen->send($isReturnValue ? $value->getValue() : NULL);
            continue;
        }
 
        $gen->send(yield $gen->key() => $value);
    }
}

這個函數(shù)在調(diào)用者和當(dāng)前正在運行的子協(xié)程之間扮演著簡單代理的角色.在$gen->send(yield $gen->key()=>$value)盗尸;這行完成了代理功能.另外它檢查返回值是否是生成器,萬一是生成器的話,它將開始運行這個生成器,并把前一個協(xié)程壓入堆棧里.一旦它獲得了CoroutineReturnValue的話,它將再次請求堆棧彈出,然后繼續(xù)執(zhí)行前一個協(xié)程.

為了使協(xié)程堆棧在任務(wù)里可用,任務(wù)構(gòu)造器里的$this-coroutine =$coroutine;這行需要替代為$this->coroutine = StackedCoroutine($coroutine);.

現(xiàn)在我們可以稍微改進(jìn)上面web服務(wù)器例子:把wait+read(和wait+write和warit+accept)這樣的動作分組為函數(shù).為了分組相關(guān)的 功能,我將使用下面類:

<?php
 
class CoSocket {
    protected $socket;
 
    public function __construct($socket) {
        $this->socket = $socket;
    }
 
    public function accept() {
        yield waitForRead($this->socket);
        yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
    }
 
    public function read($size) {
        yield waitForRead($this->socket);
        yield retval(fread($this->socket, $size));
    }
 
    public function write($string) {
        yield waitForWrite($this->socket);
        fwrite($this->socket, $string);
    }
 
    public function close() {
        @fclose($this->socket);
    }
}

現(xiàn)在服務(wù)器可以編寫的稍微簡潔點了:

<?php
 
function server($port) {
    echo "Starting server at port $port...\n";
 
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
 
    stream_set_blocking($socket, 0);
 
    $socket = new CoSocket($socket);
    while (true) {
        yield newTask(
            handleClient(yield $socket->accept())
        );
    }
}
 
function handleClient($socket) {
    $data = (yield $socket->read(8192));
 
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);
 
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
 
    yield $socket->write($response);
    yield $socket->close();
}

錯誤處理

作為一個優(yōu)秀的程序員, 相信你已經(jīng)察覺到上面的例子缺少錯誤處理. 幾乎所有的 socket 都是易出錯的. 我沒有這樣做的原因一方面固然是因為錯誤處理的乏味(特別是 socket), 另一方面也在于它很容易使代碼體積膨脹.

不過, 我仍然想講下常見的協(xié)程錯誤處理:協(xié)程允許使用 throw() 方法在其內(nèi)部拋出一個錯誤.

throw() 方法接受一個 Exception, 并將其拋出到協(xié)程的當(dāng)前懸掛點, 看看下面代碼:

<?php
function gen() {
    echo "Foo\n";
    try {
        yield;
    } catch (Exception $e) {
        echo "Exception: {$e->getMessage()}\n";
    }
    echo "Bar\n";
}
 
$gen = gen();
$gen->rewind(); // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
                                    // and "Bar"

這非常好, 有沒有? 因為我們現(xiàn)在可以使用系統(tǒng)調(diào)用以及子協(xié)程調(diào)用異常拋出了.

不過我們要對系統(tǒng)調(diào)用Scheduler::run() 方法做一些小調(diào)整:

<?php
if ($retval instanceof SystemCall) {
    try {
        $retval($task, $this);
    } catch (Exception $e) {
        $task->setException($e);
        $this->schedule($task);
    }
    continue;
}

Task 類也要添加 throw 調(diào)用處理:

<?php
class Task {
    // ...
    protected $exception = null;
 
    public function setException($exception) {
        $this->exception = $exception;
    }
 
    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } elseif ($this->exception) {
            $retval = $this->coroutine->throw($this->exception);
            $this->exception = null;
            return $retval;
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }
 
    // ...
}

現(xiàn)在, 我們已經(jīng)可以在系統(tǒng)調(diào)用中使用異常拋出了!例如,要調(diào)用 killTask,讓我們在傳遞 ID 不可用時拋出一個異常:

<?php
function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            if ($scheduler->killTask($tid)) {
                $scheduler->schedule($task);
            } else {
                throw new InvalidArgumentException('Invalid task ID!');
            }
        }
    );
}

試試看:

<?php
function task() {
    try {
        yield killTask(500);
    } catch (Exception $e) {
        echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
    }
}

這些代碼現(xiàn)在尚不能正常運作,因為 stackedCoroutine 函數(shù)無法正確處理異常.要修復(fù)需要做些調(diào)整:

<?php
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
    $exception = null;
 
    for (;;) {
        try {
            if ($exception) {
                $gen->throw($exception);
                $exception = null;
                continue;
            }
 
            $value = $gen->current();
 
            if ($value instanceof Generator) {
                $stack->push($gen);
                $gen = $value;
                continue;
            }
 
            $isReturnValue = $value instanceof CoroutineReturnValue;
            if (!$gen->valid() || $isReturnValue) {
                if ($stack->isEmpty()) {
                    return;
                }
 
                $gen = $stack->pop();
                $gen->send($isReturnValue ? $value->getValue() : NULL);
                continue;
            }
 
            try {
                $sendValue = (yield $gen->key() => $value);
            } catch (Exception $e) {
                $gen->throw($e);
                continue;
            }
 
            $gen->send($sendValue);
        } catch (Exception $e) {
            if ($stack->isEmpty()) {
                throw $e;
            }
 
            $gen = $stack->pop();
            $exception = $e;
        }
    }
}

結(jié)束語

在這篇文章里,我使用多任務(wù)協(xié)作構(gòu)建了一個任務(wù)調(diào)度器, 其中包括執(zhí)行“系統(tǒng)調(diào)用”, 做非阻塞操作和處理錯誤. 所有這些里真正很酷的事情是任務(wù)的結(jié)果代碼看起來完全同步, 甚至任務(wù)正在執(zhí)行大量的異步操作的時候也是這樣.

如果你打算從套接口讀取數(shù)據(jù)的話, 你將不需要傳遞某個回調(diào)函數(shù)或者注冊一個事件偵聽器. 相反, 你只要書寫yield $socket->read(). 這兒大部分都是你常常也要編寫的,只 在它的前面增加yield.

當(dāng)我第一次聽到協(xié)程的時候, 我發(fā)現(xiàn)這個概念完全令人折服, 正是因為這個激勵我在PHP中實現(xiàn)了它. 同時我發(fā)現(xiàn)協(xié)程真正非常的令人驚嘆:在令人敬畏的代碼和一大堆亂代碼之間只有一線之隔, 我認(rèn)為協(xié)程恰好處在這條線上, 不多不少. 不過, 要說使用上面所述的方法書寫異步代碼是否真的有益, 這個就見仁見智了.

但, 不管咋樣, 我認(rèn)為這是一個有趣的話題, 而且我希望你也能找到它的樂趣. 歡迎評論:)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末澎怒,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌喷面,老刑警劉巖星瘾,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異惧辈,居然都是意外死亡琳状,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門盒齿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來念逞,“玉大人,你說我怎么就攤上這事边翁◆岢校” “怎么了?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵符匾,是天一觀的道長叨咖。 經(jīng)常有香客問我,道長啊胶,這世上最難降的妖魔是什么甸各? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮创淡,結(jié)果婚禮上痴晦,老公的妹妹穿的比我還像新娘。我一直安慰自己琳彩,他們只是感情好誊酌,可當(dāng)我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著露乏,像睡著了一般碧浊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瘟仿,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天箱锐,我揣著相機(jī)與錄音,去河邊找鬼劳较。 笑死驹止,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的观蜗。 我是一名探鬼主播臊恋,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼墓捻!你這毒婦竟也來了抖仅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎撤卢,沒想到半個月后环凿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡放吩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年智听,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片渡紫。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡瞭稼,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出腻惠,到底是詐尸還是另有隱情,我是刑警寧澤欲虚,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布集灌,位于F島的核電站,受9級特大地震影響复哆,放射性物質(zhì)發(fā)生泄漏欣喧。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一梯找、第九天 我趴在偏房一處隱蔽的房頂上張望唆阿。 院中可真熱鬧,春花似錦锈锤、人聲如沸驯鳖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浅辙。三九已至,卻和暖如春阎姥,著一層夾襖步出監(jiān)牢的瞬間记舆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工呼巴, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留泽腮,地道東北人。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓衣赶,卻偏偏與公主長得像诊赊,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子屑埋,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容