目錄:
一、基于生成器的協(xié)程
二践盼、協(xié)程狀態(tài)
三鸦采、協(xié)程預(yù)激裝飾器
四、終止協(xié)程和異常處理
五咕幻、協(xié)程返回值
六赖淤、yield from
七、greenlet協(xié)程
八谅河、gevent協(xié)程
Python并發(fā)之協(xié)程
協(xié)程(coroutine)可以在執(zhí)行期間暫停(suspend),這樣就可以在等待外部數(shù)據(jù)處理完成之后(例如,等待網(wǎng)絡(luò)I/O數(shù)據(jù))绷耍,從之前暫停的地方恢復(fù)執(zhí)行(resume)吐限。
一、基于生成器的協(xié)程
- 生成器
2001年褂始,Python 2.2 通過了 PEP 255 -- "Simple Generators" 诸典,引入了 yield 關(guān)鍵字實(shí)現(xiàn)了生成器函數(shù)。
yield item:yield包含”產(chǎn)出“和”讓步“兩個(gè)含義崎苗。生成器中 yield x 這行代碼會(huì) 產(chǎn)出 一個(gè)值狐粱,提供給 next(...) 的調(diào)用方。此外胆数,還會(huì)作出讓步肌蜻,暫停執(zhí)行生成器,讓調(diào)用方繼續(xù)工作必尼,直到需要使用另一個(gè)值時(shí)再調(diào)用 next(...)蒋搜。
- 協(xié)程
2005年,Python 2.5 通過了 PEP 342 -- "Coroutines via Enhanced Generators"判莉,給生成器增加了.send()豆挽、.throw()和.close()方法,第一次實(shí)現(xiàn)了基于生成器的協(xié)程函數(shù)(generator-based coroutines)券盅。
send(item):生成器的調(diào)用方可以使用send()方法發(fā)送數(shù)據(jù)帮哈,發(fā)送的數(shù)據(jù)會(huì)成為生成器中yield表達(dá)式的值。此時(shí)稱生成器為協(xié)程锰镀。協(xié)程指與調(diào)用方協(xié)作的過程娘侍,產(chǎn)出由調(diào)用方提供的值。
示例:
def simple_coroutine():
print("->協(xié)程開始")
x = yield
print("->協(xié)程收到x:", x)
my_coro = simple_coroutine()
print(my_coro)
print(next(my_coro))
my_coro.send(50)
運(yùn)行結(jié)果解釋:
① yield在表達(dá)式中的使用:如果協(xié)程只需從客戶那里接收數(shù)據(jù)互站,那么產(chǎn)出的值是None(這個(gè)值是隱式指定的私蕾,因?yàn)閥ield關(guān)鍵字右邊沒有表達(dá)式)。
② 首先要需要調(diào)用next(...)胡桃。因?yàn)樯善鬟€沒啟動(dòng)踩叭,未在yield語句處暫停,所以一開始無法發(fā)送數(shù)據(jù)翠胰。
③ 調(diào)用send()方法后容贝,協(xié)程定義體中的yield表達(dá)式會(huì)計(jì)算出50。調(diào)用send()方法后協(xié)程會(huì)恢復(fù)之景,一直運(yùn)行到下一個(gè)yield表達(dá)式斤富,或者終止。
二锻狗、協(xié)程狀態(tài)
GEN_CREATED:創(chuàng)建狀態(tài)(等待開始執(zhí)行)
GEN_SUSPENDED:掛起狀態(tài)(在yield表達(dá)式處暫停)
GEN_RUNNING:運(yùn)行狀態(tài)(正在被解釋器執(zhí)行满力。只有在多線程應(yīng)用中才能看到這個(gè)狀態(tài))
GEN_CLOSED:關(guān)閉狀態(tài)(執(zhí)行結(jié)束)
協(xié)程激活方法
- 1.調(diào)用next():next(generator)
- 2.發(fā)送None:generator.send(None)
示例:
In[1]: from inspect import getgeneratorstate
def simple_coroutine():
print("->協(xié)程開始")
x = yield
print(getgeneratorstate(my_coro2))
print("->協(xié)程收到x:", x)
my_coro = simple_coroutine()
next(my_coro)
print(getgeneratorstate(my_coro))
my_coro.send(50)
In[2]: print(getgeneratorstate(my_coro))
三焕参、協(xié)程預(yù)激裝飾器
1.@wraps裝飾器
Python裝飾器(decorator)在實(shí)現(xiàn)的時(shí)候,被裝飾后的函數(shù)其實(shí)已經(jīng)是另外一個(gè)函數(shù)了(函數(shù)名等函數(shù)屬性會(huì)發(fā)生改變)油额。為了消除這樣的影響叠纷,Python的functools包中提供了一個(gè)叫wraps的decorator來消除這樣的副作用。寫一個(gè)decorator的時(shí)候潦嘶,最好在實(shí)現(xiàn)之前加上functools的wrap涩嚣,它能保留原有函數(shù)的名稱和docstring。
示例:
from functools import wraps
def my_decorater(func):
@wraps(func)
def wrapper(*args, **kwargs):
"""
wrapper.doc
:param args:
:param kwargs:
:return:
"""
print("calling decorater start")
func(*args, **kwargs)
print("calling decorater end")
return wrapper
@my_decorater
def exam():
"""
exam.doc
:return:
"""
print("example....")
exam()
print("func_name:", exam.__name__)
print("func_doc:", exam.__doc__)
2.協(xié)程預(yù)激裝飾器的實(shí)現(xiàn)
from functools import wraps
def coroutine_activator(function):
@wraps(function)
def wrapper(*args, **kwargs):
generator = function(*args, **kwargs)
next(generator)
return generator
return wrapper
示例:
def coroutine_activator(function):
@wraps(function)
def wrapper(*args, **kwargs):
generator = function(*args, **kwargs)
next(generator)
return generator
return wrapper
@coroutine_activator
def averager():
total = 0.0
count = 0
average = 0.0
while True:
term = yield average
total += term
count += 1
average = total / count
cor_avg = averager()
print(getgeneratorstate(cor_avg))
四掂僵、終止協(xié)程和異常處理
協(xié)程中未處理的異常會(huì)向上冒泡航厚,傳給next()函數(shù)或.send()方法的調(diào)用方(即觸發(fā)協(xié)程的對(duì)象)。
1.generator.throw(exc_type[, exc_value[, traceback]])
- 1.該方法會(huì)使生成器在暫停的yield表達(dá)式處拋出指定的異常锰蓬。
- 2.如果生成器處理了該異常幔睬,代碼會(huì)繼續(xù)執(zhí)行到下一個(gè)yield表達(dá)式,而產(chǎn)出的值會(huì)成為generator.throw()方法的返回值互妓。
- 3.如果沒有處理這個(gè)異常溪窒,異常則會(huì)向上冒泡。
示例:
class DemoException(Exception):
pass
def demo_exc_handling():
print("-->coroutine started")
while True:
try:
x = yield
# except GeneratorExit:
# print("-->GeneratorExit has been handled. Call close methond...")
except DemoException:
print("-->DemoException has been handled. Continuing...")
else:
print("-->coroutine received:{!r}".format(x))
finally:
print("-->end...")
exc_coro = demo_exc_handling()
next(exc_coro)
print(getgeneratorstate(exc_coro))
exc_coro.throw(DemoException)
print(getgeneratorstate(exc_coro))
exc_coro.throw(ZeroDivisionError)
print(getgeneratorstate(exc_coro))
2.generator.close()
- 1.該方法會(huì)使生成器在暫停的yield表達(dá)式處拋出GeneratorExit異常冯勉。
- 2.如果生成器沒有處理這個(gè)異常澈蚌,或者拋出了StopIteration異常(通常是指運(yùn)行到結(jié)尾),調(diào)用方不會(huì)報(bào)錯(cuò)灼狰。
- 3.如果收到GeneratorExit異常宛瞄,生成器一定不能產(chǎn)出值,否則解釋器會(huì)拋出RuntimeError異常交胚。
- 4.生成器拋出的其他異常會(huì)向上冒泡份汗,傳給調(diào)用方。
coro = demo_exc_handling()
next(coro)
coro.send(10)
coro.close()
coro.send(20)
五蝴簇、協(xié)程return值
有些協(xié)程在被激活后杯活,每次驅(qū)動(dòng)(drive)協(xié)程時(shí),不會(huì)產(chǎn)出值熬词,而是在最后(協(xié)程正常終止時(shí))返回一個(gè)值(通常是某種累加值)旁钧。
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = 0.0
while True:
term = yield average
# 為了返回值,協(xié)程必須正常終止互拾。這里使用條件判斷歪今,以便退出累計(jì)循環(huán)
if term is None:
break
total += term
count += 1
average = total / count
# 返回一個(gè) namedtuple,包含 count 和 average 兩個(gè)字段颜矿。在 Python 3.3 之前寄猩,如果生成器返回值,解釋器會(huì)報(bào)句法錯(cuò)誤
return Result(count, average)
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(20)
coro_avg.send(30)
#捕獲StopIteration異常骑疆,獲取 averager 返回的值:
try:
coro_avg.send(None)
except StopIteration as exc_data:
r = exc_data.value
print(r)
六田篇、yield from
- 在生成器gen中使用yield from subgen()時(shí)替废,子生成器subgen會(huì)獲得控制權(quán),把產(chǎn)出的值傳給gen的調(diào)用方斯辰,即調(diào)用方可以直接控制subgen舶担。與此同時(shí),gen會(huì)阻塞彬呻,等待subgen終止。
- yield from 的主要功能是打開雙向通道柄瑰,把最外層的調(diào)用方與最內(nèi)層的子生成器連接起來闸氮,這樣二者可以直接發(fā)送和產(chǎn)出值,還可以直接傳入異常教沾,而不用在位于中間的協(xié)程中添加大量處理異常的樣板代碼蒲跨。通過這個(gè)結(jié)構(gòu),協(xié)程可以把功能委托給子生成器授翻。
- 調(diào)用方:調(diào)用委派生成器的客戶端代碼或悲。
- 委派生成器:包含 yield from <iterable> 表達(dá)式語句的生成器函數(shù)(包含yield from語句的生成器)。
- 子生成器:從 yield from 表達(dá)式中 <iterable>部分獲取的生成器(yield from后面的表達(dá)式)堪唐。
簡(jiǎn)單示例:
def gen():
yield from "AB"
yield from range(1, 3)
print(list(gen()))
示例:
Result = namedtuple('Result', 'count average')
# 子生成器
def averager():
total = 0.0
count = 0
average = 0.0
while True:
term = yield average
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)
# 委派生成器
def grouper(results, key):
while True:
results[key] = yield from averager()
# 客戶端代碼
def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
# 激活委派生成器
next(group)
for value in values:
group.send(value)
group.send(None)
print(results)
data = {"girls:kg": [49.9, 39.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
"girls:m": [1.6, 1.5, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
"boys:kg": [43.9, 48.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
"boys:m": [1.6, 1.4, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
}
main(data)
運(yùn)行結(jié)果解釋:
委派生成器在yield from 表達(dá)式處暫停時(shí)巡语,調(diào)用方可以直接把數(shù)據(jù)發(fā)給子生成器,子生成器再把產(chǎn)出的值發(fā)給調(diào)用方淮菠。子生成器返回之后男公,解釋器會(huì)拋出StopIteration異常,并把返回值附加到異常對(duì)象上合陵,此時(shí)委派生成器會(huì)恢復(fù)運(yùn)行枢赔。
注意:
- 如果子生成器不終止,委派生成器會(huì)在yield from處永遠(yuǎn)暫停拥知。
- 因?yàn)槲缮善飨喈?dāng)于管道踏拜,所以可以把任意數(shù)量個(gè)委派生成器連接在一起:一個(gè)委派生成器使用yield from調(diào)用一個(gè)子生成器,而那個(gè)子生成器本身也是委派生成器低剔,使用yield from調(diào)用另一個(gè)子生成器速梗,以此類推。最終户侥,這個(gè)鏈條要以一個(gè)只使用yield表達(dá)式的簡(jiǎn)單生成器結(jié)束镀琉;不過,也能以任何可迭代的對(duì)象結(jié)束蕊唐。
- 任何yield from鏈條都必須由客戶驅(qū)動(dòng)屋摔,在最外層委派生成器上調(diào)用next(...)函數(shù)或.send(...)方法谱仪。
yield from的意義
- 1.子生成器產(chǎn)出的值都直接傳給委派生成器的調(diào)用方(即客戶端代碼)迹炼。
- 2.使用send()方法發(fā)給委派生成器的值都直接傳給子生成器枫甲。如果發(fā)送的值是None恢恼,那么會(huì)調(diào)用子生成器的next()方法。如果發(fā)送的值不是None弓熏,那么會(huì)調(diào)用子生成器的send()方法恋谭。如果調(diào)用的方法拋出StopIteration異常,那么委派生成器恢復(fù)運(yùn)行挽鞠。任何其他異常都會(huì)向上冒泡疚颊,傳給委派生成器。
- 3.生成器退出時(shí)信认,生成器(或子生成器)中的return expr表達(dá)式會(huì)觸發(fā)StopIteration(expr)異常拋出材义。
- 4.yield from表達(dá)式的值是子生成器終止時(shí)傳給StopIteration異常的第一個(gè)參數(shù)。
- 5.傳入委派生成器的異常嫁赏,除了GeneratorExit之外都傳給子生成器的throw()方法其掂。如果調(diào)用throw()方法時(shí)拋出StopIteration異常,委派生成器恢復(fù)運(yùn)行潦蝇。StopIteration之外的異常會(huì)向上冒泡款熬,傳給委派生成器。
- 6.如果把GeneratorExit異常傳入委派生成器攘乒,或者在委派生成器上調(diào)用close()方法贤牛,那么在子生成器上調(diào)用close()方法,如果它有的話持灰。如果調(diào)用close()方法導(dǎo)致異常拋出盔夜,那么異常會(huì)向上冒泡,傳給委派生成器堤魁;否則喂链,委派生成器拋出GeneratorExit異常。
七妥泉、greenlet協(xié)程
greenlet由來
雖然CPython(標(biāo)準(zhǔn)Python)能夠通過生成器來實(shí)現(xiàn)協(xié)程椭微,但使用起來還并不是很方便。 與此同時(shí)盲链,Python的一個(gè)衍生版 Stackless Python蝇率。實(shí)現(xiàn)了原生的協(xié)程,它更利于使用刽沾。 于是本慕,大家開始將 Stackless 中關(guān)于協(xié)程的代碼 。單獨(dú)拿出來做成了CPython的擴(kuò)展包侧漓。 這就是 greenlet 的由來锅尘,因此 greenlet 是底層實(shí)現(xiàn)了原生協(xié)程的 C擴(kuò)展庫。
import greenlet
# 創(chuàng)建greenlet協(xié)程
coroutine = greenlet.greenlet()
# 運(yùn)行g(shù)reenlet協(xié)程
coroutine.switch()
示例:生產(chǎn)者-消費(fèi)者模式
import greenlet
import random
def producer():
while True:
item = random.randint(0, 99)
print("生成數(shù)字{}".format(item))
c.switch(item) # 將data傳給c布蔗,并切換到c
def consumer():
while True:
item = p.switch() # 切換到p藤违,等待傳入數(shù)值
print("消費(fèi)數(shù)字{}".format(item))
if __name__ == "__main__":
c = greenlet.greenlet(consumer)
p = greenlet.greenlet(producer)
c.switch()
greenlet 的優(yōu)勢(shì):
- 1.高性能的原生協(xié)程浪腐。
- 2語義更加明確的顯式切換。
- 3.直接將函數(shù)包裝成協(xié)程顿乒,保持原有代碼風(fēng)格议街。
八、gevent協(xié)程
gevent是什么:http://www.gevent.org/
gevent是第三方庫璧榄,通過greenlet實(shí)現(xiàn)協(xié)程特漩,其基本思想是:
當(dāng)一個(gè)greenlet遇到IO操作時(shí),比如訪問網(wǎng)絡(luò)骨杂,就自動(dòng)切換到其他的greenlet拾稳,等到IO操作完成,再在適當(dāng)?shù)臅r(shí)候切換回來繼續(xù)執(zhí)行腊脱。由于IO操作非常耗時(shí),經(jīng)常使程序處于等待狀態(tài)龙亲,有了gevent為我們自動(dòng)切換協(xié)程陕凹,就保證總有g(shù)reenlet在運(yùn)行,而不是等待IO鳄炉。
由于切換是在IO操作時(shí)自動(dòng)完成杜耙,所以gevent需要修改Python自帶的一些標(biāo)準(zhǔn)庫,這一過程在啟動(dòng)時(shí)通過monkey patch完成拂盯。
import gevent
# 創(chuàng)建協(xié)程
gevent.spawn(參數(shù))
# gevent協(xié)程運(yùn)行列表
gevent.joinall([協(xié)程列表])
示例:生產(chǎn)者-消費(fèi)者模式
import gevent
from gevent.queue import Queue
import random
def Consumer(queue):
while True:
item = queue.get()
print("消費(fèi)數(shù)字{}".format(item))
def Producer(queue):
while True:
item = random.randint(0, 99)
queue.put(item)
print("生產(chǎn)數(shù)字{}".format(item))
queue = Queue(3)
p = gevent.spawn(Producer, queue)
c = gevent.spawn(Consumer, queue)
gevent.joinall([p, c])