背景
最近需要在單機(jī)上處理一個(gè)數(shù)據(jù)量較大的文件飒泻,由于內(nèi)存限制,只能分批次讀取數(shù)據(jù)并處理噪窘。與此同時(shí)梗搅,數(shù)據(jù)處理的耗時(shí)較長(zhǎng),長(zhǎng)于讀取時(shí)間效览,為了盡量提升處理效率无切,需要應(yīng)用某種方式對(duì)數(shù)據(jù)進(jìn)行并行處理。本文旨在讓所有像我一樣丐枉,很少接觸多線程/多進(jìn)程的同學(xué)哆键,在不需要任何知識(shí)儲(chǔ)備的情況下看懂并結(jié)合自己的場(chǎng)景快速上手。
任務(wù)使用python編寫瘦锹,需要應(yīng)用python中適用于并行處理的模塊籍嘹。通過(guò)查詢文檔可知:python中內(nèi)置的與并行處理相關(guān)的模塊闪盔,有如下這些:
threading
multiprocessing
concurrent
subproceses
sched
queue
contextvars
模塊選擇
python中存在GIL(Global Interpreter Lock),即在使用多線程(即 threading 模塊)的每一時(shí)刻只有一個(gè)線程在使用解釋器辱士,因此這實(shí)際上將使用了 threading 的程序退化成了可能比單線程還慢的程序泪掀。正是由于這個(gè)原因, 才有了?multiprocessing 模塊颂碘。它規(guī)避了GIL的特性异赫,使用了多進(jìn)程而非多線程,每個(gè)進(jìn)程使用獨(dú)立的解釋器头岔。因此本文講述的方法也主要使用 multiproceessing 模塊塔拳。
需求概述:
這次需要實(shí)現(xiàn)的功能是對(duì)數(shù)據(jù)進(jìn)行并行處理,如下圖所示峡竣。
從這個(gè)流程中靠抑,自然會(huì)引出一些問(wèn)題:下面對(duì)這這些問(wèn)題依次進(jìn)行分析。
1适掰,不同功能如何組織颂碧?
整個(gè)程序主要有三個(gè)功能對(duì)象:數(shù)據(jù)讀取,數(shù)據(jù)處理类浪,數(shù)據(jù)導(dǎo)出稚伍。這三個(gè)功能按如下的流程工作:數(shù)據(jù)讀取對(duì)象從文件中循環(huán)讀取數(shù)據(jù)分片,每次讀取后戚宦,將分片傳遞給某個(gè)數(shù)據(jù)處理對(duì)象進(jìn)行處理个曙。數(shù)據(jù)處理完成后,再將處理好的數(shù)據(jù)傳遞給數(shù)據(jù)導(dǎo)出對(duì)象進(jìn)行落盤受楼。
2垦搬,數(shù)據(jù)分片如何傳遞?
由于數(shù)據(jù)讀取的速度與處理的速度不一致艳汽,因此無(wú)法保證每次分片讀取完畢后猴贰,都正好有一個(gè)待命的處理單元,因此這里使用 multiprocessing 模塊中的隊(duì)列類(Queue)河狐。每讀取一個(gè)數(shù)據(jù)分片米绕,就將其放入隊(duì)列中,等待空閑的處理單元獲得此分片馋艺。同理栅干,當(dāng)數(shù)據(jù)處理完畢后,也不能保證馬上可以落盤捐祠,因此需要引入第二個(gè)隊(duì)列碱鳞,存放處理完畢的數(shù)據(jù)分片。
3踱蛀,需要多少進(jìn)程窿给?
由于數(shù)據(jù)的讀取和落盤是對(duì)單個(gè)文件操作贵白,且為保證數(shù)據(jù)合法性,不能并行處理崩泡,因此每個(gè)操作只要一個(gè)進(jìn)程就足夠了禁荒,而中間可以有多個(gè)處理單元同時(shí)處理數(shù)據(jù),達(dá)到并行提升效率的目的角撞。就是說(shuō)需要 n 個(gè)處理進(jìn)程(取決于實(shí)際數(shù)據(jù))呛伴,一個(gè)讀數(shù)據(jù)進(jìn)程,一個(gè)寫數(shù)據(jù)進(jìn)程靴寂,即 n+2。
4召耘,程序如何退出百炬?
我們依次來(lái)看每個(gè)進(jìn)程。讀數(shù)據(jù)進(jìn)程的邏輯很簡(jiǎn)單污它,只要文件讀取完畢剖踊,并將所有數(shù)據(jù)放入隊(duì)列,它的任務(wù)就完成了衫贬,其進(jìn)程可以自然退出德澈。數(shù)據(jù)處理進(jìn)程需要等待隊(duì)列中的數(shù)據(jù)分片,當(dāng)進(jìn)程空閑且隊(duì)列中有尚未處理的分片時(shí)固惯,它就會(huì)工作梆造。當(dāng)所有的分片都讀取完畢,且都分配了處理進(jìn)程后葬毫,所有空閑的處理進(jìn)程就都可以關(guān)閉了镇辉,但是這些進(jìn)程并不知道哪一個(gè)分片時(shí)最后一個(gè)分片,也就無(wú)法判斷是否應(yīng)該退出贴捡。
對(duì)于這個(gè)問(wèn)題忽肛,在這里有兩種解決辦法。
- 1烂斋,不關(guān)閉進(jìn)程屹逛,保持其運(yùn)行,最終與主進(jìn)程同時(shí)退出汛骂。這種方法就是傳說(shuō)中的守護(hù)進(jìn)程(本質(zhì)是當(dāng)沒(méi)有任務(wù)非守護(hù)進(jìn)程的子進(jìn)程在運(yùn)行時(shí)罕模,從主進(jìn)程中產(chǎn)生的所有守護(hù)進(jìn)程就會(huì)退出),一般通過(guò)在聲明進(jìn)程對(duì)象時(shí)設(shè)置 daemon=True 來(lái)使用帘瞭。在 threading 模塊中手销,也有與之對(duì)應(yīng)的守護(hù)線程。
- 2图张,維護(hù)一個(gè)狀態(tài)變量锋拖,在讀數(shù)據(jù)進(jìn)程運(yùn)行完畢時(shí)诈悍,修改此狀態(tài)。當(dāng)處理進(jìn)程計(jì)算完當(dāng)前分片兽埃,準(zhǔn)備讀取下一個(gè)分片時(shí)侥钳,先查看隊(duì)列是否為空,如果為空柄错,再查看此狀態(tài)舷夺,如果狀態(tài)顯示“讀數(shù)據(jù)進(jìn)程已經(jīng)結(jié)束”,則退出此處理進(jìn)程售貌。那么對(duì)于此狀態(tài)變量给猾,需要滿足兩個(gè)條件:
- a,此變量可以在不同進(jìn)程之間共享颂跨;
- b敢伸,需要保證進(jìn)程安全。
- 其中a可以通過(guò) multiprocessing 中的 Manager 服務(wù)器進(jìn)程實(shí)現(xiàn)恒削。
- b可以通過(guò)加鎖實(shí)現(xiàn)(Lock)池颈。
為了使程序盡量簡(jiǎn)單,這里我們采取守護(hù)進(jìn)程的方法钓丰。寫數(shù)據(jù)進(jìn)程會(huì)不斷從第二個(gè)隊(duì)列中讀取數(shù)據(jù)躯砰,并寫入文件⌒。可見琢歇,它退出的條件稍微復(fù)雜一些,在其空閑的情況下梦鉴,還有三個(gè)條件需要同時(shí)滿足才可以退出:
- 1矿微,讀數(shù)據(jù)進(jìn)程已關(guān)閉;
- 2尚揣,所有處理進(jìn)程都已處理完畢涌矢;
- 3,第二個(gè)隊(duì)列為空快骗。
在這里也有幾種不同的解決方案:
- 1娜庇,增加更多的狀態(tài)變量,判斷每個(gè)進(jìn)程是否都完成計(jì)算方篮,同時(shí)再判斷第二個(gè)隊(duì)列是否為空名秀。這種方法可以完成任務(wù),但是當(dāng)處理進(jìn)程很多時(shí)藕溅,我們就需要維護(hù)同等數(shù)量的狀態(tài)對(duì)象匕得,看起來(lái)比較復(fù)雜,同時(shí),處理進(jìn)程也就不能設(shè)置為守護(hù)進(jìn)程汁掠,否則略吨,它將無(wú)法在寫數(shù)據(jù)進(jìn)程之前退出。
- 2考阱,維護(hù)兩個(gè)計(jì)數(shù)變量翠忠。第一個(gè)計(jì)數(shù)變量用于計(jì)算讀取了多少分片。第二個(gè)計(jì)數(shù)變量用于計(jì)算導(dǎo)出了多少分片乞榨,當(dāng)讀數(shù)據(jù)進(jìn)程關(guān)閉秽之,且這兩個(gè)變量值相等時(shí),就能保證所有數(shù)據(jù)都已經(jīng)處理完畢吃既。這里有一個(gè)小問(wèn)題考榨,就是數(shù)據(jù)處理過(guò)程可能會(huì)失敗。這可以用 try except語(yǔ)句執(zhí)行鹦倚,當(dāng)處理失敗時(shí)河质,架構(gòu)處理完成計(jì)數(shù)也進(jìn)行自增,最終就可以保證數(shù)據(jù)分片計(jì)數(shù)的一致性申鱼。這個(gè)功能也可以通過(guò)服務(wù)器進(jìn)程實(shí)現(xiàn)愤诱。因此云头,在這里我們采取第二種方法捐友。
multiprocess模塊文檔總結(jié)
寫到這,我們肯定會(huì)產(chǎn)生一些對(duì)于multiprocessing模塊本身機(jī)制的問(wèn)題溃槐,比如隊(duì)列為空時(shí)怎么辦匣砖,隊(duì)列有沒(méi)有最大數(shù)量,如果達(dá)到了最大數(shù)量又該怎么辦昏滴?因此我們需要仔細(xì)閱讀文檔猴鲫,并將會(huì)用到幾個(gè)類以及使用到的方法進(jìn)行適當(dāng)總結(jié)。我們主要提到了以下幾個(gè)類谣殊。
Process
Manager
Queue
Lock
multiprocessing.process類
- 參數(shù):
- group:始終為None
- target:run() 方法調(diào)用的函數(shù)對(duì)象
- name:進(jìn)程名稱
- args:目標(biāo)調(diào)用的參數(shù)元組
- kwargs:目標(biāo)調(diào)用的參數(shù)字典
- daemon:用于設(shè)置守護(hù)進(jìn)程
- 子類重寫的構(gòu)造函數(shù)必須首先調(diào)用基類的構(gòu)造函數(shù):Process.init()
- 方法:
- 與 threading.Thread 保持一致的 API:
- run():進(jìn)程活動(dòng)的方法拂共,可以在子類中重寫
- start():?jiǎn)?dòng)進(jìn)程
- Join():阻塞進(jìn)程,直到調(diào)用join()方法的進(jìn)程終止姻几,timeout(至多阻塞的秒數(shù))
- name:進(jìn)程名稱
- Is_alive():查看進(jìn)程是否還處于活動(dòng)狀態(tài)
- daemon:判斷進(jìn)程是否為守護(hù)進(jìn)程pid:進(jìn)程ID宜狐,啟動(dòng)進(jìn)程前為None
- 其它:exitcode, authkey, sentinel, terminate()
- 異常:
- ProcessError;
- BufferTooShort
- AuthenticationError
- TimeoutError
multiprocessing.queue類
- 參數(shù):
- maxsize:隊(duì)列中的最大元素?cái)?shù)量
- 方法:
- 仿照queue.Queue實(shí)現(xiàn)的方法:
- qsize():返回隊(duì)列大致長(zhǎng)度(不可靠)
- empty():判斷隊(duì)列是否為空(不可靠)
- full():判斷隊(duì)列是否滿(不可靠)
- put():將對(duì)象放入隊(duì)列
- put_nowait():如果是滿的,則不會(huì)阻塞蛇捌,直接拋出queue.Full異常
- get():從隊(duì)列中取出對(duì)象
- get_nowait():如果隊(duì)列是空的抚恒,則不會(huì)阻塞,直接拋出 queue.Empty異常
- 此類新加的方法:
- close()
- join_thread()
- cancel_join_thread()
multiprocessing.Lock類
- 說(shuō)明:原始鎖對(duì)象络拌,任何線程或進(jìn)程都可以獲得或釋放鎖俭驮,行為與 threading.Lock 一致
- 方法:
- acquire(block=True, timeout=None):獲取鎖,如果獲取不到春贸,默認(rèn)進(jìn)行阻塞混萝。
- release():釋放鎖遗遵。
multiprocessing.Manager類(數(shù)據(jù)管理器)
- 說(shuō)明:管理器是一個(gè)用于管理共享對(duì)象的服務(wù),可以創(chuàng)建共享對(duì)象譬圣,并返回對(duì)應(yīng)的代理瓮恭,而且可以通過(guò)網(wǎng)絡(luò)跨機(jī)器共享數(shù)據(jù)。在源代碼中厘熟,Manager類就是SyncManager類屯蹦,但通過(guò) multiprocessing.Manager()創(chuàng)建,它是 multiprocessing.BaseManger 的子類绳姨。
- BaseManager中的內(nèi)容:(大部分內(nèi)容與分布式多進(jìn)程有關(guān)登澜,暫時(shí)不使用)
- 參數(shù):
- Address:管理器對(duì)象監(jiān)聽的地址與端口
- authkey:口令,b’'格式
- 方法:
- start():開啟服務(wù)飘庄,用于遠(yuǎn)程管理器服務(wù)
- connect():將本地管理器連接到一個(gè)遠(yuǎn)程管理器進(jìn)程
- Shutdown():關(guān)閉服務(wù)脑蠕,用于遠(yuǎn)程管理器服務(wù)
- register();
- get_sever();?
- addressSyncManager中的內(nèi)容:
- 方法:(大部分都是創(chuàng)建對(duì)應(yīng)對(duì)象并返回其代理)
- Barrier()
- BoundedSemaphore()
- Condition()
- Event()
- Lock():共享鎖。
- Namespace():可以注冊(cè)的類型跪削,適用于當(dāng)有許多對(duì)象需要共享的情形谴仙。
- Queue():共享隊(duì)列
- RLock():共享RLock對(duì)象。
- Semaphore()
- Array
- Value()
- dict()
- list()
總結(jié)之后碾盐,可能會(huì)產(chǎn)生更多的疑問(wèn):
5晃跺,共享的Queue和非共享的Queue有什么差別?
在單機(jī)上毫玖,沒(méi)有明顯區(qū)別掀虎,但是在分布式環(huán)境下,就必須要使用Manager創(chuàng)建Queue隊(duì)列付枫,用于不同機(jī)器上的網(wǎng)絡(luò)通信烹玉。
6,在分布式上如何使用管理器阐滩?
本文不涉及二打,暫不討論任務(wù)代碼。
具備以上基礎(chǔ)之后掂榔,將多進(jìn)程處理數(shù)據(jù)的代碼邏輯貼出继效,其中處理數(shù)據(jù)部分,用隨機(jī)sleep代替
文件1:
from multiprocessing import Process, Manager, Queue, Lock
import os
import sys
import time
import random
# read
def read_process(pre_queue, write_over, read_chunks, lock):
input_data = [ i for i in range(5) ]
for data in input_data:
pre_queue.put(data)
lock.acquire()
read_chunks.value += 1
lock.release()
time.sleep(random.random())
print('read data: ', str(data), ' chunk = ', str(read_chunks.value), " pre_queue is Full ", str(pre_queue.full()))
write_over.value = 1
print('read process over')
return
# write
def write_process(post_queue, write_over, read_chunks, write_chunks, lock):
while not (write_over.value == 1 and read_chunks.value == write_chunks.value):
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), " write_over:",str(write_over.value))
data = post_queue.get()
print('write data: ', str(data), ' chunk = ', str(write_chunks.value), " post_queue is Full", str(post_queue.full()))
lock.acquire()
write_chunks.value += 1
lock.release()
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'status is: ', str(write_over.value), ' write process over')
return
# transoform
def transoform_process(pre_queue, post_queue, write_chunks, lock):
while True:
try:
raw_data = pre_queue.get()
data = "the num. " + str(raw_data) + " chunk"
time.sleep(random.random()*2)
post_queue.put(data)
print("process data: ", str(raw_data), " to ", data , " post_queue is Full", str(post_queue.full()))
except:
lock.acquire()
write_chunks.value += 1
lock.release()
print("transform failed, write_chunks add 1")
if __name__ == '__main__':
pre_queue = Queue(maxsize=20) # 讀取數(shù)據(jù)的管道
post_queue = Queue(maxsize=20) # 輸出數(shù)據(jù)的管道
rlock = Lock() # 讀計(jì)數(shù)鎖
wlock = Lock() # 寫計(jì)數(shù)鎖
manager = Manager() # 管理器衅疙,管理進(jìn)程何時(shí)結(jié)束
write_over = manager.Value(int, 0)
read_chunks = manager.Value(int, 0)
write_chunks = manager.Value(int, 0)
workers = [ Process(target=transoform_process, args=(pre_queue, post_queue, write_chunks, wlock), daemon=True) for i in range(30) ]
for worker in workers:
worker.start()
process1 = Process(target=read_process, args=(pre_queue, write_over, read_chunks, rlock))
process2 = Process(target=write_process, args=(post_queue, write_over, read_chunks, write_chunks, wlock))
process1.start()
process2.start()
process1.join()
process2.join()
===== 更新
使用進(jìn)程池代替手動(dòng)創(chuàng)建進(jìn)程
上面程序中的處理進(jìn)程是手動(dòng)創(chuàng)建的莲趣,那有沒(méi)有一種方式能將這種同質(zhì)的進(jìn)程進(jìn)行封裝,統(tǒng)一管理呢饱溢?multiprocessing 包為我們提供了 Pool 類喧伞,可以一次性聲明多個(gè)進(jìn)程。
multiprocessing.Pool類
- 說(shuō)明:進(jìn)程池對(duì)象,封裝了 processes 數(shù)量的進(jìn)程潘鲫,可以向其提交作業(yè)翁逞。這個(gè)類中的一些概念,諸如回調(diào)函數(shù)溉仑,上下文挖函,以及initializer目前還用不上,所以暫時(shí)不深究浊竟。
- 參數(shù):
- processes:進(jìn)程數(shù)目
- initializer
- initargs
- maxtasksperchild:每個(gè)進(jìn)程完成的最大任務(wù)數(shù)怨喘,之后會(huì)將此進(jìn)程銷毀,并重新生成一個(gè)進(jìn)程振定。
- context:指定上下文
- 方法:
- apply():向進(jìn)程池的一個(gè)進(jìn)程提交任務(wù)必怜,返回結(jié)果前會(huì)阻塞。
- apply_async():返回一個(gè)結(jié)果對(duì)象后频,更適合并行化的工作梳庆。
- map(); map_async(); imap(); imap_unordered(); starmap(); starmap_async(); clost(); terminate(); join()
代碼改進(jìn)
在簡(jiǎn)要學(xué)習(xí)了 Pool 類之后,我們可以對(duì)代碼進(jìn)行一些修改卑惜,在修改過(guò)程遇到的問(wèn)題也記錄在下面膏执。
7,在使用進(jìn)程池的情況下露久,是否可以用守護(hù)進(jìn)程更米?
這樣做是為了盡量不修改代碼邏輯,第一版代碼中抱环,數(shù)據(jù)處理進(jìn)程是這樣聲明的:
workers = [ Process(target=transoform_process, args=(pre_queue, post_queue, write_chunks, wlock), daemon=True) for i in range(30) ]
for worker in workers:
worker.start()
如果改用進(jìn)程池壳快,一個(gè)很自然的想法就是在這里只定義一個(gè)進(jìn)程纸巷,修改調(diào)用的函數(shù)镇草,然后在這個(gè)進(jìn)程內(nèi)部使用進(jìn)程池。于是將代碼改成如下樣子:
worker = Process(target=transform_process_pool, args=(pre_queue, post_queue, read_chunks, write_chunks, wlock, write_over), daemon=True)
worker.start()
但是這樣運(yùn)行后會(huì)出現(xiàn)報(bào)錯(cuò):
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 326, in _repopulate_pool_static
w.start()
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 118, in start
assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
可以看到瘤旨,在創(chuàng)建進(jìn)程池的過(guò)程中梯啤,會(huì)對(duì)父進(jìn)程進(jìn)行斷言,判斷它不是守護(hù)進(jìn)程存哲。換言之因宇,守護(hù)進(jìn)程中不能創(chuàng)建子進(jìn)程。
因此這里需要改變進(jìn)程退出的機(jī)制祟偷。
我在這里的解決方案是察滑,在進(jìn)程池中,使用與輸出進(jìn)程相同的退出條件修肠,當(dāng)所有數(shù)據(jù)都輸出完畢后贺辰,進(jìn)程池自然也可以終止。
當(dāng)然,也可以將處理和輸出動(dòng)作放到一個(gè)任務(wù)中完成饲化,并加鎖處理莽鸭,這里就暫時(shí)不修改代碼整體結(jié)構(gòu)了。
因此吃靠,數(shù)據(jù)處理線程的外層任務(wù)寫成如下這樣:
# transfrom pool
def transform_process_pool(pre_queue, post_queue, read_chunks, write_chunks, lock, write_over):
pool = Pool(processes=10)
print("start transform pool")
while not (write_over.value == 1 and read_chunks.value == write_chunks.value): # 新設(shè)置退出條件
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'write_over is: ', str(write_over.value))
result = pool.apply_async(transoform_process, (pre_queue, post_queue, write_chunks, lock))
外層控制是否需要繼續(xù)提交任務(wù)硫眨,內(nèi)層用于處理數(shù)據(jù),這樣進(jìn)程就可以正常退出了巢块。
8礁阁,子進(jìn)程中的進(jìn)程池?zé)o法直接使用 Queue 和 Lock,需要通過(guò)管理器 Manager 使用代理族奢。
回顧上一版代碼中鎖和隊(duì)列的使用方式:
if __name__ == '__main__':
pre_queue = Queue(maxsize=20) # 讀取數(shù)據(jù)的管道
post_queue = Queue(maxsize=20) # 輸出數(shù)據(jù)的管道
rlock = Lock() # 讀計(jì)數(shù)鎖
wlock = Lock() # 寫計(jì)數(shù)鎖
但是际歼,如果直接將這四個(gè)對(duì)象應(yīng)用到子進(jìn)程中的進(jìn)程池中蒲列,會(huì)報(bào)錯(cuò)(這個(gè)錯(cuò)誤之前開發(fā)時(shí)出現(xiàn)了,但等代碼寫好后,嘗試復(fù)現(xiàn)此問(wèn)題慨畸,雖然進(jìn)程運(yùn)行依然會(huì)出錯(cuò),但報(bào)錯(cuò)信息本身沒(méi)有復(fù)現(xiàn)出來(lái)):
RuntimeError: Queue objects should only be shared between processes through inheritance
RuntimeError: Lock objects should only be shared between processes through inheritance
即 Queue 和 Lock 只能在同一個(gè)父進(jìn)程創(chuàng)建的進(jìn)程中共享數(shù)據(jù)骂倘。
因此需要對(duì)聲明過(guò)程適當(dāng)修改睡陪,使用管理器:
pre_queue = manager.Queue(maxsize=20) # 讀取數(shù)據(jù)的管道
post_queue = manager.Queue(maxsize=20) # 輸出數(shù)據(jù)的管道
rlock = manager.Lock() # 讀計(jì)數(shù)鎖
wlock = manager.Lock()
解決了這兩個(gè)主要問(wèn)題后,將源代碼貼在下面:
文件:read_write2.py
from multiprocessing import Process, Manager, Queue, Lock, Pool
import os
import sys
import time
import random
# read
def read_process(pre_queue, write_over, read_chunks, lock):
input_data = [ i for i in range(5) ]
for data in input_data:
pre_queue.put(data)
lock.acquire()
read_chunks.value += 1
lock.release()
time.sleep(random.random())
print('read data: ', str(data), ' chunk = ', str(read_chunks.value), " pre_queue is Full ", str(pre_queue.full()))
write_over.value = 1
print('read process over')
return
# write
def write_process(post_queue, write_over, read_chunks, write_chunks, lock):
while not (write_over.value == 1 and read_chunks.value == write_chunks.value):
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), " write_over:",str(write_over.value))
data = post_queue.get()
print('write data: ', str(data), ' chunk = ', str(write_chunks.value), " post_queue is Full", str(post_queue.full()))
lock.acquire()
write_chunks.value += 1
lock.release()
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'status is: ', str(write_over.value), ' write process over')
return
# transoform
def transoform_process(pre_queue, post_queue, write_chunks, lock):
print("enter transform process")
raw_data = pre_queue.get()
try:
data = "the num. " + str(raw_data) + " chunk"
time.sleep(random.random()*2)
post_queue.put(data)
print("process data: ", str(raw_data), " to ", data , " post_queue is Full", str(post_queue.full()))
except:
lock.acquire()
write_chunks.value += 1
lock.release()
print("transform failed, write_chunks add 1")
# transfrom pool
def transform_process_pool(pre_queue, post_queue, read_chunks, write_chunks, lock, write_over):
pool = Pool(processes=5)
print("start transform pool")
while not (write_over.value == 1 and read_chunks.value == write_chunks.value): # 設(shè)置退出條件
time.sleep(0.1)
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'write_over is: ', str(write_over.value))
result = pool.apply_async(transoform_process, (pre_queue, post_queue, write_chunks, lock))
if __name__ == '__main__':
manager = Manager() # 管理器庭敦,管理進(jìn)程何時(shí)結(jié)束
write_over = manager.Value(int, 0)
read_chunks = manager.Value(int, 0)
write_chunks = manager.Value(int, 0)
pre_queue = manager.Queue(maxsize=20) # 讀取數(shù)據(jù)的管道
post_queue = manager.Queue(maxsize=20) # 輸出數(shù)據(jù)的管道 # RuntimeError: Queue objects should only be shared between processes through inheritance
rlock = manager.Lock() # 讀計(jì)數(shù)鎖
wlock = manager.Lock() # 寫計(jì)數(shù)鎖 # RuntimeError: Lock objects should only be shared between processes through inheritance
worker = Process(target=transform_process_pool, args=(pre_queue, post_queue, read_chunks, write_chunks, wlock, write_over))
worker.start()
process1 = Process(target=read_process, args=(pre_queue, write_over, read_chunks, rlock))
process2 = Process(target=write_process, args=(post_queue, write_over, read_chunks, write_chunks, wlock))
process1.start()
process2.start()
process1.join()
process2.join()
worker.join()
10 后續(xù)問(wèn)題
在使用進(jìn)程池時(shí)疼进,關(guān)于進(jìn)程的創(chuàng)建機(jī)制,還有些疑問(wèn)秧廉,有待后續(xù)學(xué)習(xí)伞广。
參考內(nèi)容:
Python 3.6.13 文檔 - 并發(fā)執(zhí)行:https://docs.python.org/zh-cn/3.6/library/concurrency.html
博客 - python中的GIL詳解:https://www.cnblogs.com/SuKiWX/p/8804974.html
博客 - PYTHON 進(jìn)程間通信問(wèn)題-MANAGER方法:https://www.cnblogs.com/nmucomputer/p/12901380.html
博客 - Python分布式進(jìn)程使用(Queue和BaseManager使用):https://blog.csdn.net/u011318077/article/details/88094583
Python Process Pool Non-Daemonic?: https://izziswift.com/python-process-pool-non-daemonic/