2019-11-28,進程

multiprocessing --- 基于進程的并行
源代碼 Lib/multiprocessing/

概述
multiprocessing 是一個用于產(chǎn)生進程的包血柳,具有與 threading 模塊相似API迹恐。 multiprocessing 包同時提供本地和遠程并發(fā)逗载,使用子進程代替線程批什,有效避免 Global Interpreter Lock 帶來的影響农曲。因此, multiprocessing 模塊允許程序員充分利用機器上的多核∪楣妫可運行于 Unix 和 Windows 形葬。

multiprocessing 模塊還引入了在 threading 模塊中沒有的API。一個主要的例子就是 Pool 對象暮的,它提供了一種快捷的方法笙以,賦予函數(shù)并行化處理一系列輸入值的能力,可以將輸入數(shù)據(jù)分配給不同進程處理(數(shù)據(jù)并行)冻辩。下面的例子演示了在模塊中定義此類函數(shù)的常見做法猖腕,以便子進程可以成功導入該模塊。這個數(shù)據(jù)并行的基本例子使用了 Pool 微猖,

from multiprocessing import Pool

def f(x):
return x*x

if name == 'main':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
將在標準輸出中打印

[1, 4, 9]
Process 類
在 multiprocessing 中谈息,通過創(chuàng)建一個 Process 對象然后調用它的 start() 方法來生成進程。 Process 和 threading.Thread API 相同凛剥。 一個簡單的多進程程序示例是:

from multiprocessing import Process

def f(name):
print('hello', name)

if name == 'main':
p = Process(target=f, args=('bob',))
p.start()
p.join()
要顯示所涉及的各個進程ID,這是一個擴展示例:

from multiprocessing import Process
import os

def info(title):
print(title)
print('module name:', name)
print('parent process:', os.getppid())
print('process id:', os.getpid())

def f(name):
info('function f')
print('hello', name)

if name == 'main':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
關于為什么 if name == 'main' 部分是必需的解釋轻姿,請參見 編程指導犁珠。

上下文和啟動方法
根據(jù)不同的平臺, multiprocessing 支持三種啟動進程的方法互亮。這些 啟動方法 有

spawn
父進程啟動一個新的Python解釋器進程犁享。子進程只會繼承那些運行進程對象的 run() 方法所需的資源。特別是父進程中非必須的文件描述符和句柄不會被繼承豹休。相對于使用 fork 或者 forkserver炊昆,使用這個方法啟動進程相當慢。

可在Unix和Windows上使用威根。 Windows上的默認設置。

fork
父進程使用 os.fork() 來產(chǎn)生 Python 解釋器分叉。子進程在開始時實際上與父進程相同渠羞。父進程的所有資源都由子進程繼承译打。請注意,安全分叉多線程進程是棘手的留美。

只存在于Unix彰檬。Unix中的默認值。

forkserver
程序啟動并選擇* forkserver * 啟動方法時谎砾,將啟動服務器進程逢倍。從那時起,每當需要一個新進程時景图,父進程就會連接到服務器并請求它分叉一個新進程较雕。分叉服務器進程是單線程的,因此使用 os.fork() 是安全的症歇。沒有不必要的資源被繼承郎笆。

可在Unix平臺上使用谭梗,支持通過Unix管道傳遞文件描述符。

在 3.8 版更改: 對于 macOS宛蚓,spawn 啟動方式是默認方式激捏。 因為 fork 可能導致subprocess崩潰,被認為是不安全的凄吏,查看 bpo-33725 远舅。

在 3.4 版更改: 在所有unix平臺上添加支持了 spawn ,并且為一些unix平臺添加了 forkserver 痕钢。在Windows上子進程不再繼承所有可繼承的父進程句柄图柏。

在 Unix 上通過 spawn 和 forkserver 方式啟動多進程會同時啟動一個 資源追蹤 進程,負責追蹤當前程序的進程產(chǎn)生的任连、并且不再被使用的命名系統(tǒng)資源(如命名信號量以及 SharedMemory 對象)蚤吹。當所有進程退出后,資源追蹤會負責釋放這些仍被追蹤的的對象随抠。通常情況下是不會有這種對象的裁着,但是假如一個子進程被某個信號殺死,就可能存在這一類資源的“泄露”情況拱她。(泄露的信號量以及共享內存不會被釋放二驰,直到下一次系統(tǒng)重啟,對于這兩類資源來說秉沼,這是一個比較大的問題桶雀,因為操作系統(tǒng)允許的命名信號量的數(shù)量是有限的,而共享內存也會占據(jù)主內存的一片空間)

要選擇一個啟動方法唬复,你應該在主模塊的 if name == 'main' 子句中調用 set_start_method() 矗积。例如:

import multiprocessing as mp

def foo(q):
q.put('hello')

if name == 'main':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
在程序中 set_start_method() 不應該被多次調用。

或者盅抚,你可以使用 get_context() 來獲取上下文對象漠魏。上下文對象與 multiprocessing 模塊具有相同的API,并允許在同一程序中使用多種啟動方法妄均。:

import multiprocessing as mp

def foo(q):
q.put('hello')

if name == 'main':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
請注意柱锹,關聯(lián)到不同上下文的對象和進程之前可能不兼容。特別是丰包,使用 fork 上下文創(chuàng)建的鎖不能傳遞給使用 spawn 或 forkserver 啟動方法啟動的進程禁熏。

想要使用特定啟動方法的庫應該使用 get_context() 以避免干擾庫用戶的選擇。

警告 'spawn' 和 'forkserver' 啟動方法當前不能在Unix上和“凍結的”可執(zhí)行內容一同使用(例如邑彪,有類似 PyInstaller 和 cx_Freeze 的包產(chǎn)生的二進制文件)瞧毙。 'fork' 啟動方法可以使用。
在進程之間交換對象
multiprocessing 支持進程之間的兩種通信通道:

隊列

Queue 類是一個近似 queue.Queue 的克隆。 例如:

from multiprocessing import Process, Queue

def f(q):
q.put([42, None, 'hello'])

if name == 'main':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
隊列是線程和進程安全的宙彪。

管道

Pipe() 函數(shù)返回一個由管道連接的連接對象矩动,默認情況下是雙工(雙向)。例如:

from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if name == 'main':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
返回的兩個連接對象 Pipe() 表示管道的兩端释漆。每個連接對象都有 send() 和 recv() 方法(相互之間的)悲没。請注意,如果兩個進程(或線程)同時嘗試讀取或寫入管道的 同一 端男图,則管道中的數(shù)據(jù)可能會損壞示姿。當然,在不同進程中同時使用管道的不同端的情況下不存在損壞的風險逊笆。

進程間同步
對于所有在 threading 存在的同步原語栈戳,multiprocessing 中都有類似的等價物。例如难裆,可以使用鎖來確保一次只有一個進程打印到標準輸出:

from multiprocessing import Process, Lock

def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()

if name == 'main':
lock = Lock()

for num in range(10):
    Process(target=f, args=(lock, num)).start()

不使用鎖的情況下子檀,來自于多進程的輸出很容易產(chǎn)生混淆。

進程間共享狀態(tài)
如上所述乃戈,在進行并發(fā)編程時命锄,通常最好盡量避免使用共享狀態(tài)。使用多個進程時尤其如此偏化。

但是,如果你真的需要使用一些共享數(shù)據(jù)镐侯,那么 multiprocessing 提供了兩種方法侦讨。

共享內存

可以使用 Value 或 Array 將數(shù)據(jù)存儲在共享內存映射中。例如苟翻,以下代碼:

from multiprocessing import Process, Value, Array

def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]

if name == 'main':
num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])

將打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
創(chuàng)建 num 和 arr 時使用的 'd' 和 'i' 參數(shù)是 array 模塊使用的類型的 typecode : 'd' 表示雙精度浮點數(shù)韵卤, 'i' 表示有符號整數(shù)。這些共享對象將是進程和線程安全的崇猫。

為了更靈活地使用共享內存沈条,可以使用 multiprocessing.sharedctypes 模塊,該模塊支持創(chuàng)建從共享內存分配的任意ctypes對象诅炉。

服務進程

由 Manager() 返回的管理器對象控制一個服務進程蜡歹,該進程保存Python對象并允許其他進程使用代理操作它們。

Manager() 返回的管理器支持類型: list 涕烧、 dict 月而、 Namespace 、 Lock 议纯、 RLock 父款、 Semaphore 、 BoundedSemaphore 、 Condition 憨攒、 Event 世杀、 Barrier 、 Queue 肝集、 Value 和 Array 瞻坝。例如

from multiprocessing import Process, Manager

def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()

if name == 'main':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print(d)
    print(l)

將打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
使用服務進程的管理器比使用共享內存對象更靈活,因為它們可以支持任意對象類型包晰。此外湿镀,單個管理器可以通過網(wǎng)絡由不同計算機上的進程共享。但是伐憾,它們比使用共享內存慢勉痴。

使用工作進程
Pool 類表示一個工作進程池。它具有允許以幾種不同方式將任務分配到工作進程的方法树肃。

例如:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
return x*x

if name == 'main':
# start 4 worker processes
with Pool(processes=4) as pool:

    # print "[0, 1, 4,..., 81]"
    print(pool.map(f, range(10)))

    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print(i)

    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print(res.get(timeout=1))             # prints "400"

    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print(res.get(timeout=1))             # prints the PID of that process

    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print([res.get(timeout=1) for res in multiple_results])

    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print(res.get(timeout=1))
    except TimeoutError:
        print("We lacked patience and got a multiprocessing.TimeoutError")

    print("For the moment, the pool remains available for more work")

# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")

請注意蒸矛,進程池的方法只能由創(chuàng)建它的進程使用。

注解 這個包中的功能要求子進程可以導入 main 模塊胸嘴。雖然這在 編程指導 中有描述雏掠,但還是需要提前說明一下。這意味著一些示例在交互式解釋器中不起作用劣像,比如 multiprocessing.pool.Pool 示例乡话。例如:

from multiprocessing import Pool
p = Pool(5)
def f(x):
... return x*x
...
p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果嘗試執(zhí)行上面的代碼,它會以一種半隨機的方式將三個完整的堆棧內容交替輸出耳奕,然后你只能以某種方式停止父進程绑青。)

參考
multiprocessing 包主要復制了 threading 模塊的API。

Process 和異常
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
進程對象表示在單獨進程中運行的活動屋群。 Process 類擁有和 threading.Thread 等價的大部分方法闸婴。

應始終使用關鍵字參數(shù)調用構造函數(shù)。 group 應該始終是 None 芍躏;它僅用于兼容 threading.Thread 邪乍。 target 是由 run() 方法調用的可調用對象。它默認為 None 对竣,意味著什么都沒有被調用庇楞。 name 是進程名稱(有關詳細信息,請參閱 name )柏肪。 args 是目標調用的參數(shù)元組姐刁。 kwargs 是目標調用的關鍵字參數(shù)字典。如果提供烦味,則鍵參數(shù) daemon 將進程 daemon 標志設置為 True 或 False 聂使。如果是 None (默認值)壁拉,則該標志將從創(chuàng)建的進程繼承。

默認情況下柏靶,不會將任何參數(shù)傳遞給 target 弃理。

如果子類重寫構造函數(shù),它必須確保它在對進程執(zhí)行任何其他操作之前調用基類構造函數(shù)( Process.init() )屎蜓。

在 3.3 版更改: 加入 daemon 參數(shù)痘昌。

run()
表示進程活動的方法。

你可以在子類中重載此方法炬转。標準 run() 方法調用傳遞給對象構造函數(shù)的可調用對象作為目標參數(shù)(如果有)辆苔,分別從 args 和 kwargs 參數(shù)中獲取順序和關鍵字參數(shù)。

start()
啟動進程活動扼劈。

這個方法每個進程對象最多只能調用一次驻啤。它會將對象的 run() 方法安排在一個單獨的進程中調用。

join([timeout])
如果可選參數(shù) timeout 是 None (默認值)荐吵,則該方法將阻塞骑冗,直到調用 join() 方法的進程終止。如果 timeout 是一個正數(shù)先煎,它最多會阻塞 timeout 秒贼涩。請注意,如果進程終止或方法超時薯蝎,則該方法返回 None 遥倦。檢查進程的 exitcode 以確定它是否終止。

一個進程可以被 join 多次占锯。

進程無法join自身谊迄,因為這會導致死鎖。嘗試在啟動進程之前join進程是錯誤的烟央。

name
進程的名稱。該名稱是一個字符串歪脏,僅用于識別目的疑俭。它沒有語義⌒鍪В可以為多個進程指定相同的名稱钞艇。

初始名稱由構造器設定。 如果沒有為構造器提供顯式名稱豪硅,則會構造一個形式為 'Process-N1:N2:...:Nk' 的名稱哩照,其中每個 Nk 是其父親的第 N 個孩子。

is_alive()
返回進程是否還活著懒浮。

粗略地說飘弧,從 start() 方法返回到子進程終止之前识藤,進程對象仍處于活動狀態(tài)。

daemon
進程的守護標志次伶,一個布爾值痴昧。這必須在 start() 被調用之前設置。

初始值繼承自創(chuàng)建進程冠王。

當進程退出時赶撰,它會嘗試終止其所有守護進程子進程。

請注意柱彻,不允許守護進程創(chuàng)建子進程豪娜。否則,守護進程會在子進程退出時終止其子進程哟楷。 另外瘤载,這些 不是 Unix守護進程或服務,它們是正常進程吓蘑,如果非守護進程已經(jīng)退出惕虑,它們將被終止(并且不被合并)。

除了 threading.Thread API 磨镶,Process 對象還支持以下屬性和方法:

pid
返回進程ID溃蔫。在生成該進程之前,這將是 None 琳猫。

exitcode
子進程的退出代碼伟叛。如果進程尚未終止,這將是 None 脐嫂。負值 -N 表示子進程被信號 N 終止统刮。

authkey
進程的身份驗證密鑰(字節(jié)字符串)。

當 multiprocessing 初始化時账千,主進程使用 os.urandom() 分配一個隨機字符串侥蒙。

當創(chuàng)建 Process 對象時,它將繼承其父進程的身份驗證密鑰匀奏,盡管可以通過將 authkey 設置為另一個字節(jié)字符串來更改鞭衩。

參見 認證密碼 。

sentinel
系統(tǒng)對象的數(shù)字句柄娃善,當進程結束時將變?yōu)?"ready" 论衍。

如果要使用 multiprocessing.connection.wait() 一次等待多個事件,可以使用此值聚磺。否則調用 join() 更簡單坯台。

在Windows上,這是一個操作系統(tǒng)句柄瘫寝,可以與 WaitForSingleObject 和 WaitForMultipleObjects 系列API調用一起使用蜒蕾。在Unix上稠炬,這是一個文件描述符,可以使用來自 select 模塊的原語滥搭。

3.3 新版功能.

terminate()
終止進程酸纲。 在Unix上,這是使用 SIGTERM 信號完成的瑟匆;在Windows上使用 TerminateProcess() 闽坡。 請注意,不會執(zhí)行退出處理程序和finally子句等愁溜。

請注意疾嗅,進程的后代進程將不會被終止 —— 它們將簡單地變成孤立的。

警告 如果在關聯(lián)進程使用管道或隊列時使用此方法冕象,則管道或隊列可能會損壞代承,并可能無法被其他進程使用。類似地渐扮,如果進程已獲得鎖或信號量等论悴,則終止它可能導致其他進程死鎖。
kill()
與 terminate() 相同墓律,但在Unix上使用 SIGKILL 信號膀估。

3.7 新版功能.

close()
關閉 Process 對象矾缓,釋放與之關聯(lián)的所有資源查坪。如果底層進程仍在運行油航,則會引發(fā) ValueError 拯辙。一旦 close() 成功返回, Process 對象的大多數(shù)其他方法和屬性將引發(fā) ValueError 秦叛。

3.7 新版功能.

注意 start() 新荤、 join() 荧嵌、 is_alive() 慰枕、 terminate() 和 exitcode 方法只能由創(chuàng)建進程對象的進程調用具则。

Process 一些方法的示例用法:

import multiprocessing, time, signal
p = multiprocessing.Process(target=time.sleep, args=(1000,))
print(p, p.is_alive())
<Process ... initial> False
p.start()
print(p, p.is_alive())
<Process ... started> True
p.terminate()
time.sleep(0.1)
print(p, p.is_alive())
<Process ... stopped exitcode=-SIGTERM> False
p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError
所有 multiprocessing 異常的基類。

exception multiprocessing.BufferTooShort
當提供的緩沖區(qū)對象太小而無法讀取消息時具帮, Connection.recv_bytes_into() 引發(fā)的異常乡洼。

如果 e 是一個 BufferTooShort 實例,那么 e.args[0] 將把消息作為字節(jié)字符串給出匕坯。

exception multiprocessing.AuthenticationError
出現(xiàn)身份驗證錯誤時引發(fā)。

exception multiprocessing.TimeoutError
有超時的方法超時時引發(fā)拔稳。

管道和隊列
使用多進程時葛峻,一般使用消息機制實現(xiàn)進程間通信,盡可能避免使用同步原語巴比,例如鎖术奖。

消息機制包含: Pipe() (可以用于在兩個進程間傳遞消息)礁遵,以及隊列(能夠在多個生產(chǎn)者和消費者之間通信)。

The Queue, SimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the queue.Queue class in the standard library. They differ in that Queue lacks the task_done() and join() methods introduced into Python 2.5's queue.Queue class.

如果你使用了 JoinableQueue 采记,那么你必須對每個已經(jīng)移出隊列的任務調用 JoinableQueue.task_done() 佣耐。不然的話用于統(tǒng)計未完成任務的信號量最終會溢出并拋出異常。

另外還可以通過使用一個管理器對象創(chuàng)建一個共享隊列唧龄,詳見 管理器 兼砖。

注解 multiprocessing 使用了普通的 queue.Empty 和 queue.Full 異常去表示超時。 你需要從 queue 中導入它們既棺,因為它們并不在 multiprocessing 的命名空間中讽挟。
注解 當一個對象被放入一個隊列中時,這個對象首先會被一個后臺線程用pickle序列化丸冕,并將序列化后的數(shù)據(jù)通過一個底層管道的管道傳遞到隊列中耽梅。這種做法會有點讓人驚訝,但一般不會出現(xiàn)什么問題胖烛。如果它們確實妨礙了你眼姐,你可以使用一個由管理器 manager 創(chuàng)建的隊列替換它。
將一個對象放入一個空隊列后佩番,可能需要極小的延遲众旗,隊列的方法 empty() 才會返回 False 。而 get_nowait() 可以不拋出 queue.Empty 直接返回答捕。

如果有多個進程同時將對象放入隊列逝钥,那么在隊列的另一端接受到的對象可能是無序的。但是由同一個進程放入的多個對象的順序在另一端輸出時總是一樣的拱镐。

警告 如果一個進程在嘗試使用 Queue 期間被 Process.terminate() 或 os.kill() 調用終止了艘款,那么隊列中的數(shù)據(jù)很可能被破壞。 這可能導致其他進程在嘗試使用該隊列時發(fā)生異常沃琅。
警告 正如剛才提到的哗咆,如果一個子進程將一些對象放進隊列中 (并且它沒有用 JoinableQueue.cancel_join_thread 方法),那么這個進程在所有緩沖區(qū)的對象被刷新進管道之前益眉,是不會終止的晌柬。
這意味著,除非你確定所有放入隊列中的對象都已經(jīng)被消費了郭脂,否則如果你試圖等待這個進程年碘,你可能會陷入死鎖中。相似地展鸡,如果該子進程不是后臺進程屿衅,那么父進程可能在試圖等待所有非后臺進程退出時掛起。

注意用管理器創(chuàng)建的隊列不存在這個問題莹弊,詳見 編程指導 涤久。

該 示例 展示了如何使用隊列實現(xiàn)進程間通信涡尘。

multiprocessing.Pipe([duplex])
返回一對 Connection對象 ``(conn1, conn2) , 分別表示管道的兩端响迂。

如果 duplex 被置為 True (默認值)考抄,那么該管道是雙向的。如果 duplex 被置為 False 蔗彤,那么該管道是單向的川梅,即 conn1 只能用于接收消息,而 conn2 僅能用于發(fā)送消息幕与。

class multiprocessing.Queue([maxsize])
返回一個使用一個管道和少量鎖和信號量實現(xiàn)的共享隊列實例挑势。當一個進程將一個對象放進隊列中時,一個寫入線程會啟動并將對象從緩沖區(qū)寫入管道中啦鸣。

一旦超時潮饱,將拋出標準庫 queue 模塊中常見的異常 queue.Empty 和 queue.Full。

除了 task_done() 和 join() 之外诫给,Queue 實現(xiàn)了標準庫類 queue.Queue 中所有的方法香拉。

qsize()
返回隊列的大致長度。由于多線程或者多進程的上下文中狂,這個數(shù)字是不可靠的凫碌。

注意,在 Unix 平臺上胃榕,例如 Mac OS X 盛险,這個方法可能會拋出 NotImplementedError 異常,因為該平臺沒有實現(xiàn) sem_getvalue() 勋又。

empty()
如果隊列是空的苦掘,返回 True ,反之返回 False 楔壤。 由于多線程或多進程的環(huán)境鹤啡,該狀態(tài)是不可靠的。

full()
如果隊列是滿的蹲嚣,返回 True 递瑰,反之返回 False 。 由于多線程或多進程的環(huán)境隙畜,該狀態(tài)是不可靠的抖部。

put(obj[, block[, timeout]])
將 obj 放入隊列。如果可選參數(shù) block 是 True (默認值) 而且 timeout 是 None (默認值), 將會阻塞當前進程议惰,直到有空的緩沖槽慎颗。如果 timeout 是正數(shù),將會在阻塞了最多 timeout 秒之后還是沒有可用的緩沖槽時拋出 queue.Full 異常。反之 (block 是 False 時)哗总,僅當有可用緩沖槽時才放入對象,否則拋出 queue.Full 異常 (在這種情形下 timeout 參數(shù)會被忽略)倍试。

在 3.8 版更改: 如果隊列已經(jīng)關閉讯屈,會拋出 ValueError 而不是 AssertionError 。

put_nowait(obj)
相當于 put(obj, False)县习。

get([block[, timeout]])
從隊列中取出并返回對象涮母。如果可選參數(shù) block 是 True (默認值) 而且 timeout 是 None (默認值), 將會阻塞當前進程,直到隊列中出現(xiàn)可用的對象躁愿。如果 timeout 是正數(shù)叛本,將會在阻塞了最多 timeout 秒之后還是沒有可用的對象時拋出 queue.Empty 異常彤钟。反之 (block 是 False 時)来候,僅當有可用對象能夠取出時返回,否則拋出 queue.Empty 異常 (在這種情形下 timeout 參數(shù)會被忽略)逸雹。

在 3.8 版更改: 如果隊列已經(jīng)關閉营搅,會拋出 ValueError 而不是 OSError 。

get_nowait()
相當于 get(False)梆砸。

multiprocessing.Queue 類有一些在 queue.Queue 類中沒有出現(xiàn)的方法转质。這些方法在大多數(shù)情形下并不是必須的。

close()
指示當前進程將不會再往隊列中放入對象帖世。一旦所有緩沖區(qū)中的數(shù)據(jù)被寫入管道之后休蟹,后臺的線程會退出。這個方法在隊列被gc回收時會自動調用日矫。

join_thread()
等待后臺線程赂弓。這個方法僅在調用了 close() 方法之后可用。這會阻塞當前進程搬男,直到后臺線程退出拣展,確保所有緩沖區(qū)中的數(shù)據(jù)都被寫入管道中。

默認情況下缔逛,如果一個不是隊列創(chuàng)建者的進程試圖退出备埃,它會嘗試等待這個隊列的后臺線程。這個進程可以使用 cancel_join_thread() 讓 join_thread() 方法什么都不做直接跳過褐奴。

cancel_join_thread()
防止 join_thread() 方法阻塞當前進程按脚。具體而言,這防止進程退出時自動等待后臺線程退出敦冬。詳見 join_thread()辅搬。

可能這個方法稱為”allow_exit_without_flush()“ 會更好。這有可能會導致正在排隊進入隊列的數(shù)據(jù)丟失,大多數(shù)情況下你不需要用到這個方法堪遂,僅當你不關心底層管道中可能丟失的數(shù)據(jù)介蛉,只是希望進程能夠馬上退出時使用。

注解 該類的功能依賴于宿主操作系統(tǒng)具有可用的共享信號量實現(xiàn)溶褪。否則該類將被禁用币旧,任何試圖實例化一個 Queue 對象的操作都會拋出 ImportError 異常,更多信息詳見 bpo-3770 猿妈。后續(xù)說明的任何專用隊列對象亦如此吹菱。
class multiprocessing.SimpleQueue
這是一個簡化的 Queue 類的實現(xiàn),很像帶鎖的 Pipe 彭则。

empty()
如果隊列為空返回 True 鳍刷,否則返回 False 。

get()
從隊列中移出并返回一個對象俯抖。

put(item)
將 item 放入隊列输瓜。

class multiprocessing.JoinableQueue([maxsize])
JoinableQueue 類是 Queue 的子類,額外添加了 task_done() 和 join() 方法蚌成。

task_done()
指出之前進入隊列的任務已經(jīng)完成前痘。由隊列的消費者進程使用。對于每次調用 get() 獲取的任務担忧,執(zhí)行完成后調用 task_done() 告訴隊列該任務已經(jīng)處理完成芹缔。

如果 join() 方法正在阻塞之中,該方法會在所有對象都被處理完的時候返回 (即對之前使用 put() 放進隊列中的所有對象都已經(jīng)返回了對應的 task_done() ) 瓶盛。

如果被調用的次數(shù)多于放入隊列中的項目數(shù)量最欠,將引發(fā) ValueError 異常 。

join()
阻塞至隊列中所有的元素都被接收和處理完畢惩猫。

當條目添加到隊列的時候芝硬,未完成任務的計數(shù)就會增加。每當消費者進程調用 task_done() 表示這個條目已經(jīng)被回收轧房,該條目所有工作已經(jīng)完成拌阴,未完成計數(shù)就會減少。當未完成計數(shù)降到零的時候奶镶, join() 阻塞被解除迟赃。

雜項
multiprocessing.active_children()
返回當前進程存活的子進程的列表。

調用該方法有“等待”已經(jīng)結束的進程的副作用厂镇。

multiprocessing.cpu_count()
返回系統(tǒng)的CPU數(shù)量纤壁。

該數(shù)量不同于當前進程可以使用的CPU數(shù)量∞嘈牛可用的CPU數(shù)量可以由 len(os.sched_getaffinity(0)) 方法獲得酌媒。

可能引發(fā) NotImplementedError 。

參見 os.cpu_count()
multiprocessing.current_process()
返回與當前進程相對應的 Process 對象。

和 threading.current_thread() 相同秒咨。

multiprocessing.parent_process()
返回父進程 Process 對象喇辽,和父進程調用 current_process() 返回的對象一樣。如果一個進程已經(jīng)是主進程雨席, parent_process 會返回 None.

3.8 新版功能.

multiprocessing.freeze_support()
為使用了 multiprocessing 的程序茵臭,提供凍結以產(chǎn)生 Windows 可執(zhí)行文件的支持。(在 py2exe, PyInstaller 和 cx_Freeze 上測試通過)

需要在 main 模塊的 if name == 'main' 該行之后馬上調用該函數(shù)舅世。例如:

from multiprocessing import Process, freeze_support

def f():
print('hello world!')

if name == 'main':
freeze_support()
Process(target=f).start()
如果沒有調用 freeze_support() 在嘗試運行被凍結的可執(zhí)行文件時會拋出 RuntimeError 異常。

對 freeze_support() 的調用在非 Windows 平臺上是無效的奇徒。如果該模塊在 Windows 平臺的 Python 解釋器中正常運行 (該程序沒有被凍結)雏亚, 調用freeze_support() 也是無效的。

multiprocessing.get_all_start_methods()
返回支持的啟動方法的列表摩钙,該列表的首項即為默認選項罢低。可能的啟動方法有 'fork', 'spawn' 和'forkserver'胖笛。在 Windows 中网持,只有'spawn' 是可用的。Unix平臺總是支持'fork''spawn'长踊,且'fork' 是默認值功舀。

3.4 新版功能.

multiprocessing.get_context(method=None)
返回一個 Context 對象。該對象具有和 multiprocessing 模塊相同的API身弊。

如果 method 設置成 None 那么將返回默認上下文對象辟汰。否則 method 應該是 'fork', 'spawn', 'forkserver' 。 如果指定的啟動方法不存在阱佛,將拋出 ValueError 異常帖汞。

3.4 新版功能.

multiprocessing.get_start_method(allow_none=False)
返回啟動進程時使用的啟動方法名。

如果啟動方法已經(jīng)固定凑术,并且 allow_none 被設置成 False 翩蘸,那么啟動方法將被固定為默認的啟動方法,并且返回其方法名淮逊。如果啟動方法沒有設定催首,并且 allow_none 被設置成 True ,那么將返回 None 壮莹。

返回值將為 'fork' , 'spawn' , 'forkserver' 或者 None 翅帜。 'fork'是 Unix 的默認值,'spawn' 是 Windows 的默認值命满。

3.4 新版功能.

multiprocessing.set_executable()
設置在啟動子進程時使用的 Python 解釋器路徑涝滴。 ( 默認使用 sys.executable ) 嵌入式編程人員可能需要這樣做:

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
以使他們可以創(chuàng)建子進程。

在 3.4 版更改: 現(xiàn)在在 Unix 平臺上使用 'spawn' 啟動方法時支持調用該方法。

multiprocessing.set_start_method(method)
設置啟動子進程的方法歼疮。 method 可以是 'fork' , 'spawn' 或者 'forkserver' 杂抽。

注意這最多只能調用一次,并且需要藏在 main 模塊中韩脏,由 if name == 'main' 進行保護缩麸。

3.4 新版功能.

注解 multiprocessing 并沒有包含類似 threading.active_count() , threading.enumerate() , threading.settrace() , threading.setprofile(), threading.Timer , 或者 threading.local 的方法和類。
連接(Connection)對象
Connection 對象允許收發(fā)可以序列化的對象或字符串赡矢。它們可以看作面向消息的連接套接字杭朱。

通常使用 Pipe 創(chuàng)建 Connection 對象。詳見 : 監(jiān)聽者及客戶端.

class multiprocessing.connection.Connection
send(obj)
將一個對象發(fā)送到連接的另一端吹散,可以用 recv() 讀取弧械。

發(fā)送的對象必須是可以序列化的,過大的對象 ( 接近 32MiB+ 空民,這個值取決于操作系統(tǒng) ) 有可能引發(fā) ValueError 異常刃唐。

recv()
返回一個由另一端使用 send() 發(fā)送的對象。該方法會一直阻塞直到接收到對象界轩。 如果對端關閉了連接或者沒有東西可接收画饥,將拋出 EOFError 異常。

fileno()
返回由連接對象使用的描述符或者句柄浊猾。

close()
關閉連接對象抖甘。

當連接對象被垃圾回收時會自動調用。

poll([timeout])
返回連接對象中是否有可以讀取的數(shù)據(jù)葫慎。

如果未指定 timeout 单山,此方法會馬上返回。如果 timeout 是一個數(shù)字幅疼,則指定了最大阻塞的秒數(shù)涛癌。如果 timeout 是 None 力图,那么將一直等待莱衩,不會超時绰筛。

注意通過使用 multiprocessing.connection.wait() 可以一次輪詢多個連接對象。

send_bytes(buffer[, offset[, size]])
從一個 bytes-like object (字節(jié)類對象)對象中取出字節(jié)數(shù)組并作為一條完整消息發(fā)送逐工。

如果由 offset 給定了在 buffer 中讀取數(shù)據(jù)的位置铡溪。 如果給定了 size ,那么將會從緩沖區(qū)中讀取多個字節(jié)泪喊。 過大的緩沖區(qū) ( 接近 32MiB+ 棕硫,此值依賴于操作系統(tǒng) ) 有可能引發(fā) ValueError 異常。

recv_bytes([maxlength])
以字符串形式返回一條從連接對象另一端發(fā)送過來的字節(jié)數(shù)據(jù)袒啼。此方法在接收到數(shù)據(jù)前將一直阻塞哈扮。 如果連接對象被對端關閉或者沒有數(shù)據(jù)可讀取纬纪,將拋出 EOFError 異常。

如果給定了 maxlength 并且消息短于 maxlength 那么將拋出 OSError 并且該連接對象將不再可讀滑肉。

在 3.3 版更改: 曾經(jīng)該函數(shù)拋出 IOError 包各,現(xiàn)在這是 OSError 的別名。

recv_bytes_into(buffer[, offset])
將一條完整的字節(jié)數(shù)據(jù)消息讀入 buffer 中并返回消息的字節(jié)數(shù)靶庙。 此方法在接收到數(shù)據(jù)前將一直阻塞问畅。 如果連接對象被對端關閉或者沒有數(shù)據(jù)可讀取,將拋出 EOFError 異常六荒。

buffer must be a writable bytes-like object. If offset is given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length of buffer (in bytes).

如果緩沖區(qū)太小护姆,則將引發(fā) BufferTooShort 異常,并且完整的消息將會存放在異常實例 e 的 e.args[0] 中掏击。

在 3.3 版更改: 現(xiàn)在連接對象自身可以通過 Connection.send() 和 Connection.recv() 在進程之間傳遞签则。

3.3 新版功能: 連接對象現(xiàn)已支持上下文管理協(xié)議 -- 參見 see 上下文管理器類型 。 enter() 返回連接對象铐料, exit() 會調用 close() 。

例如:

from multiprocessing import Pipe
a, b = Pipe()
a.send([1, 'hello', None])
b.recv()
[1, 'hello', None]
b.send_bytes(b'thank you')
a.recv_bytes()
b'thank you'
import array
arr1 = array.array('i', range(5))
arr2 = array.array('i', [0] * 10)
a.send_bytes(arr1)
count = b.recv_bytes_into(arr2)
assert count == len(arr1) * arr1.itemsize
arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告 The Connection.recv() method automatically unpickles the data it receives, which can be a security risk unless you can trust the process which sent the message.
因此豺旬, 除非連接對象是由 Pipe() 產(chǎn)生的钠惩,在通過一些認證手段之前你應該只使用 recv() 和 send() 方法。參考 認證密碼 族阅。

警告 如果一個進程在試圖讀寫管道時被終止了篓跛,那么管道中的數(shù)據(jù)很可能是不完整的,因為此時可能無法確定消息的邊界坦刀。
同步原語
通常來說同步愿意在多進程環(huán)境中并不像它們在多線程環(huán)境中那么必要愧沟。參考 threading 模塊的文檔。

注意可以使用管理器對象創(chuàng)建同步原語鲤遥,參考 管理器 沐寺。

class multiprocessing.Barrier(parties[, action[, timeout]])
類似 threading.Barrier 的柵欄對象。

3.3 新版功能.

class multiprocessing.BoundedSemaphore([value])
非常類似 threading.BoundedSemaphore 的有界信號量對象盖奈。

一個小小的不同在于混坞,它的 acquire 方法的第一個參數(shù)名是和 Lock.acquire() 一樣的 block 。

注解 在 Mac OS X 平臺上钢坦, 該對象于 Semaphore 不同在于 sem_getvalue() 方法并沒有在該平臺上實現(xiàn)究孕。
class multiprocessing.Condition([lock])
條件變量: threading.Condition 的別名。

指定的 lock 參數(shù)應該是 multiprocessing 模塊中的 Lock 或者 RLock 對象爹凹。

在 3.3 版更改: 新增了 wait_for() 方法厨诸。

class multiprocessing.Event
A clone of threading.Event.

class multiprocessing.Lock
原始鎖(非遞歸鎖)對象,類似于 threading.Lock 禾酱。一旦一個進程或者線程拿到了鎖微酬,后續(xù)的任何其他進程或線程的其他請求都會被阻塞直到鎖被釋放绘趋。任何進程或線程都可以釋放鎖。除非另有說明得封,否則 multiprocessing.Lock 用于進程或者線程的概念和行為都和 threading.Lock 一致埋心。

注意 Lock 實際上是一個工廠函數(shù)。它返回由默認上下文初始化的 multiprocessing.synchronize.Lock 對象忙上。

Lock supports the context manager protocol and thus may be used in with statements.

acquire(block=True, timeout=None)
獲得鎖拷呆,阻塞或非阻塞的。

如果 block 參數(shù)被設為 True ( 默認值 ) 疫粥, 對該方法的調用在鎖處于釋放狀態(tài)之前都會阻塞茬斧,然后將鎖設置為鎖住狀態(tài)并返回 True 。需要注意的是第一個參數(shù)名與 threading.Lock.acquire() 的不同梗逮。

如果 block 參數(shù)被設置成 False 项秉,方法的調用將不會阻塞。 如果鎖當前處于鎖住狀態(tài)慷彤,將返回 False 娄蔼; 否則將鎖設置成鎖住狀態(tài),并返回 True 底哗。

當 timeout 是一個正浮點數(shù)時岁诉,會在等待鎖的過程中最多阻塞等待 timeout 秒,當 timeout 是負數(shù)時跋选,效果和 timeout 為0時一樣涕癣,當 timeout 是 None (默認值)時,等待時間是無限長前标。需要注意的是坠韩,對于 timeout 參數(shù)是負數(shù)和 None 的情況, 其行為與 threading.Lock.acquire() 是不一樣的。當 block 參數(shù) 為 False 時炼列, timeout 并沒有實際用處只搁,會直接忽略。否則俭尖,函數(shù)會在拿到鎖后返回 True 或者 超時沒拿到鎖后返回 False 须蜗。

release()
釋放鎖,可以在任何進程目溉、線程使用明肮,并不限于鎖的擁有者。

當嘗試釋放一個沒有被持有的鎖時缭付,會拋出 ValueError 異常柿估,除此之外其行為與 threading.Lock.release() 一樣。

class multiprocessing.RLock
遞歸鎖對象: 類似于 threading.RLock 陷猫。遞歸鎖必須由持有線程秫舌、進程親自釋放的妖。如果某個進程或者線程拿到了遞歸鎖,這個進程或者線程可以再次拿到這個鎖而不需要等待足陨。但是這個進程或者線程的拿鎖操作和釋放鎖操作的次數(shù)必須相同嫂粟。

注意 RLock 是一個工廠函數(shù),調用后返回一個使用默認 context 初始化的 multiprocessing.synchronize.RLock 實例墨缘。

RLock 支持 context manager星虹,所以可在 with 語句內使用。

acquire(block=True, timeout=None)
獲得鎖镊讼,阻塞或非阻塞的宽涌。

當 block 參數(shù)設置為 True 時,會一直阻塞直到鎖處于空閑狀態(tài)(沒有被任何進程蝶棋、線程擁有)卸亮,除非當前進程或線程已經(jīng)擁有了這把鎖。然后當前進程/線程會持有這把鎖(在鎖沒有其他持有者的情況下)玩裙,鎖內的遞歸等級加一吃溅,并返回 True . 注意蹂楣, 這個函數(shù)第一個參數(shù)的行為和 threading.RLock.acquire() 的實現(xiàn)有幾個不同點赁酝,包括參數(shù)名本身。

當 block 參數(shù)是 False , 將不會阻塞,如果此時鎖被其他進程或者線程持有风瘦,當前進程、線程獲取鎖操作失敗涌哲,鎖的遞歸等級也不會改變哆料,函數(shù)返回 False , 如果當前鎖已經(jīng)處于釋放狀態(tài),則當前進程染突、線程則會拿到鎖拓型,并且鎖內的遞歸等級加一,函數(shù)返回 True 瘸恼。

timeout 參數(shù)的使用方法及行為與 Lock.acquire() 一樣劣挫。但是要注意 timeout 的其中一些行為和 threading.RLock.acquire() 中實現(xiàn)的行為是不同的。

release()
釋放鎖钞脂,使鎖內的遞歸等級減一揣云。如果釋放后鎖內的遞歸等級降低為0捕儒,則會重置鎖的狀態(tài)為釋放狀態(tài)(即沒有被任何進程冰啃、線程持有),重置后如果有有其他進程和線程在等待這把鎖刘莹,他們中的一個會獲得這個鎖而繼續(xù)運行阎毅。如果釋放后鎖內的遞歸等級還沒到達0,則這個鎖仍將保持未釋放狀態(tài)且當前進程和線程仍然是持有者点弯。

只有當前進程或線程是鎖的持有者時扇调,才允許調用這個方法。如果當前進程或線程不是這個鎖的擁有者抢肛,或者這個鎖處于已釋放的狀態(tài)(即沒有任何擁有者)狼钮,調用這個方法會拋出 AssertionError 異常碳柱。注意這里拋出的異常類型和 threading.RLock.release() 中實現(xiàn)的行為不一樣。

class multiprocessing.Semaphore([value])
一種信號量對象: 類似于 threading.Semaphore.

一個小小的不同在于熬芜,它的 acquire 方法的第一個參數(shù)名是和 Lock.acquire() 一樣的 block 莲镣。

注解 在 Mac OS X 上,不支持 sem_timedwait 涎拉,所以瑞侮,調用 acquire() 時如果使用 timeout 參數(shù),會通過循環(huán)sleep來模擬這個函數(shù)的行為鼓拧。
注解 假如信號 SIGINT 是來自于 Ctrl-C 半火,并且主線程被 BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() 或 Condition.wait() 阻塞,則調用會立即中斷同時拋出 KeyboardInterrupt 異常季俩。
這和 threading 的行為不同钮糖,此模塊中當執(zhí)行對應的阻塞調用時,SIGINT 會被忽略种玛。

注解 這個包的某些功能依賴于宿主機系統(tǒng)的共享信號量的實現(xiàn)藐鹤,如果系統(tǒng)沒有這個特性, multiprocessing.synchronize 會被禁用赂韵,嘗試導入這個模塊會引發(fā) ImportError 異常娱节,詳細信息請查看 bpo-3770 。
共享 ctypes 對象
在共享內存上創(chuàng)建可被子進程繼承的共享對象時是可行的祭示。

multiprocessing.Value(typecode_or_type, *args, lock=True)
返回一個從共享內存上創(chuàng)建的 ctypes 對象肄满。默認情況下返回的對象實際上是經(jīng)過了同步器包裝過的≈侍危可以通過 Value 的 value 屬性訪問這個對象本身稠歉。

typecode_or_type 指明了返回的對象類型: 它可能是一個 ctypes 類型或者 array 模塊中每個類型對應的單字符長度的字符串。 *args 會透傳給這個類的構造函數(shù)汇陆。

如果 lock 參數(shù)是 True (默認值), 將會新建一個遞歸鎖用于同步對于此值的訪問操作怒炸。 如果 lock 是 Lock 或者 RLock 對象,那么這個傳入的鎖將會用于同步對這個值的訪問操作毡代,如果 lock 是 False , 那么對這個對象的訪問將沒有鎖保護阅羹,也就是說這個變量不是進程安全的。

諸如 += 這類的操作會引發(fā)獨立的讀操作和寫操作教寂,也就是說這類操作符并不具有原子性捏鱼。所以,如果你想讓遞增共享變量的操作具有原子性酪耕,僅僅以這樣的方式并不能達到要求:

counter.value += 1
共享對象內部關聯(lián)的鎖是遞歸鎖(默認情況下就是)的情況下导梆, 你可以采用這種方式

with counter.get_lock():
counter.value += 1
注意 lock 只能是命名參數(shù)。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
從共享內存中申請并返回一個具有ctypes類型的數(shù)組對象。默認情況下返回值實際上是被同步器包裝過的數(shù)組對象看尼。

typecode_or_type 指明了返回的數(shù)組中的元素類型: 它可能是一個 ctypes 類型或者 array 模塊中每個類型對應的單字符長度的字符串递鹉。 如果 size_or_initializer 是一個整數(shù),那就會當做數(shù)組的長度藏斩,并且整個數(shù)組的內存會初始化為0梳虽。否則,如果 size_or_initializer 會被當成一個序列用于初始化數(shù)組中的每一個元素灾茁,并且會根據(jù)元素個數(shù)自動判斷數(shù)組的長度窜觉。

如果 lock 為 True (默認值) 則將創(chuàng)建一個新的鎖對象用于同步對值的訪問。 如果 lock 為一個 Lock 或 RLock 對象則該對象將被用于同步對值的訪問北专。 如果 lock 為 False 則對返回對象的訪問將不會自動得到鎖的保護禀挫,也就是說它不是“進程安全的”。

請注意 lock 是一個僅限關鍵字參數(shù)拓颓。

請注意 ctypes.c_char 的數(shù)組具有 value 和 raw 屬性语婴,允許被用來保存和提取字符串。

multiprocessing.sharedctypes 模塊
multiprocessing.sharedctypes 模塊提供了一些函數(shù)驶睦,用于分配來自共享內存的砰左、可被子進程繼承的 ctypes 對象。

注解 雖然可以將指針存儲在共享內存中场航,但請記住它所引用的是特定進程地址空間中的位置缠导。 而且,指針很可能在第二個進程的上下文中無效溉痢,嘗試從第二個進程對指針進行解引用可能會導致崩潰僻造。
multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
從共享內存中申請并返回一個 ctypes 數(shù)組。

typecode_or_type 指明了返回的數(shù)組中的元素類型: 它可能是一個 ctypes 類型或者 array 模塊中使用的類型字符孩饼。 如果 size_or_initializer 是一個整數(shù)髓削,那就會當做數(shù)組的長度,并且整個數(shù)組的內存會初始化為0镀娶。否則立膛,如果 size_or_initializer 會被當成一個序列用于初始化數(shù)組中的每一個元素,并且會根據(jù)元素個數(shù)自動判斷數(shù)組的長度梯码。

注意對元素的訪問宝泵、賦值操作可能是非原子操作 - 使用 Array() , 從而借助其中的鎖保證操作的原子性。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
從共享內存中申請并返回一個 ctypes 對象忍些。

typecode_or_type 指明了返回的對象類型: 它可能是一個 ctypes 類型或者 array 模塊中每個類型對應的單字符長度的字符串鲁猩。 *args 會透傳給這個類的構造函數(shù)坎怪。

注意對 value 的訪問罢坝、賦值操作可能是非原子操作 - 使用 Value() ,從而借助其中的鎖保證操作的原子性。

請注意 ctypes.c_char 的數(shù)組具有 value 和 raw 屬性嘁酿,允許被用來保存和提取字符串 - 請查看 ctypes 文檔隙券。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)
返回一個純 ctypes 數(shù)組, 或者在此之上經(jīng)過同步器包裝過的進程安全的對象,這取決于 lock 參數(shù)的值闹司,除此之外娱仔,和 RawArray() 一樣。

如果 lock 為 True (默認值) 則將創(chuàng)建一個新的鎖對象用于同步對值的訪問游桩。 如果 lock 為一個 Lock 或 RLock 對象則該對象將被用于同步對值的訪問牲迫。 如果 lock 為 False 則對返回對象的訪問將不會自動得到鎖的保護,也就是說它不是“進程安全的”借卧。

注意 lock 只能是命名參數(shù)盹憎。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)
返回一個純 ctypes 數(shù)組, 或者在此之上經(jīng)過同步器包裝過的進程安全的對象,這取決于 lock 參數(shù)的值铐刘,除此之外陪每,和 RawArray() 一樣。

如果 lock 為 True (默認值) 則將創(chuàng)建一個新的鎖對象用于同步對值的訪問镰吵。 如果 lock 為一個 Lock 或 RLock 對象則該對象將被用于同步對值的訪問檩禾。 如果 lock 為 False 則對返回對象的訪問將不會自動得到鎖的保護,也就是說它不是“進程安全的”疤祭。

注意 lock 只能是命名參數(shù)盼产。

multiprocessing.sharedctypes.copy(obj)
從共享內存中申請一片空間將 ctypes 對象 obj 過來,然后返回一個新的 ctypes 對象勺馆。

multiprocessing.sharedctypes.synchronized(obj[, lock])
將一個 ctypes 對象包裝為進程安全的對象并返回辆飘,使用 lock 同步對于它的操作。如果 lock 是 None (默認值) 谓传,則會自動創(chuàng)建一個 multiprocessing.RLock 對象蜈项。

同步器包裝后的對象會在原有對象基礎上額外增加兩個方法: get_obj() 返回被包裝的對象, get_lock() 返回內部用于同步的鎖续挟。

需要注意的是紧卒,訪問包裝后的ctypes對象會比直接訪問原來的純 ctypes 對象慢得多。

在 3.5 版更改: 同步器包裝后的對象支持 context manager 協(xié)議诗祸。

下面的表格對比了創(chuàng)建普通ctypes對象和基于共享內存上創(chuàng)建共享ctypes對象的語法跑芳。(表格中的 MyStruct 是 ctypes.Structure 的子類)

ctypes

使用類型的共享ctypes

使用 typecode 的共享 ctypes

c_double(2.4)

RawValue(c_double, 2.4)

RawValue('d', 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray('h', 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray('i', (9, 2, 8))

下面是一個在子進程中修改多個ctypes對象的例子。

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
fields = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2

if name == 'main':
lock = Lock()

n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', b'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()

print(n.value)
print(x.value)
print(s.value)
print([(a.x, a.y) for a in A])

輸出如下

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
管理器
管理器提供了一種創(chuàng)建共享數(shù)據(jù)的方法直颅,從而可以在不同進程中共享博个,甚至可以通過網(wǎng)絡跨機器共享數(shù)據(jù)。管理器維護一個用于管理 共享對象 的服務功偿。其他進程可以通過代理訪問這些共享對象盆佣。

multiprocessing.Manager()
返回一個已啟動的 SyncManager 管理器對象,這個對象可以用于在不同進程中共享數(shù)據(jù)。返回的管理器對象對應了一個已經(jīng)啟動的子進程共耍,并且擁有一系列方法可以用于創(chuàng)建共享對象虑灰、返回對應的代理。

當管理器被垃圾回收或者父進程退出時,管理器進程會立即退出。管理器類定義在 multiprocessing.managers 模塊:

class multiprocessing.managers.BaseManager([address[, authkey]])
創(chuàng)建一個 BaseManager 對象遇伞。

一旦創(chuàng)建,應該及時調用 start() 或者 get_server().serve_forever() 以確保管理器對象對應的管理進程已經(jīng)啟動对湃。

address 是管理器服務進程監(jiān)聽的地址。如果 address 是 None ,則允許和任意主機的請求建立連接遗淳。

authkey 是認證標識熟尉,用于檢查連接服務進程的請求合法性。如果 authkey 是 None, 則會使用 current_process().authkey , 否則洲脂,就使用 authkey , 需要保證它必須是 byte 類型的字符串斤儿。

start([initializer[, initargs]])
為管理器開啟一個子進程,如果 initializer 不是 None , 子進程在啟動時將會調用 initializer(*initargs) 恐锦。

get_server()
返回一個 Server 對象往果,它是管理器在后臺控制的真實的服務。 Server 對象擁有 serve_forever() 方法一铅。

from multiprocessing.managers import BaseManager
manager = BaseManager(address=('', 50000), authkey=b'abc')
server = manager.get_server()
server.serve_forever()
Server 額外擁有一個 address 屬性陕贮。

connect()
將本地管理器對象連接到一個遠程管理器進程:

from multiprocessing.managers import BaseManager
m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
m.connect()
shutdown()
停止管理器的進程。這個方法只能用于已經(jīng)使用 start() 啟動的服務進程潘飘。

它可以被多次調用肮之。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
一個 classmethod,可以將一個類型或者可調用對象注冊到管理器類卜录。

typeid 是一種 "類型標識符"戈擒,用于唯一表示某種共享對象類型,必須是一個字符串艰毒。

callable 是一個用來為此類型標識符創(chuàng)建對象的可調用對象筐高。如果一個管理器實例將使用 connect() 方法連接到服務器,或者 create_method 參數(shù)為 False丑瞧,那么這里可留下 None柑土。

proxytype 是 BaseProxy 的子類,可以根據(jù) typeid 為共享對象創(chuàng)建一個代理绊汹,如果是 None , 則會自動創(chuàng)建一個代理類稽屏。

exposed 是一個函數(shù)名組成的序列,用來指明只有這些方法可以使用 BaseProxy.callmethod() 代理西乖。(如果 exposed 是 None, 則會在 proxytype.exposed 存在的情況下轉而使用它) 當暴露的方法列表沒有指定的時候狐榔,共享對象的所有 “公共方法” 都會被代理坛增。(這里的“公共方法”是指所有擁有 call() 方法并且不是以 '' 開頭的屬性)

method_to_typeid 是一個映射,用來指定那些應該返回代理對象的暴露方法所返回的類型荒叼。(如果 method_to_typeid 是 None, 則 proxytype.method_to_typeid 會在存在的情況下被使用)如果方法名稱不在這個映射中或者映射是 None ,則方法返回的對象會是一個值拷貝。

create_method 指明典鸡,是否要創(chuàng)建一個以 typeid 命名并返回一個代理對象的方法被廓,這個函數(shù)會被服務進程用于創(chuàng)建共享對象,默認為 True 萝玷。

BaseManager 實例也有一個只讀屬性嫁乘。

address
管理器所用的地址。

在 3.3 版更改: 管理器對象支持上下文管理協(xié)議 - 查看 上下文管理器類型 球碉。enter() 啟動服務進程(如果它還沒有啟動)并且返回管理器對象蜓斧, exit() 會調用 shutdown() 。

在之前的版本中睁冬,如果管理器服務進程沒有啟動挎春, enter() 不會負責啟動它。

class multiprocessing.managers.SyncManager
BaseManager 的子類豆拨,可用于進程的同步直奋。這個類型的對象使用 multiprocessing.Manager() 創(chuàng)建。

它擁有一系列方法施禾,可以為大部分常用數(shù)據(jù)類型創(chuàng)建并返回 代理對象 代理脚线,用于進程間同步。甚至包括共享列表和字典弥搞。

Barrier(parties[, action[, timeout]])
創(chuàng)建一個共享的 threading.Barrier 對象并返回它的代理邮绿。

3.3 新版功能.

BoundedSemaphore([value])
創(chuàng)建一個共享的 threading.BoundedSemaphore 對象并返回它的代理。

Condition([lock])
創(chuàng)建一個共享的 threading.Condition 對象并返回它的代理攀例。

如果提供了 lock 參數(shù)船逮,那它必須是 threading.Lock 或 threading.RLock 的代理對象。

在 3.3 版更改: 新增了 wait_for() 方法粤铭。

Event()
創(chuàng)建一個共享的 threading.Event 對象并返回它的代理傻唾。

Lock()
創(chuàng)建一個共享的 threading.Lock 對象并返回它的代理。

Namespace()
創(chuàng)建一個共享的 Namespace 對象并返回它的代理承耿。

Queue([maxsize])
創(chuàng)建一個共享的 queue.Queue 對象并返回它的代理冠骄。

RLock()
創(chuàng)建一個共享的 threading.RLock 對象并返回它的代理。

Semaphore([value])
創(chuàng)建一個共享的 threading.Semaphore 對象并返回它的代理加袋。

Array(typecode, sequence)
創(chuàng)建一個數(shù)組并返回它的代理凛辣。

Value(typecode, value)
創(chuàng)建一個具有可寫 value 屬性的對象并返回它的代理。

dict()
dict(mapping)
dict(sequence)
創(chuàng)建一個共享的 dict 對象并返回它的代理职烧。

list()
list(sequence)
創(chuàng)建一個共享的 list 對象并返回它的代理扁誓。

在 3.6 版更改: 共享對象能夠嵌套防泵。例如, 共享的容器對象如共享列表,可以包含另一個共享對象蝗敢,他們全都會在 SyncManager 中進行管理和同步捷泞。

class multiprocessing.managers.Namespace
一個可以注冊到 SyncManager 的類型。

命名空間對象沒有公共方法寿谴,但是擁有可寫的屬性锁右。直接print會顯示所有屬性的值。

值得一提的是讶泰,當對命名空間對象使用代理的時候咏瑟,訪問所有名稱以 '_' 開頭的屬性都只是代理器上的屬性,而不是命名空間對象的屬性痪署。

manager = multiprocessing.Manager()
Global = manager.Namespace()
Global.x = 10
Global.y = 'hello'
Global._z = 12.3 # this is an attribute of the proxy
print(Global)
Namespace(x=10, y='hello')
自定義管理器
要創(chuàng)建一個自定義的管理器码泞,需要新建一個 BaseManager 的子類,然后使用這個管理器類上的 register() 類方法將新類型或者可調用方法注冊上去狼犯。例如:

from multiprocessing.managers import BaseManager

class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y

class MyManager(BaseManager):
pass

MyManager.register('Maths', MathsClass)

if name == 'main':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
使用遠程管理器
可以將管理器服務運行在一臺機器上余寥,然后使用客戶端從其他機器上訪問。(假設它們的防火墻允許)

運行下面的代碼可以啟動一個服務悯森,此付包含了一個共享隊列劈狐,允許遠程客戶端訪問:

from multiprocessing.managers import BaseManager
from queue import Queue
queue = Queue()
class QueueManager(BaseManager): pass
QueueManager.register('get_queue', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
遠程客戶端可以通過下面的方式訪問服務:

from multiprocessing.managers import BaseManager
class QueueManager(BaseManager): pass
QueueManager.register('get_queue')
m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
m.connect()
queue = m.get_queue()
queue.put('hello')
也可以通過下面的方式:

from multiprocessing.managers import BaseManager
class QueueManager(BaseManager): pass
QueueManager.register('get_queue')
m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
m.connect()
queue = m.get_queue()
queue.get()
'hello'
本地進程也可以訪問這個隊列,利用上面的客戶端代碼通過遠程方式訪問:

from multiprocessing import Process, Queue
from multiprocessing.managers import BaseManager
class Worker(Process):
... def init(self, q):
... self.q = q
... super(Worker, self).init()
... def run(self):
... self.q.put('local hello')
...
queue = Queue()
w = Worker(queue)
w.start()
class QueueManager(BaseManager): pass

from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
遠程客戶端可以通過下面的方式訪問服務:

>>>
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
也可以通過下面的方式:

>>>
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地進程也可以訪問這個隊列呐馆,利用上面的客戶端代碼通過遠程方式訪問:

>>>
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def init(self, q):
... self.q = q
... super(Worker, self).init()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
QueueManager.register('get_queue', callable=lambda: queue)
m = QueueManager(address=('', 50000), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
代理對象
代理是一個 指向 其他共享對象的對象肥缔,這個對象(很可能)在另外一個進程中。共享對象也可以說是代理 指涉 的對象汹来。多個代理對象可能指向同一個指涉對象续膳。

代理對象代理了指涉對象的一系列方法調用(雖然并不是指涉對象的每個方法都有必要被代理)。通過這種方式收班,代理的使用方法可以和它的指涉對象一樣:

from multiprocessing import Manager
manager = Manager()
l = manager.list([i*i for i in range(10)])
print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
l[4]
16
l[2:5]
[4, 9, 16]
注意坟岔,對代理使用 str() 函數(shù)會返回指涉對象的字符串表示,但是 repr() 卻會返回代理本身的內部字符串表示摔桦。

被代理的對象很重要的一點是必須可以被序列化社付,這樣才能允許他們在進程間傳遞。因此邻耕,指涉對象可以包含 代理對象 鸥咖。這允許管理器中列表、字典或者其他 代理對象 對象之間的嵌套兄世。

a = manager.list()
b = manager.list()
a.append(b) # referent of a now contains referent of b
print(a, b)
queue = m.get_queue()
>>> queue.get()
'hello'
本地進程也可以訪問這個隊列啼辣,利用上面的客戶端代碼通過遠程方式訪問:

>>>
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def init(self, q):
... self.q = q
... super(Worker, self).init()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理對象
代理是一個 指向 其他共享對象的對象,這個對象(很可能)在另外一個進程中御滩。共享對象也可以說是代理 指涉 的對象鸥拧。多個代理對象可能指向同一個指涉對象党远。

代理對象代理了指涉對象的一系列方法調用(雖然并不是指涉對象的每個方法都有必要被代理)。通過這種方式富弦,代理的使用方法可以和它的指涉對象一樣:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
注意沟娱,對代理使用 str() 函數(shù)會返回指涉對象的字符串表示,但是 repr() 卻會返回代理本身的內部字符串表示腕柜。

被代理的對象很重要的一點是必須可以被序列化济似,這樣才能允許他們在進程間傳遞。因此媳握,指涉對象可以包含 代理對象 碱屁。這允許管理器中列表磷脯、字典或者其他 代理對象 對象之間的嵌套蛾找。

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
b.append('hello')
print(a[0], b)

from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def init(self, q):
... self.q = q
... super(Worker, self).init()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理對象
代理是一個 指向 其他共享對象的對象,這個對象(很可能)在另外一個進程中赵誓。共享對象也可以說是代理 指涉 的對象打毛。多個代理對象可能指向同一個指涉對象。

代理對象代理了指涉對象的一系列方法調用(雖然并不是指涉對象的每個方法都有必要被代理)俩功。通過這種方式幻枉,代理的使用方法可以和它的指涉對象一樣:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
注意,對代理使用 str() 函數(shù)會返回指涉對象的字符串表示诡蜓,但是 repr() 卻會返回代理本身的內部字符串表示熬甫。

被代理的對象很重要的一點是必須可以被序列化,這樣才能允許他們在進程間傳遞蔓罚。因此椿肩,指涉對象可以包含 代理對象 。這允許管理器中列表豺谈、字典或者其他 代理對象 對象之間的嵌套郑象。

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
類似地,字典和列表代理也可以相互嵌套:

l_outer = manager.list([ manager.dict() for i in range(2) ])
d_first_inner = l_outer[0]
d_first_inner['a'] = 1
d_first_inner['b'] = 2
l_outer[1]['c'] = 3
l_outer[1]['z'] = 26
print(l_outer[0])
{'a': 1, 'b': 2}
print(l_outer[1])
{'c': 3, 'z': 26}
如果指涉對象包含了普通 list 或 dict 對象茬末,對這些內部可變對象的修改不會通過管理器傳播厂榛,因為代理無法得知被包含的值什么時候被修改了。但是把存放在容器代理中的值本身是會通過管理器傳播的(會觸發(fā)代理對象中的 setitem )從而有效修改這些對象丽惭,所以可以把修改過的值重新賦值給容器代理:

create a list proxy and append a mutable object (a dictionary)

lproxy = manager.list()
lproxy.append({})

now mutate the dictionary

d = lproxy[0]
d['a'] = 1
d['b'] = 2

at this point, the changes to d are not yet synced, but by

updating the dictionary, the proxy is notified of the change

lproxy[0] = d
在大多是使用情形下击奶,這種實現(xiàn)方式并不比嵌套 代理對象 方便,但是依然演示了對于同步的一種控制級別责掏。

注解 multiprocessing 中的代理類并沒有提供任何對于代理值比較的支持正歼。所以,我們會得到如下結果:

manager.list([1,2,3]) == [1,2,3]
False
當需要比較值的時候拷橘,應該替換為使用指涉對象的拷貝局义。

class multiprocessing.managers.BaseProxy
代理對象是 BaseProxy 派生類的實例喜爷。

_callmethod(methodname[, args[, kwds]])
調用指涉對象的方法并返回結果。

如果 proxy 是一個代理且其指涉的是 obj , 那么下面的表達式:

proxy._callmethod(methodname, args, kwds)
相當于求取以下表達式的值:

getattr(obj, methodname)(*args, **kwds)
于管理器進程萄唇。

返回結果會是一個值拷貝或者一個新的共享對象的代理 - 見函數(shù) BaseManager.register() 中關于參數(shù) method_to_typeid 的文檔檩帐。

如果這個調用熬出了異常,則這個異常會被 _callmethod() 透傳出來另萤。如果是管理器進程本身拋出的一些其他異常湃密,則會被 _callmethod() 轉換為 RemoteError 異常重新拋出。

特別注意四敞,如果 methodname 沒有 暴露 出來泛源,將會引發(fā)一個異常。

_callmethod() 的一個使用示例:

l = manager.list(range(10))
l._callmethod('len')
10
l._callmethod('getitem', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
l._callmethod('getitem', (20,)) # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()
返回指涉對象的一份拷貝忿危。

如果指涉對象無法序列化达箍,則會拋出一個異常。

repr()
返回代理對象的內部字符串表示铺厨。

str()
返回指涉對象的內部字符串表示缎玫。

清理
代理對象使用了一個弱引用回調函數(shù),當它被垃圾回收時解滓,會將自己從擁有此指涉對象的管理器上反注冊赃磨,

當共享對象沒有被任何代理器引用時,會被管理器進程刪除洼裤。

進程池
可以創(chuàng)建一個進程池邻辉,它將使用 Pool 類執(zhí)行提交給它的任務。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
一個進程池對象腮鞍,它控制可以提交作業(yè)的工作進程池值骇。它支持帶有超時和回調的異步結果,以及一個并行的 map 實現(xiàn)缕减。

processes 是要使用的工作進程數(shù)目雷客。如果 processes 為 None,則使用 os.cpu_count() 返回的值桥狡。

如果 initializer 不為 None搅裙,則每個工作進程將會在啟動時調用 initializer(*initargs)。

maxtasksperchild 是一個工作進程在它退出或被一個新的工作進程代替之前能完成的任務數(shù)量裹芝,為了釋放未使用的資源部逮。默認的 maxtasksperchild 是 None,意味著工作進程壽與池齊嫂易。

context 可被用于指定啟動的工作進程的上下文兄朋。通常一個進程池是使用函數(shù) multiprocessing.Pool() 或者一個上下文對象的 Pool() 方法創(chuàng)建的。在這兩種情況下怜械, context 都是適當設置的颅和。

注意傅事,進程池對象的方法只有創(chuàng)建它的進程能夠調用。

3.2 新版功能: maxtasksperchild

3.4 新版功能: context

注解 通常來說峡扩,Pool 中的 Worker 進程的生命周期和進程池的工作隊列一樣長蹭越。一些其他系統(tǒng)中(如 Apache, mod_wsgi 等)也可以發(fā)現(xiàn)另一種模式,他們會讓工作進程在完成一些任務后退出教届,清理响鹃、釋放資源,然后啟動一個新的進程代替舊的工作進程案训。 Pool 的 maxtasksperchild 參數(shù)給用戶提供了這種能力买置。
apply(func[, args[, kwds]])
使用 args 參數(shù)以及 kwds 命名參數(shù)調用 func , 它會返回結果前阻塞。這種情況下强霎,apply_async() 更適合并行化工作忿项。另外 func 只會在一個進程池中的一個工作進程中執(zhí)行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])
apply() 方法的一個變種脆栋,返回一個結果對象倦卖。

如果指定了 callback , 它必須是一個接受單個參數(shù)的可調用對象洒擦。當執(zhí)行成功時椿争, callback 會被用于處理執(zhí)行后的返回結果,否則熟嫩,調用 error_callback 秦踪。

如果指定了 error_callback , 它必須是一個接受單個參數(shù)的可調用對象。當目標函數(shù)執(zhí)行失敗時掸茅, 會將拋出的異常對象作為參數(shù)傳遞給 error_callback 執(zhí)行椅邓。

回調函數(shù)應該立即執(zhí)行完成,否則會阻塞負責處理結果的線程昧狮。

map(func, iterable[, chunksize])
內置 map() 函數(shù)(它只支持一個 可迭代 參數(shù))的并行版本景馁,它會阻塞直到返回結果。

這個方法會將可迭代對象分割為許多塊逗鸣,然后提交給進程池合住。可以將 chunksize 設置為一個正整數(shù)從而(近似)指定每個塊的大小可以撒璧。

注意對于很長的迭代對象透葛,可能消耗很多內存∏溆#可以考慮使用 imap() 或 imap_unordered() 并且顯示指定 chunksize 以提升效率僚害。

map_async(func, iterable[, chunksize[, callback[, error_callback]]])
和 map() 方法類似,但是返回一個可用于獲取結果的對象繁调。

如果指定了 callback , 它必須是一個接受單個參數(shù)的可調用對象萨蚕。當執(zhí)行成功時靶草, callback 會被用于處理執(zhí)行后的返回結果,否則爱致,調用 error_callback 。

如果指定了 error_callback , 它必須是一個接受單個參數(shù)的可調用對象。當目標函數(shù)執(zhí)行失敗時好渠, 會將拋出的異常對象作為參數(shù)傳遞給 error_callback 執(zhí)行昨稼。

回調函數(shù)應該立即執(zhí)行完成拳锚,否則會阻塞負責處理結果的線程假栓。

imap(func, iterable[, chunksize])
map() 的延遲執(zhí)行版本。

chunksize 參數(shù)的作用和 map() 方法的一樣霍掺。對于很長的迭代器匾荆,給 chunksize 設置一個很大的值會比默認值 1 極大 地加快執(zhí)行速度。

同樣杆烁,如果 chunksize 是 1 , 那么 imap() 方法所返回的迭代器的 next() 方法擁有一個可選的 timeout 參數(shù): 如果無法在 timeout 秒內執(zhí)行得到結果牙丽,則next(timeout) 會拋出 multiprocessing.TimeoutError 異常。

imap_unordered(func, iterable[, chunksize])
和 imap() 相同兔魂,只不過通過迭代器返回的結果是任意的烤芦。(當進程池中只有一個工作進程的時候,返回結果的順序才能認為是"有序"的)

starmap(func, iterable[, chunksize])
和 map() 類似析校,不過 iterable 中的每一項會被解包再作為函數(shù)參數(shù)构罗。

比如可迭代對象 [(1,2), (3, 4)] 會轉化為等價于 [func(1,2), func(3,4)] 的調用。

3.3 新版功能.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
相當于 starmap() 與 map_async() 的結合勺良,迭代 iterable 的每一項绰播,解包作為 func 的參數(shù)并執(zhí)行,返回用于獲取結果的對象尚困。

3.3 新版功能.

close()
阻止后續(xù)任務提交到進程池蠢箩,當所有任務執(zhí)行完成后,工作進程會退出。

terminate()
不必等待未完成的任務谬泌,立即停止工作進程滔韵。當進程池對象唄垃圾回收時, 會立即調用 terminate() 掌实。

join()
等待工作進程結束陪蜻。調用 join() 前必須先調用 close() 或者 terminate() 。

3.3 新版功能: 進程池對象現(xiàn)在支持上下文管理器協(xié)議 - 參見 上下文管理器類型 贱鼻。enter() 返回進程池對象, exit() 會調用 terminate() 宴卖。

class multiprocessing.pool.AsyncResult
Pool.apply_async() 和 Pool.map_async() 返回對象所屬的類。

get([timeout])
用于獲取執(zhí)行結果邻悬。如果 timeout 不是 None 并且在 timeout 秒內仍然沒有執(zhí)行完得到結果症昏,則拋出 multiprocessing.TimeoutError 異常。如果遠程調用發(fā)生異常父丰,這個異常會通過 get() 重新拋出肝谭。

wait([timeout])
阻塞,直到返回結果蛾扇,或者 timeout 秒后超時攘烛。

ready()
返回執(zhí)行狀態(tài),是否已經(jīng)完成镀首。

successful()
判斷調用是否已經(jīng)完成并且無異常坟漱,如果沒有執(zhí)行完成會拋出 AssertionError 異常。

在 3.7 版更改: 如果沒有執(zhí)行完,會拋出 ValueError 異常而不是 AssertionError 盯串。

下面的例子演示了進程池的用法:

from multiprocessing import Pool
import time

def f(x):
return x*x

if name == 'main':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is very slow

    print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

    it = pool.imap(f, range(10))
    print(next(it))                     # prints "0"
    print(next(it))                     # prints "1"
    print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

    result = pool.apply_async(time.sleep, (10,))
    print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

監(jiān)聽者及客戶端
通常情況下,進程間通過隊列或者 Pipe() 返回的 Connection 傳遞消息。

不過膛锭,multiprocessing.connection 模塊其實提供了一些更靈活的特性。最基礎的用法是通過它抽象出來的高級API來操作socket或者Windows命名管道卖哎。也提供一些高級用法筐骇,如通過 hmac 模塊來支持 摘要認證,以及同時監(jiān)聽多個管道連接扩劝。

multiprocessing.connection.deliver_challenge(connection, authkey)
發(fā)送一個隨機生成的消息到另一端庸论,并等待回復。

如果收到的回復與使用 authkey 作為鍵生成的信息摘要匹配成功棒呛,就會發(fā)送一個歡迎信息給管道另一端聂示。否則拋出 AuthenticationError 異常。

multiprocessing.connection.answer_challenge(connection, authkey)
接收一條信息簇秒,使用 authkey 作為鍵計算信息摘要鱼喉,然后將摘要發(fā)送回去。

如果沒有收到歡迎消息,就拋出 AuthenticationError 異常扛禽。

multiprocessing.connection.Client(address[, family[, authkey]])
嘗試使用 address 地址上的監(jiān)聽者建立一個連接锋边,返回 Connection 。

連接的類型取決于 family 參數(shù)编曼,但是通扯咕蓿可以省略,因為可以通過 address 的格式推導出來掐场。(查看 地址格式 )

如果提供了 authkey 參數(shù)并且不是 None往扔,那它必須是一個字符串并且會被當做基于 HMAC 認證的密鑰。如果 authkey 是None 則不會有認證行為熊户。認證失敗拋出 AuthenticationError 異常瓤球,請查看 See 認證密碼 。

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])
可以監(jiān)聽連接請求敏弃,是對于綁定套接字或者 Windows 命名管道的封裝卦羡。

address 是監(jiān)聽器對象中的綁定套接字或命名管道使用的地址。

注解 如果使用 '0.0.0.0' 作為監(jiān)聽地址麦到,那么在Windows上這個地址無法建立連接绿饵。想要建立一個可連接的端點,應該使用 '127.0.0.1' 瓶颠。
family 是套接字(或者命名管道)使用的類型拟赊。它可以是以下一種: 'AF_INET' ( TCP 套接字類型), 'AF_UNIX' ( Unix 域套接字) 或者 'AF_PIPE' ( Windows 命名管道)。其中只有第一個保證各平臺可用粹淋。如果 family 是 None ,那么 family 會根據(jù) address 的格式自動推導出來吸祟。如果 address 也是 None , 則取默認值。默認值為可用類型中速度最快的桃移。見 地址格式 屋匕。注意,如果 family 是 'AF_UNIX' 而address是None ,套接字會在一個 tempfile.mkstemp() 創(chuàng)建的私有臨時目錄中創(chuàng)建借杰。

如果監(jiān)聽器對象使用了套接字过吻,backlog (默認值為1) 會在套接字綁定后傳遞給它的 listen() 方法。

如果提供了 authkey 參數(shù)并且不是 None蔗衡,那它必須是一個字符串并且會被當做基于 HMAC 認證的密鑰纤虽。如果 authkey 是None 則不會有認證行為。認證失敗拋出 AuthenticationError 異常绞惦,請查看 See 認證密碼 逼纸。

accept()
接受一個連接并返回一個 Connection 對象,其連接到的監(jiān)聽器對象已綁定套接字或者命名管道济蝉。如果已經(jīng)嘗試過認證并且失敗了杰刽,則會拋出 AuthenticationError 異常呻纹。

close()
關閉監(jiān)聽器對象上的綁定套接字或者命名管道。此函數(shù)會在監(jiān)聽器被垃圾回收后自動調用专缠。不過仍然建議顯式調用函數(shù)關閉雷酪。

監(jiān)聽器對象擁有下列只讀屬性:

address
監(jiān)聽器對象使用的地址。

last_accepted
最后一個連接所使用的地址涝婉。如果沒有的話就是 None 哥力。

3.3 新版功能: 監(jiān)聽器對象現(xiàn)在支持了上下文管理協(xié)議 - 見 上下文管理器類型 。 enter() 返回一個監(jiān)聽器對象, exit() 會調用 close() 墩弯。

multiprocessing.connection.wait(object_list, timeout=None)
一直等待直到 object_list 中某個對象處于就緒狀態(tài)吩跋。返回 object_list 中處于就緒狀態(tài)的對象。如果 timeout 是一個浮點型渔工,該方法會最多阻塞這么多秒锌钮。如果 timeout 是 None ,則會允許阻塞的事件沒有限制引矩。timeout為負數(shù)的情況下和為0的情況相同梁丘。

對于 Unix 和 Windows ,滿足下列條件的對象可以出現(xiàn)在 object_list 中

可讀的 Connection 對象旺韭;

一個已連接并且可讀的 socket.socket 對象氛谜;或者

Process 對象中的 sentinel 屬性。

當一個連接或者套接字對象擁有有效的數(shù)據(jù)可被讀取的時候区端,或者另一端關閉后值漫,這個對象就處于就緒狀態(tài)。

Unix: wait(object_list, timeout) 和 select.select(object_list, [], [], timeout) 幾乎相同织盼。差別在于杨何,如果 select.select() 被信號中斷,它會拋出一個附帶錯誤號為 EINTR 的 OSError 異常沥邻,而 wait() 不會危虱。

Windows: object_list 中的元素必須是一個表示為整數(shù)的可等待的句柄(按照 Win32 函數(shù) WaitForMultipleObjects() 的文檔中所定義) 或者一個擁有 fileno() 方法的對象,這個對象返回一個套接字句柄或者管道句柄谋国。(注意管道和套接字兩種句柄 不是 可等待的句柄)

3.3 新版功能.

示例

下面的服務代碼創(chuàng)建了一個使用 'secret password' 作為認證密碼的監(jiān)聽器槽地。它會等待連接然后發(fā)送一些數(shù)據(jù)給客戶端:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000) # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)

    conn.send([2.25, None, 'junk', float])

    conn.send_bytes(b'hello')

    conn.send_bytes(array('i', [42, 1729]))

下面的代碼連接到服務然后從服務器上j接收一些數(shù)據(jù):

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]

print(conn.recv_bytes())            # => 'hello'

arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr))    # => 8
print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

下面的代碼使用了 wait() ,以便在同時等待多個進程發(fā)來消息芦瘾。

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()

if name == 'main':
readers = []

for i in range(4):
    r, w = Pipe(duplex=False)
    readers.append(r)
    p = Process(target=foo, args=(w,))
    p.start()
    # We close the writable end of the pipe now to be sure that
    # p is the only process which owns a handle for it.  This
    # ensures that when p closes its handle for the writable end,
    # wait() will promptly report the readable end as being ready.
    w.close()

while readers:
    for r in wait(readers):
        try:
            msg = r.recv()
        except EOFError:
            readers.remove(r)
        else:
            print(msg)

地址格式
'AF_INET' 地址是 (hostname, port) 形式的元組類型,其中 hostname 是一個字符串集畅,port 是整數(shù)近弟。

'AF_UNIX' 地址是文件系統(tǒng)上文件名的字符串。

'AF_PIPE' 是這種格式的字符串
r'.\pipe{PipeName}' 挺智。如果要用 Client() 連接到一個名為 ServerName 的遠程命名管道祷愉,應該替換為使用 r'\ServerName\pipe{PipeName}' 這種格式。

注意,使用兩個反斜線開頭的字符串默認被當做 'AF_PIPE' 地址而不是 'AF_UNIX' 二鳄。

認證密碼
當使用 Connection.recv 接收數(shù)據(jù)時赴涵,數(shù)據(jù)會自動被反序列化。不幸的是订讼,對于一個不可信的數(shù)據(jù)源發(fā)來的數(shù)據(jù)髓窜,反序列化是存在安全風險的。所以 Listener 和 Client() 之間使用 hmac 模塊進行摘要認證欺殿。

認證密鑰是一個 byte 類型的字符串寄纵,可以認為是和密碼一樣的東西,連接建立好后脖苏,雙方都會要求另一方證明知道認證密鑰程拭。(這個證明過程不會通過連接發(fā)送密鑰)

如果要求認證但是沒有指定認證密鑰,則會使用 current_process().authkey 的返回值 (參見 Process )棍潘。 這個值會被當前進程所創(chuàng)建的任何 Process 對象自動繼承恃鞋。 這意味著(默認情況下)當一個多進程程序的所有進程在彼此之間建立連接的時候,會共享同一個認證密鑰亦歉。

os.urandom() 也可以用來生成合適的認證密鑰山宾。

日志
當前模塊也提供了一些對 logging 的支持。注意鳍徽, logging 模塊本身并沒有使用進程間共享的鎖资锰,所以來自于多個進程的日志可能(具體取決于使用的日志 handler 類型)相互覆蓋或者混雜。

multiprocessing.get_logger()
返回 multiprocessing 使用的 logger阶祭,必要的話會創(chuàng)建一個新的绷杜。

如果創(chuàng)建的首個 logger 日志級別為 logging.NOTSET 并且沒有默認 handler。通過這個 logger 打印的消息不會傳遞到根 logger濒募。

注意在 Windows 上鞭盟,子進程只會繼承父進程 logger 的日志級別 - 對于logger的其他自定義項不會繼承。

multiprocessing.log_to_stderr()
此函數(shù)會調用 get_logger() 但是會在返回的 logger 上增加一個 handler瑰剃,將所有輸出都使用 '[%(levelname)s/%(processName)s] %(message)s' 的格式發(fā)送到 sys.stderr 齿诉。

下面是一個在交互式解釋器中打開日志功能的例子:

import multiprocessing, logging
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
logger.warning('doomed')
[WARNING/MainProcess] doomed
m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
要查看日志等級的完整列表,見 logging 模塊晌姚。

multiprocessing.dummy 模塊
multiprocessing.dummy 復制了 multiprocessing 的 API粤剧,不過是在 threading 模塊之上包裝了一層。

編程指導
使用 multiprocessing 時挥唠,應遵循一些指導原則和習慣用法抵恋。

所有start方法
下面這些適用于所有start方法。

避免共享狀態(tài)

應該盡可能避免在進程間傳遞大量數(shù)據(jù)宝磨,越少越好弧关。

最好堅持使用隊列或者管道進行進程間通信盅安,而不是底層的同步原語。

可序列化

保證所代理的方法的參數(shù)是可以序列化的世囊。

代理的線程安全性

不要在多線程中同時使用一個代理對象别瞭,除非你用鎖保護它。

(而在不同進程中使用 相同 的代理對象卻沒有問題株憾。)

使用 Join 避免僵尸進程

在 Unix 上蝙寨,如果一個進程執(zhí)行完成但是沒有被 join,就會變成僵尸進程号胚。一般來說籽慢,僵尸進程不會很多,因為每次新啟動進程(或者 active_children() 被調用)時猫胁,所有已執(zhí)行完成且沒有被 join 的進程都會自動被 join箱亿,而且對一個執(zhí)行完的進程調用 Process.is_alive 也會 join 這個進程。盡管如此弃秆,對自己啟動的進程顯式調用 join 依然是最佳實踐届惋。

繼承優(yōu)于序列化、反序列化

當使用 spawn 或者 forkserver 的啟動方式時菠赚,multiprocessing 中的許多類型都必須是可序列化的脑豹,這樣子進程才能使用它們。但是通常我們都應該避免使用管道和隊列發(fā)送共享對象到另外一個進程衡查,而是重新組織代碼瘩欺,對于其他進程創(chuàng)建出來的共享對象,讓那些需要訪問這些對象的子進程可以直接將這些對象從父進程繼承過來拌牲。

避免殺死進程

聽過 Process.terminate 停止一個進程很容易導致這個進程正在使用的共享資源(如鎖俱饿、信號量、管道和隊列)損壞或者變得不可用塌忽,無法在其他進程中繼續(xù)使用拍埠。

所以,最好只對那些從來不使用共享資源的進程調用 Process.terminate 土居。

Join 使用隊列的進程

記住枣购,往隊列放入數(shù)據(jù)的進程會一直等待直到隊列中所有項被"feeder" 線程傳給底層管道。(子進程可以調用隊列的 Queue.cancel_join_thread 方法禁止這種行為)

這意味著擦耀,任何使用隊列的時候棉圈,你都要確保在進程join之前,所有存放到隊列中的項將會被其他進程埂奈、線程完全消費迄损。否則不能保證這個寫過隊列的進程可以正常終止。記住非精靈進程會自動 join 账磺。

下面是一個會導致死鎖的例子:

from multiprocessing import Process, Queue

def f(q):
q.put('X' * 1000000)

if name == 'main':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
交換最后兩行可以修復這個問題(或者直接刪掉 p.join())芹敌。

顯示傳遞資源給子進程

在Unix上,使用 fork 方式啟動的子進程可以使用父進程中全局創(chuàng)建的共享資源垮抗。不過氏捞,最好是顯式將資源對象通過參數(shù)的形式傳遞給子進程。

除了(部分原因)讓代碼兼容 Windows 以及其他的進程啟動方式外冒版,這種形式還保證了在子進程生命期這個對象是不會被父進程垃圾回收的液茎。如果父進程中的某些對象被垃圾回收會導致資源釋放,這就變得很重要辞嗡。

所以對于實例:

from multiprocessing import Process, Lock

def f():
... do something using "lock" ...

if name == 'main':
lock = Lock()
for i in range(10):
Process(target=f).start()
應當重寫成這樣:

from multiprocessing import Process, Lock

def f(l):
... do something using "l" ...

if name == 'main':
lock = Lock()
for i in range(10):
Process(target=f, args=(lock,)).start()
謹防將 sys.stdin 數(shù)據(jù)替換為 “類似文件的對象”

multiprocessing 原本會無條件地這樣調用:

os.close(sys.stdin.fileno())
在 multiprocessing.Process._bootstrap() 方法中 —— 這會導致與"進程中的進程"相關的一些問題捆等。這已經(jīng)被修改成了:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)
它解決了進程相互沖突導致文件描述符錯誤的根本問題,但是對使用帶緩沖的“文件類對象”替換 sys.stdin() 作為輸出的應用程序造成了潛在的危險续室。如果多個進程調用了此文件類對象的 close() 方法栋烤,會導致相同的數(shù)據(jù)多次刷寫到此對象,損壞數(shù)據(jù)挺狰。

如果你寫入文件類對象并實現(xiàn)了自己的緩存明郭,可以在每次追加緩存數(shù)據(jù)時記錄當前進程id,從而將其變成 fork 安全的丰泊,當發(fā)現(xiàn)進程id變化后舍棄之前的緩存薯定,例如:

@property
def cache(self):
pid = os.getpid()
if pid != self._pid:
self._pid = pid
self._cache = []
return self._cache
需要更多信息,請查看 bpo-5155, bpo-5313 以及 bpo-5331

spawn 和 forkserver 啟動方式
相對于 fork 啟動方式瞳购,有一些額外的限制话侄。

更依賴序列化

Process.init() 的所有參數(shù)都必須可序列化。同樣的学赛,當你繼承 Process 時年堆,需要保證當調用 Process.start 方法時,實例可以被序列化罢屈。

全局變量

記住嘀韧,如果子進程中的代碼嘗試訪問一個全局變量,它所看到的值(如果有)可能和父進程中執(zhí)行 Process.start 那一刻的值不一樣缠捌。

當全局變量知識模塊級別的常量時锄贷,是不會有問題的。

安全導入主模塊

確保主模塊可以被新啟動的Python解釋器安全導入而不會引發(fā)什么副作用(比如又啟動了一個子進程)

例如曼月,使用 spawn 或 forkserver 啟動方式執(zhí)行下面的模塊谊却,會引發(fā) RuntimeError 異常而失敗。

from multiprocessing import Process

def foo():
print('hello')

p = Process(target=foo)
p.start()
應該通過下面的方法使用 if name == 'main': 哑芹,從而保護程序"入口點":

from multiprocessing import Process, freeze_support, set_start_method

def foo():
print('hello')

if name == 'main':
freeze_support()
set_start_method('spawn')
p = Process(target=foo)
p.start()
(如果程序將正常運行而不是凍結炎辨,則可以省略 freeze_support() 行)

這允許新啟動的 Python 解釋器安全導入模塊然后運行模塊中的 foo() 函數(shù)。

如果主模塊中創(chuàng)建了進程池或者管理器聪姿,這個規(guī)則也適用碴萧。

示例
創(chuàng)建和使用自定義管理器乙嘀、代理的示例:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')

A simple generator function

def baz():
for i in range(10):
yield i*i

Proxy type for generator objects

class GeneratorProxy(BaseProxy):
exposed = ['next']
def iter(self):
return self
def next(self):
return self._callmethod('next')

Function to return the operator module

def get_operator_module():
return operator

class MyManager(BaseManager):
pass

register the Foo class; make f() and g() accessible via proxy

MyManager.register('Foo1', Foo)

register the Foo class; make g() and _h() accessible via proxy

MyManager.register('Foo2', Foo, exposed=('g', '_h'))

register the generator function baz; use GeneratorProxy to make proxies

MyManager.register('baz', baz, proxytype=GeneratorProxy)

register get_operator_module(); make public functions accessible via proxy

MyManager.register('operator', get_operator_module)

def test():
manager = MyManager()
manager.start()

print('-' * 20)

f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])

print('-' * 20)

f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])

print('-' * 20)

it = manager.baz()
for i in it:
    print('<%d>' % i, end=' ')
print()

print('-' * 20)

op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)

if name == 'main':
freeze_support()
test()
使用 Pool:

import multiprocessing
import time
import random
import sys

Functions used by test code

def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.name, args, result
)

def calculatestar(args):
return calculate(*args)

def mul(a, b):
time.sleep(0.5 * random.random())
return a * b

def plus(a, b):
time.sleep(0.5 * random.random())
return a + b

def f(x):
return 1.0 / (x - 5.0)

def pow3(x):
return x ** 3

def noop(x):
pass

Test code

def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)

with multiprocessing.Pool(PROCESSES) as pool:
    #
    # Tests
    #

    TASKS = [(mul, (i, 7)) for i in range(10)] + \
            [(plus, (i, 8)) for i in range(10)]

    results = [pool.apply_async(calculate, t) for t in TASKS]
    imap_it = pool.imap(calculatestar, TASKS)
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

    print('Ordered results using pool.apply_async():')
    for r in results:
        print('\t', r.get())
    print()

    print('Ordered results using pool.imap():')
    for x in imap_it:
        print('\t', x)
    print()

    print('Unordered results using pool.imap_unordered():')
    for x in imap_unordered_it:
        print('\t', x)
    print()

    print('Ordered results using pool.map() --- will block till complete:')
    for x in pool.map(calculatestar, TASKS):
        print('\t', x)
    print()

    #
    # Test error handling
    #

    print('Testing error handling:')

    try:
        print(pool.apply(f, (5,)))
    except ZeroDivisionError:
        print('\tGot ZeroDivisionError as expected from pool.apply()')
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print(pool.map(f, list(range(10))))
    except ZeroDivisionError:
        print('\tGot ZeroDivisionError as expected from pool.map()')
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print(list(pool.imap(f, list(range(10)))))
    except ZeroDivisionError:
        print('\tGot ZeroDivisionError as expected from list(pool.imap())')
    else:
        raise AssertionError('expected ZeroDivisionError')

    it = pool.imap(f, list(range(10)))
    for i in range(10):
        try:
            x = next(it)
        except ZeroDivisionError:
            if i == 5:
                pass
        except StopIteration:
            break
        else:
            if i == 5:
                raise AssertionError('expected ZeroDivisionError')

    assert i == 9
    print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
    print()

    #
    # Testing timeouts
    #

    print('Testing ApplyResult.get() with timeout:', end=' ')
    res = pool.apply_async(calculate, TASKS[0])
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % res.get(0.02))
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print()
    print()

    print('Testing IMapIterator.next() with timeout:', end=' ')
    it = pool.imap(calculatestar, TASKS)
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % it.next(0.02))
        except StopIteration:
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print()
    print()

if name == 'main':
multiprocessing.freeze_support()
test()
一個演示如何使用隊列來向一組工作進程提供任務并收集結果的例子:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

Function run by worker processes

def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)

Function used to calculate result

def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' %
(current_process().name, func.name, args, result)

Functions referenced by tasks

def mul(a, b):
time.sleep(0.5*random.random())
return a * b

def plus(a, b):
time.sleep(0.5*random.random())
return a + b

def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]

# Create queues
task_queue = Queue()
done_queue = Queue()

# Submit tasks
for task in TASKS1:
    task_queue.put(task)

# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
    Process(target=worker, args=(task_queue, done_queue)).start()

# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
    print('\t', done_queue.get())

# Add more tasks using `put()`
for task in TASKS2:
    task_queue.put(task)

# Get and print some more results
for i in range(len(TASKS2)):
    print('\t', done_queue.get())

# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
    task_queue.put('STOP')

if name == 'main':
freeze_support()
test()

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市破喻,隨后出現(xiàn)的幾起案子虎谢,更是在濱河造成了極大的恐慌,老刑警劉巖曹质,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件婴噩,死亡現(xiàn)場離奇詭異,居然都是意外死亡羽德,警方通過查閱死者的電腦和手機几莽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來宅静,“玉大人章蚣,你說我怎么就攤上這事』滴” “怎么了究驴?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長匀伏。 經(jīng)常有香客問我洒忧,道長,這世上最難降的妖魔是什么够颠? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任熙侍,我火速辦了婚禮,結果婚禮上履磨,老公的妹妹穿的比我還像新娘蛉抓。我一直安慰自己,他們只是感情好剃诅,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布巷送。 她就那樣靜靜地躺著,像睡著了一般矛辕。 火紅的嫁衣襯著肌膚如雪笑跛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天聊品,我揣著相機與錄音飞蹂,去河邊找鬼。 笑死翻屈,一個胖子當著我的面吹牛陈哑,可吹牛的內容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼惊窖,長吁一口氣:“原來是場噩夢啊……” “哼刽宪!你這毒婦竟也來了?” 一聲冷哼從身側響起爬坑,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤纠屋,失蹤者是張志新(化名)和其女友劉穎涂臣,沒想到半個月后盾计,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡赁遗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年署辉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片岩四。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡哭尝,死狀恐怖,靈堂內的尸體忽然破棺而出剖煌,到底是詐尸還是另有隱情材鹦,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布耕姊,位于F島的核電站桶唐,受9級特大地震影響,放射性物質發(fā)生泄漏茉兰。R本人自食惡果不足惜尤泽,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望规脸。 院中可真熱鬧坯约,春花似錦、人聲如沸莫鸭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽被因。三九已至卿拴,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間氏身,已是汗流浹背巍棱。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蛋欣,地道東北人航徙。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像陷虎,于是被迫代替她去往敵國和親到踏。 傳聞我的和親對象是個殘疾皇子杠袱,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內容

  • 進程間通信——隊列和管道(multiprocess.Queue楣富、multiprocess.Pipe) 進程間通信 ...
    go以恒閱讀 1,771評論 0 3
  • 寫在前面的話 代碼中的# > 表示的是輸出結果 輸入 使用input()函數(shù) 用法 注意input函數(shù)輸出的均是字...
    FlyingLittlePG閱讀 2,732評論 0 8
  • 進程、進程的使用伴榔、進程注意點纹蝴、進程間通信-Queue、進程池Pool踪少、進程與線程對比塘安、文件夾拷貝器-多任務 1.進...
    Cestine閱讀 770評論 0 0
  • 必備的理論基礎 1.操作系統(tǒng)作用: 隱藏丑陋復雜的硬件接口,提供良好的抽象接口援奢。 管理調度進程兼犯,并將多個進程對硬件...
    drfung閱讀 3,525評論 0 5
  • Python 3的多進程 多進程庫名叫multiprocessing。有幾點記錄一下: multiprocessi...
    小溫侯閱讀 3,306評論 0 2