多進程編程
multiprocessing
由于 GIL(全局解釋鎖) 的原因, 多線程并不能充分利用多核處理器, 如果是一個 CPU 計算型的任務, 應該使用多進程模塊 multiprocessing, 它的工作方式與線程庫不同, 但是兩種庫的接口相似。multiprocessing 給每個進程賦予了單獨的 Python 解釋器, 這樣就規(guī)避了 GIL 所帶來的問題施符。
import multiprocessing
def worker():
print('Worker')
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
# 輸出:
Worker
Worker
Worker
Worker
Worker
目標函數(shù)也支持傳入?yún)?shù):
import multiprocessing
def worker(num):
print(f'Worker: {num}')
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, ))
jobs.append(p)
p.start()
# 輸出:
Worker: 0
Worker: 1
Worker: 3
Worker: 2
Worker: 4
守護進程
和多線程模塊一樣, 多進程也是可以設置守護進程的, 守護進程可以一直運行而不影響主程序的結(jié)束。
import multiprocessing
import time
def daemon():
p = multiprocessing.current_process() # 獲取當前的進程
print(f'Starting: {p.name} {p.pid}')
time.sleep(2)
print(f'Exiting: {p.name} {p.pid}')
def non_daemon():
p = multiprocessing.current_process()
print(f'Starting: {p.name} {p.pid}')
print(f'Exiting: {p.name} {p.pid}')
if __name__ == '__main__':
d = multiprocessing.Process(
name = 'daemon',
target = daemon,
daemon=True
)
n = multiprocessing.Process(
name = 'non-daemon',
target = non_daemon,
)
d.start()
time.sleep(1)
n.start()
# 輸出:
Starting: daemon 7068
Starting: non-daemon 10116
Exiting: non-daemon 10116
通過上面代碼的輸出可以看到, 守護進程沒有完成程序就結(jié)束了埂奈。如果需要等待守護進程完成工作后再結(jié)束, 可以使用 join()
方法:
import multiprocessing
import time
def daemon():
p = multiprocessing.current_process() # 獲取當前的進程
print(f'Starting: {p.name} {p.pid}')
time.sleep(2)
print(f'Exiting: {p.name} {p.pid}')
def non_daemon():
p = multiprocessing.current_process()
print(f'Starting: {p.name} {p.pid}')
print(f'Exiting: {p.name} {p.pid}')
if __name__ == '__main__':
d = multiprocessing.Process(
name = 'daemon',
target = daemon,
daemon=True
)
n = multiprocessing.Process(
name = 'non-daemon',
target = non_daemon,
)
d.start()
time.sleep(1)
n.start()
d.join()
n.join()
# 輸出:
Starting: daemon 2928
Starting: non-daemon 13376
Exiting: non-daemon 13376
Exiting: daemon 2928
join() 方法接收一個超時的參數(shù), 默認是 none, 表示會一直阻塞, 可以設置一個超時時間, 在這個時間內(nèi)沒有完成, 就會阻塞:
import multiprocessing
import time
def daemon():
p = multiprocessing.current_process() # 獲取當前的進程
print(f'Starting: {p.name} {p.pid}')
time.sleep(2)
print(f'Exiting: {p.name} {p.pid}')
def non_daemon():
p = multiprocessing.current_process()
print(f'Starting: {p.name} {p.pid}')
print(f'Exiting: {p.name} {p.pid}')
if __name__ == '__main__':
d = multiprocessing.Process(
name = 'daemon',
target = daemon,
daemon=True
)
n = multiprocessing.Process(
name = 'non-daemon',
target = non_daemon,
)
d.start()
time.sleep(1)
n.start()
d.join(1) # 傳入的是 1 秒, 小于守護進程里面設置的兩秒
print('d.is_alive()', d.is_alive()) # 得到進程當前的狀態(tài)
n.join()
# 輸出:
Starting: daemon 7272
Starting: non-daemon 5872
Exiting: non-daemon 5872
d.is_alive() True
進程池
任務的執(zhí)行周期決定了 CPU 核數(shù)和任務的分配算法, 使用多進程編程 Pool 是一個很靈活的保證效率的方法:
from functools import lru_cache
from multiprocessing import Pool
# lur_cache 裝飾器使用最近最少使用算法
# 會把最近使用的對象存儲起來, 并把最近最少使用的對象在緩存值達到預設值之前從內(nèi)存中移除
# 這里 maxsize 設置為 None, 表示永遠都不會達到這個預設值
# lru_cache 這個裝飾器適合把耗時的函數(shù)的執(zhí)行結(jié)果保存起來, 避免傳入相同的參數(shù)時重復計算
@lru_cache(maxsize=None)
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
if __name__ == '__main__':
# 在 windows 下運行會造成 RuntimeError, 應該使用 __name__ == '__main__' 來保護程序的入口點
pool = Pool(2)
pool.map(fib, [35] * 2)
dummy
multiprocessing.dummy
這個子模塊雖然在多進程模塊的代碼中, 但是接口和多線程的接口基本是一樣的, 如果分不清一個任務是 CPU 密集型還是 I/O 密集型, 可以使用如下方法去試:
from multiprocessing import Pool
from multiprocessing.dummy import Pool
這種兼容的方式, 便于在多線程/多進程之間切換。
Queue(隊列)
多線程里面有 Queue 模塊實現(xiàn)隊列, 多進程的 multiprocessing 里面包含了 Queue 這個類, 它是線程和進程安全的定躏。下面是一個生產(chǎn)者/消費者的例子, 用到了兩個隊列, 一個隊列用于存儲完成的任務, 另外一個用于存儲任務完成后的結(jié)果:
import time
from multiprocessing import Process, JoinableQueue, Queue
from random import random
def double(n):
return n * 2
def producer(in_queue):
while 1:
wait_time = random()
time.sleep(wait_time)
in_queue.put((double, wait_time))
if wait_time > 0.9:
in_queue.put(None)
print('停止生產(chǎn)')
break
def consumer(in_queue, out_queue):
while 1:
task = in_queue.get()
if task is None:
break
func, arg = task
result = func(arg)
in_queue.task_done()
out_queue.put(result)
if __name__ == '__main__':
tasks_queue = JoinableQueue() # JoinableQueue 里面有 join() 方法和 task_done() 方法, 而 Queue 內(nèi)沒有, tasks_queue 可以用這兩個方法來標識任務是否完成以決定要不要阻塞下去
results_queue = Queue()
processes = []
p = Process(target=producer, args=(tasks_queue, ))
p.start()
processes.append(p)
p = Process(target=consumer, args=(tasks_queue, results_queue))
p.start()
processes.append(p)
tasks_queue.join()
for p in processes:
p.join()
while True:
if results_queue.empty():
break
result = results_queue.get()
print(f'Result: {result}')
# 輸出:
停止生產(chǎn)
Result: 1.3500119015795484
Result: 1.7651301976930043
Result: 1.6336519677702004
Result: 0.06429843269363
Result: 0.29352347406759494
Result: 1.0097954936153397
Result: 0.19863644698178606
Result: 0.9589181928209678
Result: 1.4618869426710388
Result: 1.6837862156424794
Result: 0.8653351112396082
Result: 1.5958573192798793
Result: 0.15849993035736087
Result: 1.3471427672620973
Result: 1.7492282062851205
Result: 0.27695109993667644
Result: 0.7201581558818728
Result: 1.9614106580291402
進程間共享狀態(tài)
multiprocessing 提供了在進程之間共享狀態(tài)的方案, 主要有兩種: 共享內(nèi)存
和 服務器進程
账磺。
共享內(nèi)存
共享內(nèi)存主要通過 Value 和 Array 來實現(xiàn), 在多個進程之間共享一份數(shù)據(jù), 常見的共享類型有下面這些:
>>> from multiprocessing.sharedctypes import typecode_to_type
>>> typecode_to_type
{'c': <class 'ctypes.c_char'>, # 左邊是縮寫, 右邊是全稱
'u': <class 'ctypes.c_wchar'>,
'b': <class 'ctypes.c_byte'>,
'B': <class 'ctypes.c_ubyte'>,
'h': <class 'ctypes.c_short'>,
'H': <class 'ctypes.c_ushort'>,
'i': <class 'ctypes.c_long'>,
'I': <class 'ctypes.c_ulong'>,
'l': <class 'ctypes.c_long'>,
'L': <class 'ctypes.c_ulong'>,
'f': <class 'ctypes.c_float'>,
'd': <class 'ctypes.c_double'>}
下面是一個共享內(nèi)存的實現(xiàn)示例:
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_bool, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, b, s, arr, A):
n.value **= 2
b.value = True
s.value = s.value.upper()
arr[0] = 10
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7) # 這里使用的共享類型的縮寫
b = Value(c_bool, False, lock=False)
s = Array('c', b'hello world', lock=lock)
arr = Array('i', range(5), lock=True)
A = Array(Point, [(1.525, -6.25), (-5.75, 2.5)], lock=lock)
p = Process(target=modify, args=(n, b, s, arr, A)) # 通過 modify() 把這些內(nèi)容的值更改
p.start()
p.join()
print(n.value)
print(b.value)
print(s.value)
print(arr[:])
print([(a.x, a.y) for a in A])
# 輸出
49
True
b'HELLO WORLD'
[10, 1, 2, 3, 4]
[(2.3256249999999996, 39.0625), (33.0625, 6.25)]
服務器進程
一個 multiprocessing 的 Manager
對象會控制一個服務器的進程, 其他進程可以通過代理的方式來訪問這個服務器進程, 常見的共享方式有以下幾種:
Namespace
: 創(chuàng)建一個可分享的命名空間。Value/Array
: 和共享內(nèi)存中共享 ctypes 對象的方式一樣痊远。dict/list
: 創(chuàng)建一個可分享的 dict/list, 支持對應數(shù)據(jù)結(jié)構(gòu)的方法垮抗。Condition/Event/Lock/Queue/Semaphore
: 創(chuàng)建一個可分享的對應同步原語的對象。
from multiprocessing import Manager, Process
def modify(ns, lproxy, dproxy):
ns.a **= 2
lproxy.extend(['b', 'c'])
dproxy['b'] = 1
if __name__ == '__main__':
manager = Manager()
ns = manager.Namespace() # 創(chuàng)建一個命名空間
ns.a = 1
lproxy = manager.list()
lproxy.append('a')
dproxy = manager.dict()
dproxy['b'] = 0
p = Process(target=modify, args=(ns, lproxy, dproxy))
p.start()
print(f'PID: {p.pid}')
p.join()
print(ns.a)
print(lproxy)
print(dproxy)
# 輸出:
PID: 6556
1
['a', 'b', 'c']
{'b': 1}
分布式的進程間通信
使用 manager 可以實現(xiàn)一個簡單的分布式的不同服務器之間不同進程的通信, 也就是一個簡單的 client/server 模型:
# remote_server.py
from multiprocessing.managers import BaseManager
class RemoteManager(BaseManager):
pass
if __name__ == '__main__':
host = '127.0.0.1' # 主機名
port = 5000 # 端口
authkey = b'secret' # 驗證 key
shared_list = [] # 分享列表
RemoteManager.register('get_list', callable=lambda: shared_list) # 遠程服務器支持 get_list 方法, 可以獲得 shared_list 的值
mgr = RemoteManager(address=(host, port), authkey=authkey)
server = mgr.get_server()
server.serve_forever()
# client.py
from multiprocessing.managers import BaseManager
class RemoteManager(BaseManager):
pass
if __name__ == '__main__':
host = '127.0.0.1'
port = 5000
authkey = b'secret'
RemoteManager.register('get_list')
mgr = RemoteManager(address=(host, port), authkey=authkey)
mgr.connect()
l = mgr.get_list() # 拿到分享的 list
print(l) # 一開始 l 為 空
l.append(1) # 添加一個值 1
print(mgr.get_list())
> python remote_server.py
> python client.py
[]
[1]