Python多進程運行——Multiprocessing基礎教程2

上篇文章簡單介紹了multiprocessing模塊诵盼,本文將要介紹進程之間的數(shù)據(jù)共享和信息傳遞的概念惠豺。

1 數(shù)據(jù)共享

在多進程處理中,所有新創(chuàng)建的進程都會有這兩個特點:獨立運行风宁,有自己的內(nèi)存空間洁墙。

我們來舉個例子展示一下:

import multiprocessing 

# empty list with global scope 
result = [] 

def square_list(mylist): 
    global result 
    # append squares of mylist to global list result 
    for num in mylist: 
        result.append(num * num) 
    # print global list result 
    print("Result(in process p1): {}".format(result)) 

if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 

    # creating new process 
    p1 = multiprocessing.Process(target=square_list, args=(mylist,)) 
    # starting process 
    p1.start() 
    # wait until process is finished 
    p1.join() 

    # print global result list 
    print("Result(in main program): {}".format(result)) 

這個程序的輸出結(jié)果是:

Result(in process p1): [1, 4, 9, 16]
Result(in main program): []

在上面的程序中我們嘗試在兩個地方打印全局列表result的內(nèi)容:

  • square_list()函數(shù)中,由于這個函數(shù)是由進程p1調(diào)用的戒财,所以result列表只在進程p1的內(nèi)存空間中更改热监。
  • 在主程序中的p1進程完成后。由于主程序由不同的進程運行饮寞,它的內(nèi)存空間中的result列表仍然是空的狼纬。

我們再用一張圖來幫助理解記憶不同進程間的數(shù)據(jù)關系:

圖1 進程間數(shù)據(jù)關系.png

1.1 內(nèi)存共享

如果程序需要在不同的進程之間共享一些數(shù)據(jù)的話,該怎么做呢骂际?不用擔心疗琉,multiprocessing模塊提供了Array對象和Value對象,用來在進程之間共享數(shù)據(jù)歉铝。

所謂Array對象和Value對象分別是指從共享內(nèi)存中分配的ctypes數(shù)組和對象盈简。我們直接來看一個例子,展示如何用Array對象和Value對象在進程之間共享數(shù)據(jù):

import multiprocessing 
  
def square_list(mylist, result, square_sum): 
    # append squares of mylist to result array 
    for idx, num in enumerate(mylist): 
        result[idx] = num * num 
        
    # square_sum value 
    square_sum.value = sum(result) 
    
    # print result Array 
    print("Result(in process p1): {}".format(result[:])) 
    
    # print square_sum Value 
    print("Sum of squares(in process p1): {}".format(square_sum.value)) 
    
if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 
    
    # creating Array of int data type with space for 4 integers 
    result = multiprocessing.Array('i', 4) 
    
    # creating Value of int data type 
    square_sum = multiprocessing.Value('i') 
    
    # creating new process 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum)) 
    
    # starting process 
    p1.start() 
    
    # wait until process is finished 
    p1.join() 
    
    # print result array 
    print("Result(in main program): {}".format(result[:])) 
    
    # print square_sum Value 
    print("Sum of squares(in main program): {}".format(square_sum.value)) 

程序輸出的結(jié)果如下:

Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30

成功了太示!主程序和p1進程輸出了同樣的結(jié)果柠贤,說明程序中確實完成了不同進程間的數(shù)據(jù)共享。那么我們來詳細看一下上面的程序做了什么:

在主程序中我們首先創(chuàng)建了一個Array對象:

result = multiprocessing.Array('i', 4)

向這個對象輸入的第一個參數(shù)是數(shù)據(jù)類型:i表示整數(shù)类缤,d代表浮點數(shù)臼勉。第二個參數(shù)是數(shù)組的大小,在這個例子中我們創(chuàng)建了包含4個元素的數(shù)組餐弱。

類似的宴霸,我們創(chuàng)建了一個Value對象:

square_sum = multiprocessing.Value('i')

我們只對Value對象輸入了一個參數(shù),那就是數(shù)據(jù)類型膏蚓,與上述的方法一致瓢谢。當然,我們還可以對其指定一個初始值(比如10)驮瞧,就像這樣:

square_sum = multiprocessing.Value('i', 10)

隨后氓扛,我們在創(chuàng)建進程對象時,將剛創(chuàng)建好的兩個對象:result和square_sum作為參數(shù)輸入給進程:

p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))

在函數(shù)中result元素通過索引進行數(shù)組賦值论笔,square_sum通過value屬性進行賦值采郎。

注意:為了完整打印result數(shù)組的結(jié)果千所,需要使用result[:]進行打印,而square_sum也需要使用value屬性進行打铀饴瘛:

print("Result(in process p1): {}".format(result[:])) 
print("Sum of squares(in process p1): {}".format(square_sum.value)) 

1.2 服務器進程

每當python程序啟動時淫痰,同時也會啟動一個服務器進程。隨后理茎,只要我們需要生成一個新進程黑界,父進程就會連接到服務器并請求它派生一個新進程。這個服務器進程可以保存Python對象皂林,并允許其他進程使用代理來操作它們朗鸠。

multiprocessing模塊提供了能夠控制服務器進程的Manager類。所以础倍,Manager類也提供了一種創(chuàng)建可以在不同流程之間共享的數(shù)據(jù)的方法烛占。

服務器進程管理器比使用共享內(nèi)存對象更靈活,因為它們可以支持任意對象類型沟启,如列表忆家、字典、隊列德迹、值芽卿、數(shù)組等。此外胳搞,單個管理器可以由網(wǎng)絡上不同計算機上的進程共享卸例。

但是,服務器進程管理器的速度比使用共享內(nèi)存要慢肌毅。

讓我們來看一個例子:

import multiprocessing 
  
def print_records(records): 
    for record in records: 
        print("Name: {0}\nScore: {1}\n".format(record[0], record[1])) 
        
def insert_record(record, records): 
    records.append(record) 
    print("New record added!\n") 
    
if __name__ == '__main__': 
    with multiprocessing.Manager() as manager: 
        # creating a list in server process memory 
        records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)]) 
        # new record to be inserted in records 
        new_record = ('Jeff', 8) 
        
        # creating new processes 
        p1 = multiprocessing.Process(target=insert_record, args=(new_record, records)) 
        p2 = multiprocessing.Process(target=print_records, args=(records,)) 
        
        # running process p1 to insert new record 
        p1.start() 
        p1.join() 
        
        # running process p2 to print records 
        p2.start() 
        p2.join() 

這個程序的輸出結(jié)果是:

New record added!

Name: Sam
Score: 10

Name: Adam
Score: 9

Name: Kevin
Score: 9

Name: Jeff
Score: 8

我們來理解一下這個程序做了什么:首先我們創(chuàng)建了一個manager對象

with multiprocessing.Manager() as manager:

在with語句下的所有行筷转,都是在manager對象的范圍內(nèi)的。接下來我們使用這個manager對象創(chuàng)建了列表(類似的悬而,我們還可以用manager.dict()創(chuàng)建字典)呜舒。

最后我們創(chuàng)建了進程p1(用于在records列表中插入一條新的record)和p2(將records打印出來),并將records作為參數(shù)進行傳遞笨奠。

服務器進程的概念再次用下圖總結(jié)一下:

圖2 進程間數(shù)據(jù)共享.png

2 數(shù)據(jù)傳遞

為了能使多個流程能夠正常工作袭蝗,常常需要在它們之間進行一些通信,以便能夠劃分工作并匯總最后的結(jié)果艰躺。multiprocessing模塊支持進程之間的兩種通信通道:Queue和Pipe呻袭。

2.1 Queue

使用隊列來回處理多進程之間的通信是一種比較簡單的方法。任何Python對象都可以使用隊列進行傳遞腺兴。我們來看一個例子:

import multiprocessing 
  
def square_list(mylist, q): 
    # append squares of mylist to queue 
    for num in mylist: 
        q.put(num * num) 
        
def print_queue(q): 
    print("Queue elements:") 
    while not q.empty(): 
        print(q.get()) 
    print("Queue is now empty!") 
    
if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 
    
    # creating multiprocessing Queue 
    q = multiprocessing.Queue() 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, q)) 
    p2 = multiprocessing.Process(target=print_queue, args=(q,)) 
    
    # running process p1 to square list 
    p1.start() 
    p1.join() 
    
    # running process p2 to get queue elements 
    p2.start() 
    p2.join() 

上面這個程序的輸出結(jié)果是:

Queue elements:
1
4
9
16
Queue is now empty!

我們來看一下上面這個程序到底做了什么。首先我們創(chuàng)建了一個Queue對象:

q = multiprocessing.Queue()

然后廉侧,將這個空的Queue對象輸入square_list函數(shù)页响。該函數(shù)會將列表中的數(shù)平方篓足,再使用put()方法放入隊列中:

q.put(num * num)

隨后使用get()方法,將q打印出來闰蚕,直至q重新稱為一個空的Queue對象:

while not q.empty():
    print(q.get())

我們還是用一張圖來幫助理解記憶:

圖3 用Queue完成進程間數(shù)據(jù)傳輸.png

2.2 Pipe

一個Pipe對象只能有兩個端點栈拖。因此,當進程只需要雙向通信時没陡,它會比Queue對象更好用涩哟。

multiprocessing模塊提供了Pipe()函數(shù),該函數(shù)返回由管道連接的一對連接對象盼玄。Pipe()返回的兩個連接對象分別表示管道的兩端贴彼。每個連接對象都有send()recv()方法。

我們來看一個例子:

import multiprocessing 
  
def sender(conn, msgs): 
    for msg in msgs: 
        conn.send(msg) 
        print("Sent the message: {}".format(msg)) 
    conn.close() 
    
def receiver(conn): 
    while 1: 
        msg = conn.recv() 
        if msg == "END": 
            break
        print("Received the message: {}".format(msg)) 
        
if __name__ == "__main__": 
    # messages to be sent 
    msgs = ["hello", "hey", "hru?", "END"] 
    
    # creating a pipe 
    parent_conn, child_conn = multiprocessing.Pipe() 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs)) 
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,)) 
    
    # running processes 
    p1.start() 
    p2.start() 
    
    # wait until processes finish 
    p1.join() 
    p2.join() 

上面這個程序的輸出結(jié)果是:

Sent the message: hello
Sent the message: hey
Sent the message: hru?
Received the message: hello
Sent the message: END
Received the message: hey
Received the message: hru?

我們還是來看一下這個程序到底做了什么埃儿。首先創(chuàng)建了一個Pipe對象:

parent_conn, child_conn = multiprocessing.Pipe()

與上文說的一樣器仗,該對象返回了一對管道兩端的兩個連接對象。然后使用send()方法和recv()方法進行信息的傳遞童番。就這么簡單。在上面的程序中,我們從一端向另一端發(fā)送一串消息钾怔。在另一端显歧,我們收到消息,并在收到END消息時退出幼东。

要注意的是臂容,如果兩個進程(或線程)同時嘗試從管道的同一端讀取或?qū)懭牍艿乐械臄?shù)據(jù),則管道中的數(shù)據(jù)可能會損壞筋粗。不過不同的進程同時使用管道的兩端是沒有問題的策橘。還要注意,Queue對象在進程之間進行了適當?shù)耐侥纫冢鷥r是增加了計算復雜度丽已。因此,Queue對象對于線程和進程是相對安全的买决。

最后我們還是用一張圖來示意:

圖4 用Pipe完成進程間數(shù)據(jù)傳輸.png

Python的multiprocessing模塊還剩最后一篇文章:多進程的同步與池化

敬請期待啦沛婴!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市督赤,隨后出現(xiàn)的幾起案子嘁灯,更是在濱河造成了極大的恐慌,老刑警劉巖躲舌,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件丑婿,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機羹奉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門秒旋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人诀拭,你說我怎么就攤上這事迁筛。” “怎么了耕挨?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵细卧,是天一觀的道長。 經(jīng)常有香客問我筒占,道長贪庙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任赋铝,我火速辦了婚禮插勤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘革骨。我一直安慰自己农尖,他們只是感情好,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布良哲。 她就那樣靜靜地躺著盛卡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪筑凫。 梳的紋絲不亂的頭發(fā)上滑沧,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音巍实,去河邊找鬼滓技。 笑死,一個胖子當著我的面吹牛棚潦,可吹牛的內(nèi)容都是我干的令漂。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼丸边,長吁一口氣:“原來是場噩夢啊……” “哼叠必!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起妹窖,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤纬朝,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后骄呼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體共苛,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡判没,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了俄讹。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哆致。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡绕德,死狀恐怖患膛,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情耻蛇,我是刑警寧澤踪蹬,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站臣咖,受9級特大地震影響跃捣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜夺蛇,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一疚漆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧刁赦,春花似錦娶聘、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至牺氨,卻和暖如春狡耻,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背猴凹。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工夷狰, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人郊霎。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓沼头,卻偏偏與公主長得像,于是被迫代替她去往敵國和親歹篓。 傳聞我的和親對象是個殘疾皇子瘫证,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355