1. 多進程實現(xiàn)斐波那契
對于耗費cpu的操作,多進程由于多線程, 我拿斐波那契進行比較發(fā)現(xiàn)多進程速度遠遠高于多線程
# 多線程
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def fib(n):
if n<=2:
return 1
return fib(n-1)+fib(n-2)
if __name__ == "__main__":
with ThreadPoolExecutor(2) as executor:
all_task = [executor.submit(fib, (num)) for num in range(38)] # 39s
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data))
print("last time is: {}".format(time.time()-start_time))
# 多進程
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def fib(n):
if n<=2:
return 1
return fib(n-1)+fib(n-2)
if __name__ == "__main__":
with ProcessPoolExecutor(2) as executor:
all_task = [executor.submit(fib, (num)) for num in range(38)] # 19
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data))
print("last time is: {}".format(time.time()-start_time))
start_time = time.time()
2. 使用進程池
import multiprocessing
#多進程編程
import time
def get_html(n):
time.sleep(n)
print("sub_progress success")
return n
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())
#使用線程池**********************************************************
result = pool.apply_async(get_html, args=(3,))
#等待所有任務完成
pool.close()
pool.join()
print(result.get()) # sub_progress success
#3
# ********************************************************************
# imap 會按順序打印
for result in pool.imap(get_html, [1,5,3]):
print("{} sleep success".format(result))
# sub_progress success
# 1 sleep success
# sub_progress success
# sub_progress success
# 5 sleep success
# 3 sleep success
# ********************************************************************
# imap_unordered 按時間順序打印
for result in pool.imap_unordered(get_html, [1,5,3]):
print("{} sleep success".format(result))
# sub_progress success
# 1 sleep success
# sub_progress success
# 3 sleep success
# sub_progress success
# 5 sleep success
3. 進程使用Queue
進程的Queue在multiprocessing下才好用
import time
from multiprocessing import Process, Queue, Manager, Pool, Pipe
def producer(queue):
queue.put("a")
time.sleep(2)
def consumer(queue):
time.sleep(2)
data = queue.get()
print(data)
if __name__=="__main__":
queue = Queue(10)
my_producter = Process(target=producer, args=(queue,))
my_consumer = Process(target=consumer, args=(queue,))
my_producter.start()
my_consumer.start()
my_producter.join()
my_consumer.join() # a
4. 共享變量不適用于多進程
#共享全局變量不能適用于多進程編程屡谐,可以適用于多線程
import time
from multiprocessing import Process, Queue, Manager, Pool, Pipe
def producer(a):
a += 100
time.sleep(2)
def consumer(a):
time.sleep(2)
print(a)
if __name__ == "__main__":
a = 1
my_producer = Process(target=producer, args=(a,))
my_consumer = Process(target=consumer, args=(a,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
# 1
5. 通過pipe實現(xiàn)進程間通信
# pipe性能高于queue
def producer(pipe):
pipe.send("fz")
def consumer(pipe):
print(pipe.recv())
if __name__== "__main__":
receice_pipe, send_pipe = Pipe()
# pipe只能適用于兩個進程
my_producer = Process(target=producer, args=(send_pipe,))
my_consumer = Process(target=consumer, args=(receice_pipe,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
# fz
6. 兩個進程共同操作一個dict
除了dict managers包下包含了所有python的類型可以選擇
import time
from multiprocessing import Process, Queue, Manager, Pool, Pipe
def add_data(p_dict, key, value):
p_dict[key]=value
if __name__=="__main__":
progress_dict = Manager().dict()
first_progress = Process(target=add_data, args=(progress_dict, "fz",21))
second_progress = Process(target=add_data, args=(progress_dict, "xk",19))
first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join()
print(progress_dict)