概述
協(xié)程一個異常強(qiáng)大的概念。原文:
鳥哥博客:http://www.laruence.com/2015/05/28/3038.html
PHP5.5一個比較好的新功能是加入了對迭代生成器和協(xié)程的支持.對于生成器,PHP的文檔和各種其他的博客文章已經(jīng)有了非常詳細(xì)的講解.協(xié)程相對受到的關(guān)注就少了,因為協(xié)程雖然有很強(qiáng)大的功能但相對比較復(fù)雜, 也比較難被理解,解釋起來也比較困難.
這篇文章將嘗試通過介紹如何使用協(xié)程來實施任務(wù)調(diào)度, 來解釋在PHP中的協(xié)程.
我將在前三節(jié)做一個簡單的背景介紹.如果你已經(jīng)有了比較好的基礎(chǔ),可以直接跳到“協(xié)同多任務(wù)處理”一節(jié).
迭代生成器
(迭代)生成器也是一個函數(shù),不同的是這個函數(shù)的返回值是依次返回,而不是只返回一個單獨的值.或者,換句話說,生成器使你能更方便的實現(xiàn)了迭代器接口.下面通過實現(xiàn)一個xrange函數(shù)來簡單說明:
<?php
function xrange($start, $end, $step = 1) {
for ($i = $start; $i <= $end; $i += $step) {
yield $i;
}
}
foreach (xrange(1, 1000000) as $num) {
echo $num, "\n";
}
上面這個xrange()函數(shù)提供了和PHP的內(nèi)建函數(shù)range()一樣的功能.但是不同的是range()函數(shù)返回的是一個包含值從1到100萬0的數(shù)組(注:請查看手冊). 而xrange()函數(shù)返回的是依次輸出這些值的一個迭代器, 而不會真正以數(shù)組形式返回.
這種方法的優(yōu)點是顯而易見的.它可以讓你在處理大數(shù)據(jù)集合的時候不用一次性的加載到內(nèi)存中.甚至你可以處理無限大的數(shù)據(jù)流.
當(dāng)然,也可以不同通過生成器來實現(xiàn)這個功能,而是可以通過繼承Iterator接口實現(xiàn).但通過使用生成器實現(xiàn)起來會更方便,不用再去實現(xiàn)iterator接口中的5個方法了.
生成器為可中斷的函數(shù)
要從生成器認(rèn)識協(xié)程, 理解它內(nèi)部是如何工作是非常重要的: 生成器是一種可中斷的函數(shù), 在它里面的yield構(gòu)成了中斷點.
還是看上面的例子, 調(diào)用xrange(1,1000000)的時候, xrange()函數(shù)里代碼其實并沒有真正地運行. 它只是返回了一個迭代器:
<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)
這也解釋了為什么xrange叫做迭代生成器, 因為它返回一個迭代器, 而這個迭代器實現(xiàn)了Iterator接口.
調(diào)用迭代器的方法一次, 其中的代碼運行一次.例如, 如果你調(diào)用$range->rewind(), 那么xrange()里的代碼就會運行到控制流第一次出現(xiàn)yield的地方. 而函數(shù)內(nèi)傳遞給yield語句的返回值可以通過$range->current()獲取.
為了繼續(xù)執(zhí)行生成器中yield后的代碼, 你就需要調(diào)用$range->next()方法. 這將再次啟動生成器, 直到下一次yield語句出現(xiàn). 因此,連續(xù)調(diào)用next()和current()方法, 你就能從生成器里獲得所有的值, 直到再沒有yield語句出現(xiàn).
對xrange()來說, 這種情形出現(xiàn)在$i超過$end時. 在這中情況下, 控制流將到達(dá)函數(shù)的終點,因此將不執(zhí)行任何代碼.一旦這種情況發(fā)生,vaild()方法將返回假, 這時迭代結(jié)束.
協(xié)程
協(xié)程的支持是在迭代生成器的基礎(chǔ)上, 增加了可以回送數(shù)據(jù)給生成器的功能(調(diào)用者發(fā)送數(shù)據(jù)給被調(diào)用的生成器函數(shù)). 這就把生成器到調(diào)用者的單向通信轉(zhuǎn)變?yōu)閮烧咧g的雙向通信.
傳遞數(shù)據(jù)的功能是通過迭代器的send()方法實現(xiàn)的. 下面的logger()協(xié)程是這種通信如何運行的例子:
<?php
function logger($fileName) {
$fileHandle = fopen($fileName, 'a');
while (true) {
fwrite($fileHandle, yield . "\n");
}
}
$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')
正如你能看到,這兒yield沒有作為一個語句來使用, 而是用作一個表達(dá)式, 即它能被演化成一個值. 這個值就是調(diào)用者傳遞給send()方法的值. 在這個例子里, yield表達(dá)式將首先被”Foo”替代寫入Log, 然后被”Bar”替代寫入Log.
上面的例子里演示了yield作為接受者, 接下來我們看如何同時進(jìn)行接收和發(fā)送的例子:
<?php
function gen() {
$ret = (yield 'yield1');
var_dump($ret);
$ret = (yield 'yield2');
var_dump($ret);
}
$gen = gen();
var_dump($gen->current()); // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen)
// string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen)
// NULL (the return value of ->send())
要很快的理解輸出的精確順序可能稍微有點困難, 但你確定要搞清楚為什按照這種方式輸出. 以便后續(xù)繼續(xù)閱讀.
另外, 我要特別指出的有兩點:
第一點,yield表達(dá)式兩邊的括號在PHP7以前不是可選的, 也就是說在PHP5.5和PHP5.6中圓括號是必須的.
第二點,你可能已經(jīng)注意到調(diào)用current()之前沒有調(diào)用rewind().這是因為生成迭代對象的時候已經(jīng)隱含地執(zhí)行了rewind操作.
多任務(wù)協(xié)作
如果閱讀了上面的logger()例子, 你也許會疑惑“為了雙向通信我為什么要使用協(xié)程呢色瘩?我完全可以使用其他非協(xié)程方法實現(xiàn)同樣的功能啊?”, 是的, 你是對的, 但上面的例子只是為了演示了基本用法, 這個例子其實并沒有真正的展示出使用協(xié)程的優(yōu)點.
正如上面介紹里提到的,協(xié)程是非常強(qiáng)大的概念,不過卻應(yīng)用的很稀少而且常常十分復(fù)雜.要給出一些簡單而真實的例子很難.
在這篇文章里,我決定去做的是使用協(xié)程實現(xiàn)多任務(wù)協(xié)作.我們要解決的問題是你想并發(fā)地運行多任務(wù)(或者“程序”).不過我們都知道CPU在一個時刻只能運行一個任務(wù)(不考慮多核的情況).因此處理器需要在不同的任務(wù)之間進(jìn)行切換,而且總是讓每個任務(wù)運行 “一小會兒”.
多任務(wù)協(xié)作這個術(shù)語中的“協(xié)作”很好的說明了如何進(jìn)行這種切換的:它要求當(dāng)前正在運行的任務(wù)自動把控制傳回給調(diào)度器,這樣就可以運行其他任務(wù)了. 這與“搶占”多任務(wù)相反, 搶占多任務(wù)是這樣的:調(diào)度器可以中斷運行了一段時間的任務(wù), 不管它喜歡還是不喜歡. 協(xié)作多任務(wù)在Windows的早期版本(windows95)和Mac OS中有使用, 不過它們后來都切換到使用搶先多任務(wù)了. 理由相當(dāng)明確:如果你依靠程序自動交出控制的話, 那么一些惡意的程序?qū)⒑苋菀渍加谜麄€CPU, 不與其他任務(wù)共享.
現(xiàn)在你應(yīng)當(dāng)明白協(xié)程和任務(wù)調(diào)度之間的關(guān)系:yield指令提供了任務(wù)中斷自身的一種方法, 然后把控制交回給任務(wù)調(diào)度器. 因此協(xié)程可以運行多個其他任務(wù). 更進(jìn)一步來說, yield還可以用來在任務(wù)和調(diào)度器之間進(jìn)行通信.
為了實現(xiàn)我們的多任務(wù)調(diào)度, 首先實現(xiàn)“任務(wù)” — 一個用輕量級的包裝的協(xié)程函數(shù):
<?php
class Task {
protected $taskId;
protected $coroutine;
protected $sendValue = null;
protected $beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine) {
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId() {
return $this->taskId;
}
public function setSendValue($sendValue) {
$this->sendValue = $sendValue;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
public function isFinished() {
return !$this->coroutine->valid();
}
}
如代碼, 一個任務(wù)就是用任務(wù)ID標(biāo)記的一個協(xié)程(函數(shù)). 使用setSendValue()方法, 你可以指定哪些值將被發(fā)送到下次的恢復(fù)(在之后你會了解到我們需要這個), run()函數(shù)確實沒有做什么, 除了調(diào)用send()方法的協(xié)同程序, 要理解為什么添加了一個 beforeFirstYieldflag變量, 需要考慮下面的代碼片段:
<?php
function gen() {
yield 'foo';
yield 'bar';
}
$gen = gen();
var_dump($gen->send('something'));
// 如之前提到的在send之前, 當(dāng)$gen迭代器被創(chuàng)建的時候一個renwind()方法已經(jīng)被隱式調(diào)用
// 所以實際上發(fā)生的應(yīng)該類似:
//$gen->rewind();
//var_dump($gen->send('something'));
//這樣renwind的執(zhí)行將會導(dǎo)致第一個yield被執(zhí)行, 并且忽略了他的返回值.
//真正當(dāng)我們調(diào)用yield的時候, 我們得到的是第二個yield的值! 導(dǎo)致第一個yield的值被忽略.
//string(3) "bar"
通過添加 beforeFirstYieldcondition 我們可以確定第一個yield的值能被正確返回.
調(diào)度器現(xiàn)在不得不比多任務(wù)循環(huán)要做稍微多點了, 然后才運行多任務(wù):
<?php
class Scheduler {
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this->taskQueue = new SplQueue();
}
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {
$this->taskQueue->enqueue($task);
}
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}
newTask()方法(使用下一個空閑的任務(wù)id)創(chuàng)建一個新任務(wù),然后把這個任務(wù)放入任務(wù)map數(shù)組里. 接著它通過把任務(wù)放入任務(wù)隊列里來實現(xiàn)對任務(wù)的調(diào)度. 接著run()方法掃描任務(wù)隊列, 運行任務(wù).如果一個任務(wù)結(jié)束了, 那么它將從隊列里刪除, 否則它將在隊列的末尾再次被調(diào)度.
讓我們看看下面具有兩個簡單(沒有什么意義)任務(wù)的調(diào)度器:
<?php
function task1() {
for ($i = 1; $i <= 10; ++$i) {
echo "This is task 1 iteration $i.\n";
yield;
}
}
function task2() {
for ($i = 1; $i <= 5; ++$i) {
echo "This is task 2 iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();
兩個任務(wù)都僅僅回顯一條信息,然后使用yield把控制回傳給調(diào)度器.輸出結(jié)果如下:
This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.
輸出確實如我們所期望的:對前五個迭代來說,兩個任務(wù)是交替運行的, 而在第二個任務(wù)結(jié)束后, 只有第一個任務(wù)繼續(xù)運行.
與調(diào)度器之間通信
既然調(diào)度器已經(jīng)運行了, 那么我們來看下一個問題:任務(wù)和調(diào)度器之間的通信.
我們將使用進(jìn)程用來和操作系統(tǒng)會話的同樣的方式來通信:系統(tǒng)調(diào)用.
我們需要系統(tǒng)調(diào)用的理由是操作系統(tǒng)與進(jìn)程相比它處在不同的權(quán)限級別上. 因此為了執(zhí)行特權(quán)級別的操作(如殺死另一個進(jìn)程), 就不得不以某種方式把控制傳回給內(nèi)核, 這樣內(nèi)核就可以執(zhí)行所說的操作了. 再說一遍, 這種行為在內(nèi)部是通過使用中斷指令來實現(xiàn)的. 過去使用的是通用的int指令, 如今使用的是更特殊并且更快速的syscall/sysenter指令.
我們的任務(wù)調(diào)度系統(tǒng)將反映這種設(shè)計:不是簡單地把調(diào)度器傳遞給任務(wù)(這樣就允許它做它想做的任何事), 我們將通過給yield表達(dá)式傳遞信息來與系統(tǒng)調(diào)用通信. 這兒yield即是中斷, 也是傳遞信息給調(diào)度器(和從調(diào)度器傳遞出信息)的方法.
為了說明系統(tǒng)調(diào)用, 我們對可調(diào)用的系統(tǒng)調(diào)用做一個小小的封裝:
<?php
class SystemCall {
protected $callback;
public function __construct(callable $callback) {
$this->callback = $callback;
}
public function __invoke(Task $task, Scheduler $scheduler) {
$callback = $this->callback;
return $callback($task, $scheduler);
}
}
它和其他任何可調(diào)用的對象(使用_invoke)一樣的運行, 不過它要求調(diào)度器把正在調(diào)用的任務(wù)和自身傳遞給這個函數(shù).
為了解決這個問題我們不得不微微的修改調(diào)度器的run方法:
<?php
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$retval = $task->run();
if ($retval instanceof SystemCall) {
$retval($task, $this);
continue;
}
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
第一個系統(tǒng)調(diào)用除了返回任務(wù)ID外什么都沒有做:
<?php
function getTaskId() {
return new SystemCall(function(Task $task, Scheduler $scheduler) {
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}
這個函數(shù)設(shè)置任務(wù)id為下一次發(fā)送的值, 并再次調(diào)度了這個任務(wù) .由于使用了系統(tǒng)調(diào)用, 所以調(diào)度器不能自動調(diào)用任務(wù), 我們需要手工調(diào)度任務(wù)(稍后你將明白為什么這么做). 要使用這個新的系統(tǒng)調(diào)用的話, 我們要重新編寫以前的例子:
<?php
function task($max) {
$tid = (yield getTaskId()); // <-- here's the syscall!
for ($i = 1; $i <= $max; ++$i) {
echo "This is task $tid iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task(10));
$scheduler->newTask(task(5));
$scheduler->run();
這段代碼將給出與前一個例子相同的輸出. 請注意系統(tǒng)調(diào)用如何同其他任何調(diào)用一樣正常地運行, 只不過預(yù)先增加了yield.
要創(chuàng)建新的任務(wù), 然后再殺死它們的話, 需要兩個以上的系統(tǒng)調(diào)用:
<?php
function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($coroutine) {
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
$task->setSendValue($scheduler->killTask($tid));
$scheduler->schedule($task);
}
);
}
killTask函數(shù)需要在調(diào)度器里增加一個方法:
<?php
public function killTask($tid) {
if (!isset($this->taskMap[$tid])) {
return false;
}
unset($this->taskMap[$tid]);
// This is a bit ugly and could be optimized so it does not have to walk the queue,
// but assuming that killing tasks is rather rare I won't bother with it now
foreach ($this->taskQueue as $i => $task) {
if ($task->getTaskId() === $tid) {
unset($this->taskQueue[$i]);
break;
}
}
return true;
}
用來測試新功能的微腳本:
<?php
function childTask() {
$tid = (yield getTaskId());
while (true) {
echo "Child task $tid still alive!\n";
yield;
}
}
function task() {
$tid = (yield getTaskId());
$childTid = (yield newTask(childTask()));
for ($i = 1; $i <= 6; ++$i) {
echo "Parent task $tid iteration $i.\n";
yield;
if ($i == 3) yield killTask($childTid);
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
這段代碼將打印以下信息:
Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.
經(jīng)過三次迭代以后子任務(wù)將被殺死, 因此這就是”Child is still alive”消息結(jié)束的時候. 不過你要明白這還不是真正的父子關(guān)系. 因為在父任務(wù)結(jié)束后子任務(wù)仍然可以運行, 子任務(wù)甚至可以殺死父任務(wù). 可以修改調(diào)度器使它具有更層級化的任務(wù)結(jié)構(gòu), 不過這個不是我們這個文章要繼續(xù)討論的范圍了.
現(xiàn)在你可以實現(xiàn)許多進(jìn)程管理調(diào)用. 例如 wait(它一直等待到任務(wù)結(jié)束運行時), exec(它替代當(dāng)前任務(wù))和fork(它創(chuàng)建一個當(dāng)前任務(wù)的克隆). fork非撤宓酷,而 且你可以使用PHP的協(xié)程真正地實現(xiàn)它, 因為它們都支持克隆.
讓我們把這些留給有興趣的讀者吧,我們來看下一個議題.
非阻塞IO
很明顯, 我們的任務(wù)管理系統(tǒng)的真正很酷的應(yīng)用應(yīng)該是web服務(wù)器. 它有一個任務(wù)是在套接字上偵聽是否有新連接, 當(dāng)有新連接要建立的時候, 它創(chuàng)建一個新任務(wù)來處理新連接.
Web服務(wù)器最難的部分通常是像讀數(shù)據(jù)這樣的套接字操作是阻塞的. 例如PHP將等待到客戶端完成發(fā)送為止. 對一個Web服務(wù)器來說, 這有點不太高效. 因為服務(wù)器在一個時間點上只能處理一個連接.
解決方案是確保在真正對套接字讀寫之前該套接字已經(jīng)“準(zhǔn)備就緒”. 為了查找哪個套接字已經(jīng)準(zhǔn)備好讀或者寫了, 可以使用 流選擇函數(shù).
首先,讓我們添加兩個新的 syscall, 它們將等待直到指定socket 準(zhǔn)備好:
<?php
function waitForRead($socket) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($socket) {
$scheduler->waitForRead($socket, $task);
}
);
}
function waitForWrite($socket) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($socket) {
$scheduler->waitForWrite($socket, $task);
}
);
}
這些 syscall 只是在調(diào)度器中代理其各自的方法:
<?php
// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];
public function waitForRead($socket, Task $task) {
if (isset($this->waitingForRead[(int) $socket])) {
$this->waitingForRead[(int) $socket][1][] = $task;
} else {
$this->waitingForRead[(int) $socket] = [$socket, [$task]];
}
}
public function waitForWrite($socket, Task $task) {
if (isset($this->waitingForWrite[(int) $socket])) {
$this->waitingForWrite[(int) $socket][1][] = $task;
} else {
$this->waitingForWrite[(int) $socket] = [$socket, [$task]];
}
}
waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務(wù)的數(shù)組. 有趣的部分在于下面的方法,它將檢查 socket 是否可用, 并重新安排各自任務(wù):
<?php
protected function ioPoll($timeout) {
$rSocks = [];
foreach ($this->waitingForRead as list($socket)) {
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($this->waitingForWrite as list($socket)) {
$wSocks[] = $socket;
}
$eSocks = []; // dummy
if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
return;
}
foreach ($rSocks as $socket) {
list(, $tasks) = $this->waitingForRead[(int) $socket];
unset($this->waitingForRead[(int) $socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
foreach ($wSocks as $socket) {
list(, $tasks) = $this->waitingForWrite[(int) $socket];
unset($this->waitingForWrite[(int) $socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
}
stream_select 函數(shù)接受承載讀取既琴、寫入以及待檢查的socket的數(shù)組(我們無需考慮最后一類). 數(shù)組將按引用傳遞, 函數(shù)只會保留那些狀態(tài)改變了的數(shù)組元素. 我們可以遍歷這些數(shù)組, 并重新安排與之相關(guān)的任務(wù).
為了正常地執(zhí)行上面的輪詢動作, 我們將在調(diào)度器里增加一個特殊的任務(wù):
<?php
protected function ioPollTask() {
while (true) {
if ($this->taskQueue->isEmpty()) {
$this->ioPoll(null);
} else {
$this->ioPoll(0);
}
yield;
}
}
需要在某個地方注冊這個任務(wù), 例如, 你可以在run()方法的開始增加$this->newTask($this->ioPollTask()). 然后就像其他任務(wù)一樣每執(zhí)行完整任務(wù)循環(huán)一次就執(zhí)行輪詢操作一次(這么做一定不是最好的方法), ioPollTask將使用0秒的超時來調(diào)用ioPoll, 也就是stream_select將立即返回(而不是等待).
只有任務(wù)隊列為空時,我們才使用null超時,這意味著它一直等到某個套接口準(zhǔn)備就緒.如果我們沒有這么做,那么輪詢?nèi)蝿?wù)將一而再, 再而三的循環(huán)運行, 直到有新的連接建立. 這將導(dǎo)致100%的CPU利用率. 相反, 讓操作系統(tǒng)做這種等待會更有效.
現(xiàn)在編寫服務(wù)器就相對容易了:
<?php
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
while (true) {
yield waitForRead($socket);
$clientSocket = stream_socket_accept($socket, 0);
yield newTask(handleClient($clientSocket));
}
}
function handleClient($socket) {
yield waitForRead($socket);
$data = fread($socket, 8192);
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield waitForWrite($socket);
fwrite($socket, $response);
fclose($socket);
}
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();
這段代碼實現(xiàn)了接收localhost:8000上的連接, 然后返回發(fā)送來的內(nèi)容作為HTTP響應(yīng). 當(dāng)然它還能處理真正的復(fù)雜HTTP請求, 上面的代碼片段只是演示了一般性的概念.
你可以使用類似于ab -n 10000 -c 100 localhost:8000/這樣命令來測試服務(wù)器. 這條命令將向服務(wù)器發(fā)送10000個請求, 并且其中100個請求將同時到達(dá). 使用這樣的數(shù)目, 我得到了處于中間的10毫秒的響應(yīng)時間. 不過還有一個問題:有少數(shù)幾個請求真正處理的很慢(如5秒), 這就是為什么總吞吐量只有2000請求/秒(如果是10毫秒的響應(yīng)時間的話, 總的吞吐量應(yīng)該更像是10000請求/秒)
協(xié)程堆棧
如果你試圖用我們的調(diào)度系統(tǒng)建立更大的系統(tǒng)的話, 你將很快遇到問題:我們習(xí)慣了把代碼分解為更小的函數(shù), 然后調(diào)用它們. 然而, 如果使用了協(xié)程的話, 就不能這么做了. 例如,看下面代碼:
<?php
function echoTimes($msg, $max) {
for ($i = 1; $i <= $max; ++$i) {
echo "$msg iteration $i\n";
yield;
}
}
function task() {
echoTimes('foo', 10); // print foo ten times
echo "---\n";
echoTimes('bar', 5); // print bar five times
yield; // force it to be a coroutine
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
這段代碼試圖把重復(fù)循環(huán)“輸出n次“的代碼嵌入到一個獨立的協(xié)程里,然后從主任務(wù)里調(diào)用它. 然而它無法運行. 正如在這篇文章的開始所提到的, 調(diào)用生成器(或者協(xié)程)將沒有真正地做任何事情, 它僅僅返回一個對象.這 也出現(xiàn)在上面的例子里:echoTimes調(diào)用除了放回一個(無用的)協(xié)程對象外不做任何事情.
為了仍然允許這么做,我們需要在這個裸協(xié)程上寫一個小小的封裝.我們將調(diào)用它:“協(xié)程堆椄〔担”. 因為它將管理嵌套的協(xié)程調(diào)用堆棧. 這將是通過生成協(xié)程來調(diào)用子協(xié)程成為可能:
$retval = (yield someCoroutine($foo, $bar));
使用yield,子協(xié)程也能再次返回值
yield retval("I'm a return value!");
retval函數(shù)除了返回一個值的封裝外沒有做任何其他事情.這個封裝將表示它是一個返回值.
<?php
class CoroutineReturnValue {
protected $value;
public function __construct($value) {
$this->value = $value;
}
public function getValue() {
return $this->value;
}
}
function retval($value) {
return new CoroutineReturnValue($value);
}
為了把協(xié)程轉(zhuǎn)變?yōu)閰f(xié)程堆棧(它支持子調(diào)用),我們將不得不編寫另外一個函數(shù)(很明顯,它是另一個協(xié)程):
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
for (;;) {
$value = $gen->current();
if ($value instanceof Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : NULL);
continue;
}
$gen->send(yield $gen->key() => $value);
}
}
這個函數(shù)在調(diào)用者和當(dāng)前正在運行的子協(xié)程之間扮演著簡單代理的角色.在$gen->send(yield $gen->key()=>$value)盗尸;這行完成了代理功能.另外它檢查返回值是否是生成器,萬一是生成器的話,它將開始運行這個生成器,并把前一個協(xié)程壓入堆棧里.一旦它獲得了CoroutineReturnValue的話,它將再次請求堆棧彈出,然后繼續(xù)執(zhí)行前一個協(xié)程.
為了使協(xié)程堆棧在任務(wù)里可用,任務(wù)構(gòu)造器里的$this-coroutine =$coroutine;這行需要替代為$this->coroutine = StackedCoroutine($coroutine);.
現(xiàn)在我們可以稍微改進(jìn)上面web服務(wù)器例子:把wait+read(和wait+write和warit+accept)這樣的動作分組為函數(shù).為了分組相關(guān)的 功能,我將使用下面類:
<?php
class CoSocket {
protected $socket;
public function __construct($socket) {
$this->socket = $socket;
}
public function accept() {
yield waitForRead($this->socket);
yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
}
public function read($size) {
yield waitForRead($this->socket);
yield retval(fread($this->socket, $size));
}
public function write($string) {
yield waitForWrite($this->socket);
fwrite($this->socket, $string);
}
public function close() {
@fclose($this->socket);
}
}
現(xiàn)在服務(wù)器可以編寫的稍微簡潔點了:
<?php
function server($port) {
echo "Starting server at port $port...\n";
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
stream_set_blocking($socket, 0);
$socket = new CoSocket($socket);
while (true) {
yield newTask(
handleClient(yield $socket->accept())
);
}
}
function handleClient($socket) {
$data = (yield $socket->read(8192));
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield $socket->write($response);
yield $socket->close();
}
錯誤處理
作為一個優(yōu)秀的程序員, 相信你已經(jīng)察覺到上面的例子缺少錯誤處理. 幾乎所有的 socket 都是易出錯的. 我沒有這樣做的原因一方面固然是因為錯誤處理的乏味(特別是 socket), 另一方面也在于它很容易使代碼體積膨脹.
不過, 我仍然想講下常見的協(xié)程錯誤處理:協(xié)程允許使用 throw() 方法在其內(nèi)部拋出一個錯誤.
throw() 方法接受一個 Exception, 并將其拋出到協(xié)程的當(dāng)前懸掛點, 看看下面代碼:
<?php
function gen() {
echo "Foo\n";
try {
yield;
} catch (Exception $e) {
echo "Exception: {$e->getMessage()}\n";
}
echo "Bar\n";
}
$gen = gen();
$gen->rewind(); // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
// and "Bar"
這非常好, 有沒有? 因為我們現(xiàn)在可以使用系統(tǒng)調(diào)用以及子協(xié)程調(diào)用異常拋出了.
不過我們要對系統(tǒng)調(diào)用Scheduler::run() 方法做一些小調(diào)整:
<?php
if ($retval instanceof SystemCall) {
try {
$retval($task, $this);
} catch (Exception $e) {
$task->setException($e);
$this->schedule($task);
}
continue;
}
Task 類也要添加 throw 調(diào)用處理:
<?php
class Task {
// ...
protected $exception = null;
public function setException($exception) {
$this->exception = $exception;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} elseif ($this->exception) {
$retval = $this->coroutine->throw($this->exception);
$this->exception = null;
return $retval;
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
// ...
}
現(xiàn)在, 我們已經(jīng)可以在系統(tǒng)調(diào)用中使用異常拋出了!例如,要調(diào)用 killTask,讓我們在傳遞 ID 不可用時拋出一個異常:
<?php
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
if ($scheduler->killTask($tid)) {
$scheduler->schedule($task);
} else {
throw new InvalidArgumentException('Invalid task ID!');
}
}
);
}
試試看:
<?php
function task() {
try {
yield killTask(500);
} catch (Exception $e) {
echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
}
}
這些代碼現(xiàn)在尚不能正常運作,因為 stackedCoroutine 函數(shù)無法正確處理異常.要修復(fù)需要做些調(diào)整:
<?php
function stackedCoroutine(Generator $gen) {
$stack = new SplStack;
$exception = null;
for (;;) {
try {
if ($exception) {
$gen->throw($exception);
$exception = null;
continue;
}
$value = $gen->current();
if ($value instanceof Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof CoroutineReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : NULL);
continue;
}
try {
$sendValue = (yield $gen->key() => $value);
} catch (Exception $e) {
$gen->throw($e);
continue;
}
$gen->send($sendValue);
} catch (Exception $e) {
if ($stack->isEmpty()) {
throw $e;
}
$gen = $stack->pop();
$exception = $e;
}
}
}
結(jié)束語
在這篇文章里,我使用多任務(wù)協(xié)作構(gòu)建了一個任務(wù)調(diào)度器, 其中包括執(zhí)行“系統(tǒng)調(diào)用”, 做非阻塞操作和處理錯誤. 所有這些里真正很酷的事情是任務(wù)的結(jié)果代碼看起來完全同步, 甚至任務(wù)正在執(zhí)行大量的異步操作的時候也是這樣.
如果你打算從套接口讀取數(shù)據(jù)的話, 你將不需要傳遞某個回調(diào)函數(shù)或者注冊一個事件偵聽器. 相反, 你只要書寫yield $socket->read(). 這兒大部分都是你常常也要編寫的,只 在它的前面增加yield.
當(dāng)我第一次聽到協(xié)程的時候, 我發(fā)現(xiàn)這個概念完全令人折服, 正是因為這個激勵我在PHP中實現(xiàn)了它. 同時我發(fā)現(xiàn)協(xié)程真正非常的令人驚嘆:在令人敬畏的代碼和一大堆亂代碼之間只有一線之隔, 我認(rèn)為協(xié)程恰好處在這條線上, 不多不少. 不過, 要說使用上面所述的方法書寫異步代碼是否真的有益, 這個就見仁見智了.
但, 不管咋樣, 我認(rèn)為這是一個有趣的話題, 而且我希望你也能找到它的樂趣. 歡迎評論:)