multiprocessing 是 Python 的標(biāo)準(zhǔn)模塊鉴扫,它既可以用來編寫多進(jìn)程园蝠,也可以用來編寫多線程。如果是多線程的話,用 multiprocessing.dummy 即可涯贞,用法與 multiprocessing 基本相同扫俺。
基礎(chǔ)
利用 multiprocessing.Process 對象可以創(chuàng)建一個進(jìn)程规个,Process 對象與 Thread 對象的用法相同矾飞,也有 start()
, run()
座泳, join()
等方法惠昔。Process 類適合簡單的進(jìn)程創(chuàng)建,如需資源共享可以結(jié)合 multiprocessing.Queue 使用挑势;如果想要控制進(jìn)程數(shù)量镇防,則建議使用進(jìn)程池 Pool 類。
Process 介紹:
構(gòu)造方法:
- Process([group [, target [, name [, args [, kwargs]]]]])
- group: 線程組潮饱,目前還沒有實(shí)現(xiàn)来氧,庫引用中提示必須是 None;
- target: 要執(zhí)行的方法香拉;
- name: 進(jìn)程名啦扬;
- args/kwargs: 要傳入方法的參數(shù)。
實(shí)例方法:
- is_alive():返回進(jìn)程是否在運(yùn)行凫碌。
- join([timeout]):阻塞當(dāng)前上下文環(huán)境的進(jìn)程程扑毡,直到調(diào)用此方法的進(jìn)程終止或到達(dá)指定的 timeout(可選參數(shù))。
- start():進(jìn)程準(zhǔn)備就緒盛险,等待 CPU 調(diào)度瞄摊。
- run():strat() 調(diào)用 run 方法勋又,如果實(shí)例進(jìn)程時未制定傳入 target,start 執(zhí)行默認(rèn) run() 方法换帜。
- terminate():不管任務(wù)是否完成楔壤,立即停止工作進(jìn)程。
屬性:
- authkey
- daemon:和線程的 setDeamon 功能一樣(將父進(jìn)程設(shè)置為守護(hù)進(jìn)程惯驼,當(dāng)父進(jìn)程結(jié)束時蹲嚣,子進(jìn)程也結(jié)束)。
- exitcode(進(jìn)程在運(yùn)行時為 None跳座、如果為 –N端铛,表示被信號 N 結(jié)束)。
- name:進(jìn)程名字疲眷。
- pid:進(jìn)程號。
下面看一個簡單的例子:
import multiprocessing
def worker():
"""worker function"""
print('Worker')
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
# 輸出
# Worker
# Worker
# Worker
# Worker
# Worker
輸出結(jié)果是打印了五次 Worker您朽,我們并不知道哪個 Worker 是由哪個進(jìn)程打印的狂丝,具體取決于執(zhí)行順序,因?yàn)槊總€進(jìn)程都在競爭訪問輸出流哗总。
那怎樣才能知道具體執(zhí)行順序呢几颜?可以通過給進(jìn)程傳參來實(shí)現(xiàn)。與 threading 不同讯屈,傳遞給 multiprocessing
Process
的參數(shù)必需是可序列化的蛋哭,來看一下代碼:
import multiprocessing
def worker(num):
"""thread worker function"""
print('Worker:', num)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
# 輸出
# Worker: 1
# Worker: 0
# Worker: 2
# Worker: 3
# Worker: 4
可導(dǎo)入的目標(biāo)函數(shù)
threading 和 multiprocessing 的一處區(qū)別是在 __main__
中使用時的額外保護(hù)。由于進(jìn)程已經(jīng)啟動涮母,子進(jìn)程需要能夠?qū)氚繕?biāo)函數(shù)的腳本谆趾。在 __main__
中包裝應(yīng)用程序的主要部分,可確保在導(dǎo)入模塊時不會在每個子項(xiàng)中遞歸運(yùn)行它叛本。另一種方法是從單獨(dú)的腳本導(dǎo)入目標(biāo)函數(shù)沪蓬。例如:multiprocessing_import_main.py
使用在第二個模塊中定義的 worker 函數(shù)来候。
# multiprocessing_import_main.py
import multiprocessing
import multiprocessing_import_worker
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(
target=multiprocessing_import_worker.worker,
)
jobs.append(p)
p.start()
# 輸出
# Worker
# Worker
# Worker
# Worker
# Worker
worker 函數(shù)定義于multiprocessing_import_worker.py
跷叉。
# multiprocessing_import_worker.py
def worker():
"""worker function"""
print('Worker')
return
確定當(dāng)前進(jìn)程
傳參來識別或命名進(jìn)程非常麻煩狮暑,也不必要敦冬。每個Process
實(shí)例都有一個名稱吹菱,其默認(rèn)值可以在創(chuàng)建進(jìn)程時更改凛捏。命名進(jìn)程對于跟蹤它們非常有用,尤其是在同時運(yùn)行多種類型進(jìn)程的應(yīng)用程序中左刽。
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print(name, 'Starting')
time.sleep(2)
print(name, 'Exiting')
def my_service():
name = multiprocessing.current_process().name
print(name, 'Starting')
time.sleep(3)
print(name, 'Exiting')
if __name__ == '__main__':
service = multiprocessing.Process(
name='my_service',
target=my_service,
)
worker_1 = multiprocessing.Process(
name='worker 1',
target=worker,
)
worker_2 = multiprocessing.Process( # default name
target=worker,
)
worker_1.start()
worker_2.start()
service.start()
# output
# worker 1 Starting
# worker 1 Exiting
# Process-3 Starting
# Process-3 Exiting
# my_service Starting
# my_service Exiting
守護(hù)進(jìn)程
默認(rèn)情況下网持,在所有子進(jìn)程退出之前填硕,主程序不會退出麦萤。有些時候,啟動后臺進(jìn)程運(yùn)行而不阻止主程序退出是有用的扁眯,例如為監(jiān)視工具生成“心跳”的任務(wù)壮莹。
要將進(jìn)程標(biāo)記為守護(hù)程序很簡單,只要將daemon
屬性設(shè)置為 True
就可以了姻檀。
import multiprocessing
import time
import sys
def daemon():
p = multiprocessing.current_process()
print('Starting:', p.name, p.pid)
sys.stdout.flush()
time.sleep(2)
print('Exiting :', p.name, p.pid)
sys.stdout.flush()
def non_daemon():
p = multiprocessing.current_process()
print('Starting:', p.name, p.pid)
sys.stdout.flush()
print('Exiting :', p.name, p.pid)
sys.stdout.flush()
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
# output
# Starting: daemon 41838
# Starting: non-daemon 41841
# Exiting : non-daemon 41841
輸出不包括來自守護(hù)進(jìn)程的“退出”消息命满,因?yàn)樗蟹鞘刈o(hù)進(jìn)程(包括主程序)在守護(hù)進(jìn)程從兩秒休眠狀態(tài)喚醒之前退出。
守護(hù)進(jìn)程在主程序退出之前自動終止绣版,這避免了孤立進(jìn)程的運(yùn)行胶台。這可以通過查找程序運(yùn)行時打印的進(jìn)程 ID 值來驗(yàn)證,然后使用 ps
命令檢查該進(jìn)程杂抽。
等待進(jìn)程
要等到進(jìn)程完成其工作并退出诈唬,請使用 join()
方法。
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
print('Exiting :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
d.join()
n.join()
# output
# Starting: non-daemon
# Exiting : non-daemon
# Starting: daemon
# Exiting : daemon
由于主進(jìn)程使用 join()
等待守護(hù)進(jìn)程退出缩麸,因此此時將打印“退出”消息铸磅。
默認(rèn)情況下,join()
無限期地阻止杭朱。也可以傳遞一個超時參數(shù)(一個浮點(diǎn)數(shù)表示等待進(jìn)程變?yōu)榉腔顒訝顟B(tài)的秒數(shù))阅仔。如果進(jìn)程未在超時期限內(nèi)完成,則join()
無論如何都要返回弧械。
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
print('Exiting :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
n.start()
d.join(1)
print('d.is_alive()', d.is_alive())
n.join()
# output
# Starting: non-daemon
# Exiting : non-daemon
# d.is_alive() True
由于傳遞的超時時間小于守護(hù)進(jìn)程休眠的時間八酒,因此join()
返回后進(jìn)程仍處于“活動”狀態(tài)。
終止進(jìn)程
如果想讓一個進(jìn)程退出刃唐,最好使用「poison pill」方法向它發(fā)送信號羞迷,如果進(jìn)程出現(xiàn)掛起或死鎖,那么強(qiáng)制終止它是有用的唁桩。 調(diào)用 terminate()
來殺死子進(jìn)程闭树。
import multiprocessing
import time
def slow_worker():
print('Starting worker')
time.sleep(0.1)
print('Finished worker')
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print('BEFORE:', p, p.is_alive())
p.start()
print('DURING:', p, p.is_alive())
p.terminate()
print('TERMINATED:', p, p.is_alive())
p.join()
print('JOINED:', p, p.is_alive())
# output
# BEFORE: <Process(Process-1, initial)> False
# DURING: <Process(Process-1, started)> True
# TERMINATED: <Process(Process-1, started)> True
# JOINED: <Process(Process-1, stopped[SIGTERM])> False
在終止它之后對該進(jìn)程使用 join()
很重要,可以為進(jìn)程管理代碼提供時間來更新對象狀態(tài)荒澡,用以反映終止效果报辱。
處理退出狀態(tài)
可以通過exitcode
屬性訪問進(jìn)程退出時生成的狀態(tài)代碼。允許的范圍列于下表中。
退出代碼 | 含義 |
---|---|
== 0 |
沒有產(chǎn)生錯誤 |
> 0 |
該進(jìn)程出錯碍现,并退出該代碼 |
< 0 |
這個過程被一個信號殺死了 -1 * exitcode
|
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
funcs = [
exit_error,
exit_ok,
return_value,
raises,
terminated,
]
for f in funcs:
print('Starting process for', f.__name__)
j = multiprocessing.Process(target=f, name=f.__name__)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
# output
# Starting process for exit_error
# Starting process for exit_ok
# Starting process for return_value
# Starting process for raises
# Starting process for terminated
# Process raises:
# Traceback (most recent call last):
# File ".../lib/python3.6/multiprocessing/process.py", line 258,
# in _bootstrap
# self.run()
# File ".../lib/python3.6/multiprocessing/process.py", line 93,
# in run
# self._target(*self._args, **self._kwargs)
# File "multiprocessing_exitcode.py", line 28, in raises
# raise RuntimeError('There was an error!')
# RuntimeError: There was an error!
# exit_error.exitcode = 1
# exit_ok.exitcode = 0
# return_value.exitcode = 0
# raises.exitcode = 1
# terminated.exitcode = -15
記錄日志
在調(diào)試并發(fā)問題時幅疼,訪問 multiprocessing
對象的內(nèi)部結(jié)構(gòu)很有用。有一個方便的模塊級功能來啟用被調(diào)用的日志昼接,叫 log_to_stderr()
爽篷。它使用logging
并添加處理程序來設(shè)置記錄器對象 ,以便將日志消息發(fā)送到標(biāo)準(zhǔn)錯誤通道慢睡。
import multiprocessing
import logging
import sys
def worker():
print('Doing some work')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
# output
# [INFO/Process-1] child process calling self.run()
# Doing some work
# [INFO/Process-1] process shutting down
# [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-1] running the remaining "atexit" finalizers
# [INFO/Process-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down
# [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
# [DEBUG/MainProcess] running the remaining "atexit" finalizers
默認(rèn)情況下逐工,日志記錄級別設(shè)置為NOTSET
不生成任何消息。傳遞不同的級別以將記錄器初始化為所需的詳細(xì)程度漂辐。
要直接操作記錄器(更改其級別設(shè)置或添加處理程序)泪喊,請使用get_logger()
。
import multiprocessing
import logging
import sys
def worker():
print('Doing some work')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
# output
# [INFO/Process-1] child process calling self.run()
# Doing some work
# [INFO/Process-1] process shutting down
# [INFO/Process-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down
子類化過程
雖然在單獨(dú)的進(jìn)程中啟動子進(jìn)程的最簡單方法是使用Process
并傳遞目標(biāo)函數(shù)髓涯,但也可以使用自定義子類袒啼。
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print('In {}'.format(self.name))
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
# output
# In Worker-1
# In Worker-3
# In Worker-2
# In Worker-4
# In Worker-5
派生類應(yīng)該重寫run()
以完成其工作。
向進(jìn)程傳遞消息
與線程一樣纬纪,多個進(jìn)程的常見使用模式是將作業(yè)劃分為多個工作并行運(yùn)行蚓再。有效使用多個流程通常需要在它們之間進(jìn)行一些通信,以便可以劃分工作并匯總結(jié)果包各。在進(jìn)程之間通信的一種簡單方法是使用 Queue
來傳遞消息摘仅。任何可以通過 pickle
序列化的對象都可以傳遞給 Queue
。
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}!'.format(proc_name, self.name))
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
# output
# Doing something fancy in Process-1 for Fancy Dan!
這個簡短的示例僅將單個消息傳遞給單個工作程序问畅,然后主進(jìn)程等待工作程序完成实檀。
下面看一個更復(fù)雜例子,它顯示了如何管理多個從 JoinableQueue
消耗數(shù)據(jù)的 worker按声,并將結(jié)果傳遞回父進(jìn)程√衤溃「poison pill」技術(shù)用來終止 workers签则。設(shè)置實(shí)際任務(wù)后,主程序會將每個工作程序的一個“停止”值添加到隊(duì)列中铐料。當(dāng) worker 遇到特殊值時渐裂,它會從循環(huán)中跳出。主進(jìn)程使用任務(wù)隊(duì)列的join()
方法在處理結(jié)果之前等待所有任務(wù)完成钠惩。
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('{}: Exiting'.format(proc_name))
self.task_queue.task_done()
break
print('{}: {}'.format(proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task:
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take time to do the work
return '{self.a} * {self.b} = {product}'.format(
self=self, product=self.a * self.b)
def __str__(self):
return '{self.a} * {self.b}'.format(self=self)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print('Creating {} consumers'.format(num_consumers))
consumers = [
Consumer(tasks, results)
for i in range(num_consumers)
]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print('Result:', result)
num_jobs -= 1
# output
# Creating 8 consumers
# Consumer-1: 0 * 0
# Consumer-2: 1 * 1
# Consumer-3: 2 * 2
# Consumer-4: 3 * 3
# Consumer-5: 4 * 4
# Consumer-6: 5 * 5
# Consumer-7: 6 * 6
# Consumer-8: 7 * 7
# Consumer-3: 8 * 8
# Consumer-7: 9 * 9
# Consumer-4: Exiting
# Consumer-1: Exiting
# Consumer-2: Exiting
# Consumer-5: Exiting
# Consumer-6: Exiting
# Consumer-8: Exiting
# Consumer-7: Exiting
# Consumer-3: Exiting
# Result: 6 * 6 = 36
# Result: 2 * 2 = 4
# Result: 3 * 3 = 9
# Result: 0 * 0 = 0
# Result: 1 * 1 = 1
# Result: 7 * 7 = 49
# Result: 4 * 4 = 16
# Result: 5 * 5 = 25
# Result: 8 * 8 = 64
# Result: 9 * 9 = 81
盡管作業(yè)按順序進(jìn)入隊(duì)列柒凉,但它們的執(zhí)行是并行化的,因此無法保證它們的完成順序篓跛。
進(jìn)程間通信
Event
類提供一種簡單的方式進(jìn)行進(jìn)程之間的通信膝捞。可以在設(shè)置和未設(shè)置狀態(tài)之間切換事件愧沟。事件對象的用戶可以使用可選的超時值等待它從未設(shè)置更改為設(shè)置蔬咬。
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
# output
# main: waiting before calling Event.set()
# wait_for_event: starting
# wait_for_event_timeout: starting
# wait_for_event_timeout: e.is_set()-> False
# main: event is set
# wait_for_event: e.is_set()-> True
如果wait()
超時鲤遥,不會返回錯誤。調(diào)用者可以使用 is_set()
檢查事件的狀態(tài)林艘。
控制對資源的訪問
在多個進(jìn)程之間共享單個資源的情況下盖奈,可以用 Lock
來避免訪問沖突。
import multiprocessing
import sys
def worker_with(lock, stream):
with lock:
stream.write('Lock acquired via with\n')
def worker_no_with(lock, stream):
lock.acquire()
try:
stream.write('Lock acquired directly\n')
finally:
lock.release()
lock = multiprocessing.Lock()
w = multiprocessing.Process(
target=worker_with,
args=(lock, sys.stdout),
)
nw = multiprocessing.Process(
target=worker_no_with,
args=(lock, sys.stdout),
)
w.start()
nw.start()
w.join()
nw.join()
# output
# Lock acquired via with
# Lock acquired directly
在此示例中狐援,如果兩個進(jìn)程不同步它們對標(biāo)準(zhǔn)輸出的訪問與鎖定钢坦,則打印到控制臺的消息可能混雜在一起。
同步操作
Condition
對象可用于同步工作流的一部分啥酱,可以使某些對象并行運(yùn)行爹凹,但其他對象順序運(yùn)行,即使它們位于不同的進(jìn)程中懈涛。
import multiprocessing
import time
def stage_1(cond):
"""
perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
# output
# Starting stage_2[1]
# Starting stage_2[2]
# Starting s1
# s1 done and ready for stage 2
# stage_2[1] running
# stage_2[2] running
在此示例中逛万,兩個進(jìn)程并行運(yùn)行 stage_2
,但僅在 stage_1
完成后運(yùn)行批钠。
控制對資源的并發(fā)訪問
有時宇植,允許多個 worker 同時訪問資源是有用的,同時仍限制總數(shù)埋心。例如指郁,連接池可能支持固定數(shù)量的并發(fā)連接,或者網(wǎng)絡(luò)應(yīng)用程序可能支持固定數(shù)量的并發(fā)下載拷呆。Semaphore
是管理這些連接的一種方法闲坎。
import random
import multiprocessing
import time
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.mgr = multiprocessing.Manager()
self.active = self.mgr.list()
self.lock = multiprocessing.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
def __str__(self):
with self.lock:
return str(self.active)
def worker(s, pool):
name = multiprocessing.current_process().name
with s:
pool.makeActive(name)
print('Activating {} now running {}'.format(name, pool))
time.sleep(random.random())
pool.makeInactive(name)
if __name__ == '__main__':
pool = ActivePool()
s = multiprocessing.Semaphore(3)
jobs = [
multiprocessing.Process(
target=worker,
name=str(i),
args=(s, pool),
)
for i in range(10)
]
for j in jobs:
j.start()
while True:
alive = 0
for j in jobs:
if j.is_alive():
alive += 1
j.join(timeout=0.1)
print('Now running {}'.format(pool))
if alive == 0:
# all done
break
# output
# Activating 0 now running ['0', '1', '2']
# Activating 1 now running ['0', '1', '2']
# Activating 2 now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Activating 3 now running ['0', '1', '3']
# Activating 4 now running ['1', '3', '4']
# Activating 6 now running ['1', '4', '6']
# Now running ['1', '4', '6']
# Now running ['1', '4', '6']
# Activating 5 now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Activating 8 now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Activating 7 now running ['5', '8', '7']
# Now running ['5', '8', '7']
# Activating 9 now running ['8', '7', '9']
# Now running ['8', '7', '9']
# Now running ['8', '9']
# Now running ['8', '9']
# Now running ['9']
# Now running ['9']
# Now running ['9']
# Now running ['9']
# Now running []
在此示例中,ActivePool
類僅用作跟蹤在給定時刻正在運(yùn)行的進(jìn)程的便捷方式茬斧。實(shí)際資源池可能會為新活動的進(jìn)程分配連接或其他值腰懂,并在任務(wù)完成時回收該值。這里项秉,pool 只用于保存活動進(jìn)程的名稱绣溜,以顯示只有三個并發(fā)運(yùn)行。
管理共享狀態(tài)
在前面的示例中娄蔼,首先通過 Manager
創(chuàng)建特殊類型的列表怖喻,然后活動進(jìn)程列表通過 ActivePool
在實(shí)例中集中維護(hù)。Manager
負(fù)責(zé)協(xié)調(diào)所有用戶之間共享信息的狀態(tài)岁诉。
import multiprocessing
import pprint
def worker(d, key, value):
d[key] = value
if __name__ == '__main__':
mgr = multiprocessing.Manager()
d = mgr.dict()
jobs = [
multiprocessing.Process(
target=worker,
args=(d, i, i * 2),
)
for i in range(10)
]
for j in jobs:
j.start()
for j in jobs:
j.join()
print('Results:', d)
# output
# Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}
通過管理器創(chuàng)建列表锚沸,它將被共享,并且可以在所有進(jìn)程中看到更新涕癣。字典也支持哗蜈。
共享命名空間
除了字典和列表,Manager
還可以創(chuàng)建共享Namespace
。
import multiprocessing
def producer(ns, event):
ns.value = 'This is the value'
event.set()
def consumer(ns, event):
try:
print('Before event: {}'.format(ns.value))
except Exception as err:
print('Before event, error:', str(err))
event.wait()
print('After event:', ns.value)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
# output
# Before event, error: 'Namespace' object has no attribute 'value'
# After event: This is the value
只要添加到命名空間Namespace
恬叹,那么所有接收Namespace
實(shí)例的客戶端都可見候生。
重要的是,要知道命名空間中可變值內(nèi)容的更新不會自動傳播绽昼。
import multiprocessing
def producer(ns, event):
# DOES NOT UPDATE GLOBAL VALUE!
ns.my_list.append('This is the value')
event.set()
def consumer(ns, event):
print('Before event:', ns.my_list)
event.wait()
print('After event :', ns.my_list)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
namespace.my_list = []
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
# output
# Before event: []
# After event : []
要更新列表唯鸭,需要再次將其添加到命名空間。
進(jìn)程池
Pool
類可用于管理固定數(shù)量 workers 的簡單情況硅确。返回值作為列表返回目溉。Pool
參數(shù)包括進(jìn)程數(shù)和啟動任務(wù)進(jìn)程時要運(yùn)行的函數(shù)(每個子進(jìn)程調(diào)用一次)。
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('Starting', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print('Pool :', pool_outputs)
# output
# Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Built-in: <map object at 0x1007b2be0>
# Starting ForkPoolWorker-3
# Starting ForkPoolWorker-4
# Starting ForkPoolWorker-5
# Starting ForkPoolWorker-6
# Starting ForkPoolWorker-1
# Starting ForkPoolWorker-7
# Starting ForkPoolWorker-2
# Starting ForkPoolWorker-8
# Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
除了各個任務(wù)并行運(yùn)行外菱农,map()
方法的結(jié)果在功能上等同于內(nèi)置map()
缭付。由于 Pool
并行處理其輸入,close()
和 join()
可用于主處理與任務(wù)進(jìn)程進(jìn)行同步循未,以確保完全清除陷猫。
默認(rèn)情況下,Pool
創(chuàng)建固定數(shù)量的工作進(jìn)程并將 jobs 傳遞給它們的妖,直到?jīng)]有其他 jobs 為止绣檬。設(shè)置 maxtasksperchild
參數(shù)會告訴 Pool
在完成一些任務(wù)后重新啟動工作進(jìn)程,從而防止長時間運(yùn)行 workers 消耗更多的系統(tǒng)資源嫂粟。
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('Starting', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
maxtasksperchild=2,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print('Pool :', pool_outputs)
# output
# Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Built-in: <map object at 0x1007b21d0>
# Starting ForkPoolWorker-1
# Starting ForkPoolWorker-2
# Starting ForkPoolWorker-4
# Starting ForkPoolWorker-5
# Starting ForkPoolWorker-6
# Starting ForkPoolWorker-3
# Starting ForkPoolWorker-7
# Starting ForkPoolWorker-8
# Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
即使沒有更多工作娇未,Pool
也會在完成分配的任務(wù)后重新啟動 workers。在此輸出中星虹,即使只有 10 個任務(wù)零抬,也會創(chuàng)建 8 個 workers,并且每個 worker 可以一次完成其中兩個任務(wù)宽涌。
實(shí)現(xiàn) MapReduce
Pool
類可以用來創(chuàng)建一個簡單的單臺服務(wù)器的 MapReduce 實(shí)現(xiàn)平夜。雖然它沒有給出分布式處理的全部好處,但它確實(shí)說明了將一些問題分解為可分配的工作單元是多么容易卸亮。
在基于 MapReduce 的系統(tǒng)中褥芒,輸入數(shù)據(jù)被分解為塊以供不同的工作實(shí)例處理。使用簡單的變換將每個輸入數(shù)據(jù)塊 映射到中間狀態(tài)嫡良。然后將中間數(shù)據(jù)收集在一起并基于鍵值進(jìn)行分區(qū),以使所有相關(guān)值在一起献酗。最后寝受,分區(qū)數(shù)據(jù)減少到結(jié)果。
# multiprocessing_mapreduce.py
import collections
import itertools
import multiprocessing
class SimpleMapReduce:
def __init__(self, map_func, reduce_func, num_workers=None):
"""
map_func
Function to map inputs to intermediate data. Takes as
argument one input value and returns a tuple with the
key and a value to be reduced.
reduce_func
Function to reduce partitioned version of intermediate
data to final output. Takes as argument a key as
produced by map_func and a sequence of the values
associated with that key.
num_workers
The number of workers to create in the pool. Defaults
to the number of CPUs available on the current host.
"""
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)
def partition(self, mapped_values):
"""Organize the mapped values by their key.
Returns an unsorted sequence of tuples with a key
and a sequence of values.
"""
partitioned_data = collections.defaultdict(list)
for key, value in mapped_values:
partitioned_data[key].append(value)
return partitioned_data.items()
def __call__(self, inputs, chunksize=1):
"""Process the inputs through the map and reduce functions
given.
inputs
An iterable containing the input data to be processed.
chunksize=1
The portion of the input data to hand to each worker.
This can be used to tune performance during the mapping
phase.
"""
map_responses = self.pool.map(
self.map_func,
inputs,
chunksize=chunksize,
)
partitioned_data = self.partition(
itertools.chain(*map_responses)
)
reduced_values = self.pool.map(
self.reduce_func,
partitioned_data,
)
return reduced_values
下面的示例腳本使用 SimpleMapReduce 來計算本文的 reStructuredText 源中的“words”罕偎,忽略了一些標(biāo)記很澄。
# multiprocessing_wordcount.py
import multiprocessing
import string
from multiprocessing_mapreduce import SimpleMapReduce
def file_to_words(filename):
"""Read a file and return a sequence of
(word, occurences) values.
"""
STOP_WORDS = set([
'a', 'an', 'and', 'are', 'as', 'be', 'by', 'for', 'if',
'in', 'is', 'it', 'of', 'or', 'py', 'rst', 'that', 'the',
'to', 'with',
])
TR = str.maketrans({
p: ' '
for p in string.punctuation
})
print('{} reading {}'.format(
multiprocessing.current_process().name, filename))
output = []
with open(filename, 'rt') as f:
for line in f:
# Skip comment lines.
if line.lstrip().startswith('..'):
continue
line = line.translate(TR) # Strip punctuation
for word in line.split():
word = word.lower()
if word.isalpha() and word not in STOP_WORDS:
output.append((word, 1))
return output
def count_words(item):
"""Convert the partitioned data for a word to a
tuple containing the word and the number of occurences.
"""
word, occurences = item
return (word, sum(occurences))
if __name__ == '__main__':
import operator
import glob
input_files = glob.glob('*.rst')
mapper = SimpleMapReduce(file_to_words, count_words)
word_counts = mapper(input_files)
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()
print('\nTOP 20 WORDS BY FREQUENCY\n')
top20 = word_counts[:20]
longest = max(len(word) for word, count in top20)
for word, count in top20:
print('{word:<{len}}: {count:5}'.format(
len=longest + 1,
word=word,
count=count)
)
file_to_words()
函數(shù)將每個輸入文件轉(zhuǎn)換為包含單詞和數(shù)字1
(表示單個匹配項(xiàng))的元組序列。通過partition()
使用單詞作為鍵來劃分?jǐn)?shù)據(jù),因此得到的結(jié)構(gòu)由一個鍵和1
表示每個單詞出現(xiàn)的值序列組成甩苛。count_words()
在縮小階段蹂楣,分區(qū)數(shù)據(jù)被轉(zhuǎn)換為一組元組,其中包含一個單詞和該單詞的計數(shù)讯蒲。
$ python3 -u multiprocessing_wordcount.py
ForkPoolWorker-1 reading basics.rst
ForkPoolWorker-2 reading communication.rst
ForkPoolWorker-3 reading index.rst
ForkPoolWorker-4 reading mapreduce.rst
TOP 20 WORDS BY FREQUENCY
process : 83
running : 45
multiprocessing : 44
worker : 40
starting : 37
now : 35
after : 34
processes : 31
start : 29
header : 27
pymotw : 27
caption : 27
end : 27
daemon : 22
can : 22
exiting : 21
forkpoolworker : 21
consumer : 20
main : 18
event : 16
相關(guān)文檔:
https://pymotw.com/3/multiprocessing/index.html