雖然Python的多處理庫已經(jīng)成功地用于廣泛的應用程序中漂洋,但是在本文中,我們發(fā)現(xiàn)它不適合一些重要的應用程序類力喷,包括數(shù)值數(shù)據(jù)處理刽漂、狀態(tài)計算和具有昂貴初始化的計算。主要有兩個原因:
- 處理數(shù)字數(shù)據(jù)效率低下
- 無法在單獨的“任務”之間共享變量
本文將比較python原生多任務包multiprocessing弟孟,joblib包贝咙,以及ray包,在不同環(huán)境測試他們的并行性能
Ray是一個快速拂募、簡單的框架庭猩,用于構建和運行解決這些問題的分布式應用程序。有關一些基本概念的介紹陈症,請參閱本文蔼水。Ray利用Apache Arrow進行高效的數(shù)據(jù)處理,并為分布式計算提供任務和參與者抽象录肯。
Joblib是一組在Python中提供輕量級管道的工具趴腋,Joblib特別針對大數(shù)據(jù)進行了快速和健壯的優(yōu)化,并對numpy數(shù)組進行了特定的優(yōu)化论咏。
1.字符串并行處理
這里引入ray于样,joblib和multiprocessing Pool, 默認設定為8核運行
# 測試并行性能
import ray
ray.init(num_cpus=8)
import joblib
from multiprocessing import Pool
import pandas as pd
import numpy as np
接下來,我們生成一對長度為80w的字符串數(shù)據(jù), 統(tǒng)計相同位置字符的一致率潘靖。并統(tǒng)計三個包的并行穿剖,首先我們進行16次對比,都使用8核處理卦溢。
def compare_string(args):
string_1, string_2 = args
same = 0
for i in range(len(string_1)):
if string_1[i] == string_2[i]:
same += 1
return same
# ray版本的字符串對比, 只是加了一個修飾器
@ray.remote
def compare_string2(args):
string_1, string_2 = args
same = 0
for i in range(len(string_1)):
if string_1[i] == string_2[i]:
same += 1
return same
string_1 = ['0']*800000
string_2 = ['0']*800000
args = [[string_1, string_2] for i in range(16)] # 重復16次
# 把multiprocessing 測試包在一個函數(shù)中
def test_pool(func, args):
pool = Pool(8)
ret = pool.map(func, args)
pool.close()
pool.join()
return ret
# 測試multiprocessing pool
%timeit ret = test_pool(compare_string, args)
# 測試joblib Parallel 的loky模式(默認模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 測試joblib Parallel 的multiprocessing模式(多進程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 測試ray框架并行
%timeit ret = [compare_string2.remote(arg) for arg in args]
結果如下:
# multiprocessing pool平均時間
1.21 s ± 82 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均時間
42.9 s ± 418 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均時間
1.55 s ± 117 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均時間
835 ms ± 27.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
joblib loky模式直接pass掉糊余,用時太長了秀又,這里字符串對比一共有16次,我們增加10倍看一下其它三組的排名是否還是一致贬芥。
string_1 = ['0']*800000
string_2 = ['0']*800000
args = [[string_1, string_2] for i in range(160)] # 重復160次
# 測試multiprocessing pool
%timeit ret = test_pool(compare_string, args)
# 測試joblib Parallel 的multiprocessing模式(多進程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(compare_string)(arg) for arg in args)
# 測試ray框架并行
%timeit ret = [compare_string2.remote(arg) for arg in args]
# multiprocessing pool平均時間
2.92 s ± 84.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均時間
10.6 s ± 258 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均時間
7.92 s ± 241 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
當重復次數(shù)變多吐辙,并行數(shù)(8核)不變時,python原生multiprocessing pool反而更快蘸劈,因此如果是非數(shù)值計算昏苏,字符串統(tǒng)計還是建議使用python原生multiprocessing
2.數(shù)字并行處理
第二組對比我們進行純數(shù)字計算對比,這里我們測試計算斐波那契數(shù)列并行用時
def fib_loop(n):
a, b = 0, 1
for i in range(n + 1):
a, b = b, a + b
return a
# ray 版本
@ray.remote
def fib_loop2(n):
a, b = 0, 1
for i in range(n + 1):
a, b = b, a + b
return a
args = [10000]*1600 # 重復計算1600次威沫,每次計算n=10000的斐波那契數(shù)列
# 測試multiprocessing pool
%timeit ret = test_pool(fib_loop, args)
# 測試joblib Parallel 的loky模式(默認模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(fib_loop)(arg) for arg in args)
# 測試joblib Parallel 的multiprocessing模式(多進程模式)
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(fib_loop)(arg) for arg in args)
# 測試ray框架并行
%timeit ret = [fib_loop2.remote(arg) for arg in args]
# multiprocessing pool平均時間
742 ms ± 46.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均時間
782 ms ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均時間
608 ms ± 21.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# ray 平均時間
873 ms ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
可以看出 joblib 和 ray框架都對數(shù)值計算進行了優(yōu)化贤惯,joblib的multiprocessing最快,起始這里時間的差異更多的應該是進程通信的誤差
3.矩陣運算
上面只是使用了簡單的加法操作棒掠,這里使用scipy的矩陣運算孵构,看看三種框架對矩陣運算的優(yōu)化情況
import scipy.signal as s
def scipy_convolve2d(args):
image, random_filter = args
return s.convolve2d(image, random_filter)[::5, ::5]
# ray版本
@ray.remote
def scipy_convolve2d2(args):
image, random_filter = args
return s.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(8)]
# 并行參數(shù)直接打包為args列表
args = [(np.zeros((4000, 4000)), filters[i]) for i in range(8)]
# multiprocessing pool平均時間
%timeit ret = test_pool(scipy_convolve2d, args)
# joblib Parallel 的loky模式 平均時間
%timeit ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(scipy_convolve2d)(arg) for arg in args)
# joblib Parallel 的multiprocessing模式 平均時間
%timeit ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(scipy_convolve2d)(arg) for arg in args)
#ray平均時間
%timeit ret = ray.get([scipy_convolve2d2.remote(arg) for arg in args])
# multiprocessing pool平均時間
3.36 s ± 143 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的loky模式 平均時間
1.9 s ± 64.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# joblib Parallel 的multiprocessing模式 平均時間
1.38 s ± 45.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#ray平均時間
1.31 s ± 53.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
可以看出ray相對來說是最快的
4.共享內存
joblib和ray相較于原始python多進程的優(yōu)勢的另一個方面就是對內存的優(yōu)化摆尝,對于一個較大的數(shù)據(jù)累榜,我們只想要其中的一部分,joblib和ray都可以使用共享內存完成相應部分的計算茫舶,而不是每一個進程都放入一份獨立完整的數(shù)據(jù)雾袱。
我們使用ray和joblib恤筛,共享一份pd.DataFrame,并統(tǒng)計每列所有類型的出現(xiàn)次數(shù)
# 生成一份大內存的pd.DataFrame
zeros = np.zeros((10000,1000))
ones = np.ones((10000,1000))
df = pd.DataFrame(np.concatenate([zeros, ones], axis=0))
def value_counts(df, i):
return df.iloc[:,i].value_counts().to_dict()
# ray版本函數(shù)
@ray.remote
def value_counts2(df, i):
return df.iloc[:,i].value_counts().to_dict()
# joblib共享內存函數(shù)
import os
import tempfile
def memmap(data):
tmp_folder = tempfile.mkdtemp()
tmp_path = tmp_folder + '/joblib.mmap'
if os.path.exists(tmp_path): # 若存在則刪除
os.remove(tmp_path)
_ = joblib.dump(data, tmp_path)
memmap_data = joblib.load(tmp_path, mmap_mode='r+')
return memmap_data
shared_df = memmap(df) # joblib 的共享內存方法 shared_df就是共享內存的df
df_id = ray.put(df) # ray共享內存的方法(封裝好了更簡單一些)
# joblib Parallel 的loky模式
%time ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# joblib Parallel 的multiprocessing模式
%time ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# ray
%time ret = [value_counts2.remote(df_id, i) for i in range(df.shape[1])]
# joblib Parallel 的loky模式 跑一次時間
CPU times: user 596 ms, sys: 68 ms, total: 664 ms
Wall time: 1.23 s
# joblib Parallel 的multiprocessing 跑一次時間
CPU times: user 152 ms, sys: 64 ms, total: 216 ms
Wall time: 611 ms
# ray 跑一次時間
CPU times: user 352 ms, sys: 64 ms, total: 416 ms
Wall time: 784 ms
結果還是joblib的multiprocessing模式最快,不過時間差距應該不大
這里df實際上是個數(shù)值矩陣芹橡,如果將其變?yōu)樽址袷教厩危俣葧粫陆的?
我們將df維度降低變?yōu)?00維,數(shù)值類型變?yōu)閟tr
# pandas處理文件僻族,統(tǒng)計
zeros = np.zeros((10000,100))
ones = np.ones((10000,100))
df = pd.DataFrame(np.concatenate([zeros, ones], axis=0))
df = df.astype(str) # 轉為字符串
# joblib Parallel 的loky模式
%time ret = joblib.Parallel(n_jobs=8, backend='loky', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# joblib Parallel 的multiprocessing模式
%time ret = joblib.Parallel(n_jobs=8, backend='multiprocessing', verbose=0)(joblib.delayed(value_counts)(shared_data, i) for i in range(df.shape[1]))
# ray
%time ret = [value_counts2.remote(df_id, i) for i in range(df.shape[1])]
joblib Parallel 的loky模式 跑一次時間
CPU times: user 57.7 s, sys: 3.33 s, total: 1min 1s
Wall time: 1min 1s
# joblib Parallel 的multiprocessing模式 跑一次時間
CPU times: user 52.9 s, sys: 3.21 s, total: 56.1 s
Wall time: 56.7 s
# ray 跑一次時間
CPU times: user 256 ms, sys: 76 ms, total: 332 ms
Wall time: 4.7 s
令人吃驚的是將DataFrame int類型轉為str類型后,ray框架并行計算時間驚人的減少屡谐,很有可能在上次對比中數(shù)值類型并不能時間反映出兩種框架對共享內存的使用效率述么。
4.總結
總的來說這三個包在一些小人物中并行時間上面差異并不大
ray和joblib都對數(shù)值計算進行了優(yōu)化
在處理pandas共享數(shù)據(jù)時ray的優(yōu)勢更明顯