因為python的GIL的問題, 一般來說對于計算密集型的代碼筹吐, 效率一邊如下: 多進(jìn)程 < 普通 < 多線程, 多進(jìn)程效率最高糖耸, 多線程由于切換context的原因, 反倒效率不佳丘薛。
對于一個reactive編程的死忠嘉竟, 用python多線程編程, 還沒有看完api洋侨, 就想到了用rxpy來實現(xiàn)如何呢周拐?結(jié)果官網(wǎng)上有這么一段話:
Keep in mind Python's GIL has the potential to undermine your concurrency performance, as it prevents multiple threads from accessing the same line of code simultaneously. Libraries like NumPy can mitigate this for parallel intensive computations as they free the GIL. RxPy may also minimize thread overlap to some degree. Just be sure to test your application with concurrency and ensure there is a performance gain.
大概意思就是rxpy能減輕GIL帶來的問題, 并且還是每次都要測試下你寫的代碼是得到了性能的優(yōu)化凰兑。
作為一個調(diào)庫君妥粟, python暫時不打算太深入的情況下, 我不準(zhǔn)備深究GIL的問題吏够, 以從根本知道哪些情況下會帶來效率提升勾给, 但是簡答的一個評測還是有必要的, OK锅知, let's try!
首先上測試代碼
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import multiprocessing as mp
import threading as td
import sys
from threading import current_thread
import time
from rx import Observable
from rx.concurrency import ThreadPoolScheduler
def intense_calculation(s):
temp = 0.8
for i in range(100000):
temp += (i ** 2 + i ** 0.3 + i ** 0.5) / (i ** 1.2 + i * 0.02 + 0.05)
return s
def do_20_calculation(name):
for i in range(20):
print("PROCESS {0} {1} {2} @ {3}".format(name, current_thread().name, intense_calculation(i),
time.time() - start_time)),
def proc_proces(name):
mp.Process(target=do_20_calculation, args=(name,)).start()
def proc_thread(name):
td.Thread(target=do_20_calculation, args=(name,)).start()
def proc_rx2(name, pool_scheduler):
# Observable.from_(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]) \
# .do_action(lambda s: print("begin PROCESS {0} {1} {2}".format(name, current_thread().name, s)))
for i in range(20):
Observable.just(i) \
.subscribe_on(pool_scheduler) \
.map(lambda s: intense_calculation(s)) \
.subscribe(on_next=lambda s: print("PROCESS {0} {1} {2}".format(name, current_thread().name, s)),
on_error=lambda e: print(e),
on_completed=lambda: print("PROCESS %s done! @ %.3f" % (name, time.time() - start_time)))
def proc_normal(name):
for i in range(20):
s = intense_calculation(i)
used_time = time.time() - start_time
print("PROCESS? {0} thread {1} task {2} done @ {3}".format(name, current_thread().name, s, used_time))
def main(type, proc_num, add_to_core_num):
global start_time
# calculate number of CPU's, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = mp.cpu_count()
print("has %d cpu cores" % optimal_thread_count)
pool_scheduler = ThreadPoolScheduler(optimal_thread_count + add_to_core_num)
start_time = time.time()
if type == "thread":
for i in range(proc_num):
proc_thread("%d" % i)
elif type == "process":
for i in range(proc_num):
proc_proces("%d" % i)
elif type == "rx":
for i in range(proc_num):
proc_rx2("%d" % i, pool_scheduler)
else:
for i in range(proc_num):
proc_normal("%d" % i)
print("end @ %.2f" % (time.time() - start_time))
input("Press any key to exit\n")
start_time = 0
if __name__ == "__main__":
main(sys.argv[1], int(sys.argv[2]), int(sys.argv[3]))
簡單解釋下功能播急, 就是根據(jù)輸入的type, 測試不同的調(diào)用方式售睹, 當(dāng)傳入的參數(shù)迭代8次的時候桩警,意味著單線程會循環(huán)8*20
次調(diào)用intense_calculation, process會啟動8個進(jìn)程昌妹,每個進(jìn)程調(diào)用20次捶枢, thread會啟動8個線程, 運行20次飞崖, rx比較特別烂叔, 它會啟動8個線程, 但是每個線程從線程池中獲取固歪, 線程池的大小為8.
為什么取8蒜鸡, 因為我測試的ubuntu機(jī)器正好8核(邏輯核)。
都傳入8次的時候牢裳, 結(jié)果如下:
線程調(diào)用方式 | 迭代此處 | 耗時(秒) |
---|---|---|
單線程循環(huán) | 8 | 21.92 |
rx方式 | 8 | 45.35 |
線程 | 8 | 45.23 |
進(jìn)程 | 8 | 3.1 |
rx | 16 | 90.854 |
線程 | 16 | 92.62 |
進(jìn)程 | 16 | 5.91 |
從上面的表格可以看出逢防,當(dāng)前的測試代碼下(cpu comsuming&same code been called), 基本來說rx還是不能避免thread的低效的問題蒲讯,基本延續(xù)了之前的結(jié)論忘朝。rxPy估計只是在線程更多的時候,并且線程調(diào)度頻繁的時候伶椿,使用的線程池有點點優(yōu)勢辜伟,知道沒有明顯降低性能, 還是值得一用的脊另。
結(jié)論
多進(jìn)程模式吊打多線程导狡, 多進(jìn)程真的能充分給你用多核的優(yōu)勢。
繼續(xù)測試使用多進(jìn)程進(jìn)行g(shù)uezli轉(zhuǎn)換的速度
ps: 這個才是我看python多線程&多進(jìn)程的初衷
附上測試代碼偎痛,簡單來說旱捧,啟動了cpu核數(shù)一樣的進(jìn)程, 每個進(jìn)程會獲取queue傳入的數(shù)據(jù)來進(jìn)行壓縮踩麦, 在ubuntu 16.04機(jī)器上枚赡,cpu: 8核 Intel(R) Xeon(R) CPU E5-2609 v2 @ 2.50GHz, 2個物理cpu的情況下谓谦, 壓縮23張 260*364大小的圖(其實是三輪贫橙, 所以到24張圖估計時間也一樣, 同時不同圖耗時可能不一樣)反粥,耗時74.70秒卢肃, 未調(diào)整guetzli的參數(shù)。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import multiprocessing as mp
import os
import sys
import threading as td
import time
from threading import current_thread
from rx import Observable
from rx.concurrency import ThreadPoolScheduler
def do_encode(path, out_path):
print("will process %s %s" % (path, os.getpid()))
os.system('guetzli ' + path + " " + out_path)
print("process done: %s" % path)
return path
def do_process(queue):
while True:
info = queue.get()
if info["type"] == "quit":
# print("{0} get quit command".format(os.getpid()))
break
elif info["type"] == "zipJpg":
print("{0} get zip jpeg".format(os.getpid()))
do_encode(info["path"], info["out_path"])
else:
print("{0} get wrong command".format(os.getpid()))
print("{0} quits".format(os.getpid()))
def main(path):
global start_time
# calculate number of CPU's, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = mp.cpu_count()
print("has %d cpu cores" % optimal_thread_count)
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
work_process = []
queue = mp.Queue(optimal_thread_count)
# 啟動線程
for i in range(optimal_thread_count):
p = mp.Process(target=do_process, args=(queue,))
work_process.append(p)
p.start()
pictures = []
out_pictures = []
out_dir = os.path.join(path, "out")
os.system("rm -rf %s" % out_dir)
os.system("mkdir %s" % out_dir)
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith(".jpg") or file.endswith(".png"):
pictures.append(os.path.join(root, file))
out_pictures.append(os.path.join(out_dir, file))
# for f in pictures:
# print(f)
def on_complete():
# 停止所有的進(jìn)程
for ii in range(optimal_thread_count):
print("send {0} to quit".format(ii))
queue.put({"type": "quit"})
print("onComplete")
def send_message(paths):
queue.put({"type": "zipJpg", "path": paths[0], "out_path": paths[1]})
return paths[0]
# 向進(jìn)程發(fā)送消息
Observable.from_(zip(pictures, out_pictures)).map(
lambda paths: send_message(paths)).subscribe_on(pool_scheduler).subscribe(
on_next=lambda s: print("send {0} on thread {1} done.".format(s, current_thread().name)),
on_error=lambda e: print("onError", e),
on_completed=on_complete)
start_time = time.time()
for p in work_process:
p.join()
print("complete all zip @ {0} secs".format(time.time() - start_time))
# exit(0) # 615secs 8times 76.875 per time. 74.70 run a time.
# input("Press any key to exit\n")
start_time = 0
if __name__ == "__main__":
main(sys.argv[1])