多進(jìn)程和多線程的區(qū)別
Python多線程的操作熔吗,由于有GIL鎖的存在镇饺,使得其運(yùn)行效率并不會很高匙睹,無法充分利用 多核cpu 的優(yōu)勢,只有在I/O密集形的任務(wù)邏輯中才能實(shí)現(xiàn)并發(fā)瑟由。
使用多進(jìn)程來編寫同樣消耗cpu(一般是計(jì)算)的邏輯媒鼓,對于 多核cpu 來說效率會好很多。
操作系統(tǒng)對進(jìn)程的調(diào)度代價(jià)要比線程調(diào)度要大的多。
多線程和多進(jìn)程使用案例對比
1.用多進(jìn)程和多線程兩種方式來運(yùn)算 斐波那契數(shù)列绿鸣,這里都依賴 concurrent.futures 模塊提供的線/進(jìn)程池疚沐。
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
def fib(n):
return 1 if n <= 2 else fib(n-1) + fib(n-2)
if __name__ == '__main__':
# with ProcessPoolExecutor(3) as executor:
with ThreadPoolExecutor(3) as executor:
all_task = [executor.submit(fib, n) for n in range(25, 35)]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
# todo
end_time = time.time()
print("time consuming by threads: {0}s".format(end_time-start_time))
# print("time consuming by processes: {0}s".format(end_time-start_time))
兩種方式的運(yùn)行結(jié)果對比:
# result:
# time consuming by threads: 4.823292016983032s
# time consuming by processes: 3.3890748023986816s
可以看到,對于高計(jì)算量的任務(wù)潮模,多進(jìn)程要比多線程更加高效亮蛔。同時(shí),從這個(gè)例子中還能看出擎厢,通過concurrent.futures模塊使用線程池和進(jìn)程池的方式的接口和使用邏輯是一樣的究流,不過在使用多進(jìn)程時(shí),對于Windows的操作平臺动遭,相關(guān)邏輯一定要放在main中芬探,Linux不受約束。
2.用多進(jìn)程和多線程兩種方式來模擬 I/O密集操作厘惦,I/O操作 的特點(diǎn)就是 cpu 要耗費(fèi)大量的時(shí)間進(jìn)行等待數(shù)據(jù)偷仿,這里用sleep()進(jìn)行模擬即可。
整體的操作方式不變宵蕉,修改過的邏輯如下:
def random_sleep(n):
time.sleep(n)
return n
...
# 8 個(gè)線程酝静,每個(gè)休眠兩秒,模擬 I/O
with ProcessPoolExecutor(8) as executor:
# with ThreadPoolExecutor(8) as executor:
all_task = [executor.submit(random_sleep, 2) for i in range(30)]
# result:
# time consuming by threads: 8.002903699874878s
# time consuming by processes: 8.34946894645691s
多進(jìn)程編程
直接使用
import time
import multiprocessing
def read(times):
time.sleep(times)
print("process reading...")
return "read for {0}s".format(times)
def write(times):
time.sleep(times)
print("process writing...")
return "write for {0}s".format(times)
if __name__ == '__main__':
read_process = multiprocessing.Process(target=read, args=(1,))
write_process = multiprocessing.Process(target=write, args=(2,))
read_process.start()
write_process.start()
print("read_process id {rid}".format(rid=read_process.pid))
print("write_process id {wid}".format(wid=write_process.pid))
read_process.join()
write_process.join()
print("done")
# result:
# read_process id 7064
# write_process id 836
# process reading...
# process writing...
# done
可以看出羡玛,關(guān)于多線程的邏輯和多線程的使用方式以類似的别智,要注意在Windows操作系統(tǒng)上,和進(jìn)程有關(guān)的邏輯要寫在if __name__ == '__main__'中稼稿。其他的一些方法請參閱 官方文檔薄榛。
使用原生進(jìn)程池
import time
import multiprocessing
def read(times):
time.sleep(times)
print("process reading...")
return "read for {0}s".format(times)
def write(times):
time.sleep(times)
print("process writing...")
return "write for {0}s".format(times)
if __name__ == '__main__':
# multiprocessing.cpu_count() 獲取cpu的核心數(shù)
pool = multiprocessing.Pool(multiprocessing.cpu_count())
read_result = pool.apply_async(read, args=(2,))
write_result = pool.apply_async(write, args=(3,))
# 關(guān)閉進(jìn)程池,不再接受新的任務(wù)提交让歼,否則 join() 出錯(cuò)
pool.close()
# 等待進(jìn)程池中提交的所有任務(wù)完成
pool.join()
print(read_result.get())
print(write_result.get())
# result:
# process reading...
# process writing...
# read for 2s
# write for 3s
使用imap()敞恋,所有任務(wù)順序執(zhí)行:
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for result in pool.imap(read, [2, 1, 3]):
print(result)
# result:
# process reading...
# process reading...
# read for 2s
# read for 1s
# process reading...
# read for 3s
使用imap_unordered(),哪個(gè)任務(wù)先完成就先返回結(jié)果:
for result in pool.imap_unordered(read, [1, 5, 3]):
print(result)
# process reading...
# read for 1s
# process reading...
# read for 3s
# process reading...
# read for 5s
使用concurrent.futures中的ProcessPoolExecutor
這個(gè)在多線程和多進(jìn)程對比的時(shí)提到過是越,因?yàn)楹投嗑€程的使用方式一樣,這里就不多贅述碌上,可以參閱 官方文檔 給出的例子
進(jìn)程間通信
進(jìn)程通信和線程通信有些區(qū)別倚评,在線程通信中各種提供的鎖的機(jī)制和全局變量在這里不再適用,我們要選取新的工具來完成進(jìn)程通信任務(wù)馏予。
使用multiprocessing.Queue
使用邏輯是和多線程中的Queue是一樣的天梧,詳細(xì)方法。這種通信方式不能用在通過Pool進(jìn)程池創(chuàng)建的進(jìn)程中
import multiprocessing
import time
def plus(queue):
for i in range(6):
num = queue.get() + 1
queue.put(num)
print(num)
time.sleep(1)
def subtract(queue):
for i in range(6):
num = queue.get() - 1
queue.put(num)
print(num)
time.sleep(2)
if __name__ == '__main__':
queue = multiprocessing.Queue(1)
queue.put(0)
plus_process = multiprocessing.Process(target=plus, args=(queue,))
subtract_process = multiprocessing.Process(target=subtract, args=(queue,))
plus_process.start()
subtract_process.start()
# result:
# 1
# 1
# 2
# 2
# 3
# 3
# 0
# 1
# 2
# 2
# 1
# 0
使用Manager()中的Queue
Manager()會返回一個(gè)在進(jìn)程間進(jìn)行同步管理的一個(gè)對象霞丧,它提供了多種在進(jìn)程間共享數(shù)據(jù)的形式呢岗。
import multiprocessing
import time
def plus(queue):
for i in range(6):
num = queue.get() + 1
queue.put(num)
print(num)
time.sleep(1)
def subtract(queue):
for i in range(6):
num = queue.get() - 1
queue.put(num)
print(num)
time.sleep(2)
if __name__ == '__main__':
queue = multiprocessing.Manager().Queue(1) # 創(chuàng)建方式有些奇特
# queue = multiprocessing.Queue() # 這時(shí)用這個(gè)就行不通了
pool = multiprocessing.Pool(2)
queue.put(0)
pool.apply_async(plus, args=(queue,))
pool.apply_async(subtract, args=(queue,))
pool.close()
pool.join()
# result:
# 0
# 1
# 1
# 2
# 2
# 3
# -1
# 0
# 1
# 2
# 1
# 0
使用Manager()中的list()
多個(gè)進(jìn)程可以共享全局的list,因?yàn)槭沁M(jìn)程間共享,所以用鎖的機(jī)制保證它的安全性后豫。這里的Manager().Lock不是前面線程級別的Lock悉尾,它可以保證進(jìn)程間的同步。
import multiprocessing as mp
import time
def add_person(waiting_list, name_list, lock):
lock.acquire()
for name in name_list:
waiting_list.append(name)
time.sleep(1)
print(waiting_list)
lock.release()
def get_person(waiting_list, lock):
lock.acquire()
if waiting_list:
name = waiting_list.pop(0)
print("get {0}".format(name))
lock.release()
if __name__ == '__main__':
waiting_list = mp.Manager().list()
lock = mp.Manager().Lock() # 使用 lock 限制進(jìn)程對全局量的訪問
name_list = ["MetaTian", "Rity", "Anonymous"]
add_process = mp.Process(target=add_person, args=(waiting_list, name_list, lock))
get_process = mp.Process(target=get_person, args=(waiting_list, lock))
add_process.start()
get_process.start()
add_process.join()
get_process.join()
print(waiting_list)
# result:
# ['MetaTian']
# ['MetaTian', 'Rity']
# ['MetaTian', 'Rity', 'Anonymous']
# get MetaTian
# ['Rity', 'Anonymous']
Manager()中還有更多的進(jìn)程間通信的工具挫酿,可以參閱官方文檔构眯。
使用Pipe
Pipe只能適用于兩個(gè)進(jìn)程間的通信,它的性能高于Queue早龟,Pipe()會返回兩個(gè)Connection對象惫霸,使用這個(gè)對象可以在進(jìn)程間進(jìn)行數(shù)據(jù)的發(fā)送和接收,非常像前面講過的socket對象葱弟。關(guān)于Connection
import multiprocessing
def plus(conn):
default_num = 0
for i in range(3):
num = 0 if i == 0 else conn.recv()
conn.send(num + 1)
print("plus send: {0}".format(num+1))
def subtract(conn):
for i in range(3):
num = conn.recv()
conn.send(num-1)
print("subtract send: {0}".format(num-1))
if __name__ == '__main__':
conn_plus, conn_sbtract = multiprocessing.Pipe()
plus_process = multiprocessing.Process(target=plus, args=(conn_plus,))
subtract_process = multiprocessing.Process(target=subtract, args=(conn_sbtract,))
plus_process.start()
subtract_process.start()
# result:
# plus send: 1
# subtract send: 0
# plus send: 1
# subtract send: 0
# plus send: 1
# subtract send: 0
send()可以連續(xù)發(fā)送數(shù)據(jù)壹店,recv()將另一端發(fā)送的數(shù)據(jù)陸續(xù)取出,如果沒有取到數(shù)據(jù)芝加,則進(jìn)入等待狀態(tài)硅卢。
注:喜歡python + qun:839383765 可以獲取Python各類免費(fèi)最新入門學(xué)習(xí)資料!