使用python單機(jī)多進(jìn)程并行處理數(shù)據(jù)

背景

最近需要在單機(jī)上處理一個(gè)數(shù)據(jù)量較大的文件飒泻,由于內(nèi)存限制,只能分批次讀取數(shù)據(jù)并處理噪窘。與此同時(shí)梗搅,數(shù)據(jù)處理的耗時(shí)較長(zhǎng),長(zhǎng)于讀取時(shí)間效览,為了盡量提升處理效率无切,需要應(yīng)用某種方式對(duì)數(shù)據(jù)進(jìn)行并行處理。本文旨在讓所有像我一樣丐枉,很少接觸多線程/多進(jìn)程的同學(xué)哆键,在不需要任何知識(shí)儲(chǔ)備的情況下看懂并結(jié)合自己的場(chǎng)景快速上手。
任務(wù)使用python編寫瘦锹,需要應(yīng)用python中適用于并行處理的模塊籍嘹。通過(guò)查詢文檔可知:python中內(nèi)置的與并行處理相關(guān)的模塊闪盔,有如下這些:

threading
multiprocessing
concurrent
subproceses
sched
queue
contextvars

模塊選擇

python中存在GIL(Global Interpreter Lock),即在使用多線程(即 threading 模塊)的每一時(shí)刻只有一個(gè)線程在使用解釋器辱士,因此這實(shí)際上將使用了 threading 的程序退化成了可能比單線程還慢的程序泪掀。正是由于這個(gè)原因, 才有了?multiprocessing 模塊颂碘。它規(guī)避了GIL的特性异赫,使用了多進(jìn)程而非多線程,每個(gè)進(jìn)程使用獨(dú)立的解釋器头岔。因此本文講述的方法也主要使用 multiproceessing 模塊塔拳。

需求概述:

這次需要實(shí)現(xiàn)的功能是對(duì)數(shù)據(jù)進(jìn)行并行處理,如下圖所示峡竣。


image.png

從這個(gè)流程中靠抑,自然會(huì)引出一些問(wèn)題:下面對(duì)這這些問(wèn)題依次進(jìn)行分析。

1适掰,不同功能如何組織颂碧?

整個(gè)程序主要有三個(gè)功能對(duì)象:數(shù)據(jù)讀取,數(shù)據(jù)處理类浪,數(shù)據(jù)導(dǎo)出稚伍。這三個(gè)功能按如下的流程工作:數(shù)據(jù)讀取對(duì)象從文件中循環(huán)讀取數(shù)據(jù)分片,每次讀取后戚宦,將分片傳遞給某個(gè)數(shù)據(jù)處理對(duì)象進(jìn)行處理个曙。數(shù)據(jù)處理完成后,再將處理好的數(shù)據(jù)傳遞給數(shù)據(jù)導(dǎo)出對(duì)象進(jìn)行落盤受楼。

2垦搬,數(shù)據(jù)分片如何傳遞?

由于數(shù)據(jù)讀取的速度與處理的速度不一致艳汽,因此無(wú)法保證每次分片讀取完畢后猴贰,都正好有一個(gè)待命的處理單元,因此這里使用 multiprocessing 模塊中的隊(duì)列類(Queue)河狐。每讀取一個(gè)數(shù)據(jù)分片米绕,就將其放入隊(duì)列中,等待空閑的處理單元獲得此分片馋艺。同理栅干,當(dāng)數(shù)據(jù)處理完畢后,也不能保證馬上可以落盤捐祠,因此需要引入第二個(gè)隊(duì)列碱鳞,存放處理完畢的數(shù)據(jù)分片。

3踱蛀,需要多少進(jìn)程窿给?

由于數(shù)據(jù)的讀取和落盤是對(duì)單個(gè)文件操作贵白,且為保證數(shù)據(jù)合法性,不能并行處理崩泡,因此每個(gè)操作只要一個(gè)進(jìn)程就足夠了禁荒,而中間可以有多個(gè)處理單元同時(shí)處理數(shù)據(jù),達(dá)到并行提升效率的目的角撞。就是說(shuō)需要 n 個(gè)處理進(jìn)程(取決于實(shí)際數(shù)據(jù))呛伴,一個(gè)讀數(shù)據(jù)進(jìn)程,一個(gè)寫數(shù)據(jù)進(jìn)程靴寂,即 n+2。

4召耘,程序如何退出百炬?

我們依次來(lái)看每個(gè)進(jìn)程。讀數(shù)據(jù)進(jìn)程的邏輯很簡(jiǎn)單污它,只要文件讀取完畢剖踊,并將所有數(shù)據(jù)放入隊(duì)列,它的任務(wù)就完成了衫贬,其進(jìn)程可以自然退出德澈。數(shù)據(jù)處理進(jìn)程需要等待隊(duì)列中的數(shù)據(jù)分片,當(dāng)進(jìn)程空閑且隊(duì)列中有尚未處理的分片時(shí)固惯,它就會(huì)工作梆造。當(dāng)所有的分片都讀取完畢,且都分配了處理進(jìn)程后葬毫,所有空閑的處理進(jìn)程就都可以關(guān)閉了镇辉,但是這些進(jìn)程并不知道哪一個(gè)分片時(shí)最后一個(gè)分片,也就無(wú)法判斷是否應(yīng)該退出贴捡。
對(duì)于這個(gè)問(wèn)題忽肛,在這里有兩種解決辦法。

  • 1烂斋,不關(guān)閉進(jìn)程屹逛,保持其運(yùn)行,最終與主進(jìn)程同時(shí)退出汛骂。這種方法就是傳說(shuō)中的守護(hù)進(jìn)程(本質(zhì)是當(dāng)沒(méi)有任務(wù)非守護(hù)進(jìn)程的子進(jìn)程在運(yùn)行時(shí)罕模,從主進(jìn)程中產(chǎn)生的所有守護(hù)進(jìn)程就會(huì)退出),一般通過(guò)在聲明進(jìn)程對(duì)象時(shí)設(shè)置 daemon=True 來(lái)使用帘瞭。在 threading 模塊中手销,也有與之對(duì)應(yīng)的守護(hù)線程。
  • 2图张,維護(hù)一個(gè)狀態(tài)變量锋拖,在讀數(shù)據(jù)進(jìn)程運(yùn)行完畢時(shí)诈悍,修改此狀態(tài)。當(dāng)處理進(jìn)程計(jì)算完當(dāng)前分片兽埃,準(zhǔn)備讀取下一個(gè)分片時(shí)侥钳,先查看隊(duì)列是否為空,如果為空柄错,再查看此狀態(tài)舷夺,如果狀態(tài)顯示“讀數(shù)據(jù)進(jìn)程已經(jīng)結(jié)束”,則退出此處理進(jìn)程售貌。那么對(duì)于此狀態(tài)變量给猾,需要滿足兩個(gè)條件:
    • a,此變量可以在不同進(jìn)程之間共享颂跨;
    • b敢伸,需要保證進(jìn)程安全。
    • 其中a可以通過(guò) multiprocessing 中的 Manager 服務(wù)器進(jìn)程實(shí)現(xiàn)恒削。
    • b可以通過(guò)加鎖實(shí)現(xiàn)(Lock)池颈。

為了使程序盡量簡(jiǎn)單,這里我們采取守護(hù)進(jìn)程的方法钓丰。寫數(shù)據(jù)進(jìn)程會(huì)不斷從第二個(gè)隊(duì)列中讀取數(shù)據(jù)躯砰,并寫入文件⌒。可見琢歇,它退出的條件稍微復(fù)雜一些,在其空閑的情況下梦鉴,還有三個(gè)條件需要同時(shí)滿足才可以退出:

  • 1矿微,讀數(shù)據(jù)進(jìn)程已關(guān)閉;
  • 2尚揣,所有處理進(jìn)程都已處理完畢涌矢;
  • 3,第二個(gè)隊(duì)列為空快骗。

在這里也有幾種不同的解決方案:

  • 1娜庇,增加更多的狀態(tài)變量,判斷每個(gè)進(jìn)程是否都完成計(jì)算方篮,同時(shí)再判斷第二個(gè)隊(duì)列是否為空名秀。這種方法可以完成任務(wù),但是當(dāng)處理進(jìn)程很多時(shí)藕溅,我們就需要維護(hù)同等數(shù)量的狀態(tài)對(duì)象匕得,看起來(lái)比較復(fù)雜,同時(shí),處理進(jìn)程也就不能設(shè)置為守護(hù)進(jìn)程汁掠,否則略吨,它將無(wú)法在寫數(shù)據(jù)進(jìn)程之前退出。
  • 2考阱,維護(hù)兩個(gè)計(jì)數(shù)變量翠忠。第一個(gè)計(jì)數(shù)變量用于計(jì)算讀取了多少分片。第二個(gè)計(jì)數(shù)變量用于計(jì)算導(dǎo)出了多少分片乞榨,當(dāng)讀數(shù)據(jù)進(jìn)程關(guān)閉秽之,且這兩個(gè)變量值相等時(shí),就能保證所有數(shù)據(jù)都已經(jīng)處理完畢吃既。這里有一個(gè)小問(wèn)題考榨,就是數(shù)據(jù)處理過(guò)程可能會(huì)失敗。這可以用 try except語(yǔ)句執(zhí)行鹦倚,當(dāng)處理失敗時(shí)河质,架構(gòu)處理完成計(jì)數(shù)也進(jìn)行自增,最終就可以保證數(shù)據(jù)分片計(jì)數(shù)的一致性申鱼。這個(gè)功能也可以通過(guò)服務(wù)器進(jìn)程實(shí)現(xiàn)愤诱。因此云头,在這里我們采取第二種方法捐友。

multiprocess模塊文檔總結(jié)

寫到這,我們肯定會(huì)產(chǎn)生一些對(duì)于multiprocessing模塊本身機(jī)制的問(wèn)題溃槐,比如隊(duì)列為空時(shí)怎么辦匣砖,隊(duì)列有沒(méi)有最大數(shù)量,如果達(dá)到了最大數(shù)量又該怎么辦昏滴?因此我們需要仔細(xì)閱讀文檔猴鲫,并將會(huì)用到幾個(gè)類以及使用到的方法進(jìn)行適當(dāng)總結(jié)。我們主要提到了以下幾個(gè)類谣殊。

Process
Manager
Queue
Lock
multiprocessing.process類
  • 參數(shù):
    • group:始終為None
    • target:run() 方法調(diào)用的函數(shù)對(duì)象
    • name:進(jìn)程名稱
    • args:目標(biāo)調(diào)用的參數(shù)元組
    • kwargs:目標(biāo)調(diào)用的參數(shù)字典
    • daemon:用于設(shè)置守護(hù)進(jìn)程
    • 子類重寫的構(gòu)造函數(shù)必須首先調(diào)用基類的構(gòu)造函數(shù):Process.init()
  • 方法:
    • 與 threading.Thread 保持一致的 API:
    • run():進(jìn)程活動(dòng)的方法拂共,可以在子類中重寫
    • start():?jiǎn)?dòng)進(jìn)程
    • Join():阻塞進(jìn)程,直到調(diào)用join()方法的進(jìn)程終止姻几,timeout(至多阻塞的秒數(shù))
    • name:進(jìn)程名稱
    • Is_alive():查看進(jìn)程是否還處于活動(dòng)狀態(tài)
    • daemon:判斷進(jìn)程是否為守護(hù)進(jìn)程pid:進(jìn)程ID宜狐,啟動(dòng)進(jìn)程前為None
    • 其它:exitcode, authkey, sentinel, terminate()
  • 異常:
    • ProcessError;
    • BufferTooShort
    • AuthenticationError
    • TimeoutError
multiprocessing.queue類
  • 參數(shù):
    • maxsize:隊(duì)列中的最大元素?cái)?shù)量
  • 方法:
    • 仿照queue.Queue實(shí)現(xiàn)的方法
    • qsize():返回隊(duì)列大致長(zhǎng)度(不可靠)
    • empty():判斷隊(duì)列是否為空(不可靠)
    • full():判斷隊(duì)列是否滿(不可靠)
    • put():將對(duì)象放入隊(duì)列
    • put_nowait():如果是滿的,則不會(huì)阻塞蛇捌,直接拋出queue.Full異常
    • get():從隊(duì)列中取出對(duì)象
    • get_nowait():如果隊(duì)列是空的抚恒,則不會(huì)阻塞,直接拋出 queue.Empty異常
    • 此類新加的方法
    • close()
    • join_thread()
    • cancel_join_thread()
multiprocessing.Lock類
  • 說(shuō)明:原始鎖對(duì)象络拌,任何線程或進(jìn)程都可以獲得或釋放鎖俭驮,行為與 threading.Lock 一致
  • 方法:
    • acquire(block=True, timeout=None):獲取鎖,如果獲取不到春贸,默認(rèn)進(jìn)行阻塞混萝。
    • release():釋放鎖遗遵。
multiprocessing.Manager類(數(shù)據(jù)管理器)
  • 說(shuō)明:管理器是一個(gè)用于管理共享對(duì)象的服務(wù),可以創(chuàng)建共享對(duì)象譬圣,并返回對(duì)應(yīng)的代理瓮恭,而且可以通過(guò)網(wǎng)絡(luò)跨機(jī)器共享數(shù)據(jù)。在源代碼中厘熟,Manager類就是SyncManager類屯蹦,但通過(guò) multiprocessing.Manager()創(chuàng)建,它是 multiprocessing.BaseManger 的子類绳姨。
  • BaseManager中的內(nèi)容:(大部分內(nèi)容與分布式多進(jìn)程有關(guān)登澜,暫時(shí)不使用)
  • 參數(shù):
    • Address:管理器對(duì)象監(jiān)聽的地址與端口
    • authkey:口令,b’'格式
  • 方法:
    • start():開啟服務(wù)飘庄,用于遠(yuǎn)程管理器服務(wù)
    • connect():將本地管理器連接到一個(gè)遠(yuǎn)程管理器進(jìn)程
    • Shutdown():關(guān)閉服務(wù)脑蠕,用于遠(yuǎn)程管理器服務(wù)
    • register();
    • get_sever();?
  • addressSyncManager中的內(nèi)容
  • 方法:(大部分都是創(chuàng)建對(duì)應(yīng)對(duì)象并返回其代理)
    • Barrier()
    • BoundedSemaphore()
    • Condition()
    • Event()
    • Lock():共享鎖。
    • Namespace():可以注冊(cè)的類型跪削,適用于當(dāng)有許多對(duì)象需要共享的情形谴仙。
    • Queue():共享隊(duì)列
    • RLock():共享RLock對(duì)象。
    • Semaphore()
    • Array
    • Value()
    • dict()
    • list()

總結(jié)之后碾盐,可能會(huì)產(chǎn)生更多的疑問(wèn):

5晃跺,共享的Queue和非共享的Queue有什么差別?

在單機(jī)上毫玖,沒(méi)有明顯區(qū)別掀虎,但是在分布式環(huán)境下,就必須要使用Manager創(chuàng)建Queue隊(duì)列付枫,用于不同機(jī)器上的網(wǎng)絡(luò)通信烹玉。

6,在分布式上如何使用管理器阐滩?

本文不涉及二打,暫不討論任務(wù)代碼。

具備以上基礎(chǔ)之后掂榔,將多進(jìn)程處理數(shù)據(jù)的代碼邏輯貼出继效,其中處理數(shù)據(jù)部分,用隨機(jī)sleep代替

文件1:

from multiprocessing import Process, Manager, Queue, Lock
import os
import sys
import time
import random

# read
def read_process(pre_queue, write_over, read_chunks, lock):
    input_data = [ i for i in range(5) ]
    for data in input_data:
        pre_queue.put(data)
        lock.acquire()
        read_chunks.value += 1
        lock.release()
        time.sleep(random.random())
        print('read data: ', str(data), ' chunk = ', str(read_chunks.value), " pre_queue is Full ", str(pre_queue.full()))
    
    write_over.value = 1
    print('read process over')
    return 

# write
def write_process(post_queue, write_over, read_chunks, write_chunks, lock):
    while not (write_over.value == 1 and read_chunks.value == write_chunks.value):
        print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), " write_over:",str(write_over.value))
        data = post_queue.get()
        print('write data: ', str(data), ' chunk = ', str(write_chunks.value), " post_queue is Full", str(post_queue.full()))
        lock.acquire()
        write_chunks.value += 1
        lock.release()

    print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'status is: ', str(write_over.value), ' write process over')
    return

# transoform
def transoform_process(pre_queue, post_queue, write_chunks, lock):
    while True:
        try:
            raw_data = pre_queue.get()
            data = "the num. " + str(raw_data) + " chunk"
            time.sleep(random.random()*2)
            post_queue.put(data)
            print("process data: ", str(raw_data), " to ", data , " post_queue is Full", str(post_queue.full()))
        except:
            lock.acquire()
            write_chunks.value += 1
            lock.release()
            print("transform failed, write_chunks add 1")



if __name__ == '__main__':
        
    pre_queue = Queue(maxsize=20) # 讀取數(shù)據(jù)的管道
    post_queue = Queue(maxsize=20) # 輸出數(shù)據(jù)的管道

    rlock = Lock() # 讀計(jì)數(shù)鎖
    wlock = Lock() # 寫計(jì)數(shù)鎖
    manager = Manager() # 管理器衅疙,管理進(jìn)程何時(shí)結(jié)束
    write_over = manager.Value(int, 0) 
    read_chunks = manager.Value(int, 0)
    write_chunks = manager.Value(int, 0)

    workers = [ Process(target=transoform_process, args=(pre_queue, post_queue, write_chunks, wlock), daemon=True) for i in range(30) ]
    for worker in workers:
        worker.start()

    process1 = Process(target=read_process, args=(pre_queue, write_over, read_chunks, rlock))
    process2 = Process(target=write_process, args=(post_queue, write_over, read_chunks, write_chunks, wlock))
    
    process1.start()
    process2.start()
    process1.join()
    process2.join()

    

===== 更新

使用進(jìn)程池代替手動(dòng)創(chuàng)建進(jìn)程

上面程序中的處理進(jìn)程是手動(dòng)創(chuàng)建的莲趣,那有沒(méi)有一種方式能將這種同質(zhì)的進(jìn)程進(jìn)行封裝,統(tǒng)一管理呢饱溢?multiprocessing 包為我們提供了 Pool 類喧伞,可以一次性聲明多個(gè)進(jìn)程。

multiprocessing.Pool類
  • 說(shuō)明:進(jìn)程池對(duì)象,封裝了 processes 數(shù)量的進(jìn)程潘鲫,可以向其提交作業(yè)翁逞。這個(gè)類中的一些概念,諸如回調(diào)函數(shù)溉仑,上下文挖函,以及initializer目前還用不上,所以暫時(shí)不深究浊竟。
  • 參數(shù):
    • processes:進(jìn)程數(shù)目
    • initializer
    • initargs
    • maxtasksperchild:每個(gè)進(jìn)程完成的最大任務(wù)數(shù)怨喘,之后會(huì)將此進(jìn)程銷毀,并重新生成一個(gè)進(jìn)程振定。
    • context:指定上下文
  • 方法:
    • apply():向進(jìn)程池的一個(gè)進(jìn)程提交任務(wù)必怜,返回結(jié)果前會(huì)阻塞。
    • apply_async():返回一個(gè)結(jié)果對(duì)象后频,更適合并行化的工作梳庆。
    • map(); map_async(); imap(); imap_unordered(); starmap(); starmap_async(); clost(); terminate(); join()

代碼改進(jìn)

在簡(jiǎn)要學(xué)習(xí)了 Pool 類之后,我們可以對(duì)代碼進(jìn)行一些修改卑惜,在修改過(guò)程遇到的問(wèn)題也記錄在下面膏执。

7,在使用進(jìn)程池的情況下露久,是否可以用守護(hù)進(jìn)程更米?

這樣做是為了盡量不修改代碼邏輯,第一版代碼中抱环,數(shù)據(jù)處理進(jìn)程是這樣聲明的:

    workers = [ Process(target=transoform_process, args=(pre_queue, post_queue, write_chunks, wlock), daemon=True) for i in range(30) ]
    for worker in workers:
        worker.start()

如果改用進(jìn)程池壳快,一個(gè)很自然的想法就是在這里只定義一個(gè)進(jìn)程纸巷,修改調(diào)用的函數(shù)镇草,然后在這個(gè)進(jìn)程內(nèi)部使用進(jìn)程池。于是將代碼改成如下樣子:

    worker = Process(target=transform_process_pool, args=(pre_queue, post_queue, read_chunks, write_chunks, wlock, write_over), daemon=True)
    worker.start()

但是這樣運(yùn)行后會(huì)出現(xiàn)報(bào)錯(cuò):

  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 326, in _repopulate_pool_static
    w.start()
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children

可以看到瘤旨,在創(chuàng)建進(jìn)程池的過(guò)程中梯啤,會(huì)對(duì)父進(jìn)程進(jìn)行斷言,判斷它不是守護(hù)進(jìn)程存哲。換言之因宇,守護(hù)進(jìn)程中不能創(chuàng)建子進(jìn)程。
因此這里需要改變進(jìn)程退出的機(jī)制祟偷。
我在這里的解決方案是察滑,在進(jìn)程池中,使用與輸出進(jìn)程相同的退出條件修肠,當(dāng)所有數(shù)據(jù)都輸出完畢后贺辰,進(jìn)程池自然也可以終止。
當(dāng)然,也可以將處理和輸出動(dòng)作放到一個(gè)任務(wù)中完成饲化,并加鎖處理莽鸭,這里就暫時(shí)不修改代碼整體結(jié)構(gòu)了。
因此吃靠,數(shù)據(jù)處理線程的外層任務(wù)寫成如下這樣:

# transfrom pool
def transform_process_pool(pre_queue, post_queue, read_chunks, write_chunks, lock, write_over):
    pool = Pool(processes=10)
    print("start transform pool")
    while not (write_over.value == 1 and read_chunks.value == write_chunks.value): # 新設(shè)置退出條件
        print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'write_over is: ', str(write_over.value))
        result = pool.apply_async(transoform_process, (pre_queue, post_queue, write_chunks, lock))

外層控制是否需要繼續(xù)提交任務(wù)硫眨,內(nèi)層用于處理數(shù)據(jù),這樣進(jìn)程就可以正常退出了巢块。

8礁阁,子進(jìn)程中的進(jìn)程池?zé)o法直接使用 Queue 和 Lock,需要通過(guò)管理器 Manager 使用代理族奢。

回顧上一版代碼中鎖和隊(duì)列的使用方式:

if __name__ == '__main__':
        
    pre_queue = Queue(maxsize=20) # 讀取數(shù)據(jù)的管道
    post_queue = Queue(maxsize=20) # 輸出數(shù)據(jù)的管道

    rlock = Lock() # 讀計(jì)數(shù)鎖
    wlock = Lock() # 寫計(jì)數(shù)鎖

但是际歼,如果直接將這四個(gè)對(duì)象應(yīng)用到子進(jìn)程中的進(jìn)程池中蒲列,會(huì)報(bào)錯(cuò)(這個(gè)錯(cuò)誤之前開發(fā)時(shí)出現(xiàn)了,但等代碼寫好后,嘗試復(fù)現(xiàn)此問(wèn)題慨畸,雖然進(jìn)程運(yùn)行依然會(huì)出錯(cuò),但報(bào)錯(cuò)信息本身沒(méi)有復(fù)現(xiàn)出來(lái)):

RuntimeError: Queue objects should only be shared between processes through inheritance
RuntimeError: Lock objects should only be shared between processes through inheritance

即 Queue 和 Lock 只能在同一個(gè)父進(jìn)程創(chuàng)建的進(jìn)程中共享數(shù)據(jù)骂倘。
因此需要對(duì)聲明過(guò)程適當(dāng)修改睡陪,使用管理器:

    pre_queue = manager.Queue(maxsize=20) # 讀取數(shù)據(jù)的管道
    post_queue = manager.Queue(maxsize=20) # 輸出數(shù)據(jù)的管道 
    rlock = manager.Lock() # 讀計(jì)數(shù)鎖
    wlock = manager.Lock()

解決了這兩個(gè)主要問(wèn)題后,將源代碼貼在下面:

文件:read_write2.py

from multiprocessing import Process, Manager, Queue, Lock, Pool
import os
import sys
import time
import random

# read
def read_process(pre_queue, write_over, read_chunks, lock):
    input_data = [ i for i in range(5) ]
    for data in input_data:
        pre_queue.put(data)
        lock.acquire()
        read_chunks.value += 1
        lock.release()
        time.sleep(random.random())
        print('read data: ', str(data), ' chunk = ', str(read_chunks.value), " pre_queue is Full ", str(pre_queue.full()))
    
    write_over.value = 1
    print('read process over')
    return 

# write
def write_process(post_queue, write_over, read_chunks, write_chunks, lock):
    while not (write_over.value == 1 and read_chunks.value == write_chunks.value):
        print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), " write_over:",str(write_over.value))
        data = post_queue.get()
        print('write data: ', str(data), ' chunk = ', str(write_chunks.value), " post_queue is Full", str(post_queue.full()))
        lock.acquire()
        write_chunks.value += 1
        lock.release()

    print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'status is: ', str(write_over.value), ' write process over')
    return

# transoform
def transoform_process(pre_queue, post_queue, write_chunks, lock):
    print("enter transform process")
    raw_data = pre_queue.get()
    try:
        data = "the num. " + str(raw_data) + " chunk"
        time.sleep(random.random()*2)
        post_queue.put(data)
        print("process data: ", str(raw_data), " to ", data , " post_queue is Full", str(post_queue.full()))
    except:
        lock.acquire()
        write_chunks.value += 1
        lock.release()
        print("transform failed, write_chunks add 1")

# transfrom pool
def transform_process_pool(pre_queue, post_queue, read_chunks, write_chunks, lock, write_over):
    pool = Pool(processes=5)
    print("start transform pool")
    while not (write_over.value == 1 and read_chunks.value == write_chunks.value): # 設(shè)置退出條件
        time.sleep(0.1)
        print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'write_over is: ', str(write_over.value))
        result = pool.apply_async(transoform_process, (pre_queue, post_queue, write_chunks, lock))



if __name__ == '__main__':
        
    manager = Manager() # 管理器庭敦,管理進(jìn)程何時(shí)結(jié)束
    write_over = manager.Value(int, 0) 
    read_chunks = manager.Value(int, 0)
    write_chunks = manager.Value(int, 0)
   
    pre_queue = manager.Queue(maxsize=20) # 讀取數(shù)據(jù)的管道
    post_queue = manager.Queue(maxsize=20) # 輸出數(shù)據(jù)的管道 # RuntimeError: Queue objects should only be shared between processes through inheritance
    rlock = manager.Lock() # 讀計(jì)數(shù)鎖
    wlock = manager.Lock() # 寫計(jì)數(shù)鎖 # RuntimeError: Lock objects should only be shared between processes through inheritance

    worker = Process(target=transform_process_pool, args=(pre_queue, post_queue, read_chunks, write_chunks, wlock, write_over))
    worker.start()

    process1 = Process(target=read_process, args=(pre_queue, write_over, read_chunks, rlock))
    process2 = Process(target=write_process, args=(post_queue, write_over, read_chunks, write_chunks, wlock))
    
    process1.start()
    process2.start()
    process1.join()
    process2.join()
    worker.join()
10 后續(xù)問(wèn)題

在使用進(jìn)程池時(shí)疼进,關(guān)于進(jìn)程的創(chuàng)建機(jī)制,還有些疑問(wèn)秧廉,有待后續(xù)學(xué)習(xí)伞广。

參考內(nèi)容:

Python 3.6.13 文檔 - 并發(fā)執(zhí)行:https://docs.python.org/zh-cn/3.6/library/concurrency.html
博客 - python中的GIL詳解:https://www.cnblogs.com/SuKiWX/p/8804974.html
博客 - PYTHON 進(jìn)程間通信問(wèn)題-MANAGER方法:https://www.cnblogs.com/nmucomputer/p/12901380.html
博客 - Python分布式進(jìn)程使用(Queue和BaseManager使用):https://blog.csdn.net/u011318077/article/details/88094583
Python Process Pool Non-Daemonic?: https://izziswift.com/python-process-pool-non-daemonic/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市疼电,隨后出現(xiàn)的幾起案子嚼锄,更是在濱河造成了極大的恐慌,老刑警劉巖蔽豺,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件区丑,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡修陡,警方通過(guò)查閱死者的電腦和手機(jī)沧侥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)魄鸦,“玉大人宴杀,你說(shuō)我怎么就攤上這事∈耙颍” “怎么了旺罢?”我有些...
    開封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵斯棒,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我主经,道長(zhǎng)荣暮,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任罩驻,我火速辦了婚禮穗酥,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘惠遏。我一直安慰自己砾跃,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開白布节吮。 她就那樣靜靜地躺著抽高,像睡著了一般。 火紅的嫁衣襯著肌膚如雪透绩。 梳的紋絲不亂的頭發(fā)上翘骂,一...
    開封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音帚豪,去河邊找鬼碳竟。 笑死,一個(gè)胖子當(dāng)著我的面吹牛狸臣,可吹牛的內(nèi)容都是我干的莹桅。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼烛亦,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼诈泼!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起煤禽,我...
    開封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤铐达,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后呜师,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體娶桦,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贾节,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年汁汗,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片栗涂。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡知牌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出斤程,到底是詐尸還是另有隱情角寸,我是刑警寧澤菩混,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站扁藕,受9級(jí)特大地震影響沮峡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜亿柑,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一邢疙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧望薄,春花似錦疟游、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至卧须,卻和暖如春另绩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背花嘶。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工板熊, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人察绷。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓干签,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親拆撼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子容劳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容