原文:http://www.reibang.com/p/4e048726b613
引言
隨著node.js的盛行,相信大家今年多多少少都聽到了異步編程這個概念。Python社區(qū)雖然對于異步編程的支持相比其他語言稍顯遲緩法梯,但是也在Python3.4中加入了asyncio酗昼,在Python3.5上又提供了async/await語法層面的支持杀捻,剛正式發(fā)布的Python3.6中asyncio也已經(jīng)由臨時版改為了穩(wěn)定版崔泵。下面我們就基于Python3.4+來了解一下異步編程的概念以及asyncio的用法。
什么是協(xié)程
通常在Python中我們進(jìn)行并發(fā)編程一般都是使用多線程或者多進(jìn)程來實現(xiàn)的险污,對于計算型任務(wù)由于GIL的存在我們通常使用多進(jìn)程來實現(xiàn)痹愚,而對與IO型任務(wù)我們可以通過線程調(diào)度來讓線程在執(zhí)行IO任務(wù)時讓出GIL,從而實現(xiàn)表面上的并發(fā)蛔糯。
其實對于IO型任務(wù)我們還有一種選擇就是協(xié)程拯腮,協(xié)程是運行在單線程當(dāng)中的“并發(fā)”,協(xié)程相比多線程一大優(yōu)勢就是省去了多線程之間的切換開銷蚁飒,獲得了更大的運行效率疾瓮。Python中的asyncio也是基于協(xié)程來進(jìn)行實現(xiàn)的。在進(jìn)入asyncio之前我們先來了解一下Python中怎么通過生成器進(jìn)行協(xié)程來實現(xiàn)并發(fā)飒箭。
example1
我們先來看一個簡單的例子來了解一下什么是協(xié)程(coroutine)狼电,對生成器不了解的朋友建議先看一下Stackoverflow上面的這篇高票回答。
>>> def coroutine():...reply =yield'hello'...yieldreply
...
>>>c = coroutine()
>>>next(c)'hello'
>>>c.send('world')'world'
example2
下面這個程序我們要實現(xiàn)的功能就是模擬多個學(xué)生同時向一個老師提交作業(yè)弦蹂,按照傳統(tǒng)的話我們或許要采用多線程/多進(jìn)程肩碟,但是這里我們可以采用生成器來實現(xiàn)協(xié)程用來模擬并發(fā)。
fromcollectionsimportdeque
def student(name, homeworks):
forhomeworkinhomeworks.items():
yield(name, homework[0], homework[1])# 學(xué)生"生成"作業(yè)給老師
class Teacher(object):
def __init__(self, students):
self.students = deque(students)
def handle(self):
"""老師處理學(xué)生作業(yè)"""
whilelen(self.students):
student = self.students.pop()
try:
homework = next(student)
print('handling', homework[0], homework[1], homework[2])
exceptStopIteration:
pass
else:
self.students.appendleft(student)
下面我們來調(diào)用一下這個程序凸椿。
Teacher([
student('Student1', {'math':'1+1=2','cs':'operating system'}),
student('Student2', {'math':'2+2=4','cs':'computer graphics'}),
student('Student3', {'math':'3+3=5','cs':'compiler construction'})
]).handle()
這是輸出結(jié)果削祈,我們僅僅只用了一個簡單的生成器就實現(xiàn)了并發(fā)(concurrence),注意不是并行(parallel)脑漫,因為我們的程序僅僅是運行在一個單線程當(dāng)中髓抑。
handling Student3 cs compiler construction
handling Student2 cs computer graphics
handling Student1 cs operating system
handling Student3 math 3+3=5
handling Student2 math 2+2=4
handling Student1 math 1+1=2
使用asyncio模塊實現(xiàn)協(xié)程
從Python3.4開始asyncio模塊加入到了標(biāo)準(zhǔn)庫,通過asyncio我們可以輕松實現(xiàn)協(xié)程來完成異步IO操作优幸。
解釋一下下面這段代碼吨拍,我們自己定義了一個協(xié)程display_date(num, loop),然后它使用關(guān)鍵字yield from來等待協(xié)程asyncio.sleep(2)的返回結(jié)果网杆。而在這等待的2s之間它會讓出CPU的執(zhí)行權(quán)羹饰,直到asyncio.sleep(2)返回結(jié)果伊滋。gather()或者wait()來返回future的執(zhí)行結(jié)果。
# coroutine.pyimportasyncioimportdatetime
@asyncio.coroutine ?# 聲明一個協(xié)程def display_date(num, loop):
end_time = loop.time() +10.0
whileTrue:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if(loop.time() +1.0) >= end_time:
break
yieldfromasyncio.sleep(2)# 阻塞直到協(xié)程sleep(2)返回結(jié)果
loop = asyncio.get_event_loop()# 獲取一個event_loop
tasks = [display_date(1, loop), display_date(2, loop)]
loop.run_until_complete(asyncio.gather(*tasks))# 阻塞直到所有的tasks完成
loop.close()
下面是運行結(jié)果队秩,注意到并發(fā)的效果沒有笑旺,程序從開始到結(jié)束只用大約10s,而在這里我們并沒有使用任何的多線程/多進(jìn)程代碼馍资。在實際項目中你可以將asyncio.sleep(secends)替換成相應(yīng)的IO任務(wù)筒主,比如數(shù)據(jù)庫/磁盤文件讀寫等操作。
ziwenxie :: ~ ? python coroutine.py
Loop:1Time:2016-12-1916:06:46.515329
Loop:2Time:2016-12-1916:06:46.515446
Loop:1Time:2016-12-1916:06:48.517613
Loop:2Time:2016-12-1916:06:48.517724
Loop:1Time:2016-12-1916:06:50.520005
Loop:2Time:2016-12-1916:06:50.520169
Loop:1Time:2016-12-1916:06:52.522452
Loop:2Time:2016-12-1916:06:52.522567
Loop:1Time:2016-12-1916:06:54.524889
Loop:2Time:2016-12-1916:06:54.525031
Loop:1Time:2016-12-1916:06:56.527713
Loop:2Time:2016-12-1916:06:56.528102
在Python3.5中為我們提供更直接的對協(xié)程的支持鸟蟹,引入了async/await關(guān)鍵字乌妙,上面的代碼我們可以這樣改寫,使用async代替了@asyncio.coroutine戏锹,使用了await代替了yield from冠胯,這樣我們的代碼變得更加簡潔可讀火诸。
importasyncioimportdatetime
asyncdef display_date(num, loop):# 聲明一個協(xié)程
end_time = loop.time() +10.0
whileTrue:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if(loop.time() +1.0) >= end_time:
break
awaitasyncio.sleep(2)# 等同于yield from
loop = asyncio.get_event_loop()# 獲取一個event_loop
tasks = [display_date(1, loop), display_date(2, loop)]
loop.run_until_complete(asyncio.gather(*tasks))# 阻塞直到所有的tasks完成
loop.close()
asyncio模塊的其他方法
開啟事件循環(huán)有兩種方法锦针,一種方法就是通過調(diào)用run_until_complete,另外一種就是調(diào)用run_forever置蜀。run_until_complete內(nèi)置add_done_callback奈搜,使用run_forever的好處是可以通過自己自定義add_done_callback,具體差異請看下面兩個例子盯荤。
run_until_complete()
importasyncio
asyncdef slow_operation(future):
awaitasyncio.sleep(1)
future.set_result('Future is done!')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
print(loop.is_running())# False
loop.run_until_complete(future)
print(future.result())
loop.close()
run_forever()
run_forever相比run_until_complete的優(yōu)勢是添加了一個add_done_callback馋吗,可以讓我們在task(future)完成的時候調(diào)用相應(yīng)的方法進(jìn)行后續(xù)處理。
importasyncio
asyncdef slow_operation(future):
awaitasyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop()
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)try:
loop.run_forever()finally:
loop.close()
這里還要注意一點秋秤,即使你調(diào)用了協(xié)程方法宏粤,但是如果事件循環(huán)沒有開啟,協(xié)程也不會執(zhí)行灼卢,參考官方文檔的描述绍哎,我剛被坑過。
Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There aretwobasic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using theensure_future()function or theAbstractEventLoop.create_task()method. Coroutines (and tasks) can only run when the event loop is running.
Call
call_soon()
importasyncio
def hello_world(loop):
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
下面是運行結(jié)果鞋真,我們可以通過call_soon提前注冊我們的task崇堰,并且也可以根據(jù)返回的Handle進(jìn)行cancel。
Hello World
call_later()
importasyncioimportdatetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if(loop.time() +1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.get_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() +5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
改動一下上面的例子我們來看一下call_later的用法涩咖,注意這里并沒有像上面那樣使用while循環(huán)進(jìn)行操作海诲,我們可以通過call_later來設(shè)置每隔1秒去調(diào)用display_date()方法。
2016-12-24 19:17:13.421649
2016-12-24 19:17:14.422933
2016-12-24 19:17:15.424315
2016-12-24 19:17:16.425571
2016-12-24 19:17:17.426874
Chain coroutines
importasyncio
asyncdef compute(x, y):
print("Compute %s + %s ..."% (x, y))
awaitasyncio.sleep(1.0)# 協(xié)程compute不會繼續(xù)往下面執(zhí)行檩互,直到協(xié)程sleep返回結(jié)果
returnx + y
asyncdef print_sum(x, y):
result =awaitcompute(x, y)# 協(xié)程print_sum不會繼續(xù)往下執(zhí)行特幔,直到協(xié)程compute返回結(jié)果
print("%s + %s = %s"% (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1,2))
loop.close()
下面是輸出結(jié)果
ziwenxie :: ~ ? python chain.py
Compute 1 + 2 ...
1 + 2 = 3
如何將同步的代碼改成異步
結(jié)合上面提到的內(nèi)容下面來小結(jié)一下如何將同步的代碼改成異步
同步模型
def handle(id):
subject = get_subject_from_db(id)# 1
buyinfo = get_buyinfo(id)# 2
change = process(subject, buyinfo)
notify_change(change)
flush_cache(id)
上面是一個典型的同步編程模型,每個步驟必須建立在上一個步驟完成的前提闸昨,但是注意到步驟1和步驟2之間并沒有任何的關(guān)系敬辣,所以可以將這兩個IO型改成異步的雪标,讓兩者可以并發(fā)進(jìn)行。
異步模型
# 先要將get_subject_from_db, get_buyinfo, process, notify_change修改成協(xié)程函數(shù)/方法importasyncio
def handle(id):
subject = asyncio.ensure_future(get_subject_from_db(id))# 1
buyinfo = asyncio.ensure_future(get_buyinfo(id))# 2
results = asyncio.gather(subject, buyinfo)
change =awaitprocess(results)
awaitnotify_change(change)
loop.call_soon(flush_cache(id))
使用ensure_future, loop.crate_task, Task可以將協(xié)程包裝成一個Future對象溉跃,這里我們選擇ensure_future村刨。
Queue
在asyncio使用Queue來模擬生產(chǎn)者-消費者模式:
importasyncioimportrandom
asyncdef produce(queue, n):
forxinrange(n):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
awaitasyncio.sleep(random.random())
item = str(x)
# put the item in the queue
awaitqueue.put(item)
asyncdef consume(queue):
whileTrue:
# wait for an item from the producer
item =awaitqueue.get()
# process the item
print('consuming {}...'.format(item))
# simulate i/o operation using sleep
awaitasyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
asyncdef run(n):
queue = asyncio.Queue()
# schedule the consumer
consumer = asyncio.ensure_future(consume(queue))
# run the producer and wait for completion
awaitproduce(queue, n)
# wait until the consumer has processed all items
awaitqueue.join()
# the consumer is still awaiting for an item, cancel it
consumer.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
loop.close()
實戰(zhàn)
by the way:在asyncio中使用requests沒有任何意義,requests是基于同步實現(xiàn)的撰茎,目前也沒有要支持asyncio的動向嵌牺,如果要充分發(fā)回異步的威力,應(yīng)該使用aiohttp龄糊。而且也要合理使用concurrent.futures模塊提供的線程池/進(jìn)程池逆粹。
Asyncio+Aiohttp
importaiohttpimportasyncioimporttime
NUMBERS = range(12)
URL ='http://httpbin.org/get?a={}'
asyncdef fetch_async(a):
asyncwithaiohttp.request('GET', URL.format(a))asr:
data =awaitr.json()
returndata['args']['a']
start = time.time()
event_loop = asyncio.get_event_loop()
tasks = [fetch_async(num)fornuminNUMBERS]
results = event_loop.run_until_complete(asyncio.gather(*tasks))
fornum, resultinzip(NUMBERS, results):
print('fetch({}) = {}'.format(num, result))
print('Use asyncio+aiohttp cost: {}'.format(time.time() - start))
下面是運行結(jié)果:
ziwenxie :: ~ ? python example1.py
fetch(0) =0
fetch(1) =1
fetch(2) =2
fetch(3) =3
fetch(4) =4
fetch(5) =5
fetch(6) =6
fetch(7) =7
fetch(8) =8
fetch(9) =9
fetch(10) =10
fetch(11) =11
Use asyncio+aiohttp cost:0.8980867862701416
Requests+Pool
如果使用傳統(tǒng)的Requests和ThreadPool/ProcessPool方式的話,由于多線程/多進(jìn)程之間切換的開銷速度會慢了許多炫惩。
importrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor
NUMBERS = range(12)
URL ='http://httpbin.org/get?a={}'
def fetch(a):
r = requests.get(URL.format(a))
returnr.json()['args']['a']
start = time.time()withThreadPoolExecutor(max_workers=3)asexecutor:
fornum, resultinzip(NUMBERS, executor.map(fetch, NUMBERS)):
print('fetch({}) = {}'.format(num, result))
print('Use requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))
線程池的執(zhí)行結(jié)果:
ziwenxie :: ~ ? python example2.py
fetch(0) =0
fetch(1) =1
fetch(2) =2
fetch(3) =3
fetch(4) =4
fetch(5) =5
fetch(6) =6
fetch(7) =7
fetch(8) =8
fetch(9) =9
fetch(10) =10
fetch(11) =11
Use requests+ThreadPoolExecutor cost:3.356502056121826
進(jìn)程池的執(zhí)行結(jié)果:
fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use requests+ProcessPoolExecutor cost: 3.2979931831359863
Asyncio+Requests+Pool
雖然上面提到requests不支持異步僻弹,但是在某些情形需要控制event loop中運行在單獨的線程/進(jìn)程中的function會阻塞直到這些function返回結(jié)果,這個時候可以結(jié)合run_in_executor()和wait()來進(jìn)行控制他嚷。
p.s:下面這個例子在處理純IO任務(wù)的時候并沒有太多的意義蹋绽,只是為了理解如何在不支持異步的模塊中引入異步的概念。
importasyncioimportrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor
NUMBERS = range(12)
URL ='http://httpbin.org/get?a={}'
def fetch(a):
r = requests.get(URL.format(a))
returnr.json()['args']['a']
asyncdef run_scraper_tasks(executor):
loop = asyncio.get_event_loop()
blocking_tasks = []
fornuminNUMBERS:
task = loop.run_in_executor(executor, fetch, num)
task.__num = num
blocking_tasks.append(task)
completed, pending =awaitasyncio.wait(blocking_tasks)
results = {t.__num: t.result()fortincompleted}
fornum, resultinsorted(results.items(), key=lambdax: x[0]):
print('fetch({}) = {}'.format(num, result))
start = time.time()
executor = ThreadPoolExecutor(3)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(
run_scraper_tasks(executor)
)
print('Use asyncio+requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))
結(jié)果可想而知與requests+ThreadPoolExecutor執(zhí)行速度上并沒有太多的差別筋蓖,因為我們的IO任務(wù)還是放在對應(yīng)的子線程中去處理的卸耘,只是這里通過wait引入了異步的概念,但是在某些場景可以取得更大自由度程度的控制粘咖。
fetch(0) =0
fetch(1) =1
fetch(2) =2
fetch(3) =3
fetch(4) =4
fetch(5) =5
fetch(6) =6
fetch(7) =7
fetch(8) =8
fetch(9) =9
fetch(10) =10
fetch(11) =11
Use asyncio+requests+ThreadPoolExecutor cost:3.614989995956421
Semaphore
爬蟲一次性的產(chǎn)生過多的請求賬號/IP很快就會被封掉蚣抗,可以考慮使用Semaphore控制同時的并發(fā)量,與我們熟悉的threading模塊中的Semaphore(信號量)用法類似瓮下。
importaiohttpimportasyncio
NUMBERS = range(12)
URL ='http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)
asyncdef fetch_async(a):
asyncwithaiohttp.request('GET', URL.format(a))asr:
data =awaitr.json()
returndata['args']['a']
asyncdef print_result(a):
with(awaitsema):
r =awaitfetch_async(a)
print('fetch({}) = {}'.format(a, r))
loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num)fornuminNUMBERS])
loop.run_until_complete(f)
可以到后臺看到并發(fā)受到了信號量的限制翰铡,同一時刻一般只處理三個請求。
References