Python并發(fā)之多線程與多進(jìn)程

引言

抨擊線程的往往是系統(tǒng)程序員子姜,他們考慮的使用場景對(duì)一般的應(yīng)用程序員來說,也許一生都不會(huì)遇到……應(yīng)用程序員遇到的使用場景陕悬,99% 的情況下只需知道如何派生一堆獨(dú)立的線程灌诅,然后用隊(duì)列收集結(jié)果 ——Michele Simionato

碰到多線程總是很頭疼的問題,什么父線程子線程单刁,線程間通信灸异,線程隊(duì)列,阻塞,死鎖呀绎狭,頭疼的一比细溅,壓根沒有心思學(xué)下去。直到我看到了上面那句話讓我的心理得到了一絲安慰儡嘶。的確喇聊,作為程序員,接下去我們不需要管哪些花里胡哨的東西蹦狂,安心使用多線程完成自己的tasks就好誓篱。

簡單介紹Threading模塊

Python3廢棄了原來的thread模塊,換成了高級(jí)的threading模塊凯楔。
threading庫可用來在單獨(dú)的線程中執(zhí)行任意的Python可調(diào)用對(duì)象,你可以創(chuàng)建一個(gè)Thread 對(duì)象并將你要執(zhí)行的對(duì)象以target 參數(shù)的形式提供給該對(duì)象窜骄。下面是一個(gè)簡單的例子:

# 下面是"執(zhí)行一個(gè)獨(dú)立的線程"的代碼
import time
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)
# 創(chuàng)建并運(yùn)行一個(gè)線程
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()
  • 當(dāng)創(chuàng)建一個(gè)線程實(shí)例時(shí),在調(diào)用它的start()之前(需要提供目標(biāo)函數(shù)以及相應(yīng)的參數(shù)),線程并不會(huì)立刻執(zhí)行.
  • 可以使用t.is_alive()來查看線程是否還在運(yùn)行
  • 可以使用t.join()請(qǐng)求連接(join)到某個(gè)線程上,這么做會(huì)等待該線程結(jié)束
  • 于需要長時(shí)間運(yùn)行的線程或者需要一直運(yùn)行的后臺(tái)任務(wù),你應(yīng)當(dāng)考慮使用后臺(tái)線程摆屯。例如t = Thread(target=countdown, args=(10,), daemon=True)然后t.start()

或者你也可以通過繼承 threading.Thread 創(chuàng)建子類邻遏,而不是直接調(diào)用 Thread 函數(shù)。例子如下:

from threading import Thread
class CountdownThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = 0
    def run(self):
        while self.n > 0:
            print('T-minus', self.n)
            self.n -= 1
            time.sleep(5)
            c = CountdownThread(5)
            c.start()

盡管這樣也可以工作虐骑,但這使得你的代碼依賴于threading 庫准验,所以你的這些代碼只能在線程上下文中使用。
好了打住打住廷没,上面只是簡單說明一下threading這個(gè)庫的用法糊饱,下面才是所謂的程序員遇到的使用場景

創(chuàng)建線程池來實(shí)現(xiàn)多線程

為了高效處理網(wǎng)絡(luò)I/O,需要使用并發(fā),因?yàn)榫W(wǎng)絡(luò)有很高的延遲,所以為了不浪費(fèi)CPU周期去等待,最好在收到網(wǎng)絡(luò)響應(yīng)之前做些其他的事颠黎。 (相關(guān)概念之前文章有解惑另锋,請(qǐng)看這里
好了,少廢話狭归,直接看例子夭坪。
在國家地理中文網(wǎng)點(diǎn)開每日一圖。每個(gè)網(wǎng)址對(duì)應(yīng)每張圖:
[圖片上傳失敗...(image-c92495-1514007801847)]

注意網(wǎng)址中的5058唉铜。不同的每日一圖台舱,變的只是這個(gè)數(shù)字,比圖前面的那張數(shù)字就是5057潭流,或者5056竞惋。但是有時(shí)候某個(gè)數(shù)字的網(wǎng)址可能不存在,比如5057不存在灰嫉,我們當(dāng)它是那天忘了更新拆宛。所以我們進(jìn)行網(wǎng)絡(luò)鏈接,判斷從n到最新的5058哪幾個(gè)網(wǎng)址是有效的讼撒,存在每日一圖浑厚,方便我們后續(xù)的爬取圖片股耽。

import requests
import time
import concurrent.futures

msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}


def is_valid_url(n):
    req = requests.get(msg.format(n), headers=headers)
    # 切莫頻繁,每次請(qǐng)求后停個(gè)0.2s =.=
    time.sleep(0.2)
    return True if req.status_code == 200 else False


def get_valid_url(page_start):
    start = time.time()
    # 這里的max_worker=50 我是隨便取的,別太大,小于任務(wù)數(shù)就好,本身線程越多,切換線程消耗越大
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
        result_gen = executor.map(is_valid_url, range(page_start, 5059))
        num = len([result for result in result_gen if result])
        end = time.time()
        print("cost time:", end - start)
        return "valid url num: " + str(num)

get_valid_url(5000) ,即從5000到5058,結(jié)果得到:
cost time: 0.7712774276733398
valid url num: 21

get_valid_url(4000) 即從4000到5058,結(jié)果得到:
cost time: 9.259612798690796
valid url num: 273

阻塞性I/O與GIL

上面的例子耗時(shí)還是很快的,你們可以去試一下不用多線程,只是for循環(huán)運(yùn)行,耗時(shí)將高達(dá)兩位數(shù)乃至三位數(shù)。
CPython 解釋器本身就不是線程安全的钳幅,因此有全局解釋器鎖(GIL)物蝙,一次只允許使用一個(gè)線程執(zhí)行 Python 字節(jié)碼。因此敢艰,一個(gè) Python 進(jìn)程通常不能同時(shí)使用多個(gè) CPU 核心诬乞。這是 CPython 解釋器的局限,與 Python 語言本身無關(guān)钠导。Jython 和 IronPython 沒有這種限制震嫉。不過,目前最快的 Python解釋器 PyPy 也有 GIL牡属。
編寫 Python 代碼時(shí)無法控制 GIL票堵;不過,執(zhí)行耗時(shí)的任務(wù)時(shí)逮栅,可以使用一個(gè)內(nèi)置的函數(shù)或一個(gè)使用 C 語言編寫的擴(kuò)展釋放 GIL悴势。其實(shí),有個(gè)使用 C 語言編寫的 Python 庫能管理GIL措伐,自行啟動(dòng)操作系統(tǒng)線程瞳浦,利用全部可用的 CPU 核心。這樣做會(huì)極大地增加庫代碼的復(fù)雜度废士,因此大多數(shù)庫的作者都不這么做。
然而蝇完,標(biāo)準(zhǔn)庫中所有執(zhí)行阻塞型 I/O 操作的函數(shù)官硝,在等待操作系統(tǒng)返回結(jié)果時(shí)都會(huì)釋放GIL。這意味著在 Python 語言這個(gè)層次上可以使用多線程短蜕,而 I/O 密集型 Python 程序能從中受益:一個(gè) Python 線程等待網(wǎng)絡(luò)響應(yīng)時(shí)氢架,阻塞型 I/O 函數(shù)會(huì)釋放 GIL,再運(yùn)行一個(gè)線程朋魔。
總結(jié):唯有在處理CPU密集型的時(shí)候才需要考慮GIL岖研,I/O密集型的處理則不必

使用concurrent.futures模塊

concurrent.futures 模塊的主要特色是 ThreadPoolExecutorProcessPoolExecutor 類,這兩個(gè)類實(shí)現(xiàn)的接口能分別在不同的線程或進(jìn)程中執(zhí)行可調(diào)用的對(duì)象警检。這兩個(gè)類在內(nèi)部維護(hù)著一個(gè)工作線程或進(jìn)程池孙援,以及要執(zhí)行的任務(wù)隊(duì)列。不過扇雕,這個(gè)接口抽象的層級(jí)很高拓售,像上面的例子,無需關(guān)心任何實(shí)現(xiàn)細(xì)節(jié)

concurrent.futures模塊中一些組件:

1. Executor.map方法

  • executor.map 方法返回的結(jié)果(results)是生成器镶奉,所以我這里有result_gen表示
  • 對(duì)生成器進(jìn)行循環(huán)相當(dāng)于使用next()方法,獲取各個(gè)函數(shù)返回的值``

2. future 以及 Executor.submit方法

  • futureconcurrent.futures 模塊的重要組件,是concurrent.futures.Future的一個(gè)實(shí)例
  • 通常情況下自己不應(yīng)該創(chuàng)建future础淤,而只能由并發(fā)框架(concurrent.futures實(shí)例化)崭放。原因很簡單:future表示終將發(fā)生的事情,而確定某件事會(huì)發(fā)生的唯一方式是執(zhí)行的時(shí)間已經(jīng)排定鸽凶。因此币砂,只有排定把某件事交給 concurrent.futures.Executor 子類處理時(shí),生成concurrent.futures.Future 實(shí)例玻侥。例如决摧,Executor.submit()方法的參數(shù)是一個(gè)可調(diào)用的對(duì)象,調(diào)用這個(gè)方法后會(huì)為傳入的可調(diào)用對(duì)象排期使碾,并返回一個(gè)future蜜徽。

注意: 使用submit會(huì)返回future;而Executor.map在過程中悄悄地已經(jīng)使用future:返回值是一個(gè)迭代器票摇,迭代器的__next__ 方法調(diào)用各個(gè)future的 result 方法拘鞋,因此我們得到的是各個(gè)期物的結(jié)果,而非future本身矢门。

那么兩者區(qū)別就可以看見: executor.submitfutures.as_completed 這個(gè)組合比 executor.map 更靈活,因?yàn)?submit 方法能處理不同的可調(diào)用對(duì)象和參數(shù)隔躲,而 executor.map只能處理參數(shù)不同的同一個(gè)可調(diào)用對(duì)象 (跟內(nèi)置函數(shù)map一樣的用法)物延。

此外宣旱,傳給 futures.as_completed 函數(shù)的future集合可以來自多個(gè) Executor 實(shí)例,例如一些由 ThreadPoolExecutor 實(shí)例創(chuàng)建叛薯,另一些由 ProcessPoolExecutor 實(shí)例創(chuàng)建

3. future的方法

  • .done :不阻塞,返回布爾值耗溜,指明future鏈接的可調(diào)用對(duì)象是否都已經(jīng)執(zhí)行
  • .add_done_callback():future運(yùn)行結(jié)束后會(huì)調(diào)用參數(shù)內(nèi)的可調(diào)用對(duì)象
  • .result :返回可調(diào)用對(duì)象的結(jié)果,阻塞

這里有一個(gè)executor.submit的實(shí)例:

import requests
import time
import concurrent.futures
msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}
def is_valid_url(n):
    req = requests.get(msg.format(n), headers=headers)
    time.sleep(0.2)
    return True if req.status_code == 200 else False
# 一個(gè)求平方的任務(wù),準(zhǔn)備也加在里面執(zhí)行
def square(a):
    time.sleep(0.1)
    output = a **2
    print(output)
    return output

def get_valid_url(page_start):
    start = time.time()
    num =0
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
        # future1 代表檢查網(wǎng)址有效性的任務(wù)
        futures1 = [executor.submit(is_valid_url,n) for n in range(page_start,5059)]
        # future2 代表求平方的任務(wù)
        futures2 = [executor.submit(square,3)]
        # 放在一起
        all_futures = futures1 +futures2
        # 傳入futures.as_completed完成future,返回一個(gè)迭代器.可以通過循環(huán)得到已經(jīng)完成的future
        for future in concurrent.futures.as_completed(all_futures):
            result = future.result()
            if result == True:
                num+=1
        end = time.time()
        print("cost time:", end - start)
        return "valid url num: " + str(num)

print(get_valid_url(5000))

相似的進(jìn)程池

concurrent.futures 模塊的文檔副標(biāo)題是“Launching paralleltasks”(執(zhí)行并行任務(wù))抖拴。這個(gè)模塊實(shí)現(xiàn)的是真正的并行計(jì)算燎字,因?yàn)樗褂?code>ProcessPoolExecutor 類把工作分配給多個(gè) Python 進(jìn)程處理。因此候衍,如果需要做 CPU密集型處理家夺,使用這個(gè)模塊能繞開 GIL,利用所有可用的 CPU 核心拉馋。
ProcessPoolExecutorThreadPoolExecutor 類都實(shí)現(xiàn)了通用的 Executor 接口,因此使用 concurrent.futures 模塊能特別輕松地把基于線程的方案轉(zhuǎn)成基于進(jìn)程的方案随闺。只需要將with futures.ThreadPoolExecutor(workers) as executor:with futures.ProcessPoolExecutor() as executor:即可。但是對(duì)于上面那個(gè)例子用多進(jìn)程沒有意義矩乐,可能花費(fèi)的時(shí)間更長散罕,這里只是簡單提及一下。

官方文檔的這個(gè)例子就很不錯(cuò):

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

總結(jié)

  1. 相對(duì)于threading與queue模塊的結(jié)合,futures.ThreadPoolExecutor類已經(jīng)在內(nèi)部封裝了這些組件,這對(duì)于我們平常的作業(yè)來說已經(jīng)綽綽有余职抡。除非要更靈活误甚。自行定制方案,方能用上前者

  2. 對(duì) CPU 密集型工作來說窑邦,要啟動(dòng)多個(gè)進(jìn)程,規(guī)避 GIL郊丛。創(chuàng)建多個(gè)進(jìn)程最簡單的方式是瞧筛,使用futures.ProcessPoolExecutor類宾袜。不過和前面一樣驾窟,如果使用場景較復(fù)雜认轨,需要更高級(jí)的工具嘁字。multiprocessing 模塊。

  3. 多線程和多進(jìn)程并發(fā)的低層實(shí)現(xiàn)(但卻更靈活)——threadingmultiprocessing 模塊纪蜒。這兩個(gè)模塊代表在 Python 中使用線程和進(jìn)程的傳統(tǒng)方式。

參考資料

David beazley協(xié)程
Fluent Python
Python Cookbook

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末随珠,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子茸歧,更是在濱河造成了極大的恐慌显沈,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涤浇,死亡現(xiàn)場離奇詭異魔慷,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)纹烹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門召边,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人片挂,你說我怎么就攤上這事贞盯。” “怎么了闷愤?”我有些...
    開封第一講書人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵件余,是天一觀的道長。 經(jīng)常有香客問我旬渠,道長端壳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任岖免,我火速辦了婚禮,結(jié)果婚禮上觅捆,老公的妹妹穿的比我還像新娘。我一直安慰自己掂摔,他們只是感情好赢赊,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開白布释移。 她就那樣靜靜地躺著,像睡著了一般玩讳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上同诫,一...
    開封第一講書人閱讀 51,208評(píng)論 1 299
  • 那天樟澜,我揣著相機(jī)與錄音,去河邊找鬼霹俺。 笑死毒费,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的艇棕。 我是一名探鬼主播串塑,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼北苟,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了傻昙?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤僻爽,失蹤者是張志新(化名)和其女友劉穎贾惦,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體须板,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡习瑰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了柠横。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片课兄。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖糜俗,靈堂內(nèi)的尸體忽然破棺而出曲饱,到底是詐尸還是另有隱情,我是刑警寧澤扩淀,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布驻谆,位于F島的核電站,受9級(jí)特大地震影響胜臊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜黑忱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望菇曲。 院中可真熱鬧抚吠,春花似錦、人聲如沸楷力。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽剪勿。三九已至,卻和暖如春酱固,著一層夾襖步出監(jiān)牢的瞬間头朱,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來泰國打工班眯, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留烁巫,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓磁餐,卻偏偏與公主長得像阿弃,于是被迫代替她去往敵國和親诊霹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子渣淳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容