PySpark 通過(guò)Arrow加速

前言

PySpark是Spark 實(shí)現(xiàn) Unify BigData && Machine Learning目標(biāo)的基石之一芍耘。通過(guò)PySpark,我們可以用Python在一個(gè)腳本里完成數(shù)據(jù)加載立莉,處理茉帅,訓(xùn)練,預(yù)測(cè)等完整Pipeline,加上DB良好的notebook的支持捍歪,數(shù)據(jù)科學(xué)家們會(huì)覺(jué)得非常開心负溪。當(dāng)然缺點(diǎn)也是有的,就是帶來(lái)了比較大的性能損耗妆棒。

性能損耗點(diǎn)分析

如果使用PySpark,大概處理流程是這樣的(注意,這些都是對(duì)用戶透明的)

  1. python通過(guò)socket調(diào)用Spark API(py4j完成)沸伏,一些計(jì)算邏輯糕珊,python會(huì)在調(diào)用時(shí)將其序列化,一并發(fā)送給Spark毅糟。
  2. Spark 觸發(fā)計(jì)算红选,比如加載數(shù)據(jù),然后把數(shù)據(jù)轉(zhuǎn)成內(nèi)部存儲(chǔ)格式InternalRow,接著啟動(dòng)Python Deamon, Python Deamon再啟動(dòng)多個(gè)Worker,
  3. 數(shù)據(jù)通過(guò)socket協(xié)議發(fā)送給Python Worker(不跨網(wǎng)絡(luò))姆另,期間需要將InternalRow轉(zhuǎn)化為Java對(duì)象喇肋,然后再用Java Pickle進(jìn)行序列化(一次),這個(gè)時(shí)候才能通過(guò)網(wǎng)絡(luò)發(fā)送給Worker
  4. Worker接收后迹辐,一條一條反序列化(python pickle蝶防,兩次),然后轉(zhuǎn)化為Python對(duì)象進(jìn)行處理。拿到前面序列化好的函數(shù)反序列化明吩,接著用這個(gè)函數(shù)對(duì)這些數(shù)據(jù)處理间学,處理完成后,再用pickle進(jìn)行序列化(三次)印荔,發(fā)送給Java Executor.
  5. Java Executor獲取數(shù)據(jù)后低葫,需要反序列化(四次),然后轉(zhuǎn)化為InternalRow繼續(xù)進(jìn)行處理躏鱼。

所以可以看到氮采,前后需要四次編碼/解碼動(dòng)作殷绍。序列化反序列化耗時(shí)應(yīng)該占用額外耗時(shí)的70%左右染苛。我們說(shuō),有的時(shí)候把序列化框架設(shè)置為Kyro之后,速度明顯快了很多茶行,可見序列化的額外耗時(shí)是非常明顯的躯概。

前面是一個(gè)點(diǎn),第二個(gè)點(diǎn)是畔师,數(shù)據(jù)是按行進(jìn)行處理的娶靡,一條一條,顯然性能不好看锉。

第三個(gè)點(diǎn)是姿锭,Socket協(xié)議通訊其實(shí)還是很快的,而且不跨網(wǎng)絡(luò)伯铣,只要能克服前面兩個(gè)問(wèn)題呻此,那么性能就會(huì)得到很大的提升。 另外可以跟大家說(shuō)的是腔寡,Python如果使用一些C庫(kù)的擴(kuò)展焚鲜,比如Numpy,本身也是非常快的放前。

如何開啟Arrow進(jìn)行加速忿磅,以及背后原理

開啟方式很簡(jiǎn)單,啟動(dòng)時(shí)加上一個(gè)配置即可:

if __name__ == '__main__':
    conf = SparkConf()
    conf.set("spark.sql.execution.arrow.enabled", "true")

你也可以在submit命令行里添加凭语。

那么Arrow是如何加快速度的呢葱她?主要是有兩點(diǎn):

  1. 序列化友好
  2. 向量化

序列化友好指的是,Arrow提供了一個(gè)內(nèi)存格式似扔,該格式本身是跨應(yīng)用的览效,無(wú)論你放到哪,都是這個(gè)格式虫几,中間如果需要網(wǎng)絡(luò)傳輸這個(gè)格式锤灿,那么也是序列化友好的,只要做下格式調(diào)整(不是序列化)就可以將數(shù)據(jù)發(fā)送到另外一個(gè)應(yīng)用里辆脸。這樣就大大的降低了序列化開銷但校。

向量化指的是,首先Arrow是將數(shù)據(jù)按block進(jìn)行傳輸?shù)姆惹猓浯问强梢詫?duì)立面的數(shù)據(jù)按列進(jìn)行處理的状囱。這樣就極大的加快了處理速度。

實(shí)測(cè)效果

為了方便測(cè)試倘是,我定義了一個(gè)基類:

from pyspark import SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os

os.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3"

class _SparkBase(object):
    @classmethod
    def start(cls, conf=SparkConf()):
        cls.sc = SparkContext(master='local[*]', appName=cls.__name__, conf=conf)
        cls.sql = SQLContext(cls.sc)
        cls.session = SparkSession.builder.getOrCreate()
        cls.dataDir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"

    @classmethod
    def shutdown(cls):
        cls.session.stop()
        cls.session = None
        cls.sc.stop()
        cls.sc = None

接著提供了一個(gè)性能測(cè)試輔助類:

import time
from functools import wraps
import logging

logger = logging.getLogger(__name__)

PROF_DATA = {}


def profile(fn):
    @wraps(fn)
    def with_profiling(*args, **kwargs):
        start_time = time.time()

        ret = fn(*args, **kwargs)

        elapsed_time = time.time() - start_time

        if fn.__name__ not in PROF_DATA:
            PROF_DATA[fn.__name__] = [0, []]
        PROF_DATA[fn.__name__][0] += 1
        PROF_DATA[fn.__name__][1].append(elapsed_time)

        return ret

    return with_profiling


def print_prof_data(clear):
    for fname, data in PROF_DATA.items():
        max_time = max(data[1])
        avg_time = sum(data[1]) / len(data[1])
        logger.warn("Function %s called %d times. " % (fname, data[0]))
        logger.warn('Execution time max: %.3f, average: %.3f' % (max_time, avg_time))
    if clear:
        clear_prof_data()


def clear_prof_data():
    global PROF_DATA
    PROF_DATA = {}

很簡(jiǎn)單亭枷,就是wrap一下實(shí)際的函數(shù),然后進(jìn)行時(shí)間計(jì)算〔笳福現(xiàn)在叨粘,我們寫一個(gè)PySpark的類:

import logging
from random import Random

import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql.types import *

from example.allwefantasy.base.spark_base import _SparkBase
import example.allwefantasy.time_profile as TimeProfile
import pandas as pd

logger = logging.getLogger(__name__)
class PySparkOptimize(_SparkBase):
    def trick1(self):   
        pass 

if __name__ == '__main__':
    conf = SparkConf()
    conf.set("spark.sql.execution.arrow.enabled", "true")
    PySparkOptimize.start(conf=conf)
    PySparkOptimize().trick1()
    PySparkOptimize.shutdown()

這樣骨架就搭建好了旨别。

我們寫第一個(gè)方法撮奏,trick1,做一個(gè)簡(jiǎn)單的計(jì)數(shù):

    def trick1(self):
        df = self.session.range(0, 1000000).select("id", F.rand(seed=10).alias("uniform"),
                                                   F.randn(seed=27).alias("normal"))
        # 更少的內(nèi)存和更快的速度
        TimeProfile.profile(lambda: df.toPandas())()
        TimeProfile.print_prof_data(clear=True)

并且將前面的arrow設(shè)置為false.結(jié)果如下:

Function <lambda> called 1 times. 
Execution time max: 6.716, average: 6.716

然后同樣的代碼续扔,我們把a(bǔ)rrow設(shè)置為true,是不是會(huì)好一些呢?

Function <lambda> called 1 times. 
Execution time max: 2.067, average: 2.067

當(dāng)然我這個(gè)測(cè)試并不嚴(yán)謹(jǐn)疮丛,但是對(duì)于這種非常簡(jiǎn)單的示例,提升還是有效三倍的驴党,不是么瘪撇?而這,只是改個(gè)配置就可以達(dá)成了港庄。

分組聚合使用Pandas處理

另外值得一提的是倔既,PySpark是不支持自定義聚合函數(shù)的,現(xiàn)在如果是數(shù)據(jù)處理鹏氧,可以把group by的小集合發(fā)給pandas處理叉存,pandas再返回,比如

def trick7(self):
        df = self.session.createDataFrame(
            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

        @F.pandas_udf("id long", F.PandasUDFType.GROUPED_MAP)  
        def normalize(pdf):
            v = pdf.v
            return pdf.assign(v=(v - v.mean()) / v.std())[["id"]]

        df.groupby("id").apply(normalize).show() 

這里是id進(jìn)行g(shù)ourp by 度帮,這樣就得到一張id列都是1的小表歼捏,接著呢把這個(gè)小表轉(zhuǎn)化為pandas dataframe處理,處理完成后笨篷,還是返回一張小表瞳秽,表結(jié)構(gòu)則在注解里定義,比如只返回id字段率翅,id字段是long類型练俐。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市冕臭,隨后出現(xiàn)的幾起案子腺晾,更是在濱河造成了極大的恐慌,老刑警劉巖辜贵,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件悯蝉,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡托慨,警方通過(guò)查閱死者的電腦和手機(jī)鼻由,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)厚棵,“玉大人蕉世,你說(shuō)我怎么就攤上這事∑庞玻” “怎么了狠轻?”我有些...
    開封第一講書人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)彬犯。 經(jīng)常有香客問(wèn)我向楼,道長(zhǎng)查吊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任蜜自,我火速辦了婚禮菩貌,結(jié)果婚禮上卢佣,老公的妹妹穿的比我還像新娘重荠。我一直安慰自己,他們只是感情好虚茶,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開白布戈鲁。 她就那樣靜靜地躺著,像睡著了一般嘹叫。 火紅的嫁衣襯著肌膚如雪婆殿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,158評(píng)論 1 308
  • 那天罩扇,我揣著相機(jī)與錄音婆芦,去河邊找鬼。 笑死喂饥,一個(gè)胖子當(dāng)著我的面吹牛消约,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播员帮,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼或粮,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了捞高?” 一聲冷哼從身側(cè)響起氯材,我...
    開封第一講書人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎硝岗,沒(méi)想到半個(gè)月后氢哮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡型檀,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年命浴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贱除。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡生闲,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出月幌,到底是詐尸還是另有隱情碍讯,我是刑警寧澤,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布扯躺,位于F島的核電站捉兴,受9級(jí)特大地震影響蝎困,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜倍啥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一禾乘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧虽缕,春花似錦始藕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至剩胁,卻和暖如春诉植,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背昵观。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工晾腔, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人啊犬。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓灼擂,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親椒惨。 傳聞我的和親對(duì)象是個(gè)殘疾皇子缤至,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359