Coroutine and Generator

Coroutine

先要知道什么是 Coroutine逆济,按照 Wikipedia 上的定義

Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.
When subroutines are invoked, execution begins at the start, and once a subroutine exits, it is finished; an instance of a subroutine only returns once, and does not hold state between invocations. By contrast, coroutines can exit by calling other coroutines, which may later return to the point where they were invoked in the original coroutine; from the coroutine's point of view, it is not exiting but calling another coroutine. Thus, a coroutine instance holds state, and varies between invocations; there can be multiple instances of a given coroutine at once.

可以這么理解鼠冕,平常的子程序(函數(shù))都是只有一個(gè)出口(通過 return), 一般都是 caller 調(diào)用 callee飘哨,callee 結(jié)束后返回到caller梅割。
而協(xié)程可以有多個(gè)出入口(yield 指明)继控,而且協(xié)程可以 yield 到其他協(xié)程求泰, 其他協(xié)程可以繼續(xù) yield 到其他協(xié)程(或者 yield 到上一個(gè)協(xié)程)

那么生成器(Generator)是什么呢卤唉?我們平常說的生成器實(shí)現(xiàn)了迭代器協(xié)議备畦,因此能作為迭代器使用低飒。但生成器本質(zhì)上是一種協(xié)程《危看 Wiki 上的解釋

Generators, also known as semicoroutines,[5] are also a generalisation of subroutines, but are more limited than coroutines. Specifically, while both of these can yield multiple times, suspending their execution and allowing re-entry at multiple entry points, they differ in that coroutines can control where execution continues after they yield, while generators cannot, instead transferring control back to the generator's caller.[6] That is, since generators are primarily used to simplify the writing of iterators, the yield statement in a generator does not specify a coroutine to jump to, but rather passes a value back to a parent routine.

我的理解是生成器是一種有限制的協(xié)程褥赊,其不能 yield 到其他協(xié)程,只能 yield 到 caller.
由于這個(gè)特性允粤,其十分適合作為迭代器崭倘,因此就常常作為迭代器使用了。

Yield keyword in Python

其他從 Python 對(duì)協(xié)程的支持我們也可以發(fā)現(xiàn)生成器就是協(xié)程类垫,他們?cè)?Python 中都是用關(guān)鍵詞 yield 定義司光。

Python 中的 yield 有這么三種用處:

  • pull:caller 從生成器獲得數(shù)據(jù)(next(gen) / yield item
  • push:協(xié)程從 caller 收到值 (gen.send(value) / item = yield)
  • tasks:協(xié)程與 caller 之間沒有數(shù)據(jù)交互,只是用來進(jìn)行流程控制

我覺得第三點(diǎn)流程控制才是協(xié)程的本質(zhì)悉患,傳輸數(shù)據(jù)只是一個(gè)自然而然的功能残家。

可以用 inspect.getgeneratorstate(...) 獲得協(xié)程狀態(tài)
共有四種狀態(tài)
'GEN_CREATED', 'GEN_RUNNING', 'GEN_SUSPENDED, 'GEN_CLOSED'

from inspect import getgeneratorstate

def simple_coro(a):
    """
    >>> coro = simple_coro(14)
    >>> coro
    <generator object simple_coro at 0x038A8A50>
    >>> getgeneratorstate(coro)
    'GEN_CREATED'
    >>> next(coro)  # 啟動(dòng)協(xié)程
    -> Started: a =  14
    14
    >>> getgeneratorstate(coro)
    'GEN_SUSPENDED'
    >>> coro.send(28) # 傳入 b
    -> Received: b =  28
    42
    >>> coro.send(99)
    -> Received: c =  99
    StopIteration ...
    >>> getgeneratorstate(coro)
    'GEN_CLOSED'
    """
    print('-> Started: a = ', a)
    b = yield a
    print('-> Received: b = ', b)
    c = yield a + b
    print('-> Received: c = ', c)
coroutine.jpg

Returning a Value from a Coroutine

有時(shí)候一個(gè)協(xié)程不是為了 yield value, 而是為了最后返回值
協(xié)程返回值時(shí)會(huì)拋出一個(gè) StopIteration (就跟迭代器完結(jié)一樣)

from collections import  namedtuple

Result = namedtuple('Result', 'count average')

def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break
        total += term
        count += 1
        average = total / count
    return Result(count, average)

---------------------
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(None) # 我們發(fā)現(xiàn)協(xié)程結(jié)束會(huì)拋出 StopIteration 異常, 就跟迭代器結(jié)束一樣
StopIteration: Result(count=1, average=10.0)
---------------------
>>> try:  # 我們要想獲得協(xié)程最后返回值必須這么做...
    coro_avg.send(None)
except StopIteration as exc:
    result = exc.value
>>> result
Result(count=1, average=10.0)

可以發(fā)現(xiàn)每次捕獲 StopIteration 很麻煩, 所以 Python 中 yield from 可以自動(dòng)幫助我們捕獲 StopIteration 并把協(xié)程返回值綁定到變量上 (具體 yield from 見下面)

def delegate():
    while True:
        yield (yield from averager())

--------------------
>>> coro_avg = delegate()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(None)
Result(count=1, average=10.0)

Yield From

yield from 在其他支持協(xié)程的語言中往往稱作 await,后者名字更利于理解售躁。
yield from 涉及到了如下三個(gè)對(duì)象:

  • caller
  • delegating generator: 包含 yield from <iterable> 的生成器
  • subgenerator: yield from <iterable> 中的iterable(常用生成器或迭代器)

兩個(gè)注意事項(xiàng):

  • Every arrangement of coroutines chained with yield from must be ultimately
    driven by a caller that is not a coroutine, which invokes next(…) or .send(…) on the outermost delegating generator, explicitly or implicitly (e.g., in a for loop).
  • The innermost subgenerator in the chain must be a simple generator that uses just yield—or an iterable object
yield from.jpg

一個(gè)用 yield from 的二叉樹中序遍歷

def chain(*iterables):
    """
    >>> chain('ABC', range(2))
    ['A', 'B', 'C', 0, 1]
    """
    for it in iterables:
        yield from it
from collections import  namedtuple

Result = namedtuple('Result', 'count average')

# the subgenerator
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break
        total += term
        count += 1
        average = total / count
    return Result(count, average)

# the delegating generator
def grouper(results, key):
    # 這里 while True 的目的是讓這個(gè)協(xié)程永遠(yuǎn)不退出, 
    # 因?yàn)閰f(xié)程退出時(shí)會(huì)拋出 StopIteration, 這樣必須在 caller 中捕獲
    while True:
        # print('here') 如果打印, 會(huì)發(fā)現(xiàn)打印了四次
        # 啟動(dòng)時(shí)打印一次坞淮,當(dāng) subgenerator 返回時(shí),delegating generator 蘇醒陪捷,又會(huì)打印一次
        results[key] = yield from averager()  # yield from 會(huì)自動(dòng)幫你啟動(dòng) subgenerator

# the caller
def main(data, results):
    for key, values in data.items():
        # 每次使用一個(gè)新的 grouper, 目的是有一個(gè)不同的 key
        # 可以只用一個(gè) grouper 的, 思路是將 key 也傳進(jìn)去回窘,見下面
        group = grouper(results, key)
        next(group)
        for value in values:
            # 這里的 value 都經(jīng)由 group, 發(fā)送到 averager 中去了
            # averager yield 的值也經(jīng)由 group 返回到 caller
            # group(delegating generator) 只是作為一個(gè)管道
            group.send(value)
        # 必須發(fā)送 None, 讓 averager 返回, 這里才能在 results 中設(shè)定值
        # 如果 刪掉下面那句, 會(huì)發(fā)現(xiàn)最后 results 為空字典
        group.send(None)

>>> data = {'a': [1, 2, 3, 4, 5], 'b': [6, 7, 8, 9, 10]}
>>> results ={}
>>> main(data, results)
>>> results
{'a': Result(count=5, average=3.0), 'b': Result(count=5, average=8.0)}
# the delegating generator
def grouper(results):
    while True:
        # print('here')  會(huì)打印 3 次
        key = yield
        results[key] = yield from averager()

# the caller
def main(data, results):
    group = grouper(results)
    next(group)
    for key, values in data.items():
        group.send(key)
        for value in values:
            group.send(value)
        group.send(None)

下面我們來具體看看 yield 三種功能的應(yīng)用:

Generator as Pipeline (Pull)

generator as pipeline.jpg
import time

def follow(thefile):
    thefile.seek(0, 2)  # Go to the end of the file
    while True:
        line = thefile.readline()
        if not line:
            time.sleep(0.1)
            continue
        yield line

def grep(pattern, lines):
    for line in lines:
        if pattern in line:
            yield line
            
logfile = open("sample-log")
loglines = follow(logfile)
pylines = grep("python", loglines)
for line in pylines:
    print(line)

Coroutine as Pipeline (Push)

coroutine as pipeline.jpg
import time

def coroutine(func):
    """Decorator for start coroutine"""
    def start(*args, **kwargs):
        coro = func(*args, **kwargs)
        next(coro)
        return coro
    return start

@coroutine
def follow(thefile, target):
    thefile.seek(0, 2)  # Go to the end of the file
    while True:
        line = thefile.readline()
        if not line:
            time.sleep(0.1)
            continue
        target.send(line)

@coroutine
def grep(pattern, target):
    while True:
        line = yield
        if pattern in line:
            target.send(line)
            
@coroutine
def printer():
    while True:
        line = yield
        print(line)

logfile = open("sample-log")
loglines = follow(logfile, grep("python", printer()))
broadcast.jpg
@coroutine
def broadcast(targets):
    while True:
        item = yield
        for target in targets:
            target.send(item)
            
logfile = open("sample-log")
loglines = follow(logfile, 
                            broadcast([grep("python", printer()),
                                              grep("java", printer()),
                                              grep("go", printer())]))
generator vs coroutine.jpg

Coroutine as Task

可以把 yield 看做 trap, 不過有個(gè)不同,trap 是把控制權(quán)交給操作系統(tǒng)市袖,而 yield 是把控制權(quán)交還給 caller啡直。

from queue import Queue

class Task(object):
    """Task is a wrapper around a coroutine"""
    taskid = 0
    def __init__(self, target):
        Task.taskid += 1
        self.tid = Task.taskid
        self.target = target  # target coroutine
        self.sendval = None  # sendval 為 system call 的結(jié)果
        
    def run(self):
        return self.target.send(self.sendval)
    

class SystemCall(object):
    def handle(self):
        pass
    
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)
        
class NewTask(SystemCall):
    def __init__(self, target):
        self.target = target
    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)
        
class KillTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid
    def handle(self):
        task = self.sched.taskmap.get(self.tid, None)
        if task:
            task.target.close()
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)
        
class WaitTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid
    def handle(self):
        result = self.sched.waitforexit(self.task, self.tid)
        self.task.sendval = result
        if not result:
            self.sched.schedule(self.task)

class Scheduler(object):
    """ Each task runs until it hits the yield
          scheduler regains control and switch to the other task
    """
    def __init__(self):
        self.ready = Queue()
        self.taskmap = {}
        self.exit_waiting = {}
        
    def new(self, target):
        """Create a new task and put them in ready queue"""
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid
    
    def exit(self, task):
        print("Taks {} terminated".format(task.tid))
        del self.taskmap[task.tid]
        # notify other tasks waiting for this task to exit
        for task in self.exit_waiting.pop(task.tid, []):
            self.schedule(task)
            
    def waitforexit(self, task, waittid):
        if waittid in self.taskmap:
            self.exit_waiting.setdefault(waittid, []).append(task)
            return True
        else:
            return False
        
    def schedule(self, task):
        """Put task in ready queue"""
        self.ready.put(task)
        
    def mainloop(self):
        while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result, SystemCall):
                    result.task = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)


def foo():
    print("foo start")
    mytid = yield GetTid()
    for i in range(2):
        print("foo id={}".format(mytid))
        yield

def bar():
    print("bar start")
    mytid = yield GetTid()
    for i in range(1):
        print("bar {}".format(mytid))
        yield
        
s = Scheduler()
s.new(foo())
s.new(bar())
s.mainloop()
---------------
foo start
bar start
foo id=1
bar 2
foo id=1
Taks 2 terminated
Taks 1 terminated

************

def main():
    child = yield NewTask(foo())
    yield
    yield KillTask(child)
    print("main done")

s = Scheduler()
s.new(main())
s.mainloop()
-----------------
foo start
foo id=4
Taks 4 terminated
main done
Taks 3 terminated

***********

def main():
    child = yield NewTask(foo())
    print("Waiting for child")
    yield WaitTask(child)
    print("Child done")
    
s = Scheduler()
s.new(main())
s.mainloop()
-----------------------
foo start
Waiting for child
foo id=6
foo id=6
Taks 6 terminated
Child done
Taks 5 terminated

TODO 后面 webserver,非阻塞 IO 還沒看


參考資料:
Fluent Python Control Flow
PEP 342 -- Coroutines via Enhanced Generators
PEP 380 -- Syntax for Delegating to a Subgenerator
Generator Tricks for Systems Programmers
A Curious Course on Coroutines and Concurrency
Generators: The Final Frontier
A Web Crawler With asyncio Coroutines
In practice, what are the main uses for the new “yield from” syntax in Python 3.3?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末苍碟,一起剝皮案震驚了整個(gè)濱河市酒觅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌微峰,老刑警劉巖舷丹,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蜓肆,居然都是意外死亡颜凯,警方通過查閱死者的電腦和手機(jī)谋币,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來装获,“玉大人瑞信,你說我怎么就攤上這事⊙ㄔィ” “怎么了凡简?”我有些...
    開封第一講書人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長精肃。 經(jīng)常有香客問我秤涩,道長,這世上最難降的妖魔是什么司抱? 我笑而不...
    開封第一講書人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任筐眷,我火速辦了婚禮,結(jié)果婚禮上习柠,老公的妹妹穿的比我還像新娘匀谣。我一直安慰自己,他們只是感情好资溃,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開白布武翎。 她就那樣靜靜地躺著,像睡著了一般溶锭。 火紅的嫁衣襯著肌膚如雪宝恶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,208評(píng)論 1 299
  • 那天趴捅,我揣著相機(jī)與錄音垫毙,去河邊找鬼。 笑死拱绑,一個(gè)胖子當(dāng)著我的面吹牛综芥,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播猎拨,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼毫痕,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了迟几?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤栏笆,失蹤者是張志新(化名)和其女友劉穎类腮,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛉加,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蚜枢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年缸逃,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片厂抽。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡需频,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出筷凤,到底是詐尸還是另有隱情昭殉,我是刑警寧澤,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布藐守,位于F島的核電站挪丢,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏卢厂。R本人自食惡果不足惜乾蓬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望慎恒。 院中可真熱鬧任内,春花似錦、人聲如沸融柬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽丹鸿。三九已至越走,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間靠欢,已是汗流浹背廊敌。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留门怪,地道東北人骡澈。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓,卻偏偏與公主長得像掷空,于是被迫代替她去往敵國和親肋殴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容