Python協(xié)程

目錄:
一、基于生成器的協(xié)程
二践盼、協(xié)程狀態(tài)
三鸦采、協(xié)程預(yù)激裝飾器
四、終止協(xié)程和異常處理
五咕幻、協(xié)程返回值
六赖淤、yield from
七、greenlet協(xié)程
八谅河、gevent協(xié)程

Python并發(fā)之協(xié)程

協(xié)程(coroutine)可以在執(zhí)行期間暫停(suspend),這樣就可以在等待外部數(shù)據(jù)處理完成之后(例如,等待網(wǎng)絡(luò)I/O數(shù)據(jù))绷耍,從之前暫停的地方恢復(fù)執(zhí)行(resume)吐限。

一、基于生成器的協(xié)程

  • 生成器
    2001年褂始,Python 2.2 通過了 PEP 255 -- "Simple Generators" 诸典,引入了 yield 關(guān)鍵字實(shí)現(xiàn)了生成器函數(shù)。

yield item:yield包含”產(chǎn)出“和”讓步“兩個(gè)含義崎苗。生成器中 yield x 這行代碼會(huì) 產(chǎn)出 一個(gè)值狐粱,提供給 next(...) 的調(diào)用方。此外胆数,還會(huì)作出讓步肌蜻,暫停執(zhí)行生成器,讓調(diào)用方繼續(xù)工作必尼,直到需要使用另一個(gè)值時(shí)再調(diào)用 next(...)蒋搜。

  • 協(xié)程
    2005年,Python 2.5 通過了 PEP 342 -- "Coroutines via Enhanced Generators"判莉,給生成器增加了.send()豆挽、.throw()和.close()方法,第一次實(shí)現(xiàn)了基于生成器的協(xié)程函數(shù)(generator-based coroutines)券盅。

send(item):生成器的調(diào)用方可以使用send()方法發(fā)送數(shù)據(jù)帮哈,發(fā)送的數(shù)據(jù)會(huì)成為生成器中yield表達(dá)式的值。此時(shí)稱生成器為協(xié)程锰镀。協(xié)程指與調(diào)用方協(xié)作的過程娘侍,產(chǎn)出由調(diào)用方提供的值。

示例:

def simple_coroutine():
    print("->協(xié)程開始")
    x = yield
    print("->協(xié)程收到x:", x)


my_coro = simple_coroutine()
print(my_coro)
print(next(my_coro))
my_coro.send(50)
運(yùn)行結(jié)果

運(yùn)行結(jié)果解釋:

① yield在表達(dá)式中的使用:如果協(xié)程只需從客戶那里接收數(shù)據(jù)互站,那么產(chǎn)出的值是None(這個(gè)值是隱式指定的私蕾,因?yàn)閥ield關(guān)鍵字右邊沒有表達(dá)式)。
② 首先要需要調(diào)用next(...)胡桃。因?yàn)樯善鬟€沒啟動(dòng)踩叭,未在yield語句處暫停,所以一開始無法發(fā)送數(shù)據(jù)翠胰。
③ 調(diào)用send()方法后容贝,協(xié)程定義體中的yield表達(dá)式會(huì)計(jì)算出50。調(diào)用send()方法后協(xié)程會(huì)恢復(fù)之景,一直運(yùn)行到下一個(gè)yield表達(dá)式斤富,或者終止。

二锻狗、協(xié)程狀態(tài)

GEN_CREATED:創(chuàng)建狀態(tài)(等待開始執(zhí)行)
GEN_SUSPENDED:掛起狀態(tài)(在yield表達(dá)式處暫停)
GEN_RUNNING:運(yùn)行狀態(tài)(正在被解釋器執(zhí)行满力。只有在多線程應(yīng)用中才能看到這個(gè)狀態(tài))
GEN_CLOSED:關(guān)閉狀態(tài)(執(zhí)行結(jié)束)

協(xié)程激活方法

  • 1.調(diào)用next():next(generator)
  • 2.發(fā)送None:generator.send(None)

示例:

In[1]:  from inspect import getgeneratorstate
        def simple_coroutine():
            print("->協(xié)程開始")
            x = yield
            print(getgeneratorstate(my_coro2))
            print("->協(xié)程收到x:", x)

        my_coro = simple_coroutine()
        next(my_coro)
        print(getgeneratorstate(my_coro))
        my_coro.send(50)
In[2]:  print(getgeneratorstate(my_coro))

三焕参、協(xié)程預(yù)激裝飾器

1.@wraps裝飾器

Python裝飾器(decorator)在實(shí)現(xiàn)的時(shí)候,被裝飾后的函數(shù)其實(shí)已經(jīng)是另外一個(gè)函數(shù)了(函數(shù)名等函數(shù)屬性會(huì)發(fā)生改變)油额。為了消除這樣的影響叠纷,Python的functools包中提供了一個(gè)叫wraps的decorator來消除這樣的副作用。寫一個(gè)decorator的時(shí)候潦嘶,最好在實(shí)現(xiàn)之前加上functools的wrap涩嚣,它能保留原有函數(shù)的名稱和docstring。

示例:

from functools import wraps
def my_decorater(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        wrapper.doc
        :param args:
        :param kwargs:
        :return:
        """
        print("calling decorater start")
        func(*args, **kwargs)
        print("calling decorater end")

    return wrapper


@my_decorater
def exam():
    """
    exam.doc
    :return:
    """
    print("example....")


exam()
print("func_name:", exam.__name__)
print("func_doc:", exam.__doc__)
運(yùn)行結(jié)果

2.協(xié)程預(yù)激裝飾器的實(shí)現(xiàn)

from functools import wraps
def coroutine_activator(function):
    @wraps(function)
    def wrapper(*args, **kwargs):
        generator = function(*args, **kwargs)
        next(generator)
        return generator

    return wrapper

示例:

def coroutine_activator(function):
    @wraps(function)
    def wrapper(*args, **kwargs):
        generator = function(*args, **kwargs)
        next(generator)
        return generator

    return wrapper


@coroutine_activator
def averager():
    total = 0.0
    count = 0
    average = 0.0

    while True:
        term = yield average
        total += term
        count += 1
        average = total / count


cor_avg = averager()
print(getgeneratorstate(cor_avg))
運(yùn)行結(jié)果

四掂僵、終止協(xié)程和異常處理

協(xié)程中未處理的異常會(huì)向上冒泡航厚,傳給next()函數(shù)或.send()方法的調(diào)用方(即觸發(fā)協(xié)程的對(duì)象)。

1.generator.throw(exc_type[, exc_value[, traceback]])

  • 1.該方法會(huì)使生成器在暫停的yield表達(dá)式處拋出指定的異常锰蓬。
  • 2.如果生成器處理了該異常幔睬,代碼會(huì)繼續(xù)執(zhí)行到下一個(gè)yield表達(dá)式,而產(chǎn)出的值會(huì)成為generator.throw()方法的返回值互妓。
  • 3.如果沒有處理這個(gè)異常溪窒,異常則會(huì)向上冒泡。

示例:

class DemoException(Exception):
    pass


def demo_exc_handling():
    print("-->coroutine started")

    while True:
        try:
            x = yield
#         except GeneratorExit:
#             print("-->GeneratorExit has been handled. Call close methond...")
        except DemoException:
            print("-->DemoException has been handled. Continuing...")
        else:
            print("-->coroutine received:{!r}".format(x))
        finally:
            print("-->end...")


exc_coro = demo_exc_handling()
next(exc_coro)
print(getgeneratorstate(exc_coro))

exc_coro.throw(DemoException)
print(getgeneratorstate(exc_coro))

exc_coro.throw(ZeroDivisionError)
print(getgeneratorstate(exc_coro))
運(yùn)行結(jié)果

2.generator.close()

  • 1.該方法會(huì)使生成器在暫停的yield表達(dá)式處拋出GeneratorExit異常冯勉。
  • 2.如果生成器沒有處理這個(gè)異常澈蚌,或者拋出了StopIteration異常(通常是指運(yùn)行到結(jié)尾),調(diào)用方不會(huì)報(bào)錯(cuò)灼狰。
  • 3.如果收到GeneratorExit異常宛瞄,生成器一定不能產(chǎn)出值,否則解釋器會(huì)拋出RuntimeError異常交胚。
  • 4.生成器拋出的其他異常會(huì)向上冒泡份汗,傳給調(diào)用方。
coro = demo_exc_handling()
next(coro)
coro.send(10)

coro.close()
coro.send(20)
運(yùn)行結(jié)果

五蝴簇、協(xié)程return值

有些協(xié)程在被激活后杯活,每次驅(qū)動(dòng)(drive)協(xié)程時(shí),不會(huì)產(chǎn)出值熬词,而是在最后(協(xié)程正常終止時(shí))返回一個(gè)值(通常是某種累加值)旁钧。

from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
    total = 0.0
    count = 0
    average = 0.0

    while True:
        term = yield average
        # 為了返回值,協(xié)程必須正常終止互拾。這里使用條件判斷歪今,以便退出累計(jì)循環(huán)
        if term is None:
            break
        total += term
        count += 1
        average = total / count
    # 返回一個(gè) namedtuple,包含 count 和 average 兩個(gè)字段颜矿。在 Python 3.3 之前寄猩,如果生成器返回值,解釋器會(huì)報(bào)句法錯(cuò)誤
    return Result(count, average)


coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(20)
coro_avg.send(30)

#捕獲StopIteration異常骑疆,獲取 averager 返回的值:
try:
    coro_avg.send(None)
except StopIteration as exc_data:
    r = exc_data.value
print(r)
運(yùn)行結(jié)果

六田篇、yield from

  • 在生成器gen中使用yield from subgen()時(shí)替废,子生成器subgen會(huì)獲得控制權(quán),把產(chǎn)出的值傳給gen的調(diào)用方斯辰,即調(diào)用方可以直接控制subgen舶担。與此同時(shí),gen會(huì)阻塞彬呻,等待subgen終止。
  • yield from 的主要功能是打開雙向通道柄瑰,把最外層的調(diào)用方與最內(nèi)層的子生成器連接起來闸氮,這樣二者可以直接發(fā)送和產(chǎn)出值,還可以直接傳入異常教沾,而不用在位于中間的協(xié)程中添加大量處理異常的樣板代碼蒲跨。通過這個(gè)結(jié)構(gòu),協(xié)程可以把功能委托給子生成器授翻。
  • 調(diào)用方:調(diào)用委派生成器的客戶端代碼或悲。
  • 委派生成器:包含 yield from <iterable> 表達(dá)式語句的生成器函數(shù)(包含yield from語句的生成器)。
  • 子生成器:從 yield from 表達(dá)式中 <iterable>部分獲取的生成器(yield from后面的表達(dá)式)堪唐。

簡(jiǎn)單示例:

def gen():
    yield from "AB"
    yield from range(1, 3)
    
    
print(list(gen()))
運(yùn)行結(jié)果

示例:

Result = namedtuple('Result', 'count average')


# 子生成器
def averager():
    total = 0.0
    count = 0
    average = 0.0

    while True:
        term = yield average
        if term is None:
            break
        total += term
        count += 1
        average = total / count

    return Result(count, average)


# 委派生成器
def grouper(results, key):
    while True:
        results[key] = yield from averager()


# 客戶端代碼
def main(data):
    results = {}
    for key, values in data.items():
        group = grouper(results, key)
        # 激活委派生成器
        next(group)

        for value in values:
            group.send(value)

        group.send(None)

    print(results)


data = {"girls:kg": [49.9, 39.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
        "girls:m": [1.6, 1.5, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
        "boys:kg": [43.9, 48.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
        "boys:m": [1.6, 1.4, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
        }
main(data)
運(yùn)行結(jié)果

運(yùn)行結(jié)果解釋:

委派生成器在yield from 表達(dá)式處暫停時(shí)巡语,調(diào)用方可以直接把數(shù)據(jù)發(fā)給子生成器,子生成器再把產(chǎn)出的值發(fā)給調(diào)用方淮菠。子生成器返回之后男公,解釋器會(huì)拋出StopIteration異常,并把返回值附加到異常對(duì)象上合陵,此時(shí)委派生成器會(huì)恢復(fù)運(yùn)行枢赔。

注意:

  • 如果子生成器不終止,委派生成器會(huì)在yield from處永遠(yuǎn)暫停拥知。
  • 因?yàn)槲缮善飨喈?dāng)于管道踏拜,所以可以把任意數(shù)量個(gè)委派生成器連接在一起:一個(gè)委派生成器使用yield from調(diào)用一個(gè)子生成器,而那個(gè)子生成器本身也是委派生成器低剔,使用yield from調(diào)用另一個(gè)子生成器速梗,以此類推。最終户侥,這個(gè)鏈條要以一個(gè)只使用yield表達(dá)式的簡(jiǎn)單生成器結(jié)束镀琉;不過,也能以任何可迭代的對(duì)象結(jié)束蕊唐。
  • 任何yield from鏈條都必須由客戶驅(qū)動(dòng)屋摔,在最外層委派生成器上調(diào)用next(...)函數(shù)或.send(...)方法谱仪。

yield from的意義

  • 1.子生成器產(chǎn)出的值都直接傳給委派生成器的調(diào)用方(即客戶端代碼)迹炼。
  • 2.使用send()方法發(fā)給委派生成器的值都直接傳給子生成器枫甲。如果發(fā)送的值是None恢恼,那么會(huì)調(diào)用子生成器的next()方法。如果發(fā)送的值不是None弓熏,那么會(huì)調(diào)用子生成器的send()方法恋谭。如果調(diào)用的方法拋出StopIteration異常,那么委派生成器恢復(fù)運(yùn)行挽鞠。任何其他異常都會(huì)向上冒泡疚颊,傳給委派生成器。
  • 3.生成器退出時(shí)信认,生成器(或子生成器)中的return expr表達(dá)式會(huì)觸發(fā)StopIteration(expr)異常拋出材义。
  • 4.yield from表達(dá)式的值是子生成器終止時(shí)傳給StopIteration異常的第一個(gè)參數(shù)。
  • 5.傳入委派生成器的異常嫁赏,除了GeneratorExit之外都傳給子生成器的throw()方法其掂。如果調(diào)用throw()方法時(shí)拋出StopIteration異常,委派生成器恢復(fù)運(yùn)行潦蝇。StopIteration之外的異常會(huì)向上冒泡款熬,傳給委派生成器。
  • 6.如果把GeneratorExit異常傳入委派生成器攘乒,或者在委派生成器上調(diào)用close()方法贤牛,那么在子生成器上調(diào)用close()方法,如果它有的話持灰。如果調(diào)用close()方法導(dǎo)致異常拋出盔夜,那么異常會(huì)向上冒泡,傳給委派生成器堤魁;否則喂链,委派生成器拋出GeneratorExit異常。

七妥泉、greenlet協(xié)程

greenlet由來

雖然CPython(標(biāo)準(zhǔn)Python)能夠通過生成器來實(shí)現(xiàn)協(xié)程椭微,但使用起來還并不是很方便。 與此同時(shí)盲链,Python的一個(gè)衍生版 Stackless Python蝇率。實(shí)現(xiàn)了原生的協(xié)程,它更利于使用刽沾。 于是本慕,大家開始將 Stackless 中關(guān)于協(xié)程的代碼 。單獨(dú)拿出來做成了CPython的擴(kuò)展包侧漓。 這就是 greenlet 的由來锅尘,因此 greenlet 是底層實(shí)現(xiàn)了原生協(xié)程的 C擴(kuò)展庫。

import greenlet

# 創(chuàng)建greenlet協(xié)程
coroutine = greenlet.greenlet() 
# 運(yùn)行g(shù)reenlet協(xié)程
coroutine.switch()

示例:生產(chǎn)者-消費(fèi)者模式

import greenlet
import random
def producer():
    while True:
        item = random.randint(0, 99)
        print("生成數(shù)字{}".format(item))
        c.switch(item)          # 將data傳給c布蔗,并切換到c

def consumer():
    while True:
        item = p.switch()       # 切換到p藤违,等待傳入數(shù)值
        print("消費(fèi)數(shù)字{}".format(item))


if __name__ == "__main__":
    c = greenlet.greenlet(consumer)
    p = greenlet.greenlet(producer)
    c.switch()
運(yùn)行結(jié)果

greenlet 的優(yōu)勢(shì):

  • 1.高性能的原生協(xié)程浪腐。
  • 2語義更加明確的顯式切換。
  • 3.直接將函數(shù)包裝成協(xié)程顿乒,保持原有代碼風(fēng)格议街。

八、gevent協(xié)程

gevent是什么:http://www.gevent.org/

gevent是第三方庫璧榄,通過greenlet實(shí)現(xiàn)協(xié)程特漩,其基本思想是:

當(dāng)一個(gè)greenlet遇到IO操作時(shí),比如訪問網(wǎng)絡(luò)骨杂,就自動(dòng)切換到其他的greenlet拾稳,等到IO操作完成,再在適當(dāng)?shù)臅r(shí)候切換回來繼續(xù)執(zhí)行腊脱。由于IO操作非常耗時(shí),經(jīng)常使程序處于等待狀態(tài)龙亲,有了gevent為我們自動(dòng)切換協(xié)程陕凹,就保證總有g(shù)reenlet在運(yùn)行,而不是等待IO鳄炉。
由于切換是在IO操作時(shí)自動(dòng)完成杜耙,所以gevent需要修改Python自帶的一些標(biāo)準(zhǔn)庫,這一過程在啟動(dòng)時(shí)通過monkey patch完成拂盯。

import gevent
# 創(chuàng)建協(xié)程
gevent.spawn(參數(shù))
# gevent協(xié)程運(yùn)行列表
gevent.joinall([協(xié)程列表])  

示例:生產(chǎn)者-消費(fèi)者模式

import gevent
from gevent.queue import Queue
import random


def Consumer(queue):
        while True:
            item = queue.get()
            print("消費(fèi)數(shù)字{}".format(item))


def Producer(queue):
        while True:
            item = random.randint(0, 99)
            queue.put(item)
            print("生產(chǎn)數(shù)字{}".format(item))


queue = Queue(3)

p = gevent.spawn(Producer, queue)
c = gevent.spawn(Consumer, queue)
gevent.joinall([p, c])
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末佑女,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子谈竿,更是在濱河造成了極大的恐慌团驱,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件空凸,死亡現(xiàn)場(chǎng)離奇詭異嚎花,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)呀洲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門紊选,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人道逗,你說我怎么就攤上這事兵罢。” “怎么了滓窍?”我有些...
    開封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵卖词,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我贰您,道長(zhǎng)坏平,這世上最難降的妖魔是什么拢操? 我笑而不...
    開封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮舶替,結(jié)果婚禮上令境,老公的妹妹穿的比我還像新娘。我一直安慰自己顾瞪,他們只是感情好舔庶,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著陈醒,像睡著了一般惕橙。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上钉跷,一...
    開封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天弥鹦,我揣著相機(jī)與錄音,去河邊找鬼爷辙。 笑死彬坏,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的膝晾。 我是一名探鬼主播栓始,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼血当!你這毒婦竟也來了幻赚?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤臊旭,失蹤者是張志新(化名)和其女友劉穎落恼,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體巍扛,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡领跛,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了撤奸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吠昭。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖胧瓜,靈堂內(nèi)的尸體忽然破棺而出矢棚,到底是詐尸還是另有隱情,我是刑警寧澤府喳,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布蒲肋,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏兜粘。R本人自食惡果不足惜申窘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望孔轴。 院中可真熱鬧剃法,春花似錦、人聲如沸路鹰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽晋柱。三九已至优构,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間雁竞,已是汗流浹背钦椭。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留碑诉,地道東北人玉凯。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像联贩,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子捎拯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354