PHP 協(xié)程實現(xiàn)

多進程/線程

最早的服務(wù)器端程序都是通過多進程、多線程來解決并發(fā)IO的問題镐捧。進程模型出現(xiàn)的最早支救,從Unix 系統(tǒng)誕生就開始有了進程的概念。最早的服務(wù)器端程序一般都是 Accept 一個客戶端連接就創(chuàng)建一個進程贤斜,然后子進程進入循環(huán)同步阻塞地與客戶端連接進行交互策吠,收發(fā)處理數(shù)據(jù)。

多線程模式出現(xiàn)要晚一些瘩绒,線程與進程相比更輕量猴抹,而且線程之間共享內(nèi)存堆棧,所以不同的線程之間交互非常容易實現(xiàn)锁荔。比如實現(xiàn)一個聊天室蟀给,客戶端連接之間可以交互,聊天室中的玩家可以任意的其他人發(fā)消息堕战。用多線程模式實現(xiàn)非常簡單坤溃,線程中可以直接向某一個客戶端連接發(fā)送數(shù)據(jù)。而多進程模式就要用到管道嘱丢、消息隊列薪介、共享內(nèi)存等等統(tǒng)稱進程間通信(IPC)復(fù)雜的技術(shù)才能實現(xiàn)。

最簡單的多進程服務(wù)端模型

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) 
or die("Create server failed");

![image.png](http://upload-images.jianshu.io/upload_images/4686383-a16cd123fd865f10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

while(1) {
    $conn = stream_socket_accept($serv);
    if (pcntl_fork() == 0) {
        $request = fread($conn);
        // do something
        // $response = "hello world";
        fwrite($response);
        fclose($conn);
        exit(0);
    }
}

多進程/線程模型的流程是:

創(chuàng)建一個 socket越驻,綁定服務(wù)器端口(bind)汁政,監(jiān)聽端口(listen)道偷,在 PHP 中用 stream_socket_server 一個函數(shù)就能完成上面 3 個步驟,當(dāng)然也可以使用更底層的sockets 擴展分別實現(xiàn)记劈。

進入 while 循環(huán)勺鸦,阻塞在 accept 操作上,等待客戶端連接進入目木。此時程序會進入隨眠狀態(tài)换途,直到有新的客戶端發(fā)起 connect 到服務(wù)器,操作系統(tǒng)會喚醒此進程刽射。accept 函數(shù)返回客戶端連接的 socket 主進程在多進程模型下通過 fork(php: pcntl_fork)創(chuàng)建子進程军拟,多線程模型下使用 pthread_create(php: new Thread)創(chuàng)建子線程。

下文如無特殊聲明將使用進程同時表示進程/線程誓禁。

子進程創(chuàng)建成功后進入 while 循環(huán)懈息,阻塞在 recv(php:fread)調(diào)用上,等待客戶端向服務(wù)器發(fā)送數(shù)據(jù)摹恰。收到數(shù)據(jù)后服務(wù)器程序進行處理然后使用 send(php: fwrite)向客戶端發(fā)送響應(yīng)辫继。長連接的服務(wù)會持續(xù)與客戶端交互,而短連接服務(wù)一般收到響應(yīng)就會 close俗慈。

當(dāng)客戶端連接關(guān)閉時姑宽,子進程退出并銷毀所有資源,主進程會回收掉此子進程闺阱。

image.png

這種模式最大的問題是低千,進程創(chuàng)建和銷毀的開銷很大。所以上面的模式?jīng)]辦法應(yīng)用于非常繁忙的服務(wù)器程序馏颂。對應(yīng)的改進版解決了此問題,這就是經(jīng)典的 Leader-Follower 模型棋傍。

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) 
or die("Create server failed");

for($i = 0; $i < 32; $i++) {
    if (pcntl_fork() == 0) {
        while(1) {
            $conn = stream_socket_accept($serv);
            if ($conn == false) continue;
            // do something
            $request = fread($conn);
            // $response = "hello world";
            fwrite($response);
            fclose($conn);
        }
        exit(0);
    }
}

它的特點是程序啟動后就會創(chuàng)建 N 個進程救拉。每個子進程進入 Accept,等待新的連接進入瘫拣。當(dāng)客戶端連接到服務(wù)器時亿絮,其中一個子進程會被喚醒,開始處理客戶端請求麸拄,并且不再接受新的 TCP 連接派昧。當(dāng)此連接關(guān)閉時,子進程會釋放拢切,重新進入 Accept蒂萎,參與處理新的連接。

這個模型的優(yōu)勢是完全可以復(fù)用進程淮椰,沒有額外消耗五慈,性能非常好纳寂。很多常見的服務(wù)器程序都是基于此模型的,比如 Apache泻拦、PHP-FPM毙芜。

多進程模型也有一些缺點。

這種模型嚴(yán)重依賴進程的數(shù)量解決并發(fā)問題争拐,一個客戶端連接就需要占用一個進程腋粥,工作進程的數(shù)量有多少,并發(fā)處理能力就有多少架曹。操作系統(tǒng)可以創(chuàng)建的進程數(shù)量是有限的隘冲。

啟動大量進程會帶來額外的進程調(diào)度消耗。數(shù)百個進程時可能進程上下文切換調(diào)度消耗占 CPU 不到1%可以忽略不接音瓷,如果啟動數(shù)千甚至數(shù)萬個進程对嚼,消耗就會直線上升。調(diào)度消耗可能占到 CPU 的百分之幾十甚至 100%绳慎。

并行和并發(fā)

談到多進程以及類似同時執(zhí)行多個任務(wù)的模型纵竖,就不得不先談?wù)劜⑿泻筒l(fā)。

并發(fā)(Concurrency)

是指能處理多個同時性活動的能力杏愤,并發(fā)事件之間不一定要同一時刻發(fā)生靡砌。

并行(Parallesim)

是指同時發(fā)生的兩個并發(fā)事件,具有并發(fā)的含義珊楼,而并發(fā)則不一定并行通殃。

區(qū)別

  • 『并發(fā)』指的是程序的結(jié)構(gòu),『并行』指的是程序運行時的狀態(tài)
  • 『并行』一定是并發(fā)的厕宗,『并行』是『并發(fā)』設(shè)計的一種
  • 單線程永遠(yuǎn)無法達到『并行』狀態(tài)

正確的并發(fā)設(shè)計的標(biāo)準(zhǔn)是:

使多個操作可以在重疊的時間段內(nèi)進行画舌。
two tasks can start, run, and complete in overlapping time periods

參考:

迭代器 & 生成器

在了解 PHP 協(xié)程前,還有 迭代器生成器 這兩個概念需要先認(rèn)識一下已慢。

迭代器

PHP5 開始內(nèi)置了 Iterator 即迭代器接口曲聂,所以如果你定義了一個類,并實現(xiàn)了Iterator 接口佑惠,那么你的這個類對象就是 ZEND_ITER_OBJECT 即可迭代的朋腋,否則就是 ZEND_ITER_PLAIN_OBJECT

對于 ZEND_ITER_PLAIN_OBJECT 的類膜楷,foreach 會獲取該對象的默認(rèn)屬性數(shù)組旭咽,然后對該數(shù)組進行迭代。

而對于 ZEND_ITER_OBJECT 的類對象赌厅,則會通過調(diào)用對象實現(xiàn)的 Iterator 接口相關(guān)函數(shù)來進行迭代穷绵。

任何實現(xiàn)了 Iterator 接口的類都是可迭代的,即都可以用 foreach 語句來遍歷察蹲。

Iterator 接口

interface Iterator extends Traversable
{
    // 獲取當(dāng)前內(nèi)部標(biāo)量指向的元素的數(shù)據(jù)
    public mixed current()

    // 獲取當(dāng)前標(biāo)量
    public scalar key()

    // 移動到下一個標(biāo)量
    public void next()

    // 重置標(biāo)量
    public void rewind()

    // 檢查當(dāng)前標(biāo)量是否有效
    public boolean valid()
}

常規(guī)實現(xiàn) range 函數(shù)

PHP 自帶的 range 函數(shù)原型:

range — 根據(jù)范圍創(chuàng)建數(shù)組请垛,包含指定的元素

array range (mixed $start , mixed $end [, number $step = 1 ])

建立一個包含指定范圍單元的數(shù)組催训。

在不使用迭代器的情況要實現(xiàn)一個和 PHP 自帶的 range 函數(shù)類似的功能,可能會這么寫:

function range ($start, $end, $step = 1)
{
    $ret = [];
    
    for ($i = $start; $i <= $end; $i += $step) {
        $ret[] = $i;
    }
    
    return $ret;
}

需要將生成的所有元素放在內(nèi)存數(shù)組中宗收,如果需要生成一個非常大的集合漫拭,則會占用巨大的內(nèi)存。

迭代器實現(xiàn) xrange 函數(shù)

來看看迭代實現(xiàn)的 range混稽,我們叫做 xrange采驻,他實現(xiàn)了 Iterator 接口必須的 5 個方法:

class Xrange implements Iterator
{
    protected $start;
    protected $limit;
    protected $step;
    protected $current;

    public function __construct($start, $limit, $step = 1)
    {
        $this->start = $start;
        $this->limit = $limit;
        $this->step  = $step;
    }

    public function rewind()
    {
        $this->current = $this->start;
    }

    public function next()
    {
        $this->current += $this->step;
    }

    public function current()
    {
        return $this->current;
    }

    public function key()
    {
        return $this->current + 1;
    }

    public function valid()
    {
        return $this->current <= $this->limit;
    }
}

使用時代碼如下:

foreach (new Xrange(0, 9) as $key => $val) {
    echo $key, ' ', $val, "\n";
}

輸出:

0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9

看上去功能和 range() 函數(shù)所做的一致,不同點在于迭代的是一個 對象(Object) 而不是數(shù)組:

var_dump(new Xrange(0, 9));

輸出:

object(Xrange)#1 (4) {
  ["start":protected]=>
  int(0)
  ["limit":protected]=>
  int(9)
  ["step":protected]=>
  int(1)
  ["current":protected]=>
  NULL
}

另外匈勋,內(nèi)存的占用情況也完全不同:

// range
$startMemory = memory_get_usage();
$arr = range(0, 500000);
echo 'range(): ', memory_get_usage() - $startMemory, " bytes\n";

unset($arr);

// xrange
$startMemory = memory_get_usage();
$arr = new Xrange(0, 500000);
echo 'xrange(): ', memory_get_usage() - $startMemory, " bytes\n";

輸出:

xrange(): 624 bytes
range(): 72194784 bytes

range() 函數(shù)在執(zhí)行后占用了 50W 個元素內(nèi)存空間礼旅,而 xrange 對象在整個迭代過程中只占用一個對象的內(nèi)存。

Yii2 Query

在喜聞樂見的各種 PHP 框架里有不少生成器的實例洽洁,比如 Yii2 中用來構(gòu)建 SQL 語句的 \yii\db\Query 類:

$query = (new \yii\db\Query)->from('user');
// yii\db\BatchQueryResult
foreach ($query->batch() as $users) {
    // 每次循環(huán)得到多條 user 記錄
}

來看一下 batch() 做了什么:

/**
* Starts a batch query.
*
* A batch query supports fetching data in batches, which can keep the memory usage under a limit.
* This method will return a [[BatchQueryResult]] object which implements the [[\Iterator]] interface
* and can be traversed to retrieve the data in batches.
*
* For example,
*
*
* $query = (new Query)->from('user');
* foreach ($query->batch() as $rows) {
*     // $rows is an array of 10 or fewer rows from user table
* }
*
*
* @param integer $batchSize the number of records to be fetched in each batch.
* @param Connection $db the database connection. If not set, the "db" application component will be used.
* @return BatchQueryResult the batch query result. It implements the [[\Iterator]] interface
* and can be traversed to retrieve the data in batches.
*/
public function batch($batchSize = 100, $db = null)
{
   return Yii::createObject([
       'class' => BatchQueryResult::className(),
       'query' => $this,
       'batchSize' => $batchSize,
       'db' => $db,
       'each' => false,
   ]);
}

實際上返回了一個 BatchQueryResult 類痘系,類的源碼實現(xiàn)了 Iterator 接口 5 個關(guān)鍵方法:

class BatchQueryResult extends Object implements \Iterator
{
    public $db;
    public $query;
    public $batchSize = 100;
    public $each = false;
    private $_dataReader;
    private $_batch;
    private $_value;
    private $_key;


    /**
     * Destructor.
     */
    public function __destruct()
    {
        // make sure cursor is closed
        $this->reset();
    }

    /**
     * Resets the batch query.
     * This method will clean up the existing batch query so that a new batch query can be performed.
     */
    public function reset()
    {
        if ($this->_dataReader !== null) {
            $this->_dataReader->close();
        }
        $this->_dataReader = null;
        $this->_batch = null;
        $this->_value = null;
        $this->_key = null;
    }

    /**
     * Resets the iterator to the initial state.
     * This method is required by the interface [[\Iterator]].
     */
    public function rewind()
    {
        $this->reset();
        $this->next();
    }

    /**
     * Moves the internal pointer to the next dataset.
     * This method is required by the interface [[\Iterator]].
     */
    public function next()
    {
        if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) {
            $this->_batch = $this->fetchData();
            reset($this->_batch);
        }

        if ($this->each) {
            $this->_value = current($this->_batch);
            if ($this->query->indexBy !== null) {
                $this->_key = key($this->_batch);
            } elseif (key($this->_batch) !== null) {
                $this->_key++;
            } else {
                $this->_key = null;
            }
        } else {
            $this->_value = $this->_batch;
            $this->_key = $this->_key === null ? 0 : $this->_key + 1;
        }
    }

    /**
     * Fetches the next batch of data.
     * @return array the data fetched
     */
    protected function fetchData()
    {
        // ...
    }

    /**
     * Returns the index of the current dataset.
     * This method is required by the interface [[\Iterator]].
     * @return integer the index of the current row.
     */
    public function key()
    {
        return $this->_key;
    }

    /**
     * Returns the current dataset.
     * This method is required by the interface [[\Iterator]].
     * @return mixed the current dataset.
     */
    public function current()
    {
        return $this->_value;
    }

    /**
     * Returns whether there is a valid dataset at the current position.
     * This method is required by the interface [[\Iterator]].
     * @return boolean whether there is a valid dataset at the current position.
     */
    public function valid()
    {
        return !empty($this->_batch);
    }
}

以迭代器的方式實現(xiàn)了類似分頁取的效果,同時避免了一次性取出所有數(shù)據(jù)占用太多的內(nèi)存空間饿自。

迭代器使用場景

  • 使用返回迭代器的包或庫時(如 PHP5 中的 SPL 迭代器)
  • 無法在一次調(diào)用獲取所需的所有元素時
  • 要處理數(shù)量巨大的元素時(數(shù)據(jù)庫中要處理的結(jié)果集內(nèi)容超過內(nèi)存)
  • ...

生成器

需要 PHP 5 >= 5.5.0 或 PHP 7

雖然迭代器僅需繼承接口即可實現(xiàn)汰翠,但畢竟需要定義一整個類然后實現(xiàn)接口的所有方法,實在是不怎么方便昭雌。

生成器則提供了一種更簡單的方式來實現(xiàn)簡單的對象迭代复唤,相比定義類來實現(xiàn) Iterator 接口的方式,性能開銷和復(fù)雜度大大降低烛卧。

生成器允許在 foreach 代碼塊中迭代一組數(shù)據(jù)而不需要創(chuàng)建任何數(shù)組佛纫。一個生成器函數(shù),就像一個普通的有返回值的自定義函數(shù)類似总放,但普通函數(shù)只返回一次, 而生成器可以根據(jù)需要通過 yield 關(guān)鍵字返回多次呈宇,以便連續(xù)生成需要迭代返回的值。

一個最簡單的例子就是使用生成器來重新實現(xiàn) xrange() 函數(shù)局雄。效果和上面我們用迭代器實現(xiàn)的差不多攒盈,但實現(xiàn)起來要簡單的多。

生成器實現(xiàn) xrange 函數(shù)

function xrange($start, $limit, $step = 1) {
    for ($i = 0; $i < $limit; $i += $step) { 
        yield $i + 1 => $i;
    }
}

foreach (xrange(0, 9) as $key => $val) {
    printf("%d %d \n", $key, $val);
}

// 輸出
// 1 0
// 2 1
// 3 2
// 4 3
// 5 4
// 6 5
// 7 6
// 8 7
// 9 8

實際上生成器生成的正是一個迭代器對象實例哎榴,該迭代器對象繼承了 Iterator 接口,同時也包含了生成器對象自有的接口僵蛛,具體可以參考 Generator 類的定義以及語法參考尚蝌。

同時需要注意的是:

一個生成器不可以返回值,這樣做會產(chǎn)生一個編譯錯誤充尉。然而 return 空是一個有效的語法并且它將會終止生成器繼續(xù)執(zhí)行飘言。

yield 關(guān)鍵字

需要注意的是 yield 關(guān)鍵字,這是生成器的關(guān)鍵驼侠。通過上面的例子可以看出姿鸿,yield 會將當(dāng)前產(chǎn)生的值傳遞給 foreach谆吴,換句話說,foreach 每一次迭代過程都會從 yield 處取一個值苛预,直到整個遍歷過程不再能執(zhí)行到 yield 時遍歷結(jié)束句狼,此時生成器函數(shù)簡單的退出,而調(diào)用生成器的上層代碼還可以繼續(xù)執(zhí)行热某,就像一個數(shù)組已經(jīng)被遍歷完了腻菇。

yield 最簡單的調(diào)用形式看起來像一個 return 申明,不同的是 yield 暫停當(dāng)前過程的執(zhí)行并返回值昔馋,而 return 是中斷當(dāng)前過程并返回值筹吐。暫停當(dāng)前過程,意味著將處理權(quán)轉(zhuǎn)交由上一級繼續(xù)進行秘遏,直到上一級再次調(diào)用被暫停的過程丘薛,該過程又會從上一次暫停的位置繼續(xù)執(zhí)行。這像是什么呢邦危?如果之前已經(jīng)在鳥哥的文章中粗略看過洋侨,應(yīng)該知道這很像操作系統(tǒng)的進程調(diào)度,多個進程在一個 CPU 核心上執(zhí)行铡俐,在系統(tǒng)調(diào)度下每一個進程執(zhí)行一段指令就被暫停凰兑,切換到下一個進程,這樣外部用戶看起來就像是同時在執(zhí)行多個任務(wù)审丘。

但僅僅如此還不夠吏够,yield 除了可以返回值以外,還能接收值滩报,也就是可以在兩個層級間實現(xiàn)雙向通信锅知。

來看看如何傳遞一個值給 yield

function printer()
{
    while (true) {
        printf("receive: %s\n", yield);
    }
}

$printer = printer();

$printer->send('hello');
$printer->send('world');

// 輸出
receive: hello
receive: world

根據(jù) PHP 官方文檔的描述可以知道 Generator 對象除了實現(xiàn) Iterator 接口中的必要方法以外,還有一個 send 方法脓钾,這個方法就是向 yield 語句處傳遞一個值售睹,同時從 yield 語句處繼續(xù)執(zhí)行,直至再次遇到 yield 后控制權(quán)回到外部可训。

既然 yield 可以在其位置中斷并返回或者接收一個值昌妹,那能不能同時進行接收返回呢?當(dāng)然握截,這也是實現(xiàn)協(xié)程的根本飞崖。對上述代碼做出修改:

function printer()
{
    $i = 0;
    while (true) {
        printf("receive: %s\n", (yield ++$i));
    }
}

$printer = printer();

printf("%d\n", $printer->current());
$printer->send('hello');
printf("%d\n", $printer->current());
$printer->send('world');
printf("%d\n", $printer->current());

// 輸出
1
receive: hello
2
receive: world
3

這是另一個例子:

function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}
 
$gen = gen();
var_dump($gen->current());    // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1"   (第一個 var_dump)
                              // string(6) "yield2" (繼續(xù)執(zhí)行到第二個 yield,吐出了返回值)
var_dump($gen->send('ret2')); // string(4) "ret2"   (第二個 var_dump)
                              // NULL (var_dump 之后沒有其他語句谨胞,所以這次 ->send() 的返回值為 null)

current 方法是迭代器 Iterator 接口必要的方法固歪,foreach 語句每一次迭代都會通過其獲取當(dāng)前值,而后調(diào)用迭代器的 next 方法胯努。在上述例子里則是手動調(diào)用了 current 方法獲取值牢裳。

上述例子已經(jīng)足以表示 yield 能夠作為實現(xiàn)雙向通信的工具逢防,也就是具備了后續(xù)實現(xiàn)協(xié)程的基本條件。

上面的例子如果第一次接觸并稍加思考蒲讯,不免會疑惑為什么一個 yield 既是語句又是表達式忘朝,而且這兩種情況還同時存在:

  • 對于所有在生成器函數(shù)中出現(xiàn)的 yield,首先它都是語句伶椿,而跟在 yield 后面的任何表達式的值將作為調(diào)用生成器函數(shù)的返回值辜伟,如果 yield 后面沒有任何表達式(變量、常量都是表達式)脊另,那么它會返回 NULL导狡,這一點和 return 語句一致。
  • yield 也是表達式偎痛,它的值就是 send 函數(shù)傳過來的值(相當(dāng)于一個特殊變量旱捧,只不過賦值是通過 send 函數(shù)進行的)。只要調(diào)用send方法踩麦,并且生成器對象的迭代并未終結(jié)枚赡,那么當(dāng)前位置的 yield 就會得到 send 方法傳遞過來的值,這和生成器函數(shù)有沒有把這個值賦值給某個變量沒有任何關(guān)系谓谦。

這個地方可能需要仔細(xì)品味上面兩個 send() 方法的例子才能理解贫橙。但可以簡單的記住:

任何時候 yield 關(guān)鍵詞即是語句:可以為生成器函數(shù)返回值反粥;
也是表達式:可以接收生成器對象發(fā)過來的值卢肃。

除了 send() 方法,還有一種控制生成器執(zhí)行的方法是 next() 函數(shù):

  • Next()才顿,恢復(fù)生成器函數(shù)的執(zhí)行直到下一個 yield
  • Send()莫湘,向生成器傳入一個值,恢復(fù)執(zhí)行直到下一個 yield

協(xié)程

對于單核處理器郑气,多進程實現(xiàn)多任務(wù)的原理是讓操作系統(tǒng)給一個任務(wù)每次分配一定的 CPU 時間片幅垮,然后中斷、讓下一個任務(wù)執(zhí)行一定的時間片接著再中斷并繼續(xù)執(zhí)行下一個尾组,如此反復(fù)忙芒。由于切換執(zhí)行任務(wù)的速度非常快讳侨,給外部用戶的感受就是多個任務(wù)的執(zhí)行是同時進行的匕争。

多進程的調(diào)度是由操作系統(tǒng)來實現(xiàn)的,進程自身不能控制自己何時被調(diào)度爷耀,也就是說:

進程的調(diào)度是由外層調(diào)度器搶占式實現(xiàn)的

協(xié)程要求當(dāng)前正在運行的任務(wù)自動把控制權(quán)回傳給調(diào)度器,這樣就可以繼續(xù)運行其他任務(wù)拍皮。這與『搶占式』的多任務(wù)正好相反, 搶占多任務(wù)的調(diào)度器可以強制中斷正在運行的任務(wù), 不管它自己有沒有意愿歹叮∨芎迹『協(xié)作式多任務(wù)』在 Windows 的早期版本 (windows95) 和 Mac OS 中有使用, 不過它們后來都切換到『搶占式多任務(wù)』了普监。理由相當(dāng)明確:如果僅依靠程序自動交出控制的話躏鱼,那么一些惡意程序?qū)苋菀渍加萌?CPU 時間而不與其他任務(wù)共享。

協(xié)程的調(diào)度是由協(xié)程自身主動讓出控制權(quán)到外層調(diào)度器實現(xiàn)的

回到剛才生成器實現(xiàn) xrange 函數(shù)的例子庶灿,整個執(zhí)行過程的交替可以用下圖來表示:

image.png

協(xié)程可以理解為純用戶態(tài)的線程萨螺,通過協(xié)作而不是搶占來進行任務(wù)切換窄做。相對于進程或者線程,協(xié)程所有的操作都可以在用戶態(tài)而非操作系統(tǒng)內(nèi)核態(tài)完成慰技,創(chuàng)建和切換的消耗非常低椭盏。

簡單的說 Coroutine(協(xié)程) 就是提供一種方法來中斷當(dāng)前任務(wù)的執(zhí)行,保存當(dāng)前的局部變量吻商,下次再過來又可以恢復(fù)當(dāng)前局部變量繼續(xù)執(zhí)行掏颊。

我們可以把大任務(wù)拆分成多個小任務(wù)輪流執(zhí)行,如果有某個小任務(wù)在等待系統(tǒng) IO艾帐,就跳過它乌叶,執(zhí)行下一個小任務(wù),這樣往復(fù)調(diào)度柒爸,實現(xiàn)了 IO 操作和 CPU 計算的并行執(zhí)行准浴,總體上就提升了任務(wù)的執(zhí)行效率,這也便是協(xié)程的意義捎稚。

PHP 協(xié)程和 yield

PHP 從 5.5 開始支持生成器及 yield 關(guān)鍵字乐横,而 PHP 協(xié)程則由 yield 來實現(xiàn)。

要理解協(xié)程阳藻,首先要理解:代碼是代碼晰奖,函數(shù)是函數(shù)。函數(shù)包裹的代碼賦予了這段代碼附加的意義:不管是否顯式的指明返回值腥泥,當(dāng)函數(shù)內(nèi)的代碼塊執(zhí)行完后都會返回到調(diào)用層匾南。而當(dāng)調(diào)用層調(diào)用某個函數(shù)的時候,必須等這個函數(shù)返回蛔外,當(dāng)前函數(shù)才能繼續(xù)執(zhí)行蛆楞,這就構(gòu)成了后進先出,也就是 Stack夹厌。

而協(xié)程包裹的代碼豹爹,不是函數(shù),不完全遵守函數(shù)的附加意義矛纹,協(xié)程執(zhí)行到某個點臂聋,協(xié)會協(xié)程會 yield 返回一個值然后掛起,而不是 return 一個值然后結(jié)束,當(dāng)再次調(diào)用協(xié)程的時候孩等,會在上次 yield 的點繼續(xù)執(zhí)行艾君。

所以協(xié)程違背了通常操作系統(tǒng)和 x86 的 CPU 認(rèn)定的代碼執(zhí)行方式,也就是 Stack 的這種執(zhí)行方式肄方,需要運行環(huán)境(比如 php冰垄,python 的 yield 和 golang 的 goroutine)自己調(diào)度,來實現(xiàn)任務(wù)的中斷和恢復(fù)权她,具體到 PHP虹茶,就是靠 yield 來實現(xiàn)。

堆棧式調(diào)用協(xié)程調(diào)用的對比:

image.png

結(jié)合之前的例子隅要,可以總結(jié)一下 yield 能做的就是:

  • 實現(xiàn)不同任務(wù)間的主動讓位蝴罪、讓行,把控制權(quán)交回給任務(wù)調(diào)度器拾徙。
  • 通過 send() 實現(xiàn)不同任務(wù)間的雙向通信洲炊,也就可以實現(xiàn)任務(wù)和調(diào)度器之間的通信。

yield 就是 PHP 實現(xiàn)協(xié)程的方式尼啡。

協(xié)程多任務(wù)調(diào)度

下面是雄文 Cooperative multitasking using coroutines (in PHP!) 里一個簡單但完整的例子暂衡,來展示如何具體的在 PHP 里實現(xiàn)協(xié)程任務(wù)的調(diào)度。

首先是一個任務(wù)類:

Task

class Task
{
    // 任務(wù) ID
    protected $taskId;
    // 協(xié)程對象
    protected $coroutine;
    // send() 值
    protected $sendVal = null;
    // 是否首次 yield
    protected $beforeFirstYield = true;

    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }
    
    public function getTaskId() {
        return $this->taskId;
    }

    public function setSendValue($sendVal) {
        $this->sendVal = $sendVal;
    }

    public function run() {
        // 如之前提到的在send之前, 當(dāng)?shù)鞅粍?chuàng)建后第一次 yield 之前崖瞭,一個 renwind() 方法會被隱式調(diào)用
        // 所以實際上發(fā)生的應(yīng)該類似:
        // $this->coroutine->rewind();
        // $this->coroutine->send();
         
        // 這樣 renwind 的執(zhí)行將會導(dǎo)致第一個 yield 被執(zhí)行, 并且忽略了他的返回值.
        // 真正當(dāng)我們調(diào)用 yield 的時候, 我們得到的是第二個yield的值狂巢,導(dǎo)致第一個yield的值被忽略。
        // 所以這個加上一個是否第一次 yield 的判斷來避免這個問題
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendVal);
            $this->sendVal = null;
            return $retval;
        }
    }

    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

接下來是調(diào)度器书聚,比 foreach 是要復(fù)雜一點唧领,但好歹也能算個正兒八經(jīng)的 Scheduler :)

Scheduler

class Scheduler
{
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;
 
    public function __construct() {
        $this->taskQueue = new SplQueue();
    }
 
    // (使用下一個空閑的任務(wù)id)創(chuàng)建一個新任務(wù),然后把這個任務(wù)放入任務(wù)map數(shù)組里. 接著它通過把任務(wù)放入任務(wù)隊列里來實現(xiàn)對任務(wù)的調(diào)度. 接著run()方法掃描任務(wù)隊列, 運行任務(wù).如果一個任務(wù)結(jié)束了, 那么它將從隊列里刪除, 否則它將在隊列的末尾再次被調(diào)度。
    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }
 
    public function schedule(Task $task) {
        // 任務(wù)入隊
        $this->queue->enqueue($task);
    }
 
    public function run() {
        while (!$this->queue->isEmpty()) {
            // 任務(wù)出隊
            $task = $this->queue->dequeue();
            $task->run();
 
            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}

隊列可以使每個任務(wù)獲得同等的 CPU 使用時間雌续,

Demo

function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}
 
function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}
    
$scheduler = new Scheduler;
 
$scheduler->newTask(task1());
$scheduler->newTask(task2());
 
$scheduler->run();

輸出:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

結(jié)果正是我們期待的斩个,最初的 5 次迭代,兩個任務(wù)是交替進行的驯杜,而在第二個任務(wù)結(jié)束后受啥,只有第一個任務(wù)繼續(xù)執(zhí)行到結(jié)束。

協(xié)程非阻塞 IO

若想真正的發(fā)揮出協(xié)程的作用鸽心,那一定是在一些涉及到阻塞 IO 的場景滚局,我們都知道 Web 服務(wù)器最耗時的部分通常都是 socket 讀取數(shù)據(jù)等操作上,如果進程對每個請求都掛起的等待 IO 操作顽频,那處理效率就太低了藤肢,接下來我們看個支持非阻塞 IO 的 Scheduler:

<?php

class Scheduler
{
    protected $maxTaskId = 0;
    protected $tasks = []; // taskId => task
    protected $queue;

    // resourceID => [socket, tasks]
    protected $waitingForRead = [];
    protected $waitingForWrite = [];
 
    public function __construct() {
        // SPL 隊列
        $this->queue = new SplQueue();
    }
 
    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->tasks[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }
 
    public function schedule(Task $task) {
        // 任務(wù)入隊
        $this->queue->enqueue($task);
    }
 
    public function run() {
        while (!$this->queue->isEmpty()) {
            // 任務(wù)出隊
            $task = $this->queue->dequeue();
            $task->run();
 
            if ($task->isFinished()) {
                unset($this->tasks[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }

    public function waitForRead($socket, Task $task)
    {
        if (isset($this->waitingForRead[(int)$socket])) {
            $this->waitingForRead[(int)$socket][1][] = $task;
        } else {
            $this->waitingForRead[(int)$socket] = [$socket, [$task]];
        }
    }

    public function waitForWrite($socket, Task $task)
    {
        if (isset($this->waitingForWrite[(int)$socket])) {
            $this->waitingForWrite[(int)$socket][1][] = $task;
        } else {
            $this->waitingForWrite[(int)$socket] = [$socket, [$task]];
        }
    }

    /**
     * @param $timeout 0 represent
     */
    protected function ioPoll($timeout)
    {
        $rSocks = [];
        foreach ($this->waitingForRead as list($socket)) {
            $rSocks[] = $socket;
        }

        $wSocks = [];
        foreach ($this->waitingForWrite as list($socket)) {
            $wSocks[] = $socket;
        }

        $eSocks = [];
        // $timeout 為 0 時, stream_select 為立即返回,為 null 時則會阻塞的等糯景,見 http://php.net/manual/zh/function.stream-select.php
        if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
            return;
        }

        foreach ($rSocks as $socket) {
            list(, $tasks) = $this->waitingForRead[(int)$socket];
            unset($this->waitingForRead[(int)$socket]);

            foreach ($tasks as $task) {
                $this->schedule($task);
            }
        }

        foreach ($wSocks as $socket) {
            list(, $tasks) = $this->waitingForWrite[(int)$socket];
            unset($this->waitingForWrite[(int)$socket]);

            foreach ($tasks as $task) {
                $this->schedule($task);
            }
        }
    }

    /**
     * 檢查隊列是否為空嘁圈,若為空則掛起的執(zhí)行 stream_select省骂,否則檢查完 IO 狀態(tài)立即返回,詳見 ioPoll()
     * 作為任務(wù)加入隊列后最住,由于 while true冀宴,會被一直重復(fù)的加入任務(wù)隊列,實現(xiàn)每次任務(wù)前檢查 IO 狀態(tài)
     * @return Generator object for newTask
     *
     */
    protected function ioPollTask()
    {
        while (true) {
            if ($this->taskQueue->isEmpty()) {
                $this->ioPoll(null);
            } else {
                $this->ioPoll(0);
            }
            yield;
        }
    }

    /**
     * $scheduler = new Scheduler;
     * $scheduler->newTask(Web Server Generator);
     * $scheduler->withIoPoll()->run();
     *
     * 新建 Web Server 任務(wù)后先執(zhí)行 withIoPoll() 將 ioPollTask() 作為任務(wù)入隊
     * 
     * @return $this
     */
    public function withIoPoll()
    {
        $this->newTask($this->ioPollTask());
        return $this;
    }
}

這個版本的 Scheduler 里加入一個永不退出的任務(wù)温学,并且通過 stream_select 支持的特性來實現(xiàn)快速的來回檢查各個任務(wù)的 IO 狀態(tài),只有 IO 完成的任務(wù)才會繼續(xù)執(zhí)行甚疟,而 IO 還未完成的任務(wù)則會跳過仗岖,完整的代碼和例子可以戳這里

也就是說任務(wù)交替執(zhí)行的過程中览妖,一旦遇到需要 IO 的部分轧拄,調(diào)度器就會把 CPU 時間分配給不需要 IO 的任務(wù),等到當(dāng)前任務(wù)遇到 IO 或者之前的任務(wù) IO 結(jié)束才再次調(diào)度 CPU 時間讽膏,以此實現(xiàn) CPU 和 IO 并行來提升執(zhí)行效率檩电,類似下圖:

image.png

單任務(wù)改造

如果想將一個單進程任務(wù)改造成并發(fā)執(zhí)行,我們可以選擇改造成多進程或者協(xié)程:

  • 多進程府树,不改變?nèi)蝿?wù)執(zhí)行的整體過程俐末,在一個時間段內(nèi)同時執(zhí)行多個相同的代碼段,調(diào)度權(quán)在 CPU奄侠,如果一個任務(wù)能獨占一個 CPU 則可以實現(xiàn)并行卓箫。
  • 協(xié)程,把原有任務(wù)拆分成多個小任務(wù)垄潮,原有任務(wù)的執(zhí)行流程被改變烹卒,調(diào)度權(quán)在進程自己,如果有 IO 并且可以實現(xiàn)異步弯洗,則可以實現(xiàn)并行旅急。

多進程改造

image.png

協(xié)程改造

image.png

協(xié)程(Coroutines)和 Go 協(xié)程(Goroutines)

PHP 的協(xié)程或者其他語言中,比如 Python牡整、Lua 等都有協(xié)程的概念藐吮,和 Go 協(xié)程有些相似,不過有兩點不同:

  • Go 協(xié)程意味著并行(或者可以以并行的方式部署果正,可以用 runtime.GOMAXPROCS() 指定可同時使用的 CPU 個數(shù))炎码,協(xié)程一般來說只是并發(fā)。
  • Go 協(xié)程通過通道 channel 來通信秋泳;協(xié)程通過 yield 讓出和恢復(fù)操作來通信潦闲。

Go 協(xié)程比普通協(xié)程更強大,也很容易從協(xié)程的邏輯復(fù)用到 Go 協(xié)程迫皱,而且在 Go 的開發(fā)中也使用的極為普遍歉闰,有興趣的話可以了解一下作為對比辖众。

結(jié)束

個人感覺 PHP 的協(xié)程在實際使用中想要徒手實現(xiàn)和應(yīng)用并不方便而且場景有限,但了解其概念及實現(xiàn)原理對更好的理解并發(fā)不無裨益和敬。

如果想更多的了解協(xié)程的實際應(yīng)用場景不妨試試已經(jīng)大名鼎鼎的 Swoole凹炸,其對多種協(xié)議的 client 做了底層的協(xié)程封裝,幾乎可以做到以同步編程的寫法實現(xiàn)協(xié)程異步 IO 的效果昼弟。

參考

關(guān)注 NewtonIO - 創(chuàng)造者們的技術(shù)與工具

image.png

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末啤它,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子舱痘,更是在濱河造成了極大的恐慌变骡,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件芭逝,死亡現(xiàn)場離奇詭異塌碌,居然都是意外死亡,警方通過查閱死者的電腦和手機旬盯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進店門台妆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人胖翰,你說我怎么就攤上這事接剩。” “怎么了泡态?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵搂漠,是天一觀的道長。 經(jīng)常有香客問我某弦,道長桐汤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任靶壮,我火速辦了婚禮怔毛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘腾降。我一直安慰自己拣度,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布螃壤。 她就那樣靜靜地躺著抗果,像睡著了一般。 火紅的嫁衣襯著肌膚如雪奸晴。 梳的紋絲不亂的頭發(fā)上冤馏,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天,我揣著相機與錄音寄啼,去河邊找鬼逮光。 笑死代箭,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的涕刚。 我是一名探鬼主播嗡综,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼杜漠!你這毒婦竟也來了极景?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤驾茴,失蹤者是張志新(化名)和其女友劉穎戴陡,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沟涨,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年异吻,在試婚紗的時候發(fā)現(xiàn)自己被綠了裹赴。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡诀浪,死狀恐怖棋返,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情雷猪,我是刑警寧澤睛竣,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站求摇,受9級特大地震影響射沟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜与境,卻給世界環(huán)境...
    茶點故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一验夯、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧摔刁,春花似錦挥转、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至拗引,卻和暖如春借宵,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背寺擂。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工暇务, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留泼掠,地道東北人。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓垦细,卻偏偏與公主長得像择镇,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子括改,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,585評論 2 359

推薦閱讀更多精彩內(nèi)容