上篇文章簡單介紹了multiprocessing模塊诵盼,本文將要介紹進程之間的數(shù)據(jù)共享和信息傳遞的概念惠豺。
1 數(shù)據(jù)共享
在多進程處理中,所有新創(chuàng)建的進程都會有這兩個特點:獨立運行风宁,有自己的內(nèi)存空間洁墙。
我們來舉個例子展示一下:
import multiprocessing
# empty list with global scope
result = []
def square_list(mylist):
global result
# append squares of mylist to global list result
for num in mylist:
result.append(num * num)
# print global list result
print("Result(in process p1): {}".format(result))
if __name__ == "__main__":
# input list
mylist = [1,2,3,4]
# creating new process
p1 = multiprocessing.Process(target=square_list, args=(mylist,))
# starting process
p1.start()
# wait until process is finished
p1.join()
# print global result list
print("Result(in main program): {}".format(result))
這個程序的輸出結(jié)果是:
Result(in process p1): [1, 4, 9, 16]
Result(in main program): []
在上面的程序中我們嘗試在兩個地方打印全局列表result的內(nèi)容:
- 在
square_list()
函數(shù)中,由于這個函數(shù)是由進程p1調(diào)用的戒财,所以result列表只在進程p1的內(nèi)存空間中更改热监。 - 在主程序中的p1進程完成后。由于主程序由不同的進程運行饮寞,它的內(nèi)存空間中的result列表仍然是空的狼纬。
我們再用一張圖來幫助理解記憶不同進程間的數(shù)據(jù)關系:
1.1 內(nèi)存共享
如果程序需要在不同的進程之間共享一些數(shù)據(jù)的話,該怎么做呢骂际?不用擔心疗琉,multiprocessing模塊提供了Array對象和Value對象,用來在進程之間共享數(shù)據(jù)歉铝。
所謂Array對象和Value對象分別是指從共享內(nèi)存中分配的ctypes數(shù)組和對象盈简。我們直接來看一個例子,展示如何用Array對象和Value對象在進程之間共享數(shù)據(jù):
import multiprocessing
def square_list(mylist, result, square_sum):
# append squares of mylist to result array
for idx, num in enumerate(mylist):
result[idx] = num * num
# square_sum value
square_sum.value = sum(result)
# print result Array
print("Result(in process p1): {}".format(result[:]))
# print square_sum Value
print("Sum of squares(in process p1): {}".format(square_sum.value))
if __name__ == "__main__":
# input list
mylist = [1,2,3,4]
# creating Array of int data type with space for 4 integers
result = multiprocessing.Array('i', 4)
# creating Value of int data type
square_sum = multiprocessing.Value('i')
# creating new process
p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
# starting process
p1.start()
# wait until process is finished
p1.join()
# print result array
print("Result(in main program): {}".format(result[:]))
# print square_sum Value
print("Sum of squares(in main program): {}".format(square_sum.value))
程序輸出的結(jié)果如下:
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30
成功了太示!主程序和p1進程輸出了同樣的結(jié)果柠贤,說明程序中確實完成了不同進程間的數(shù)據(jù)共享。那么我們來詳細看一下上面的程序做了什么:
在主程序中我們首先創(chuàng)建了一個Array對象:
result = multiprocessing.Array('i', 4)
向這個對象輸入的第一個參數(shù)是數(shù)據(jù)類型:i表示整數(shù)类缤,d代表浮點數(shù)臼勉。第二個參數(shù)是數(shù)組的大小,在這個例子中我們創(chuàng)建了包含4個元素的數(shù)組餐弱。
類似的宴霸,我們創(chuàng)建了一個Value對象:
square_sum = multiprocessing.Value('i')
我們只對Value對象輸入了一個參數(shù),那就是數(shù)據(jù)類型膏蚓,與上述的方法一致瓢谢。當然,我們還可以對其指定一個初始值(比如10)驮瞧,就像這樣:
square_sum = multiprocessing.Value('i', 10)
隨后氓扛,我們在創(chuàng)建進程對象時,將剛創(chuàng)建好的兩個對象:result和square_sum作為參數(shù)輸入給進程:
p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
在函數(shù)中result元素通過索引進行數(shù)組賦值论笔,square_sum通過value
屬性進行賦值采郎。
注意:為了完整打印result數(shù)組的結(jié)果千所,需要使用result[:]
進行打印,而square_sum也需要使用value
屬性進行打铀饴瘛:
print("Result(in process p1): {}".format(result[:]))
print("Sum of squares(in process p1): {}".format(square_sum.value))
1.2 服務器進程
每當python程序啟動時淫痰,同時也會啟動一個服務器進程。隨后理茎,只要我們需要生成一個新進程黑界,父進程就會連接到服務器并請求它派生一個新進程。這個服務器進程可以保存Python對象皂林,并允許其他進程使用代理來操作它們朗鸠。
multiprocessing模塊提供了能夠控制服務器進程的Manager類。所以础倍,Manager類也提供了一種創(chuàng)建可以在不同流程之間共享的數(shù)據(jù)的方法烛占。
服務器進程管理器比使用共享內(nèi)存對象更靈活,因為它們可以支持任意對象類型沟启,如列表忆家、字典、隊列德迹、值芽卿、數(shù)組等。此外胳搞,單個管理器可以由網(wǎng)絡上不同計算機上的進程共享卸例。
但是,服務器進程管理器的速度比使用共享內(nèi)存要慢肌毅。
讓我們來看一個例子:
import multiprocessing
def print_records(records):
for record in records:
print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))
def insert_record(record, records):
records.append(record)
print("New record added!\n")
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
# creating a list in server process memory
records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])
# new record to be inserted in records
new_record = ('Jeff', 8)
# creating new processes
p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
p2 = multiprocessing.Process(target=print_records, args=(records,))
# running process p1 to insert new record
p1.start()
p1.join()
# running process p2 to print records
p2.start()
p2.join()
這個程序的輸出結(jié)果是:
New record added!
Name: Sam
Score: 10
Name: Adam
Score: 9
Name: Kevin
Score: 9
Name: Jeff
Score: 8
我們來理解一下這個程序做了什么:首先我們創(chuàng)建了一個manager對象
with multiprocessing.Manager() as manager:
在with語句下的所有行筷转,都是在manager對象的范圍內(nèi)的。接下來我們使用這個manager對象創(chuàng)建了列表(類似的悬而,我們還可以用manager.dict()
創(chuàng)建字典)呜舒。
最后我們創(chuàng)建了進程p1(用于在records列表中插入一條新的record)和p2(將records打印出來),并將records作為參數(shù)進行傳遞笨奠。
服務器進程的概念再次用下圖總結(jié)一下:
2 數(shù)據(jù)傳遞
為了能使多個流程能夠正常工作袭蝗,常常需要在它們之間進行一些通信,以便能夠劃分工作并匯總最后的結(jié)果艰躺。multiprocessing模塊支持進程之間的兩種通信通道:Queue和Pipe呻袭。
2.1 Queue
使用隊列來回處理多進程之間的通信是一種比較簡單的方法。任何Python對象都可以使用隊列進行傳遞腺兴。我們來看一個例子:
import multiprocessing
def square_list(mylist, q):
# append squares of mylist to queue
for num in mylist:
q.put(num * num)
def print_queue(q):
print("Queue elements:")
while not q.empty():
print(q.get())
print("Queue is now empty!")
if __name__ == "__main__":
# input list
mylist = [1,2,3,4]
# creating multiprocessing Queue
q = multiprocessing.Queue()
# creating new processes
p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
p2 = multiprocessing.Process(target=print_queue, args=(q,))
# running process p1 to square list
p1.start()
p1.join()
# running process p2 to get queue elements
p2.start()
p2.join()
上面這個程序的輸出結(jié)果是:
Queue elements:
1
4
9
16
Queue is now empty!
我們來看一下上面這個程序到底做了什么。首先我們創(chuàng)建了一個Queue對象:
q = multiprocessing.Queue()
然后廉侧,將這個空的Queue對象輸入square_list函數(shù)页响。該函數(shù)會將列表中的數(shù)平方篓足,再使用put()
方法放入隊列中:
q.put(num * num)
隨后使用get()
方法,將q打印出來闰蚕,直至q重新稱為一個空的Queue對象:
while not q.empty():
print(q.get())
我們還是用一張圖來幫助理解記憶:
2.2 Pipe
一個Pipe對象只能有兩個端點栈拖。因此,當進程只需要雙向通信時没陡,它會比Queue對象更好用涩哟。
multiprocessing模塊提供了Pipe()
函數(shù),該函數(shù)返回由管道連接的一對連接對象盼玄。Pipe()
返回的兩個連接對象分別表示管道的兩端贴彼。每個連接對象都有send()
和recv()
方法。
我們來看一個例子:
import multiprocessing
def sender(conn, msgs):
for msg in msgs:
conn.send(msg)
print("Sent the message: {}".format(msg))
conn.close()
def receiver(conn):
while 1:
msg = conn.recv()
if msg == "END":
break
print("Received the message: {}".format(msg))
if __name__ == "__main__":
# messages to be sent
msgs = ["hello", "hey", "hru?", "END"]
# creating a pipe
parent_conn, child_conn = multiprocessing.Pipe()
# creating new processes
p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
# running processes
p1.start()
p2.start()
# wait until processes finish
p1.join()
p2.join()
上面這個程序的輸出結(jié)果是:
Sent the message: hello
Sent the message: hey
Sent the message: hru?
Received the message: hello
Sent the message: END
Received the message: hey
Received the message: hru?
我們還是來看一下這個程序到底做了什么埃儿。首先創(chuàng)建了一個Pipe對象:
parent_conn, child_conn = multiprocessing.Pipe()
與上文說的一樣器仗,該對象返回了一對管道兩端的兩個連接對象。然后使用send()
方法和recv()
方法進行信息的傳遞童番。就這么簡單。在上面的程序中,我們從一端向另一端發(fā)送一串消息钾怔。在另一端显歧,我們收到消息,并在收到END消息時退出幼东。
要注意的是臂容,如果兩個進程(或線程)同時嘗試從管道的同一端讀取或?qū)懭牍艿乐械臄?shù)據(jù),則管道中的數(shù)據(jù)可能會損壞筋粗。不過不同的進程同時使用管道的兩端是沒有問題的策橘。還要注意,Queue對象在進程之間進行了適當?shù)耐侥纫冢鷥r是增加了計算復雜度丽已。因此,Queue對象對于線程和進程是相對安全的买决。
最后我們還是用一張圖來示意:
Python的multiprocessing模塊還剩最后一篇文章:多進程的同步與池化
敬請期待啦沛婴!