Python 實現(xiàn)并行化新思路

春節(jié)坐在回家的火車上百無聊賴配紫,偶然看到 Parallelism in one line 這篇在 Hacker News 和 reddit 上都評論過百的文章碉就,順手譯出,enjoy:-)

Python 在程序并行化方面多少有些聲名狼藉废岂。撇開技術(shù)上的問題祖搓,例如線程的實現(xiàn)和 GIL1,我覺得錯誤的教學(xué)指導(dǎo)才是主要問題湖苞。常見的經(jīng)典 Python 多線程拯欧、多進程教程多顯得偏“重”。而且往往隔靴搔癢财骨,沒有深入探討日常工作中最有用的內(nèi)容镐作。

傳統(tǒng)的例子

簡單搜索下“Python 多線程教程”,不難發(fā)現(xiàn)幾乎所有的教程都給出涉及類和隊列的例子:

#Example.py
'''
Standard Producer/Consumer Threading Pattern
'''

import time 
import threading 
import Queue 

class Consumer(threading.Thread): 
    def __init__(self, queue): 
        threading.Thread.__init__(self)
        self._queue = queue 

    def run(self):
        while True: 
            # queue.get() blocks the current thread until 
            # an item is retrieved. 
            msg = self._queue.get() 
            # Checks if the current message is 
            # the "Poison Pill"
            if isinstance(msg, str) and msg == 'quit':
                # if so, exists the loop
                break
            # "Processes" (or in our case, prints) the queue item    
            print "I'm a thread, and I received %s!!" % msg
        # Always be friendly! 
        print 'Bye byes!'


def Producer():
    # Queue is used to share items between
    # the threads.
    queue = Queue.Queue()

    # Create an instance of the worker
    worker = Consumer(queue)
    # start calls the internal run() method to 
    # kick off the thread
    worker.start() 

    # variable to keep track of when we started
    start_time = time.time() 
    # While under 5 seconds.. 
    while time.time() - start_time < 5: 
        # "Produce" a piece of work and stick it in 
        # the queue for the Consumer to process
        queue.put('something at %s' % time.time())
        # Sleep a bit just to avoid an absurd number of messages
        time.sleep(1)

    # This the "poison pill" method of killing a thread. 
    queue.put('quit')
    # wait for the thread to close down
    worker.join()


if __name__ == '__main__':
    Producer()

哈隆箩,看起來有些像 Java 不是嗎该贾?

我并不是說使用生產(chǎn)者/消費者模型處理多線程/多進程任務(wù)是錯誤的(事實上,這一模型自有其用武之地)捌臊。只是杨蛋,處理日常腳本任務(wù)時我們可以使用更有效率的模型。

問題在于…

首先,你需要一個樣板類逞力;
其次曙寡,你需要一個隊列來傳遞對象;
而且寇荧,你還需要在通道兩端都構(gòu)建相應(yīng)的方法來協(xié)助其工作(如果需想要進行雙向通信或是保存結(jié)果還需要再引入一個隊列)举庶。

worker 越多,問題越多

按照這一思路揩抡,你現(xiàn)在需要一個 worker 線程的線程池户侥。下面是一篇 IBM 經(jīng)典教程中的例子——在進行網(wǎng)頁檢索時通過多線程進行加速。

#Example2.py
'''
A more realistic thread pool example 
'''

import time 
import threading 
import Queue 
import urllib2 

class Consumer(threading.Thread): 
    def __init__(self, queue): 
        threading.Thread.__init__(self)
        self._queue = queue 

    def run(self):
        while True: 
            content = self._queue.get() 
            if isinstance(content, str) and content == 'quit':
                break
            response = urllib2.urlopen(content)
        print 'Bye byes!'


def Producer():
    urls = [
        'http://www.python.org', 'http://www.yahoo.com'
        'http://www.scala.org', 'http://www.google.com'
        # etc.. 
    ]
    queue = Queue.Queue()
    worker_threads = build_worker_pool(queue, 4)
    start_time = time.time()

    # Add the urls to process
    for url in urls: 
        queue.put(url)  
    # Add the poison pillv
    for worker in worker_threads:
        queue.put('quit')
    for worker in worker_threads:
        worker.join()

    print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
    workers = []
    for _ in range(size):
        worker = Consumer(queue)
        worker.start() 
        workers.append(worker)
    return workers

if __name__ == '__main__':
    Producer()

這段代碼能正確的運行捅膘,但仔細看看我們需要做些什么:構(gòu)造不同的方法添祸、追蹤一系列的線程,還有為了解決惱人的死鎖問題寻仗,我們需要進行一系列的 join 操作。這還只是開始……

至此我們回顧了經(jīng)典的多線程教程凡壤,多少有些空洞不是嗎署尤?樣板化而且易出錯,這樣事倍功半的風(fēng)格顯然不那么適合日常使用亚侠,好在我們還有更好的方法曹体。

何不試試 map

map 這一小巧精致的函數(shù)是簡捷實現(xiàn) Python 程序并行化的關(guān)鍵。map 源于 Lisp 這類函數(shù)式編程語言硝烂。它可以通過一個序列實現(xiàn)兩個函數(shù)之間的映射箕别。

    urls = ['http://www.yahoo.com', 'http://www.reddit.com']
    results = map(urllib2.urlopen, urls)

上面的這兩行代碼將 urls 這一序列中的每個元素作為參數(shù)傳遞到 urlopen 方法中,并將所有結(jié)果保存到 results 這一列表中滞谢。其結(jié)果大致相當(dāng)于:

results = []
for url in urls: 
    results.append(urllib2.urlopen(url))

map 函數(shù)一手包辦了序列操作串稀、參數(shù)傳遞和結(jié)果保存等一系列的操作。

為什么這很重要呢狮杨?這是因為借助正確的庫母截,map 可以輕松實現(xiàn)并行化操作。

在 Python 中有個兩個庫包含了 map 函數(shù): multiprocessing 和它鮮為人知的子庫 multiprocessing.dummy.

這里多扯兩句: multiprocessing.dummy橄教? mltiprocessing 庫的線程版克虑蹇堋?這是蝦米护蝶?即便在 multiprocessing 庫的官方文檔里關(guān)于這一子庫也只有一句相關(guān)描述华烟。而這句描述譯成人話基本就是說:"嘛,有這么個東西持灰,你知道就成."相信我盔夜,這個庫被嚴重低估了!

dummy 是 multiprocessing 模塊的完整克隆,唯一的不同在于 multiprocessing 作用于進程比吭,而 dummy 模塊作用于線程(因此也包括了 Python 所有常見的多線程限制)绽族。
所以替換使用這兩個庫異常容易。你可以針對 IO 密集型任務(wù)和 CPU 密集型任務(wù)來選擇不同的庫衩藤。2

動手嘗試

使用下面的兩行代碼來引用包含并行化 map 函數(shù)的庫:

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

實例化 Pool 對象:

pool = ThreadPool()

這條簡單的語句替代了 example2.py 中 build_worker_pool 函數(shù) 7 行代碼的工作吧慢。它生成了一系列的 worker 線程并完成初始化工作、將它們儲存在變量中以方便訪問检诗。

Pool 對象有一些參數(shù)瓢剿,這里我所需要關(guān)注的只是它的第一個參數(shù):processes. 這一參數(shù)用于設(shè)定線程池中的線程數(shù)间狂。其默認值為當(dāng)前機器 CPU 的核數(shù)鉴象。

一般來說纺弊,執(zhí)行 CPU 密集型任務(wù)時淆游,調(diào)用越多的核速度就越快犹菱。但是當(dāng)處理網(wǎng)絡(luò)密集型任務(wù)時已亥,事情有有些難以預(yù)計了虑椎,通過實驗來確定線程池的大小才是明智的。

pool = ThreadPool(4) # Sets the pool size to 4

線程數(shù)過多時传趾,切換線程所消耗的時間甚至?xí)^實際工作時間泥技。對于不同的工作,通過嘗試來找到線程池大小的最優(yōu)值是個不錯的主意簸呈。

創(chuàng)建好 Pool 對象后蜕便,并行化的程序便呼之欲出了。我們來看看改寫后的 example2.py

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
    'http://www.python.org', 
    'http://www.python.org/about/',
    'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
    'http://www.python.org/doc/',
    'http://www.python.org/download/',
    'http://www.python.org/getit/',
    'http://www.python.org/community/',
    'https://wiki.python.org/moin/',
    'http://planet.python.org/',
    'https://wiki.python.org/moin/LocalUserGroups',
    'http://www.python.org/psf/',
    'http://docs.python.org/devguide/',
    'http://www.python.org/community/awards/'
    # etc.. 
    ]

# Make the Pool of workers
pool = ThreadPool(4) 
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish 
pool.close() 
pool.join() 

實際起作用的代碼只有 4 行,其中只有一行是關(guān)鍵的。map 函數(shù)輕而易舉的取代了前文中超過 40 行的例子贰您。為了更有趣一些枉圃,我統(tǒng)計了不同方法、不同線程池大小的耗時情況展父。

# results = [] 
# for url in urls:
#   result = urllib2.urlopen(url)
#   results.append(result)

# # ------- VERSUS ------- # 


# # ------- 4 Pool ------- # 
# pool = ThreadPool(4) 
# results = pool.map(urllib2.urlopen, urls)

# # ------- 8 Pool ------- # 

# pool = ThreadPool(8) 
# results = pool.map(urllib2.urlopen, urls)

# # ------- 13 Pool ------- # 

# pool = ThreadPool(13) 
# results = pool.map(urllib2.urlopen, urls)

結(jié)果:

#        Single thread:  14.4 Seconds 
#               4 Pool:   3.1 Seconds
#               8 Pool:   1.4 Seconds
#              13 Pool:   1.3 Seconds

很棒的結(jié)果不是嗎栖茉?這一結(jié)果也說明了為什么要通過實驗來確定線程池的大小吕漂。在我的機器上當(dāng)線程池大小大于 9 帶來的收益就十分有限了。

另一個真實的例子

生成上千張圖片的縮略圖
這是一個 CPU 密集型的任務(wù)尘应,并且十分適合進行并行化。

基礎(chǔ)單進程版本

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    for image in images:
        create_thumbnail(Image)

上邊這段代碼的主要工作就是將遍歷傳入的文件夾中的圖片文件苍鲜,一一生成縮略圖,并將這些縮略圖保存到特定文件夾中混滔。

這我的機器上,用這一程序處理 6000 張圖片需要花費 27.9 秒。

如果我們使用 map 函數(shù)來代替 for 循環(huán):

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == '__main__':
    folder = os.path.abspath(
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images)
    pool.close()
    pool.join()

5.6 秒坯屿!

雖然只改動了幾行代碼,我們卻明顯提高了程序的執(zhí)行速度领跛。在生產(chǎn)環(huán)境中隔节,我們可以為 CPU 密集型任務(wù)和 IO 密集型任務(wù)分別選擇多進程和多線程庫來進一步提高執(zhí)行速度——這也是解決死鎖問題的良方怎诫。此外幻妓,由于 map 函數(shù)并不支持手動線程管理,反而使得相關(guān)的 debug 工作也變得異常簡單。

到這里妹沙,我們就實現(xiàn)了(基本)通過一行 Python 實現(xiàn)并行化偶洋。

轉(zhuǎn)載來源:https://segmentfault.com/a/1190000000414339

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市距糖,隨后出現(xiàn)的幾起案子玄窝,更是在濱河造成了極大的恐慌,老刑警劉巖悍引,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件恩脂,死亡現(xiàn)場離奇詭異,居然都是意外死亡趣斤,警方通過查閱死者的電腦和手機俩块,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來浓领,“玉大人玉凯,你說我怎么就攤上這事∧魇牛” “怎么了壮啊?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長撑蒜。 經(jīng)常有香客問我歹啼,道長玄渗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任狸眼,我火速辦了婚禮藤树,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘拓萌。我一直安慰自己岁钓,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布微王。 她就那樣靜靜地躺著屡限,像睡著了一般。 火紅的嫁衣襯著肌膚如雪炕倘。 梳的紋絲不亂的頭發(fā)上钧大,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天,我揣著相機與錄音罩旋,去河邊找鬼啊央。 笑死,一個胖子當(dāng)著我的面吹牛涨醋,可吹牛的內(nèi)容都是我干的瓜饥。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼浴骂,長吁一口氣:“原來是場噩夢啊……” “哼乓土!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起溯警,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤帐我,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后愧膀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡谣光,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年檩淋,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片萄金。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡蟀悦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出氧敢,到底是詐尸還是另有隱情日戈,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布孙乖,位于F島的核電站浙炼,受9級特大地震影響份氧,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜弯屈,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一蜗帜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧资厉,春花似錦厅缺、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至窄刘,卻和暖如春窥妇,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背都哭。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工秩伞, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人欺矫。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓纱新,卻偏偏與公主長得像,于是被迫代替她去往敵國和親穆趴。 傳聞我的和親對象是個殘疾皇子脸爱,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內(nèi)容