Coroutine
先要知道什么是 Coroutine逆济,按照 Wikipedia 上的定義
Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.
When subroutines are invoked, execution begins at the start, and once a subroutine exits, it is finished; an instance of a subroutine only returns once, and does not hold state between invocations. By contrast, coroutines can exit by calling other coroutines, which may later return to the point where they were invoked in the original coroutine; from the coroutine's point of view, it is not exiting but calling another coroutine. Thus, a coroutine instance holds state, and varies between invocations; there can be multiple instances of a given coroutine at once.
可以這么理解鼠冕,平常的子程序(函數(shù))都是只有一個(gè)出口(通過 return), 一般都是 caller 調(diào)用 callee飘哨,callee 結(jié)束后返回到caller梅割。
而協(xié)程可以有多個(gè)出入口(yield 指明)继控,而且協(xié)程可以 yield 到其他協(xié)程求泰, 其他協(xié)程可以繼續(xù) yield 到其他協(xié)程(或者 yield 到上一個(gè)協(xié)程)
那么生成器(Generator)是什么呢卤唉?我們平常說的生成器實(shí)現(xiàn)了迭代器協(xié)議备畦,因此能作為迭代器使用低飒。但生成器本質(zhì)上是一種協(xié)程《危看 Wiki 上的解釋
Generators, also known as semicoroutines,[5] are also a generalisation of subroutines, but are more limited than coroutines. Specifically, while both of these can yield multiple times, suspending their execution and allowing re-entry at multiple entry points, they differ in that coroutines can control where execution continues after they yield, while generators cannot, instead transferring control back to the generator's caller.[6] That is, since generators are primarily used to simplify the writing of iterators, the yield statement in a generator does not specify a coroutine to jump to, but rather passes a value back to a parent routine.
我的理解是生成器是一種有限制的協(xié)程褥赊,其不能 yield 到其他協(xié)程,只能 yield 到 caller.
由于這個(gè)特性允粤,其十分適合作為迭代器崭倘,因此就常常作為迭代器使用了。
Yield keyword in Python
其他從 Python 對(duì)協(xié)程的支持我們也可以發(fā)現(xiàn)生成器就是協(xié)程类垫,他們?cè)?Python 中都是用關(guān)鍵詞 yield 定義司光。
Python 中的 yield 有這么三種用處:
-
pull:caller 從生成器獲得數(shù)據(jù)(
next(gen) / yield item
) -
push:協(xié)程從 caller 收到值 (
gen.send(value) / item = yield
) - tasks:協(xié)程與 caller 之間沒有數(shù)據(jù)交互,只是用來進(jìn)行流程控制
我覺得第三點(diǎn)流程控制才是協(xié)程的本質(zhì)悉患,傳輸數(shù)據(jù)只是一個(gè)自然而然的功能残家。
可以用 inspect.getgeneratorstate(...) 獲得協(xié)程狀態(tài)
共有四種狀態(tài)
'GEN_CREATED', 'GEN_RUNNING', 'GEN_SUSPENDED, 'GEN_CLOSED'
from inspect import getgeneratorstate
def simple_coro(a):
"""
>>> coro = simple_coro(14)
>>> coro
<generator object simple_coro at 0x038A8A50>
>>> getgeneratorstate(coro)
'GEN_CREATED'
>>> next(coro) # 啟動(dòng)協(xié)程
-> Started: a = 14
14
>>> getgeneratorstate(coro)
'GEN_SUSPENDED'
>>> coro.send(28) # 傳入 b
-> Received: b = 28
42
>>> coro.send(99)
-> Received: c = 99
StopIteration ...
>>> getgeneratorstate(coro)
'GEN_CLOSED'
"""
print('-> Started: a = ', a)
b = yield a
print('-> Received: b = ', b)
c = yield a + b
print('-> Received: c = ', c)
Returning a Value from a Coroutine
有時(shí)候一個(gè)協(xié)程不是為了 yield value, 而是為了最后返回值
協(xié)程返回值時(shí)會(huì)拋出一個(gè) StopIteration (就跟迭代器完結(jié)一樣)
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)
---------------------
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(None) # 我們發(fā)現(xiàn)協(xié)程結(jié)束會(huì)拋出 StopIteration 異常, 就跟迭代器結(jié)束一樣
StopIteration: Result(count=1, average=10.0)
---------------------
>>> try: # 我們要想獲得協(xié)程最后返回值必須這么做...
coro_avg.send(None)
except StopIteration as exc:
result = exc.value
>>> result
Result(count=1, average=10.0)
可以發(fā)現(xiàn)每次捕獲 StopIteration 很麻煩, 所以 Python 中 yield from 可以自動(dòng)幫助我們捕獲 StopIteration 并把協(xié)程返回值綁定到變量上 (具體 yield from 見下面)
def delegate():
while True:
yield (yield from averager())
--------------------
>>> coro_avg = delegate()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(None)
Result(count=1, average=10.0)
Yield From
yield from 在其他支持協(xié)程的語言中往往稱作 await,后者名字更利于理解售躁。
yield from 涉及到了如下三個(gè)對(duì)象:
- caller
- delegating generator: 包含 yield from <iterable> 的生成器
- subgenerator: yield from <iterable> 中的iterable(常用生成器或迭代器)
兩個(gè)注意事項(xiàng):
- Every arrangement of coroutines chained with yield from must be ultimately
driven by a caller that is not a coroutine, which invokes next(…) or .send(…) on the outermost delegating generator, explicitly or implicitly (e.g., in a for loop). - The innermost subgenerator in the chain must be a simple generator that uses just yield—or an iterable object
一個(gè)用 yield from 的二叉樹中序遍歷
def chain(*iterables):
"""
>>> chain('ABC', range(2))
['A', 'B', 'C', 0, 1]
"""
for it in iterables:
yield from it
from collections import namedtuple
Result = namedtuple('Result', 'count average')
# the subgenerator
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)
# the delegating generator
def grouper(results, key):
# 這里 while True 的目的是讓這個(gè)協(xié)程永遠(yuǎn)不退出,
# 因?yàn)閰f(xié)程退出時(shí)會(huì)拋出 StopIteration, 這樣必須在 caller 中捕獲
while True:
# print('here') 如果打印, 會(huì)發(fā)現(xiàn)打印了四次
# 啟動(dòng)時(shí)打印一次坞淮,當(dāng) subgenerator 返回時(shí),delegating generator 蘇醒陪捷,又會(huì)打印一次
results[key] = yield from averager() # yield from 會(huì)自動(dòng)幫你啟動(dòng) subgenerator
# the caller
def main(data, results):
for key, values in data.items():
# 每次使用一個(gè)新的 grouper, 目的是有一個(gè)不同的 key
# 可以只用一個(gè) grouper 的, 思路是將 key 也傳進(jìn)去回窘,見下面
group = grouper(results, key)
next(group)
for value in values:
# 這里的 value 都經(jīng)由 group, 發(fā)送到 averager 中去了
# averager yield 的值也經(jīng)由 group 返回到 caller
# group(delegating generator) 只是作為一個(gè)管道
group.send(value)
# 必須發(fā)送 None, 讓 averager 返回, 這里才能在 results 中設(shè)定值
# 如果 刪掉下面那句, 會(huì)發(fā)現(xiàn)最后 results 為空字典
group.send(None)
>>> data = {'a': [1, 2, 3, 4, 5], 'b': [6, 7, 8, 9, 10]}
>>> results ={}
>>> main(data, results)
>>> results
{'a': Result(count=5, average=3.0), 'b': Result(count=5, average=8.0)}
# the delegating generator
def grouper(results):
while True:
# print('here') 會(huì)打印 3 次
key = yield
results[key] = yield from averager()
# the caller
def main(data, results):
group = grouper(results)
next(group)
for key, values in data.items():
group.send(key)
for value in values:
group.send(value)
group.send(None)
下面我們來具體看看 yield 三種功能的應(yīng)用:
Generator as Pipeline (Pull)
import time
def follow(thefile):
thefile.seek(0, 2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1)
continue
yield line
def grep(pattern, lines):
for line in lines:
if pattern in line:
yield line
logfile = open("sample-log")
loglines = follow(logfile)
pylines = grep("python", loglines)
for line in pylines:
print(line)
Coroutine as Pipeline (Push)
import time
def coroutine(func):
"""Decorator for start coroutine"""
def start(*args, **kwargs):
coro = func(*args, **kwargs)
next(coro)
return coro
return start
@coroutine
def follow(thefile, target):
thefile.seek(0, 2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1)
continue
target.send(line)
@coroutine
def grep(pattern, target):
while True:
line = yield
if pattern in line:
target.send(line)
@coroutine
def printer():
while True:
line = yield
print(line)
logfile = open("sample-log")
loglines = follow(logfile, grep("python", printer()))
@coroutine
def broadcast(targets):
while True:
item = yield
for target in targets:
target.send(item)
logfile = open("sample-log")
loglines = follow(logfile,
broadcast([grep("python", printer()),
grep("java", printer()),
grep("go", printer())]))
Coroutine as Task
可以把 yield 看做 trap, 不過有個(gè)不同,trap 是把控制權(quán)交給操作系統(tǒng)市袖,而 yield 是把控制權(quán)交還給 caller啡直。
from queue import Queue
class Task(object):
"""Task is a wrapper around a coroutine"""
taskid = 0
def __init__(self, target):
Task.taskid += 1
self.tid = Task.taskid
self.target = target # target coroutine
self.sendval = None # sendval 為 system call 的結(jié)果
def run(self):
return self.target.send(self.sendval)
class SystemCall(object):
def handle(self):
pass
class GetTid(SystemCall):
def handle(self):
self.task.sendval = self.task.tid
self.sched.schedule(self.task)
class NewTask(SystemCall):
def __init__(self, target):
self.target = target
def handle(self):
tid = self.sched.new(self.target)
self.task.sendval = tid
self.sched.schedule(self.task)
class KillTask(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self):
task = self.sched.taskmap.get(self.tid, None)
if task:
task.target.close()
self.task.sendval = True
else:
self.task.sendval = False
self.sched.schedule(self.task)
class WaitTask(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self):
result = self.sched.waitforexit(self.task, self.tid)
self.task.sendval = result
if not result:
self.sched.schedule(self.task)
class Scheduler(object):
""" Each task runs until it hits the yield
scheduler regains control and switch to the other task
"""
def __init__(self):
self.ready = Queue()
self.taskmap = {}
self.exit_waiting = {}
def new(self, target):
"""Create a new task and put them in ready queue"""
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def exit(self, task):
print("Taks {} terminated".format(task.tid))
del self.taskmap[task.tid]
# notify other tasks waiting for this task to exit
for task in self.exit_waiting.pop(task.tid, []):
self.schedule(task)
def waitforexit(self, task, waittid):
if waittid in self.taskmap:
self.exit_waiting.setdefault(waittid, []).append(task)
return True
else:
return False
def schedule(self, task):
"""Put task in ready queue"""
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
if isinstance(result, SystemCall):
result.task = task
result.sched = self
result.handle()
continue
except StopIteration:
self.exit(task)
continue
self.schedule(task)
def foo():
print("foo start")
mytid = yield GetTid()
for i in range(2):
print("foo id={}".format(mytid))
yield
def bar():
print("bar start")
mytid = yield GetTid()
for i in range(1):
print("bar {}".format(mytid))
yield
s = Scheduler()
s.new(foo())
s.new(bar())
s.mainloop()
---------------
foo start
bar start
foo id=1
bar 2
foo id=1
Taks 2 terminated
Taks 1 terminated
************
def main():
child = yield NewTask(foo())
yield
yield KillTask(child)
print("main done")
s = Scheduler()
s.new(main())
s.mainloop()
-----------------
foo start
foo id=4
Taks 4 terminated
main done
Taks 3 terminated
***********
def main():
child = yield NewTask(foo())
print("Waiting for child")
yield WaitTask(child)
print("Child done")
s = Scheduler()
s.new(main())
s.mainloop()
-----------------------
foo start
Waiting for child
foo id=6
foo id=6
Taks 6 terminated
Child done
Taks 5 terminated
TODO 后面 webserver,非阻塞 IO 還沒看
參考資料:
Fluent Python Control Flow
PEP 342 -- Coroutines via Enhanced Generators
PEP 380 -- Syntax for Delegating to a Subgenerator
Generator Tricks for Systems Programmers
A Curious Course on Coroutines and Concurrency
Generators: The Final Frontier
A Web Crawler With asyncio Coroutines
In practice, what are the main uses for the new “yield from” syntax in Python 3.3?