multiprocessing

multiprocessing是Python內(nèi)置的進程并行庫,具有十分簡潔良好的并行機制镐捧。但就我個人使用感受而言潜索,更適合于單機并行,而不適合分布式結(jié)點懂酱。

multiprocessing

一竹习、multiprocessing使用

  1. Process

    Processmultiprocessing最基本的進程類,內(nèi)置了進程的啟動玩焰、掛起由驹、關閉等方法芍锚。

    # 并行的最小單位依舊是函數(shù)昔园,或者可以稱之為 handler
    from multiprocessing import Process
    
    def test_mult(i):
        print(i+1)
        return i ** 2
    
    result = []  # 用于保存返回的結(jié)果
    
    for i in range(10):
        p = Process(target=test_mult, args=(i, ))
        result.append(p)  # 防止進程不見
        p.start()         # 正式啟動進程
    
    result = [p.join() for p in result]  # 掛起進程,直至其完成
    print(result)  # 進程是無法返回內(nèi)容的
    
    """
    Output:
        1 2 3 4 5 6 7 8 9 10
        [None, None, None, None, None, None, None, None, None, None]
    """
    
  2. Pool

    Pool類提供了進程池化的能力并炮,可以合理地管理和使用資源默刚。

    from multiprocessing import Pool
    
    def test_pool(args):
        a, b = args
        print( a * b)
    
    pool = Pool(5)  # 指定該進程池最多只有5個進程
    
    # map的第二個參數(shù)必須是可迭代的對象,因此如果需要
    # 傳入多個對象時必須也要是一個多參數(shù)的迭代器
    result = pool.map(test_pool, [(a, b) for a, b in zip(range(10), range(10))])
    
    print(result)
    
    pool.close()  # 關掉進程池
    
    """
    Output:
        0 1 9 4 36 49 16 64 25 81
        [None, None, None, None, None, None, None, None, None, None]
    """
    
  3. Queue

    Queue提供了隊列的數(shù)據(jù)結(jié)構逃魄,可以用于進程之間的數(shù)據(jù)通信荤西、消息通信,同時保證數(shù)據(jù)的讀寫安全。Queue提供了FIFO(默認)邪锌、FILO等方式勉躺。

    詳細使用方法將在《事件驅(qū)動》中介紹

  1. Manager

    Manager是用于多節(jié)點并行共享變量的類,不過我還是覺得用于單節(jié)點最好用觅丰。

    • 共享變量
      • Dict:共享的字典變量
      • Array:共享的數(shù)組變量(同dtype)
      • Value:共享的數(shù)值變量(如int饵溅、float)
      • ctypes:支持ctypes構造更復雜的變量
    # 這里以我項目中出現(xiàn)的復數(shù)矩陣來構造一些變量
    
    from multiprocessing.sharedctypes import Array
    import ctypes
    import numpy as np
    
    shared_array_base = Array(ctypes.c_double, 3*3*2)
    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.view(np.complex128).reshape(3, 3)
    
    print(shared_array)
    
    shared_array[2, 2] = 2. + 3.j
    
    print('\n', shared_array)
    
    """
    Output:
        [[0.+0.j 0.+0.j 0.+0.j]
         [0.+0.j 0.+0.j 0.+0.j]
         [0.+0.j 0.+0.j 0.+0.j]]
    
       [[0.+0.j 0.+0.j 0.+0.j]
        [0.+0.j 0.+0.j 0.+0.j]
        [0.+0.j 0.+0.j 2.+3.j]]
    
    """
    
    # 使用RawArray來構造共享變量
    from multiprocessing import RawArray
    data = np.random.randn(16, 1000000)
    X_shape = data.shape
    X_size = data.size
    X = RawArray('d', X_size * 2)
    X_np = np.frombuffer(X, dtype=np.complex128).reshape(X_shape)
    np.copyto(X_np, data)
    
    """
    Output:
        array([[ 1.43361395+0.j, -0.13536996+0.j, -1.05048751+0.j, ...,
                 0.34899814+0.j,  0.33336308+0.j, -1.41943919+0.j],
               [-0.65600705+0.j,  0.81952908+0.j,  0.78193087+0.j, ...,
                 0.73767972+0.j, -0.52045135+0.j,  0.96770416+0.j],
               [-2.13355565+0.j,  0.17741152+0.j, -1.2255968 +0.j, ...,
                 0.71831462+0.j,  0.1928877 +0.j,  0.14207214+0.j],
               ...,
               [-0.27040098+0.j,  0.21613441+0.j, -0.24113161+0.j, ...,
                -1.02808119+0.j,  0.07977458+0.j, -0.86394499+0.j],
               [ 0.27319615+0.j, -0.15105511+0.j, -0.03926541+0.j, ...,
                -0.20495524+0.j,  0.09575596+0.j,  0.58463843+0.j],
               [-0.51712435+0.j, -0.63082962+0.j, -0.47347812+0.j, ...,
                -0.15066354+0.j, -0.87177074+0.j, -0.24865684+0.j]])
    """
    

二、注意事項

  1. 進程pickle問題

    • pool類實例的進程一般需要序列化妇萄,這意味著會將代碼以及參數(shù)使用pickle打包蜕企,因此不能包含lambda的代碼或者數(shù)據(jù),這也導致了pool無法使用manager的共享變量來共享狀態(tài)冠句。process則無須序列化轻掩,因此可以和manager搭配使用。
    • 也正是需要序列化的原因懦底,如果傳輸?shù)膮?shù)過大唇牧,將會使得進程初始化的時間大大增加。
  2. 進程生成方式

    • 進程的啟動方式有以下幾種:
    • spawn:父進程會啟動一個全新的 python 解釋器進程基茵。 子進程將只繼承那些運行進程對象的 run()方法所必需的資源奋构。使用此方法啟動進程相比使用 forkforkserver 要慢上許多。
    • fork:父進程使用 os.fork() 來產(chǎn)生 Python 解釋器分叉拱层。子進程在開始時實際上與父進程相同弥臼。父進程的所有資源都由子進程繼承。
    • forkserver:程序啟動并選擇forkserver 啟動方法時根灯,將啟動服務器進程径缅。從那時起,每當需要一個新進程時烙肺,父進程就會連接到服務器并請求它分叉一個新進程纳猪。分叉服務器進程是單線程的,因此使用 os.fork()是安全的桃笙。沒有不必要的資源被繼承氏堤。
    • 當使用 spawn 或者 forkserver 的啟動方式時,multiprocessing中的許多類型都必須是可序列化的搏明,這樣子進程才能使用它們鼠锈。
    • 例子:
    from multiprocessing import Process, freeze_support, set_start_method
    
    def foo():
        print('hello')
    
    if __name__ == '__main__':
        freeze_support()
        set_start_method('spawn')  # 設置啟動方式
        p = Process(target=foo)
        p.start()
    
  3. 注意進程間可能存在無法共享的變量

    • 情況:編寫事件驅(qū)動類時,使用一個 self.processPool = [] 來充當進程池星著,但是經(jīng)過多次的實驗购笆,在每次添加事件add_event()時,僅僅是在當時append成功了虚循,但往后再訪問self.processPool同欠,都是空的样傍。原因是:self.processPool是存在類這個進程下的,而進程池的真是操作是在定義的某個 mainprocessor下的铺遂,他們不是同一進程衫哥,并不能互相更改。
  4. Process類無法進行pickle襟锐,其用于父子進程通信的AuthenticationString是不允許pickle的炕檩。

    • 情況:由于listappend無法統(tǒng)一,所以打算使用Queue或者Manager.dict()來充當進程池捌斧,結(jié)果Process無法被pickle(分配到不同進程時需要pickle)笛质。解決方法可以是: 將while循環(huán)的條件設為隊列,判斷隊列空或者為True(必須先判斷是否為空)時便循環(huán)等待捞蚂;而一旦接收到False妇押,則在run()的進程內(nèi)去join()進程(此進程內(nèi)可見),最后return結(jié)束run()姓迅,則確保此進程也關閉了敲霍。
  5. The “freeze_support()” line can be omitted if the program is not going to be frozen錯誤

    • 情況:該錯誤出現(xiàn)于Windows系統(tǒng)下,腳本的主函數(shù)沒有使用if __name__ == '__main__'前提下丁存,使用了多進程肩杈。原因極可能是:multiprocessing默認創(chuàng)建進程的方式是spwan,也就是fork解寝;而windows下創(chuàng)建進程并不是使用fork扩然,所以解決方法是要么設定創(chuàng)建時用fork,要么就按照上面的寫法聋伦。
  6. RuntimeError: Queue objects should only be shared between processes through inheritance

    • 情況:使用multiprocessing進行多進程計算時夫偶,打算不同函數(shù)中使用Queue來進行共享的通信。但實際上觉增,Queue基于Pipe實現(xiàn)兵拢,而Pipe對象的共享需要通過繼承才能使用。故Queue一般只能用在 父進程創(chuàng)建隊列逾礁,父子進程之間共享狀態(tài) 的情況说铃。可以使用multiprocessing.Manager.Queue代替嘹履。
  7. 隊列queue有線程的隊列與進程隊列之分腻扇。

    • 線程一般使用from queue import Queue, 進程可以使用from multiprocessing import Queue
  8. 父進程與子進程之間不可以使用Queue來進行通信

    • 情況:做進程并行植捎,希望父進程分發(fā)任務衙解,子進程完成后返回給父進程結(jié)果阳柔。但是Queue盡管在子進程可以成功進行put操作焰枢,父進程卻無法接收而導致隊列仍為空。
    • 解決方法:使用form multiprocessing import ManagerManager().Queue()來進行進程通信,Queue對象是隸屬于manager對象進程的济锄,且并無父子關系暑椰,可以和任務進程之間通信,解決了問題荐绝。
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末一汽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子低滩,更是在濱河造成了極大的恐慌召夹,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件恕沫,死亡現(xiàn)場離奇詭異监憎,居然都是意外死亡,警方通過查閱死者的電腦和手機婶溯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門鲸阔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人迄委,你說我怎么就攤上這事褐筛。” “怎么了叙身?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵渔扎,是天一觀的道長。 經(jīng)常有香客問我信轿,道長赞警,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任虏两,我火速辦了婚禮愧旦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘定罢。我一直安慰自己笤虫,他們只是感情好,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布祖凫。 她就那樣靜靜地躺著琼蚯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪惠况。 梳的紋絲不亂的頭發(fā)上遭庶,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機與錄音稠屠,去河邊找鬼峦睡。 笑死翎苫,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的榨了。 我是一名探鬼主播煎谍,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼龙屉!你這毒婦竟也來了呐粘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤转捕,失蹤者是張志新(化名)和其女友劉穎作岖,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體五芝,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡鳍咱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了与柑。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谤辜。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖价捧,靈堂內(nèi)的尸體忽然破棺而出丑念,到底是詐尸還是另有隱情,我是刑警寧澤结蟋,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布脯倚,位于F島的核電站,受9級特大地震影響嵌屎,放射性物質(zhì)發(fā)生泄漏推正。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一宝惰、第九天 我趴在偏房一處隱蔽的房頂上張望植榕。 院中可真熱鬧,春花似錦尼夺、人聲如沸尊残。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽寝衫。三九已至,卻和暖如春拐邪,著一層夾襖步出監(jiān)牢的瞬間慰毅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工扎阶, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留汹胃,地道東北人婶芭。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像统台,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子啡邑,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容