原文轉(zhuǎn)載自「劉悅的技術(shù)博客」https://v3u.cn/a_id_221
一般情況下,大家對(duì)Python原生的并發(fā)/并行工作方式:進(jìn)程炸宵、線程和協(xié)程的關(guān)系與區(qū)別都能講清楚辟躏。甚至具體的對(duì)象名稱、內(nèi)置方法都可以如數(shù)家珍土全,這顯然是極好的捎琐,但我們其實(shí)都忽略了一個(gè)問題会涎,就是具體應(yīng)用場景,三者的使用目的是一樣的瑞凑,話句話說末秃,使用結(jié)果是一樣的,都可以提高程序運(yùn)行的效率籽御,但到底那種場景用那種方式更好一點(diǎn)练慕?
這就好比,目前主流的汽車發(fā)動(dòng)機(jī)變速箱無外乎三種:雙離合技掏、CVT以及傳統(tǒng)AT铃将。主機(jī)廠把它們搭載到不同的發(fā)動(dòng)機(jī)和車型上,它們都是變速箱零截,都可以將發(fā)動(dòng)機(jī)產(chǎn)生的動(dòng)力作用到車輪上麸塞,但不同使用場景下到底該選擇那種變速箱?這顯然也是一個(gè)問題涧衙。
所謂“無場景哪工,不功能”,本次我們來討論一下弧哎,具體的并發(fā)編程場景有哪些惕虑,并且對(duì)應(yīng)到具體場景,應(yīng)該怎么選擇并發(fā)手段和方式奸柬。
什么是并發(fā)和并行鹉梨?
在討論場景之前,我們需要將多任務(wù)執(zhí)行的方式進(jìn)行一下分類序攘,那就是并發(fā)方式和并行方式茴她。教科書上告訴我們:并行是指兩個(gè)或者多個(gè)事件在同一時(shí)刻發(fā)生;而并發(fā)是指兩個(gè)或多個(gè)事件在同一時(shí)間間隔內(nèi)發(fā)生程奠。 在多道程序環(huán)境下丈牢,并發(fā)性是指在一段時(shí)間內(nèi)宏觀上有多個(gè)程序在同時(shí)運(yùn)行,但在單處理機(jī)系統(tǒng)中瞄沙,每一時(shí)刻卻僅能有一道程序執(zhí)行己沛,故微觀上這些程序只能是分時(shí)地交替執(zhí)行。
好像有那么一點(diǎn)抽象距境,好吧申尼,讓我們務(wù)實(shí)一點(diǎn),由于GIL全局解釋器鎖的存在垫桂,在Python編程領(lǐng)域师幕,我們可以簡單粗暴地將并發(fā)和并行用程序通過能否使用多核CPU來區(qū)分,能使用多核CPU就是并行伪货,不能使用多核CPU们衙,只能單核處理的钾怔,就是并發(fā)。就這么簡單蒙挑,是的宗侦,Python的GIL全局解釋器鎖幫我們把問題簡化了, 這是Python的大幸忆蚀?還是不幸矾利?
Python中并發(fā)任務(wù)實(shí)現(xiàn)方式包含:多線程threading和協(xié)程asyncio,它們的共同點(diǎn)都是交替執(zhí)行馋袜,而區(qū)別是多線程threading是搶占式的男旗,而協(xié)程asyncio是協(xié)作式的,原理也很簡單欣鳖,只有一顆CPU可以用察皇,而一顆CPU一次只能做一件事,所以只能靠不停地切換才能完成并發(fā)任務(wù)泽台。
Python中并行任務(wù)的實(shí)現(xiàn)方式是多進(jìn)程multiprocessing什荣,通過multiprocessing庫,Python可以在程序主進(jìn)程中創(chuàng)建新的子進(jìn)程怀酷。這里的一個(gè)進(jìn)程可以被認(rèn)為是一個(gè)幾乎完全不同的程序稻爬,盡管從技術(shù)上講,它們通常被定義為資源集合蜕依,其中資源包括內(nèi)存桅锄、文件句柄等。換一種說法是样眠,每個(gè)子進(jìn)程都擁有自己的Python解釋器友瘤,因此,Python中的并行任務(wù)可以使用一顆以上的CPU檐束,每一顆CPU都可以跑一個(gè)進(jìn)程商佑,是真正的同時(shí)運(yùn)行,而不需要切換厢塘,如此Python就可以完成并行任務(wù)。
什么時(shí)候使用并發(fā)肌幽?IO密集型任務(wù)
現(xiàn)在我們搞清楚了晚碾,Python里的并發(fā)運(yùn)行方式就是多線程threading和協(xié)程asyncio,那么什么場景下使用它們喂急?
一般情況下格嘁,任務(wù)場景,或者說的更準(zhǔn)確一些廊移,任務(wù)類型糕簿,無非兩種:CPU密集型任務(wù)和IO密集型任務(wù)探入。
什么是IO密集型任務(wù)?IO就是Input-Output的縮寫懂诗,說白了就是程序的輸入和輸出蜂嗽,想一想確實(shí)就是這樣,您的電腦殃恒,它不就是這兩種功能嗎植旧?用鍵盤、麥克風(fēng)离唐、攝像頭輸入數(shù)據(jù)病附,然后再用屏幕和音箱進(jìn)行輸出操作。
但輸入和輸出操作要比電腦中的CPU運(yùn)行速度慢亥鬓,換句話說完沪,CPU得等著這些比它慢的輸入和輸出操作,說白了就是CPU運(yùn)算一會(huì)嵌戈,就得等這些IO操作覆积,等IO操作完了,CPU才能繼續(xù)運(yùn)算一會(huì)咕别,然后再等著IO操作技健,如圖所示:
[圖片上傳失敗...(image-25c8de-1659364626164)]
由此可知,并發(fā)適合這種IO操作密集和頻繁的工作惰拱,因?yàn)榫退鉉PU是蘋果最新ARM架構(gòu)的M2芯片雌贱,也沒有用武之地。
另外偿短,如果把IO密集型任務(wù)具象化欣孤,那就是我們經(jīng)常操作的:硬盤讀寫(數(shù)據(jù)庫讀寫)、網(wǎng)絡(luò)請(qǐng)求昔逗、文件的打印等等降传。
并發(fā)方式的選擇:多線程threading還是協(xié)程asyncio?
既然涉及硬盤讀寫(數(shù)據(jù)庫讀寫)勾怒、網(wǎng)絡(luò)請(qǐng)求婆排、文件打印等任務(wù)都算并發(fā)任務(wù),那我們就真正地實(shí)踐一下笔链,看看不同的并發(fā)方式到底能提升多少效率段只?
一個(gè)簡單的小需求,對(duì)本站數(shù)據(jù)進(jìn)行重復(fù)抓取操作鉴扫,并計(jì)算首頁數(shù)據(jù)文本的行數(shù):
import requests
import time
def download_site(url, session):
with session.get(url) as response:
print(f"下載了{(lán)len(response.content)}行數(shù)據(jù)")
def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)
if __name__ == "__main__":
sites = ["https://v3u.cn"] * 50
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"下載了 {len(sites)}次赞枕,執(zhí)行了{(lán)duration}秒")
在不使用任何并發(fā)手段的前提下,程序返回:
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了76347行數(shù)據(jù)
下載了 50 次數(shù)據(jù),執(zhí)行了8.781155824661255秒
[Finished in 9.6s]
這里程序的每一步都是同步操作炕婶,也就是說當(dāng)?shù)谝淮巫ト【W(wǎng)站首頁時(shí)姐赡,剩下的49次都在等待。
接著使用多線程threading來改造程序:
import concurrent.futures
import requests
import threading
import time
thread_local = threading.local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
with session.get(url) as response:
print(f"下載了{(lán)len(response.content)}行數(shù)據(jù)")
def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(download_site, sites)
if __name__ == "__main__":
sites = ["https://v3u.cn"] * 50
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"下載了 {len(sites)}次柠掂,執(zhí)行了{(lán)duration}秒")
這里通過with關(guān)鍵詞開啟線程池上下文管理器项滑,并發(fā)8個(gè)線程進(jìn)行下載,程序返回:
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了 50次陪踩,執(zhí)行了7.680492877960205秒
很明顯杖们,效率上有所提升,事實(shí)上肩狂,每個(gè)線程其實(shí)是在不驼辏“切換”著運(yùn)行,這就節(jié)省了單線程每次等待爬取結(jié)果的時(shí)間:
[圖片上傳失敗...(image-8a5baa-1659364626165)]
由此帶來了另外一個(gè)問題:上下文切換的時(shí)間開銷傻谁。
讓我們繼續(xù)改造孝治,用協(xié)程來一試鋒芒,首先安裝異步web請(qǐng)求庫aiohttp:
pip3 install aiohttp
改寫邏輯:
import asyncio
import time
import aiohttp
async def download_site(session, url):
async with session.get(url) as response:
print(f"下載了{(lán)response.content_length}行數(shù)據(jù)")
async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = []
for url in sites:
task = asyncio.ensure_future(download_site(session, url))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
sites = ["https://v3u.cn"] * 50
start_time = time.time()
asyncio.run(download_all_sites(sites))
duration = time.time() - start_time
print(f"下載了 {len(sites)}次审磁,執(zhí)行了{(lán)duration}秒")
程序返回:
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76424行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了76161行數(shù)據(jù)
下載了 50次谈飒,執(zhí)行了6.893810987472534秒
效率上百尺竿頭更進(jìn)一步,同樣的使用with關(guān)鍵字操作上下文管理器态蒂,協(xié)程使用asyncio.ensure_future()創(chuàng)建任務(wù)列表杭措,該列表還負(fù)責(zé)啟動(dòng)它們。創(chuàng)建所有任務(wù)后钾恢,使用asyncio.gather()來保持會(huì)話上下文的實(shí)例手素,直到所有爬取任務(wù)完成。和多線程threading的區(qū)別是瘩蚪,協(xié)程并不需要切換上下文泉懦,因此每個(gè)任務(wù)所需的資源和創(chuàng)建時(shí)間要少得多,因此創(chuàng)建和運(yùn)行更多的任務(wù)效率更高:
[圖片上傳失敗...(image-6e8003-1659364626165)]
綜上疹瘦,并發(fā)邏輯歸根結(jié)底是減少CPU等待的時(shí)間崩哩,也就是讓CPU少等一會(huì)兒,而協(xié)程的工作方式顯然讓CPU等待的時(shí)間最少言沐。
并行方式:多進(jìn)程multiprocessing
再來試試多進(jìn)程multiprocessing邓嘹,并行能不能干并發(fā)的事?
import requests
import multiprocessing
import time
session = None
def set_global_session():
global session
if not session:
session = requests.Session()
def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process().name
print(f"讀了{(lán)len(response.content)}行")
def download_all_sites(sites):
with multiprocessing.Pool(initializer=set_global_session) as pool:
pool.map(download_site, sites)
if __name__ == "__main__":
sites = ["https://v3u.cn"] * 50
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"下載了 {len(sites)}次险胰,執(zhí)行了{(lán)duration}秒")
這里我們依然使用上下文管理器開啟進(jìn)程池吴超,默認(rèn)進(jìn)程數(shù)匹配當(dāng)前計(jì)算機(jī)的CPU核心數(shù),也就是有幾核就開啟幾個(gè)進(jìn)程鸯乃,程序返回:
讀了76000行
讀了76241行
讀了76044行
讀了75894行
讀了76290行
讀了76312行
讀了76419行
讀了76753行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
下載了 50次,執(zhí)行了8.195281982421875秒
雖然比同步程序要快,但無疑的缨睡,效率上要低于多線程和協(xié)程鸟悴。為什么?因?yàn)槎噙M(jìn)程不適合IO密集型任務(wù)奖年,雖然可以利用多核資源细诸,但沒有任何意義:
[圖片上傳失敗...(image-d7d1a2-1659364626165)]
無論開多少進(jìn)程,CPU都沒有用武之地陋守,多數(shù)情況下CPU都在等待IO操作震贵,也就是說,多核反而拖累了IO程序的執(zhí)行水评。
并行方式的選擇:CPU密集型任務(wù)
什么是CPU密集型任務(wù)猩系?這里我們可以使用逆定理:所有不涉及硬盤讀寫(數(shù)據(jù)庫讀寫)、網(wǎng)絡(luò)請(qǐng)求中燥、文件打印等任務(wù)都算CPU密集型任務(wù)任務(wù)寇甸,說白了就是,計(jì)算型任務(wù)疗涉。
以求平方和為例子:
import time
def cpu_bound(number):
return sum(i * i for i in range(number))
def find_sums(numbers):
for number in numbers:
cpu_bound(number)
if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]
start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"{duration}秒")
同步執(zhí)行20次拿霉,需要花費(fèi)多少時(shí)間?
4.466595888137817秒
再來試試并行方式:
import multiprocessing
import time
def cpu_bound(number):
return sum(i * i for i in range(number))
def find_sums(numbers):
with multiprocessing.Pool() as pool:
pool.map(cpu_bound, numbers)
if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]
start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"{duration}秒")
八核處理器咱扣,開八個(gè)進(jìn)程開始跑:
1.1755797863006592秒
不言而喻绽淘,并行方式有效提高了計(jì)算效率。
最后闹伪,既然之前用并行方式運(yùn)行了IO密集型任務(wù)沪铭,我們就再來試試用并發(fā)的方式運(yùn)行CPU密集型任務(wù):
import concurrent.futures
import time
def cpu_bound(number):
return sum(i * i for i in range(number))
def find_sums(numbers):
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(cpu_bound, numbers)
if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]
start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"{duration}秒")
單進(jìn)程開8個(gè)線程,走起:
4.452666759490967秒
如何祭往?和并行方式運(yùn)行IO密集型任務(wù)一樣伦意,可以運(yùn)行,但是沒有任何意義硼补。為什么驮肉?因?yàn)闆]有任何IO操作了,CPU不需要等待了已骇,CPU只要全力運(yùn)算即可离钝,所以你上多線程或者協(xié)程,無非就是畫蛇添足褪储、多此一舉卵渴。
結(jié)語
有經(jīng)驗(yàn)的汽修師傅會(huì)告訴你,想省油就選CVT和雙離合鲤竹,想質(zhì)量穩(wěn)定就選AT浪读,經(jīng)常高速上激烈駕駛就選雙離合,經(jīng)常市區(qū)內(nèi)堵車就選CVT;同樣地碘橘,作為經(jīng)驗(yàn)豐富的后臺(tái)研發(fā)互订,你也可以告訴汽修師傅,任何不需要CPU等待的任務(wù)就選擇并行(multiprocessing)的處理方式痘拆,而需要CPU等待時(shí)間過長的任務(wù)仰禽,選擇并發(fā)(threading/asyncio)。反過來纺蛆,我就想用CVT在高速上飆車吐葵,用雙離合在市區(qū)堵車,行不行桥氏?行温峭,但沒有意義,或者說的更準(zhǔn)確一些识颊,沒有任何額外的收益诚镰;而用并發(fā)方式執(zhí)行CPU密集型任務(wù),用并行方式執(zhí)行IO密集型任務(wù)行不行祥款?也行清笨,但依然沒有任何額外的收益, 無他刃跛,唯物無定味抠艾,適口者珍矣。
原文轉(zhuǎn)載自「劉悅的技術(shù)博客」 https://v3u.cn/a_id_221