引言
抨擊線程的往往是系統(tǒng)程序員子姜,他們考慮的使用場景對(duì)一般的應(yīng)用程序員來說,也許一生都不會(huì)遇到……應(yīng)用程序員遇到的使用場景陕悬,99% 的情況下只需知道如何派生一堆獨(dú)立的線程灌诅,然后用隊(duì)列收集結(jié)果 ——Michele Simionato
碰到多線程總是很頭疼的問題,什么父線程子線程单刁,線程間通信灸异,線程隊(duì)列,阻塞,死鎖呀绎狭,頭疼的一比细溅,壓根沒有心思學(xué)下去。直到我看到了上面那句話讓我的心理得到了一絲安慰儡嘶。的確喇聊,作為程序員,接下去我們不需要管哪些花里胡哨的東西蹦狂,安心使用多線程完成自己的tasks就好誓篱。
簡單介紹Threading模塊
Python3廢棄了原來的thread模塊,換成了高級(jí)的threading
模塊凯楔。
threading
庫可用來在單獨(dú)的線程中執(zhí)行任意的Python可調(diào)用對(duì)象,你可以創(chuàng)建一個(gè)Thread
對(duì)象并將你要執(zhí)行的對(duì)象以target 參數(shù)的形式提供給該對(duì)象窜骄。下面是一個(gè)簡單的例子:
# 下面是"執(zhí)行一個(gè)獨(dú)立的線程"的代碼
import time
def countdown(n):
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
# 創(chuàng)建并運(yùn)行一個(gè)線程
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()
- 當(dāng)創(chuàng)建一個(gè)線程實(shí)例時(shí),在調(diào)用它的
start()
之前(需要提供目標(biāo)函數(shù)以及相應(yīng)的參數(shù)),線程并不會(huì)立刻執(zhí)行. - 可以使用
t.is_alive()
來查看線程是否還在運(yùn)行 - 可以使用
t.join()
請(qǐng)求連接(join)到某個(gè)線程上,這么做會(huì)等待該線程結(jié)束 - 于需要長時(shí)間運(yùn)行的線程或者需要一直運(yùn)行的后臺(tái)任務(wù),你應(yīng)當(dāng)考慮使用后臺(tái)線程摆屯。例如
t = Thread(target=countdown, args=(10,), daemon=True)
然后t.start()
或者你也可以通過繼承 threading.Thread 創(chuàng)建子類邻遏,而不是直接調(diào)用 Thread 函數(shù)。例子如下:
from threading import Thread
class CountdownThread(Thread):
def __init__(self, n):
super().__init__()
self.n = 0
def run(self):
while self.n > 0:
print('T-minus', self.n)
self.n -= 1
time.sleep(5)
c = CountdownThread(5)
c.start()
盡管這樣也可以工作虐骑,但這使得你的代碼依賴于threading 庫准验,所以你的這些代碼只能在線程上下文中使用。
好了打住打住廷没,上面只是簡單說明一下threading
這個(gè)庫的用法糊饱,下面才是所謂的程序員遇到的使用場景。
創(chuàng)建線程池來實(shí)現(xiàn)多線程
為了高效處理網(wǎng)絡(luò)I/O,需要使用并發(fā),因?yàn)榫W(wǎng)絡(luò)有很高的延遲,所以為了不浪費(fèi)CPU周期去等待,最好在收到網(wǎng)絡(luò)響應(yīng)之前做些其他的事颠黎。 (相關(guān)概念之前文章有解惑另锋,請(qǐng)看這里)
好了,少廢話狭归,直接看例子夭坪。
在國家地理中文網(wǎng)點(diǎn)開每日一圖。每個(gè)網(wǎng)址對(duì)應(yīng)每張圖:
[圖片上傳失敗...(image-c92495-1514007801847)]
注意網(wǎng)址中的5058唉铜。不同的每日一圖台舱,變的只是這個(gè)數(shù)字,比圖前面的那張數(shù)字就是5057潭流,或者5056竞惋。但是有時(shí)候某個(gè)數(shù)字的網(wǎng)址可能不存在,比如5057不存在灰嫉,我們當(dāng)它是那天忘了更新拆宛。所以我們進(jìn)行網(wǎng)絡(luò)鏈接,判斷從n到最新的5058哪幾個(gè)網(wǎng)址是有效的讼撒,存在每日一圖浑厚,方便我們后續(xù)的爬取圖片股耽。
import requests
import time
import concurrent.futures
msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}
def is_valid_url(n):
req = requests.get(msg.format(n), headers=headers)
# 切莫頻繁,每次請(qǐng)求后停個(gè)0.2s =.=
time.sleep(0.2)
return True if req.status_code == 200 else False
def get_valid_url(page_start):
start = time.time()
# 這里的max_worker=50 我是隨便取的,別太大,小于任務(wù)數(shù)就好,本身線程越多,切換線程消耗越大
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
result_gen = executor.map(is_valid_url, range(page_start, 5059))
num = len([result for result in result_gen if result])
end = time.time()
print("cost time:", end - start)
return "valid url num: " + str(num)
get_valid_url(5000) ,即從5000到5058,結(jié)果得到:
cost time: 0.7712774276733398
valid url num: 21
get_valid_url(4000) 即從4000到5058,結(jié)果得到:
cost time: 9.259612798690796
valid url num: 273
阻塞性I/O與GIL
上面的例子耗時(shí)還是很快的,你們可以去試一下不用多線程,只是for循環(huán)運(yùn)行,耗時(shí)將高達(dá)兩位數(shù)乃至三位數(shù)。
CPython 解釋器本身就不是線程安全的钳幅,因此有全局解釋器鎖(GIL)物蝙,一次只允許使用一個(gè)線程執(zhí)行 Python 字節(jié)碼。因此敢艰,一個(gè) Python 進(jìn)程通常不能同時(shí)使用多個(gè) CPU 核心诬乞。這是 CPython 解釋器的局限,與 Python 語言本身無關(guān)钠导。Jython 和 IronPython 沒有這種限制震嫉。不過,目前最快的 Python解釋器 PyPy 也有 GIL牡属。
編寫 Python 代碼時(shí)無法控制 GIL票堵;不過,執(zhí)行耗時(shí)的任務(wù)時(shí)逮栅,可以使用一個(gè)內(nèi)置的函數(shù)或一個(gè)使用 C 語言編寫的擴(kuò)展釋放 GIL悴势。其實(shí),有個(gè)使用 C 語言編寫的 Python 庫能管理GIL措伐,自行啟動(dòng)操作系統(tǒng)線程瞳浦,利用全部可用的 CPU 核心。這樣做會(huì)極大地增加庫代碼的復(fù)雜度废士,因此大多數(shù)庫的作者都不這么做。
然而蝇完,標(biāo)準(zhǔn)庫中所有執(zhí)行阻塞型 I/O 操作的函數(shù)官硝,在等待操作系統(tǒng)返回結(jié)果時(shí)都會(huì)釋放GIL。這意味著在 Python 語言這個(gè)層次上可以使用多線程短蜕,而 I/O 密集型 Python 程序能從中受益:一個(gè) Python 線程等待網(wǎng)絡(luò)響應(yīng)時(shí)氢架,阻塞型 I/O 函數(shù)會(huì)釋放 GIL,再運(yùn)行一個(gè)線程朋魔。
總結(jié):唯有在處理CPU密集型的時(shí)候才需要考慮GIL岖研,I/O密集型的處理則不必
使用concurrent.futures模塊
concurrent.futures
模塊的主要特色是 ThreadPoolExecutor
和ProcessPoolExecutor
類,這兩個(gè)類實(shí)現(xiàn)的接口能分別在不同的線程或進(jìn)程中執(zhí)行可調(diào)用的對(duì)象警检。這兩個(gè)類在內(nèi)部維護(hù)著一個(gè)工作線程或進(jìn)程池孙援,以及要執(zhí)行的任務(wù)隊(duì)列。不過扇雕,這個(gè)接口抽象的層級(jí)很高拓售,像上面的例子,無需關(guān)心任何實(shí)現(xiàn)細(xì)節(jié)
concurrent.futures模塊中一些組件:
1. Executor.map方法
-
executor.map
方法返回的結(jié)果(results)是生成器镶奉,所以我這里有result_gen
表示 - 對(duì)生成器進(jìn)行循環(huán)相當(dāng)于使用
next()
方法,獲取各個(gè)函數(shù)返回的值``
2. future 以及 Executor.submit方法
-
future
是concurrent.futures
模塊的重要組件,是concurrent.futures.Future
的一個(gè)實(shí)例 - 通常情況下自己不應(yīng)該創(chuàng)建
future
础淤,而只能由并發(fā)框架(concurrent.futures
實(shí)例化)崭放。原因很簡單:future表示終將發(fā)生的事情,而確定某件事會(huì)發(fā)生的唯一方式是執(zhí)行的時(shí)間已經(jīng)排定鸽凶。因此币砂,只有排定把某件事交給concurrent.futures.Executor
子類處理時(shí),生成concurrent.futures.Future
實(shí)例玻侥。例如决摧,Executor.submit()
方法的參數(shù)是一個(gè)可調(diào)用的對(duì)象,調(diào)用這個(gè)方法后會(huì)為傳入的可調(diào)用對(duì)象排期使碾,并返回一個(gè)future蜜徽。
注意: 使用submit
會(huì)返回future;而Executor.map
在過程中悄悄地已經(jīng)使用future:返回值是一個(gè)迭代器票摇,迭代器的__next__
方法調(diào)用各個(gè)future的 result 方法拘鞋,因此我們得到的是各個(gè)期物的結(jié)果,而非future本身矢门。
那么兩者區(qū)別就可以看見: executor.submit
和 futures.as_completed
這個(gè)組合比 executor.map 更靈活,因?yàn)?submit 方法能處理不同的可調(diào)用對(duì)象和參數(shù)隔躲,而 executor.map只能處理參數(shù)不同的同一個(gè)可調(diào)用對(duì)象 (跟內(nèi)置函數(shù)map一樣的用法)物延。
此外宣旱,傳給 futures.as_completed
函數(shù)的future集合可以來自多個(gè) Executor 實(shí)例,例如一些由 ThreadPoolExecutor
實(shí)例創(chuàng)建叛薯,另一些由 ProcessPoolExecutor
實(shí)例創(chuàng)建
3. future的方法
-
.done
:不阻塞,返回布爾值耗溜,指明future鏈接的可調(diào)用對(duì)象是否都已經(jīng)執(zhí)行 -
.add_done_callback()
:future運(yùn)行結(jié)束后會(huì)調(diào)用參數(shù)內(nèi)的可調(diào)用對(duì)象 -
.result
:返回可調(diào)用對(duì)象的結(jié)果,阻塞
這里有一個(gè)executor.submit
的實(shí)例:
import requests
import time
import concurrent.futures
msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}
def is_valid_url(n):
req = requests.get(msg.format(n), headers=headers)
time.sleep(0.2)
return True if req.status_code == 200 else False
# 一個(gè)求平方的任務(wù),準(zhǔn)備也加在里面執(zhí)行
def square(a):
time.sleep(0.1)
output = a **2
print(output)
return output
def get_valid_url(page_start):
start = time.time()
num =0
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
# future1 代表檢查網(wǎng)址有效性的任務(wù)
futures1 = [executor.submit(is_valid_url,n) for n in range(page_start,5059)]
# future2 代表求平方的任務(wù)
futures2 = [executor.submit(square,3)]
# 放在一起
all_futures = futures1 +futures2
# 傳入futures.as_completed完成future,返回一個(gè)迭代器.可以通過循環(huán)得到已經(jīng)完成的future
for future in concurrent.futures.as_completed(all_futures):
result = future.result()
if result == True:
num+=1
end = time.time()
print("cost time:", end - start)
return "valid url num: " + str(num)
print(get_valid_url(5000))
相似的進(jìn)程池
concurrent.futures
模塊的文檔副標(biāo)題是“Launching paralleltasks”(執(zhí)行并行任務(wù))抖拴。這個(gè)模塊實(shí)現(xiàn)的是真正的并行計(jì)算燎字,因?yàn)樗褂?code>ProcessPoolExecutor 類把工作分配給多個(gè) Python 進(jìn)程處理。因此候衍,如果需要做 CPU密集型處理家夺,使用這個(gè)模塊能繞開 GIL,利用所有可用的 CPU 核心拉馋。
ProcessPoolExecutor
和 ThreadPoolExecutor
類都實(shí)現(xiàn)了通用的 Executor 接口,因此使用 concurrent.futures 模塊能特別輕松地把基于線程的方案轉(zhuǎn)成基于進(jìn)程的方案随闺。只需要將with futures.ThreadPoolExecutor(workers) as executor:
改with futures.ProcessPoolExecutor() as executor:
即可。但是對(duì)于上面那個(gè)例子用多進(jìn)程沒有意義矩乐,可能花費(fèi)的時(shí)間更長散罕,這里只是簡單提及一下。
官方文檔的這個(gè)例子就很不錯(cuò):
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
總結(jié)
相對(duì)于threading與queue模塊的結(jié)合,
futures.ThreadPoolExecutor
類已經(jīng)在內(nèi)部封裝了這些組件,這對(duì)于我們平常的作業(yè)來說已經(jīng)綽綽有余职抡。除非要更靈活误甚。自行定制方案,方能用上前者對(duì) CPU 密集型工作來說窑邦,要啟動(dòng)多個(gè)進(jìn)程,規(guī)避 GIL郊丛。創(chuàng)建多個(gè)進(jìn)程最簡單的方式是瞧筛,使用
futures.ProcessPoolExecutor
類宾袜。不過和前面一樣驾窟,如果使用場景較復(fù)雜认轨,需要更高級(jí)的工具嘁字。multiprocessing
模塊。多線程和多進(jìn)程并發(fā)的低層實(shí)現(xiàn)(但卻更靈活)——
threading
和multiprocessing
模塊纪蜒。這兩個(gè)模塊代表在 Python 中使用線程和進(jìn)程的傳統(tǒng)方式。
參考資料
David beazley協(xié)程
Fluent Python
Python Cookbook