Python并發(fā)編程之協(xié)程/異步IO

原文:http://www.reibang.com/p/4e048726b613

引言

隨著node.js的盛行,相信大家今年多多少少都聽到了異步編程這個概念。Python社區(qū)雖然對于異步編程的支持相比其他語言稍顯遲緩法梯,但是也在Python3.4中加入了asyncio酗昼,在Python3.5上又提供了async/await語法層面的支持杀捻,剛正式發(fā)布的Python3.6中asyncio也已經(jīng)由臨時版改為了穩(wěn)定版崔泵。下面我們就基于Python3.4+來了解一下異步編程的概念以及asyncio的用法。

什么是協(xié)程

通常在Python中我們進(jìn)行并發(fā)編程一般都是使用多線程或者多進(jìn)程來實現(xiàn)的险污,對于計算型任務(wù)由于GIL的存在我們通常使用多進(jìn)程來實現(xiàn)痹愚,而對與IO型任務(wù)我們可以通過線程調(diào)度來讓線程在執(zhí)行IO任務(wù)時讓出GIL,從而實現(xiàn)表面上的并發(fā)蛔糯。

其實對于IO型任務(wù)我們還有一種選擇就是協(xié)程拯腮,協(xié)程是運行在單線程當(dāng)中的“并發(fā)”,協(xié)程相比多線程一大優(yōu)勢就是省去了多線程之間的切換開銷蚁飒,獲得了更大的運行效率疾瓮。Python中的asyncio也是基于協(xié)程來進(jìn)行實現(xiàn)的。在進(jìn)入asyncio之前我們先來了解一下Python中怎么通過生成器進(jìn)行協(xié)程來實現(xiàn)并發(fā)飒箭。

example1

我們先來看一個簡單的例子來了解一下什么是協(xié)程(coroutine)狼电,對生成器不了解的朋友建議先看一下Stackoverflow上面的這篇高票回答

>>> def coroutine():...reply =yield'hello'...yieldreply

...

>>>c = coroutine()

>>>next(c)'hello'

>>>c.send('world')'world'

example2

下面這個程序我們要實現(xiàn)的功能就是模擬多個學(xué)生同時向一個老師提交作業(yè)弦蹂,按照傳統(tǒng)的話我們或許要采用多線程/多進(jìn)程肩碟,但是這里我們可以采用生成器來實現(xiàn)協(xié)程用來模擬并發(fā)。

fromcollectionsimportdeque

def student(name, homeworks):

forhomeworkinhomeworks.items():

yield(name, homework[0], homework[1])# 學(xué)生"生成"作業(yè)給老師

class Teacher(object):

def __init__(self, students):

self.students = deque(students)

def handle(self):

"""老師處理學(xué)生作業(yè)"""

whilelen(self.students):

student = self.students.pop()

try:

homework = next(student)

print('handling', homework[0], homework[1], homework[2])

exceptStopIteration:

pass

else:

self.students.appendleft(student)

下面我們來調(diào)用一下這個程序凸椿。

Teacher([

student('Student1', {'math':'1+1=2','cs':'operating system'}),

student('Student2', {'math':'2+2=4','cs':'computer graphics'}),

student('Student3', {'math':'3+3=5','cs':'compiler construction'})

]).handle()

這是輸出結(jié)果削祈,我們僅僅只用了一個簡單的生成器就實現(xiàn)了并發(fā)(concurrence),注意不是并行(parallel)脑漫,因為我們的程序僅僅是運行在一個單線程當(dāng)中髓抑。

handling Student3 cs compiler construction

handling Student2 cs computer graphics

handling Student1 cs operating system

handling Student3 math 3+3=5

handling Student2 math 2+2=4

handling Student1 math 1+1=2

使用asyncio模塊實現(xiàn)協(xié)程

從Python3.4開始asyncio模塊加入到了標(biāo)準(zhǔn)庫,通過asyncio我們可以輕松實現(xiàn)協(xié)程來完成異步IO操作优幸。

解釋一下下面這段代碼吨拍,我們自己定義了一個協(xié)程display_date(num, loop),然后它使用關(guān)鍵字yield from來等待協(xié)程asyncio.sleep(2)的返回結(jié)果网杆。而在這等待的2s之間它會讓出CPU的執(zhí)行權(quán)羹饰,直到asyncio.sleep(2)返回結(jié)果伊滋。gather()或者wait()來返回future的執(zhí)行結(jié)果。

# coroutine.pyimportasyncioimportdatetime

@asyncio.coroutine ?# 聲明一個協(xié)程def display_date(num, loop):

end_time = loop.time() +10.0

whileTrue:

print("Loop: {} Time: {}".format(num, datetime.datetime.now()))

if(loop.time() +1.0) >= end_time:

break

yieldfromasyncio.sleep(2)# 阻塞直到協(xié)程sleep(2)返回結(jié)果

loop = asyncio.get_event_loop()# 獲取一個event_loop

tasks = [display_date(1, loop), display_date(2, loop)]

loop.run_until_complete(asyncio.gather(*tasks))# 阻塞直到所有的tasks完成

loop.close()

下面是運行結(jié)果队秩,注意到并發(fā)的效果沒有笑旺,程序從開始到結(jié)束只用大約10s,而在這里我們并沒有使用任何的多線程/多進(jìn)程代碼馍资。在實際項目中你可以將asyncio.sleep(secends)替換成相應(yīng)的IO任務(wù)筒主,比如數(shù)據(jù)庫/磁盤文件讀寫等操作。

ziwenxie :: ~ ? python coroutine.py

Loop:1Time:2016-12-1916:06:46.515329

Loop:2Time:2016-12-1916:06:46.515446

Loop:1Time:2016-12-1916:06:48.517613

Loop:2Time:2016-12-1916:06:48.517724

Loop:1Time:2016-12-1916:06:50.520005

Loop:2Time:2016-12-1916:06:50.520169

Loop:1Time:2016-12-1916:06:52.522452

Loop:2Time:2016-12-1916:06:52.522567

Loop:1Time:2016-12-1916:06:54.524889

Loop:2Time:2016-12-1916:06:54.525031

Loop:1Time:2016-12-1916:06:56.527713

Loop:2Time:2016-12-1916:06:56.528102

在Python3.5中為我們提供更直接的對協(xié)程的支持鸟蟹,引入了async/await關(guān)鍵字乌妙,上面的代碼我們可以這樣改寫,使用async代替了@asyncio.coroutine戏锹,使用了await代替了yield from冠胯,這樣我們的代碼變得更加簡潔可讀火诸。

importasyncioimportdatetime

asyncdef display_date(num, loop):# 聲明一個協(xié)程

end_time = loop.time() +10.0

whileTrue:

print("Loop: {} Time: {}".format(num, datetime.datetime.now()))

if(loop.time() +1.0) >= end_time:

break

awaitasyncio.sleep(2)# 等同于yield from

loop = asyncio.get_event_loop()# 獲取一個event_loop

tasks = [display_date(1, loop), display_date(2, loop)]

loop.run_until_complete(asyncio.gather(*tasks))# 阻塞直到所有的tasks完成

loop.close()

asyncio模塊的其他方法

開啟事件循環(huán)有兩種方法锦针,一種方法就是通過調(diào)用run_until_complete,另外一種就是調(diào)用run_forever置蜀。run_until_complete內(nèi)置add_done_callback奈搜,使用run_forever的好處是可以通過自己自定義add_done_callback,具體差異請看下面兩個例子盯荤。

run_until_complete()

importasyncio

asyncdef slow_operation(future):

awaitasyncio.sleep(1)

future.set_result('Future is done!')

loop = asyncio.get_event_loop()

future = asyncio.Future()

asyncio.ensure_future(slow_operation(future))

print(loop.is_running())# False

loop.run_until_complete(future)

print(future.result())

loop.close()

run_forever()

run_forever相比run_until_complete的優(yōu)勢是添加了一個add_done_callback馋吗,可以讓我們在task(future)完成的時候調(diào)用相應(yīng)的方法進(jìn)行后續(xù)處理。

importasyncio

asyncdef slow_operation(future):

awaitasyncio.sleep(1)

future.set_result('Future is done!')

def got_result(future):

print(future.result())

loop.stop()

loop = asyncio.get_event_loop()

future = asyncio.Future()

asyncio.ensure_future(slow_operation(future))

future.add_done_callback(got_result)try:

loop.run_forever()finally:

loop.close()

這里還要注意一點秋秤,即使你調(diào)用了協(xié)程方法宏粤,但是如果事件循環(huán)沒有開啟,協(xié)程也不會執(zhí)行灼卢,參考官方文檔的描述绍哎,我剛被坑過。

Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There aretwobasic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using theensure_future()function or theAbstractEventLoop.create_task()method. Coroutines (and tasks) can only run when the event loop is running.

Call

call_soon()

importasyncio

def hello_world(loop):

print('Hello World')

loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()

loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()

loop.run_forever()

loop.close()

下面是運行結(jié)果鞋真,我們可以通過call_soon提前注冊我們的task崇堰,并且也可以根據(jù)返回的Handle進(jìn)行cancel。

Hello World

call_later()

importasyncioimportdatetime

def display_date(end_time, loop):

print(datetime.datetime.now())

if(loop.time() +1.0) < end_time:

loop.call_later(1, display_date, end_time, loop)

else:

loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()

end_time = loop.time() +5.0

loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()

loop.run_forever()

loop.close()

改動一下上面的例子我們來看一下call_later的用法涩咖,注意這里并沒有像上面那樣使用while循環(huán)進(jìn)行操作海诲,我們可以通過call_later來設(shè)置每隔1秒去調(diào)用display_date()方法。

2016-12-24 19:17:13.421649

2016-12-24 19:17:14.422933

2016-12-24 19:17:15.424315

2016-12-24 19:17:16.425571

2016-12-24 19:17:17.426874

Chain coroutines

importasyncio

asyncdef compute(x, y):

print("Compute %s + %s ..."% (x, y))

awaitasyncio.sleep(1.0)# 協(xié)程compute不會繼續(xù)往下面執(zhí)行檩互,直到協(xié)程sleep返回結(jié)果

returnx + y

asyncdef print_sum(x, y):

result =awaitcompute(x, y)# 協(xié)程print_sum不會繼續(xù)往下執(zhí)行特幔,直到協(xié)程compute返回結(jié)果

print("%s + %s = %s"% (x, y, result))

loop = asyncio.get_event_loop()

loop.run_until_complete(print_sum(1,2))

loop.close()

下面是輸出結(jié)果

ziwenxie :: ~ ? python chain.py

Compute 1 + 2 ...

1 + 2 = 3

如何將同步的代碼改成異步

結(jié)合上面提到的內(nèi)容下面來小結(jié)一下如何將同步的代碼改成異步

同步模型

def handle(id):

subject = get_subject_from_db(id)# 1

buyinfo = get_buyinfo(id)# 2

change = process(subject, buyinfo)

notify_change(change)

flush_cache(id)

上面是一個典型的同步編程模型,每個步驟必須建立在上一個步驟完成的前提闸昨,但是注意到步驟1和步驟2之間并沒有任何的關(guān)系敬辣,所以可以將這兩個IO型改成異步的雪标,讓兩者可以并發(fā)進(jìn)行。

異步模型

# 先要將get_subject_from_db, get_buyinfo, process, notify_change修改成協(xié)程函數(shù)/方法importasyncio

def handle(id):

subject = asyncio.ensure_future(get_subject_from_db(id))# 1

buyinfo = asyncio.ensure_future(get_buyinfo(id))# 2

results = asyncio.gather(subject, buyinfo)

change =awaitprocess(results)

awaitnotify_change(change)

loop.call_soon(flush_cache(id))

使用ensure_future, loop.crate_task, Task可以將協(xié)程包裝成一個Future對象溉跃,這里我們選擇ensure_future村刨。

Queue

在asyncio使用Queue來模擬生產(chǎn)者-消費者模式:

importasyncioimportrandom

asyncdef produce(queue, n):

forxinrange(n):

# produce an item

print('producing {}/{}'.format(x, n))

# simulate i/o operation using sleep

awaitasyncio.sleep(random.random())

item = str(x)

# put the item in the queue

awaitqueue.put(item)

asyncdef consume(queue):

whileTrue:

# wait for an item from the producer

item =awaitqueue.get()

# process the item

print('consuming {}...'.format(item))

# simulate i/o operation using sleep

awaitasyncio.sleep(random.random())

# Notify the queue that the item has been processed

queue.task_done()

asyncdef run(n):

queue = asyncio.Queue()

# schedule the consumer

consumer = asyncio.ensure_future(consume(queue))

# run the producer and wait for completion

awaitproduce(queue, n)

# wait until the consumer has processed all items

awaitqueue.join()

# the consumer is still awaiting for an item, cancel it

consumer.cancel()

loop = asyncio.get_event_loop()

loop.run_until_complete(run(10))

loop.close()

實戰(zhàn)

by the way:在asyncio中使用requests沒有任何意義,requests是基于同步實現(xiàn)的撰茎,目前也沒有要支持asyncio的動向嵌牺,如果要充分發(fā)回異步的威力,應(yīng)該使用aiohttp龄糊。而且也要合理使用concurrent.futures模塊提供的線程池/進(jìn)程池逆粹。

Asyncio+Aiohttp

importaiohttpimportasyncioimporttime

NUMBERS = range(12)

URL ='http://httpbin.org/get?a={}'

asyncdef fetch_async(a):

asyncwithaiohttp.request('GET', URL.format(a))asr:

data =awaitr.json()

returndata['args']['a']

start = time.time()

event_loop = asyncio.get_event_loop()

tasks = [fetch_async(num)fornuminNUMBERS]

results = event_loop.run_until_complete(asyncio.gather(*tasks))

fornum, resultinzip(NUMBERS, results):

print('fetch({}) = {}'.format(num, result))

print('Use asyncio+aiohttp cost: {}'.format(time.time() - start))

下面是運行結(jié)果:

ziwenxie :: ~ ? python example1.py

fetch(0) =0

fetch(1) =1

fetch(2) =2

fetch(3) =3

fetch(4) =4

fetch(5) =5

fetch(6) =6

fetch(7) =7

fetch(8) =8

fetch(9) =9

fetch(10) =10

fetch(11) =11

Use asyncio+aiohttp cost:0.8980867862701416

Requests+Pool

如果使用傳統(tǒng)的Requests和ThreadPool/ProcessPool方式的話,由于多線程/多進(jìn)程之間切換的開銷速度會慢了許多炫惩。

importrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor

NUMBERS = range(12)

URL ='http://httpbin.org/get?a={}'

def fetch(a):

r = requests.get(URL.format(a))

returnr.json()['args']['a']

start = time.time()withThreadPoolExecutor(max_workers=3)asexecutor:

fornum, resultinzip(NUMBERS, executor.map(fetch, NUMBERS)):

print('fetch({}) = {}'.format(num, result))

print('Use requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

線程池的執(zhí)行結(jié)果:

ziwenxie :: ~ ? python example2.py

fetch(0) =0

fetch(1) =1

fetch(2) =2

fetch(3) =3

fetch(4) =4

fetch(5) =5

fetch(6) =6

fetch(7) =7

fetch(8) =8

fetch(9) =9

fetch(10) =10

fetch(11) =11

Use requests+ThreadPoolExecutor cost:3.356502056121826

進(jìn)程池的執(zhí)行結(jié)果:

fetch(0) = 0

fetch(1) = 1

fetch(2) = 2

fetch(3) = 3

fetch(4) = 4

fetch(5) = 5

fetch(6) = 6

fetch(7) = 7

fetch(8) = 8

fetch(9) = 9

fetch(10) = 10

fetch(11) = 11

Use requests+ProcessPoolExecutor cost: 3.2979931831359863

Asyncio+Requests+Pool

雖然上面提到requests不支持異步僻弹,但是在某些情形需要控制event loop中運行在單獨的線程/進(jìn)程中的function會阻塞直到這些function返回結(jié)果,這個時候可以結(jié)合run_in_executor()和wait()來進(jìn)行控制他嚷。

p.s:下面這個例子在處理純IO任務(wù)的時候并沒有太多的意義蹋绽,只是為了理解如何在不支持異步的模塊中引入異步的概念。

importasyncioimportrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor

NUMBERS = range(12)

URL ='http://httpbin.org/get?a={}'

def fetch(a):

r = requests.get(URL.format(a))

returnr.json()['args']['a']

asyncdef run_scraper_tasks(executor):

loop = asyncio.get_event_loop()

blocking_tasks = []

fornuminNUMBERS:

task = loop.run_in_executor(executor, fetch, num)

task.__num = num

blocking_tasks.append(task)

completed, pending =awaitasyncio.wait(blocking_tasks)

results = {t.__num: t.result()fortincompleted}

fornum, resultinsorted(results.items(), key=lambdax: x[0]):

print('fetch({}) = {}'.format(num, result))

start = time.time()

executor = ThreadPoolExecutor(3)

event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(

run_scraper_tasks(executor)

)

print('Use asyncio+requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

結(jié)果可想而知與requests+ThreadPoolExecutor執(zhí)行速度上并沒有太多的差別筋蓖,因為我們的IO任務(wù)還是放在對應(yīng)的子線程中去處理的卸耘,只是這里通過wait引入了異步的概念,但是在某些場景可以取得更大自由度程度的控制粘咖。

fetch(0) =0

fetch(1) =1

fetch(2) =2

fetch(3) =3

fetch(4) =4

fetch(5) =5

fetch(6) =6

fetch(7) =7

fetch(8) =8

fetch(9) =9

fetch(10) =10

fetch(11) =11

Use asyncio+requests+ThreadPoolExecutor cost:3.614989995956421

Semaphore

爬蟲一次性的產(chǎn)生過多的請求賬號/IP很快就會被封掉蚣抗,可以考慮使用Semaphore控制同時的并發(fā)量,與我們熟悉的threading模塊中的Semaphore(信號量)用法類似瓮下。

importaiohttpimportasyncio

NUMBERS = range(12)

URL ='http://httpbin.org/get?a={}'

sema = asyncio.Semaphore(3)

asyncdef fetch_async(a):

asyncwithaiohttp.request('GET', URL.format(a))asr:

data =awaitr.json()

returndata['args']['a']

asyncdef print_result(a):

with(awaitsema):

r =awaitfetch_async(a)

print('fetch({}) = {}'.format(a, r))

loop = asyncio.get_event_loop()

f = asyncio.wait([print_result(num)fornuminNUMBERS])

loop.run_until_complete(f)

可以到后臺看到并發(fā)受到了信號量的限制翰铡,同一時刻一般只處理三個請求。

References

DOCUMENTATION OF ASYNCIO1

DOCUMENTATION OF ASYNCIO2

COROUTINES AND ASYNC/AWAIT

GOLD-XITU1

GOLD-XITU2

STACKOVERFLOW

PyMOTW-3

500LINES

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末讽坏,一起剝皮案震驚了整個濱河市锭魔,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌震缭,老刑警劉巖赂毯,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異拣宰,居然都是意外死亡党涕,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門巡社,熙熙樓的掌柜王于貴愁眉苦臉地迎上來膛堤,“玉大人,你說我怎么就攤上這事晌该》世螅” “怎么了绿渣?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長燕耿。 經(jīng)常有香客問我中符,道長,這世上最難降的妖魔是什么誉帅? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任淀散,我火速辦了婚禮,結(jié)果婚禮上蚜锨,老公的妹妹穿的比我還像新娘档插。我一直安慰自己,他們只是感情好亚再,可當(dāng)我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布郭膛。 她就那樣靜靜地躺著,像睡著了一般氛悬。 火紅的嫁衣襯著肌膚如雪则剃。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天圆雁,我揣著相機(jī)與錄音忍级,去河邊找鬼帆谍。 笑死伪朽,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的汛蝙。 我是一名探鬼主播烈涮,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼窖剑!你這毒婦竟也來了坚洽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤西土,失蹤者是張志新(化名)和其女友劉穎讶舰,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體需了,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡跳昼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了肋乍。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鹅颊。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖墓造,靈堂內(nèi)的尸體忽然破棺而出堪伍,到底是詐尸還是另有隱情锚烦,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布帝雇,位于F島的核電站涮俄,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏尸闸。R本人自食惡果不足惜禽拔,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望室叉。 院中可真熱鬧睹栖,春花似錦、人聲如沸茧痕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽踪旷。三九已至曼氛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間令野,已是汗流浹背舀患。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留气破,地道東北人聊浅。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像现使,于是被迫代替她去往敵國和親低匙。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容