Dec. 18th, 2018
案例故事:
我是一個(gè)快樂的菜鳥程序員,每天寫著簡(jiǎn)單的代碼扇单。聽說過多進(jìn)程/多線程商模,但是一直覺得是很高深的技術(shù),遲遲不敢涉足蜘澜。慢慢地施流,我開始不滿足于單進(jìn)程代碼的速度。很多時(shí)候鄙信,我的任務(wù)是檢索一個(gè)數(shù)據(jù)庫(kù)瞪醋,對(duì)里面每一條數(shù)據(jù)進(jìn)行簡(jiǎn)單處理并輸出一些結(jié)果。對(duì)每一條數(shù)據(jù)的處理都是相互獨(dú)立的装诡,和其他數(shù)據(jù)完全沒有關(guān)系银受。這時(shí)我開始思考或許把不同數(shù)據(jù)交給不同線程操作践盼,會(huì)大大加快程序運(yùn)行速度。
事實(shí)證明我是對(duì)的宾巍,而且利用Python的multiprocessing
模塊咕幻,并發(fā)地執(zhí)行多個(gè)獨(dú)立任務(wù)是個(gè)非常簡(jiǎn)單的事情。你要做的顶霞,只是將需要執(zhí)行的任務(wù)做成函數(shù)肄程,用一個(gè)multiprocessing.Pool
就可以了。(當(dāng)然multiprocessing.Process
也可以實(shí)現(xiàn)并發(fā)选浑。具體請(qǐng)參照這個(gè)模塊的文檔蓝厌。如果有需求,我也可以寫一點(diǎn)multiprocessing
包的入門教程古徒。嗯褂始,如果有需求的話。描函。。)我非常開心狐粱,感覺自己已經(jīng)是一頭具備了多線程編程能力的程序猿了舀寓!并發(fā)編程簡(jiǎn)直就是小菜一碟!直到我的每一個(gè)進(jìn)程都需要往同一個(gè)文件里寫入數(shù)據(jù)肌蜻。。。
一個(gè)簡(jiǎn)單的并發(fā)程序炊汹,各個(gè)進(jìn)程/線程之間相互獨(dú)立褐着,無需信息交流,也無需同步豆挽。這樣的程序往往很簡(jiǎn)單育谬,因?yàn)槊恳粋€(gè)進(jìn)程只需要執(zhí)行自己的任務(wù),并不需要意識(shí)到其它進(jìn)程的存在帮哈。但好景不長(zhǎng)膛檀,很快,我就開始面臨進(jìn)程通信的問題娘侍。比如咖刃,很多時(shí)候我們需要不同的進(jìn)程對(duì)同一個(gè)文件進(jìn)行寫入,或者對(duì)同一個(gè)文件進(jìn)行更新操作憾筏。這時(shí)候嚎杨,如果每個(gè)進(jìn)程太無視其他進(jìn)程的操作,就有可能發(fā)生意想不到的事情氧腰。
本文以多個(gè)進(jìn)程讀寫同一個(gè)文件為例枫浙,探索兩種常見的并發(fā)編程的概念刨肃,一個(gè)是進(jìn)程鎖,一個(gè)是進(jìn)程間通信自脯。
進(jìn)程鎖
進(jìn)程鎖的邏輯概念
進(jìn)程鎖的概念很簡(jiǎn)單之景。有一些操作(比如寫入文檔),需要保證每次只能有一個(gè)進(jìn)程在運(yùn)行膏潮,直到其運(yùn)行結(jié)束锻狗。這樣以來,每個(gè)進(jìn)程要執(zhí)行這個(gè)操作的時(shí)候焕参,先要查看是否有其他進(jìn)程在執(zhí)行改操作轻纪。如果有,就等待那個(gè)進(jìn)程完成它的操作叠纷。如何實(shí)現(xiàn)這個(gè)想法呢刻帚?不難想到,一個(gè)進(jìn)程間的全局變量就可以做到涩嚣。為了簡(jiǎn)化問題崇众,我們考慮一個(gè)整數(shù)類型的全局變量(我們管它叫“current_writing_proc”),每個(gè)進(jìn)程都可以訪問航厚。如何用一個(gè)整數(shù)來給多進(jìn)程寫入同一個(gè)文件的任務(wù)做一個(gè)進(jìn)程鎖呢顷歌?我們可以做以下規(guī)定:
- 初始狀態(tài)下,沒有任何進(jìn)程訪問文件幔睬。此時(shí)
current_writing_proc = 0
. - 當(dāng)某個(gè)進(jìn)程試圖訪問文件時(shí)眯漩,它先要檢查
current_writing_prod
變量的值。如果該值為0麻顶, 則表示目前沒有任何進(jìn)程訪問文件赦抖。如果其值不為0,則表示有程序正在占有文件辅肾,其必須等待該值重新變?yōu)?才可以訪問文件队萤。 - 假設(shè)一個(gè)進(jìn)程等到了
current_writing_proc == 0
,即此時(shí)文件沒有被任何進(jìn)程訪問的時(shí)候矫钓。那此進(jìn)程可以開始它的操作:首先要將此變量值修改為自己的"process id" (在Python中浮禾,可以使用os.getpid
函數(shù)獲取當(dāng)前進(jìn)程的進(jìn)程id),然后對(duì)文件進(jìn)行操作份汗。 - 當(dāng)該進(jìn)程完成操作后盈电,要將
current_writing_proc
重新設(shè)為0,以使得其他進(jìn)程可以獲得權(quán)限來操作文件杯活。
總的來說匆帚,該變量存儲(chǔ)著當(dāng)前擁有文件操作權(quán)限的進(jìn)程ID號(hào)。此模型雖然簡(jiǎn)單旁钧,但是已經(jīng)具備了進(jìn)程鎖的幾大要素:等待進(jìn)程鎖(第2步)吸重,獲取進(jìn)程鎖(第3步)互拾,釋放進(jìn)程鎖(第4步)。
為何一般的全局變量不能用來作進(jìn)程鎖嚎幸?
有同學(xué)可能會(huì)問颜矿,既然進(jìn)程鎖這么簡(jiǎn)單,為何multiprocessing
包要專門實(shí)現(xiàn)Lock
這個(gè)類嫉晶?我自己定義一個(gè)整形全局變量不就好了骑疆?
這里涉及到多進(jìn)程的一個(gè)核心問題:當(dāng)一個(gè)進(jìn)程產(chǎn)生(fork)出多個(gè)進(jìn)程時(shí)替废,可以認(rèn)為箍铭,其所有變量都會(huì)被拷貝,每個(gè)進(jìn)程擁有一個(gè)這個(gè)變量的副本椎镣。假設(shè)我們?cè)诟高M(jìn)程中定義一個(gè)全局變量current_writing_proc
诈火。緊接著,我們創(chuàng)建10個(gè)子進(jìn)程状答,那么基本上冷守,變量current_writing_proc
會(huì)被復(fù)制出10個(gè)副本,每個(gè)子進(jìn)程中擁有一個(gè)惊科。既然是副本教沾,那么進(jìn)程A中修改該變量的值,將不會(huì)影響到進(jìn)程B中的值译断。換句話說,進(jìn)程B將無法通過這個(gè)變量得知進(jìn)程A的任何信息或悲。
這跟我們定義此變量的初衷是違背的孙咪。為了實(shí)現(xiàn)所有進(jìn)程間共享一個(gè)變量的值,我們就要用到進(jìn)程通信巡语。當(dāng)這個(gè)變量值被修改的時(shí)候翎蹈,進(jìn)程需要通過進(jìn)程間的通信渠道,告知其他進(jìn)程這個(gè)變量值被修改了男公。multiprocessing
包里的Lock
變量就是通過一個(gè)進(jìn)程通信機(jī)制保證了每一個(gè)鎖都是真正意義上的進(jìn)程級(jí)別的全局變量荤堪,從而實(shí)現(xiàn)上面提到的邏輯。(當(dāng)然枢赔,Lock
里還有一些進(jìn)程鎖操作合法性的代碼澄阳。有興趣的同學(xué)不妨研究一下Python的multiprocessing的官方文檔。)
注意:這里的討論只適用于多進(jìn)程踏拜,不適用于多線程碎赢。同一個(gè)進(jìn)程的不同線程之間,可以方便的共享內(nèi)存速梗。這并不需要通過其他通信手段來完成肮塞。這也是為什么多線程編程往往比多進(jìn)程編程更簡(jiǎn)單襟齿。若不是Python中有一個(gè)全局線程鎖(Global Interpretator Lock), 多線程編程在Python世界里應(yīng)該遠(yuǎn)比現(xiàn)在更受歡迎。
簡(jiǎn)單的進(jìn)程鎖文件寫入代碼
有了進(jìn)程鎖枕赵,我們展示一個(gè)簡(jiǎn)單的文檔寫入的代碼猜欺。
from multiprocessing import Pool, Lock
def write_with_lock(lock, filename, s): # 在獲取lock后,將字符串s的內(nèi)容寫入到文件filename中拷窜。
lock.acquire() # 等待獲取進(jìn)程鎖开皿。
# 執(zhí)行文件操作。比如:
open(filename, 'a').write(s) # 為了保證程序流暢運(yùn)行装黑,應(yīng)使獲取到釋放進(jìn)程鎖中間的代碼盡量簡(jiǎn)潔副瀑。
lock.release() #完成操作后,釋放進(jìn)程鎖
# 執(zhí)行其他不需要進(jìn)程鎖的代碼
def main():
filename = "hello.txt"
s = "<你需要寫入的字符串>"
lock = Lock()
pool = Pool(processes=20) # 創(chuàng)建20個(gè)進(jìn)程
pool.starmap(write_with_lock, [[lock, filename, s]]*20) # 每個(gè)進(jìn)程都執(zhí)行write_with_lock函數(shù)恋谭。
pool.close()
pool.join()
文件操作的代理進(jìn)程
有些時(shí)候糠睡,我們會(huì)創(chuàng)建很多進(jìn)程來執(zhí)行一些計(jì)算任務(wù)。每個(gè)進(jìn)程都會(huì)偶爾對(duì)一個(gè)文件進(jìn)行一些寫入操作疚颊,跟花在計(jì)算上的時(shí)間相比狈孔,這些文件寫入發(fā)生的頻率并不大(比如每個(gè)進(jìn)程執(zhí)行自己的任務(wù),只有發(fā)生某些錯(cuò)誤時(shí)才會(huì)將該錯(cuò)誤記錄到某個(gè)日志文件中)材义。在這種情況下均抽,可以考慮用一個(gè)進(jìn)程來完成所有的文件讀寫操作。其他的任務(wù)進(jìn)程其掂,只需要將其需要寫入的內(nèi)容發(fā)送給這個(gè)負(fù)責(zé)文件讀寫的進(jìn)程油挥,由它代為操作未見就可以了。這樣的進(jìn)程款熬,我們稱它為文件操作代理進(jìn)程深寥,簡(jiǎn)稱“代理進(jìn)程”。其他的進(jìn)程贤牛,我們稱它們?yōu)椤叭蝿?wù)進(jìn)程”惋鹅。(注意:該命名只是方便在本文中討論問題時(shí)容易區(qū)分,并不一定是標(biāo)準(zhǔn)的命名殉簸,請(qǐng)勿對(duì)名字當(dāng)真闰集。)
由于代理進(jìn)程和任務(wù)進(jìn)程之間要進(jìn)行通信,這里我們先講一下進(jìn)程間通信隊(duì)列般卑。
進(jìn)程間通信武鲁,消息隊(duì)列
如何從進(jìn)程A向進(jìn)程B發(fā)送消息?想象在兩個(gè)進(jìn)程之間架設(shè)一根單向管道蝠检。進(jìn)程A可以將消息通過管道發(fā)給進(jìn)程B洞坑。在這個(gè)過程中,A最先發(fā)出去的消息肯定最先到達(dá)B蝇率,后發(fā)出去的消息后到達(dá)B迟杂。程序員們管這種“先到先得”的數(shù)據(jù)結(jié)構(gòu)叫做隊(duì)列刽沾。
Python的multiprocessing.Queue
和multiprocessing.SimpleQueue
都是實(shí)現(xiàn)進(jìn)程間消息隊(duì)列的類。通過調(diào)用這兩個(gè)類的put
和get
函數(shù)排拷,就可以向隊(duì)列中發(fā)送數(shù)據(jù)侧漓,或者從隊(duì)列中獲取數(shù)據(jù)。具體例子見下一小節(jié)的演示监氢。
利用代理進(jìn)程的思想建立多進(jìn)程文檔讀寫模型
接下來布蔗,我們就利用代理進(jìn)程的思想,來建立一個(gè)多進(jìn)程任務(wù)中文檔讀寫的邏輯模型浪腐。
- 建立代理進(jìn)程纵揍。該進(jìn)程函數(shù)有兩個(gè)參數(shù):文件寫入隊(duì)列
writing_queue
和目標(biāo)文件名filename
。該進(jìn)程用一個(gè)死循環(huán)不斷從隊(duì)列writing_queue
中獲取消息议街。獲取的數(shù)據(jù)存入變量s
中泽谨。如果s
是整數(shù)0,則表示收到進(jìn)程結(jié)束信號(hào)特漩。代理進(jìn)程將跳出死循環(huán)吧雹,結(jié)束運(yùn)行。如果s
是個(gè)字符串涂身,則代理進(jìn)程打開文件filename
雄卷,將s
寫入文件中。 - 建立任務(wù)進(jìn)程蛤售。任務(wù)進(jìn)程執(zhí)行某個(gè)任務(wù)丁鹉,在需要向文件寫入信息時(shí),將其需要寫入的信息封裝成字符串悴能,投入到
writing_queue
隊(duì)列中揣钦。 - 當(dāng)所有任務(wù)進(jìn)程都結(jié)束運(yùn)行,并入住進(jìn)程之后搜骡,主進(jìn)程需要告知代理進(jìn)程結(jié)束工作。按照步驟1中的規(guī)定佑女,主進(jìn)程只需往
writing_queue
隊(duì)列中投入一個(gè)整數(shù)0记靡,并且等待代理進(jìn)程結(jié)束即可。
Python的代碼實(shí)現(xiàn)
最后這個(gè)小節(jié)团驱,介紹如何用Python實(shí)現(xiàn)上一節(jié)的模型摸吠。首先,我們來編寫代理進(jìn)程的任務(wù)代碼嚎花。
def writing_proc(writing_queue, filename):
while True: # 開啟處理消息的死循環(huán)寸痢。直到接收到終止消息(數(shù)字0)方才跳出。
s = get()
if isinstance(s, str):
open(filename, 'a').write(s)
elif isinstance(s, int) and s == 0:
break
else:
continue # 忽略掉錯(cuò)誤格式的消息紊选。
接下來是任務(wù)進(jìn)程代碼啼止。
def task_proc(writing_queue):
# 執(zhí)行其自己的任務(wù)道逗。當(dāng)需要寫入文件時(shí):
s = "需要寫入的字符串"
writing_queue.put(s)
# 任務(wù)的其他代碼
可以看到,任務(wù)進(jìn)程中寫入文件的操作非常簡(jiǎn)單献烦,只需將要寫入的字符串put
到寫入隊(duì)列中就可以了滓窍。真正的文件寫入是上面的代理進(jìn)程的工作。最后巩那,給出主進(jìn)程的代碼:
# don't forget to import Process, Manager (to use the Queue object) and Pool (if you like) from multiprocessing.
def main():
m = Manager() # 從這個(gè)類中獲取Queue類吏夯。(見下面討論)
writing_queue = m.Queue()
filename = "log.txt"
p_write = Process(writing_proc, args=(writing_queue, filename))
p_write.start()
p_task_list = [] # 這里演示用Process而非Pool建立任務(wù)進(jìn)程
for i in range(20):
p = Process(task_proc, args=(writing_queue,))
p_task_list.append(p)
p.start()
# 主進(jìn)程開啟代理進(jìn)程和所有任務(wù)進(jìn)程后,執(zhí)行其自己的操作即横,然后等待任務(wù)進(jìn)程結(jié)束
for p in p_task_list:
p.join()
# 現(xiàn)在任務(wù)進(jìn)程全部結(jié)束噪生。但代理進(jìn)程還在死循環(huán)中。
# 主進(jìn)程需要告知代理進(jìn)程結(jié)束運(yùn)行东囚。
writing_queue.put(0)
p_write.join() # 等待代理進(jìn)程結(jié)束跺嗽。
# 現(xiàn)在所有子進(jìn)程均已完成工作。主進(jìn)程可以繼續(xù)執(zhí)行其他代碼舔庶,或者退出程序抛蚁。
上面代碼中提到了要使用multiprocessing.Manager
來創(chuàng)建Queue,而不是直接從multiprocess
中引入Queue
惕橙,即
from multiprocessing import Queue # 有可能引發(fā)錯(cuò)誤瞧甩。
直接使用這一行代碼,會(huì)引發(fā)錯(cuò)誤:Queue can only be used through inheritance
. 錯(cuò)誤提示表示弥鹦,Python只允許不同的進(jìn)程通過集成同一個(gè)基類(Process)的方式使用Queue肚逸。也就是說,我們需要給每一個(gè)任務(wù)都創(chuàng)建一個(gè)進(jìn)程類彬坏,而非一個(gè)簡(jiǎn)單的進(jìn)程函數(shù)朦促。其實(shí),這確實(shí)是一個(gè)很好的編碼習(xí)慣栓始。特別是編寫比較大的程序時(shí)务冕,給每一個(gè)任務(wù)定義一個(gè)進(jìn)程類會(huì)讓代碼的組織更清晰,更便于管理和重用幻赚。但是這里禀忆,我們只做簡(jiǎn)單的演示,固沒有定義任務(wù)類落恼,而是使用任務(wù)函數(shù)箩退。為了避免出現(xiàn)錯(cuò)誤,我們就引入Manager
類佳谦,然后從Manager
對(duì)象中引入Queue
類戴涝,見下面代碼。
from multiprocessing import Manager
m = Manager()
queue = m.Queue()
總結(jié)
本文通過”多進(jìn)程同時(shí)對(duì)同一個(gè)文件進(jìn)行讀寫“這個(gè)問題,樸素講解了進(jìn)程鎖啥刻、進(jìn)程通信和進(jìn)程間通信的概念奸鸯,展示了用不同方式構(gòu)建出的不同的并發(fā)執(zhí)行模型。本人的經(jīng)驗(yàn)是郑什,當(dāng)在進(jìn)行并發(fā)編程時(shí)府喳,在開始寫代碼之前,先設(shè)計(jì)一下程序的并發(fā)執(zhí)行模型是很重要的蘑拯。這個(gè)模型可以類似于上面幾個(gè)小節(jié)中的列表钝满,規(guī)定進(jìn)程間通信的格式,以及規(guī)定不同進(jìn)程對(duì)統(tǒng)一資源進(jìn)行操作的先后步驟申窘。把這個(gè)模型搞清楚之后弯蚜,再去考慮寫代碼實(shí)現(xiàn)這個(gè)模型。
感謝閱讀剃法,歡迎在留言區(qū)討論碎捺,或者談一下你在設(shè)計(jì)并發(fā)程序時(shí)遇到的問題。Bye!