眾所周知滥比,Python中不存在真正的多線程,Python中的多線程是一個并發(fā)過程祷肯。如果想要并行的執(zhí)行程序,充分的利用cpu資源(cpu核心)疗隶,還是需要使用多進程解決的佑笋。其中multiprocessing模塊應該是Python中最常用的多進程模塊了。
創(chuàng)建進程
基本上multiprocessing這個模塊和threading這個模塊用法是相同的斑鼻,也是可以通過函數(shù)和類創(chuàng)建進程蒋纬。
""" 案例1:函數(shù)式創(chuàng)建進程 """
import multiprocessing
import time
# 進程執(zhí)行函數(shù)
def run(num):
time.sleep(1)
print(f'i am process{num}')
if __name__ == '__main__':
# 獲取開始的時間戳
start = time.time()
# 存放進程的列表,用于阻塞進程
process_list = list()
# 創(chuàng)建4個進程坚弱,并將每個創(chuàng)建好的進程對象放到process_list中
for i in range(1, 5):
process = multiprocessing.Process(target=run, args=(i,))
# 啟動該進程
process.start()
process_list.append(process)
# 只有進程全部結(jié)束颠锉,再向下執(zhí)行
for j in process_list:
j.join()
# 結(jié)束的時間戳
end = time.time()
# 打印該程序運行了幾秒
print(end-start)
# i am process1
# i am process2
# i am process3
# i am process4
# 1.122110366821289
上述案例基本上就是筆者搬用了上篇文章多線程的案例,可見其使用的相似之處史汗。導入multiprocessing后實例化Process就可以創(chuàng)建一個進程,參數(shù)的話也是和多線程一樣拒垃,target放置進程執(zhí)行函數(shù)停撞,args存放該函數(shù)的參數(shù)。
""" 案例2:類繼承創(chuàng)建進程 """
import multiprocessing
import time
# 繼承Process悼瓮,轉(zhuǎn)變?yōu)檫M程類
class MyProcess(multiprocessing.Process):
def __init__(self, process_id):
# 必須實現(xiàn)Process類的init方法
super().__init__()
self.process_id = process_id
# 重寫執(zhí)行函數(shù)
def run(self):
time.sleep(1)
print(f'i am process{self.process_id}')
if __name__ == '__main__':
# 獲取開始的時間戳
start = time.time()
# 存放進程的列表戈毒,用于阻塞進程
process_list = list()
# 創(chuàng)建4個進程,并將每個創(chuàng)建好的進程對象放到process_list中
for i in range(1, 5):
process = MyProcess(i)
# 啟動該進程
process.start()
process_list.append(process)
# 只有進程全部結(jié)束横堡,再向下執(zhí)行
for j in process_list:
j.join()
# 結(jié)束的時間戳
end = time.time()
# 打印該程序運行了幾秒
print(end - start)
# i am process1
# i am process2
# i am process3
# i am process4
# 1.1184189319610596
使用類來創(chuàng)建進程也是需要先繼承multiprocessing.Process并且實現(xiàn)其init方法埋市。
進程池
Pool可以提供指定數(shù)量的進程,供用戶調(diào)用命贴,當有新的請求提交到pool中時道宅,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求胸蛛。
但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值污茵,那么該請求就會等待,直到池中有進程結(jié)束葬项,才會創(chuàng)建新的進程泞当。
""" 案例3:進程池Pool """
import multiprocessing
import time
# 進程執(zhí)行函數(shù)
def func(msg):
print('process start...', msg)
time.sleep(3)
print('process end...')
if __name__ == '__main__':
print('ready~~~~~~~~~~~~~~~~~go~~~~~')
# 創(chuàng)建進程池,最大進程數(shù)量為3
pool = multiprocessing.Pool(processes=3)
for i in range(5):
msg = f'hello {i}'
# 以非阻塞的方式民珍,維持執(zhí)行的進程總數(shù)為processes并在進程結(jié)束后自動添加新的進程
pool.apply_async(func, (msg,))
# 阻止后續(xù)任務(wù)提交到進程池
pool.close()
# 等待工作進程結(jié)束
pool.join()
print('game~~~~~~~~~~~~~~~~~over~~~~~')
# ready~~~~~~~~~~~~~~~~~go~~~~~
# process start... hello 0
# process start... hello 1
# process start... hello 2
# process end...
# process start... hello 3
# process end...
# process end...
# process end...
# game~~~~~~~~~~~~~~~~~over~~~~~
需要注意的是襟士,在調(diào)用join方法阻塞進程前盗飒,需要先調(diào)用close方法,陋桂,否則程序會出錯逆趣。
在上述案例中,提到了非阻塞章喉,當把創(chuàng)建進程的方法換為pool.apply(func, (msg,))時汗贫,就會阻塞進程,出現(xiàn)下面的狀況秸脱。
# ready~~~~~~~~~~~~~~~~~go~~~~~
# process start... hello 0
# process end...
# process start... hello 1
# process end...
# process start... hello 2
# process end...
# process start... hello 3
# process end...
# process start... hello 4
# process end...
# game~~~~~~~~~~~~~~~~~over~~~~~
進程隊列
在multiprocessing模塊中還存在Queue對象何缓,這是一個進程的安全隊列录平,近似queue.Queue。隊列一般也是需要配合多線程或者多進程使用。
下列案例是一個使用進程隊列實現(xiàn)的生產(chǎn)者消費者模式遥缕。
""" 案例4:基于進程隊列的生產(chǎn)者消費者模式 """
import random
import time
import multiprocessing
# 生產(chǎn)者
def producer(queue):
for i in range(10):
time.sleep(random.randint(1, 3))
res = f'物品{i}'
# 入隊
queue.put(res)
print(f'{multiprocessing.current_process().name}生產(chǎn){res}')
# 消費者
def consumer(queue):
while True:
# 出隊
res = queue.get()
time.sleep(random.randint(1, 3))
print(f'{multiprocessing.current_process().name}消費{res}')
if __name__ == '__main__':
# 生產(chǎn)消費隊列
queue = multiprocessing.Queue()
# 創(chuàng)建生產(chǎn)進程
process1 = multiprocessing.Process(target=producer, args=(queue,))
# 進程名
process1.name = 'process_1'
# 創(chuàng)建消費進程
process2 = multiprocessing.Process(target=consumer, args=(queue,))
# 進程名
process2.name = 'process_2'
print('程序開始!8∩摇握巢!')
# 啟動進程
process1.start()
process2.start()
# 程序開始!5呵搿旭寿!
# process_1生產(chǎn)物品0
# process_1生產(chǎn)物品1
# process_2消費物品0
# process_1生產(chǎn)物品2
# process_2消費物品1
# process_1生產(chǎn)物品3
# ......
管道
multiprocessing支持兩種進程間的通信,其中一種便是上述案例的隊列崇败,另一種則稱作管道盅称。在官方文檔的描述中,multiprocessing中的隊列是基于管道實現(xiàn)的后室,并且擁有更高的讀寫效率缩膝。
管道可以理解為進程間的通道,使用Pipe([duplex])創(chuàng)建岸霹,并返回一個元組(conn1,conn2)疾层。如果duplex被置為True(默認值),那么該管道是雙向的贡避,如果duplex被置為False痛黎,那么該管道是單向的,即conn1只能用于接收消息刮吧,而conn2僅能用于發(fā)送消息舅逸。
其中conn1、conn2表示管道兩端的連接對象皇筛,每個連接對象都有send()和recv()方法琉历。send和recv方法分別是發(fā)送和接受消息的方法。例如,可以調(diào)用conn1.send發(fā)送消息旗笔,conn1.recv接收消息彪置。如果沒有消息可接收,recv方法會一直阻塞蝇恶。如果管道已經(jīng)被關(guān)閉拳魁,那么recv方法會拋出EOFError。
""" 案例5:管道 """
import multiprocessing
import random
import time
# 發(fā)送數(shù)據(jù)
def proc_send(pipe, data):
for d in data:
pipe.send(d)
print(f'{multiprocessing.current_process().name}發(fā)送了數(shù)據(jù)bdjrfrh')
time.sleep(random.random())
# 接收數(shù)據(jù)
def proc_recv(pipe):
while True:
print(f'{multiprocessing.current_process().name}接收了數(shù)據(jù){pipe.recv()}')
time.sleep(random.random())
if __name__ == '__main__':
# 創(chuàng)建管道
pipe1, pipe2 = multiprocessing.Pipe()
# 數(shù)據(jù)
data = [i for i in range(10)]
# 創(chuàng)建進程process1
process1 = multiprocessing.Process(target=proc_send, args=(pipe1, data))
process1.name = 'process_1'
# 創(chuàng)建進程process1
process2 = multiprocessing.Process(target=proc_recv, args=(pipe2,))
process2.name = 'process_2'
# 啟動進程
process1.start()
process2.start()
# 阻塞進程
process1.join()
process2.join()
# process_1發(fā)送了數(shù)據(jù)0
# process_2接收了數(shù)據(jù)0
# process_1發(fā)送了數(shù)據(jù)1
# process_2接收了數(shù)據(jù)1
# process_1發(fā)送了數(shù)據(jù)2
# process_2接收了數(shù)據(jù)2
# process_1發(fā)送了數(shù)據(jù)3
# process_2接收了數(shù)據(jù)3
# ......
關(guān)于multiprocessing模塊其實還有很多實用的類和方法撮弧,由于篇幅有限(懶),筆者就先寫到這里潘懊。該模塊其實用起來很像threading模塊,像鎖對象和守護線程(進程)等multiprocessing模塊也是有的贿衍,使用方法也近乎相同授舟。
如果想要更加詳細的了解multiprocessing模塊,請參考官方文檔贸辈。
# multiprocessing --- 基于進程的并行
https://docs.python.org/zh-cn/3/library/multiprocessing.html