最近學習了spark 相關的內容,寫個筆記記錄一下自己目前對于spark的理解,方便以后查閱医咨。在本文的寫作過程中,主要參考了1.寬依賴與窄依賴的區(qū)別;2.spark中幾個概念的梳理架诞;[3.spark shuffle的理解](https://blog.csdn.net/zylove2010/article/details/79067149)這樣三篇博客拟淮,寫的非常好,建議大家都去看看谴忧。
1.簡介
??Spark可以說是圈內最流行的幾個大數據處理框架之一了很泊,類似地位的可能還有storm之類的角虫。其最大的優(yōu)點就是能夠幾乎底層透明的完成分布式的計算,非常方便開發(fā)委造。
??Spark是可以搭建在很多平臺上的戳鹅,本文面對的環(huán)境就是比較常見的Spark+Hadoop(作為文件系統)+Hive(作為分布式數據庫)的配置。
??本文重點想講解的就是spark整個程序的生命周期(我們能夠接觸到的幾個環(huán)節(jié)昏兆,還沒有到太底層的需要看源碼的程度)枫虏。
2.spark的核心數據結構-RDD
RDD是spark 的核心數據結構,spark 之所以能夠做到把分布式做成近乎底層透明爬虱,正是依靠了RDD.RDD全稱彈性分布式數據集(Resilient Distributed Datasets).
2.1 partition ,task 與 RDD
??拋開一切我們想想一下隶债,假如說現在有一張很大很大(千萬級)的數據表格要你處理,我們按傳統的思維方式來搞的話跑筝,就是:操作表格嘛死讹,那么我就用個dataframe(R語言和python的pandas包中都有此概念)來封裝這個數據表唄,然后我不就是各種騷操作嗖嗖嗖曲梗,像什么計數用的count
函數啦赞警,排序用的sort
函數啦,分組用的groupby
函數啦稀并。
??現在問題來啦仅颇!你好像忘記了我一個很重要的前提,這個表很大很大暗饩佟忘瓦!你用dataframe來封裝的前提是得把這些數據全部加載到內存啊。這顯然是不現實的引颈。那么我們就要想辦法耕皮,最直觀的辦法就是Divide & Conquer
。因為我們看看我們想做的這些操作蝙场,無論是計數凌停,還是排序,還是分組售滤,都是能夠先分成小數據集罚拟,并行處理,然后再合并出結果的完箩。因此我們的解決方案來啦赐俗,以計數為例,我們首先把這個大數據集分成多個小數據集(知識點弊知,敲黑板W璐!這就是partition)秩彤,每個小數據集我們啟動一個子任務去讓他做計數(知識點叔扼,敲黑板J驴蕖!這就是task)瓜富,每個子任務執(zhí)行完畢之后再匯總成最終的結果鳍咱。
??其實,spark 就是幫我們把上面的工作完成了与柑。Instead of 手動的分割文件流炕,手動的分配任務,手動的匯總結果仅胞,我們只需要把我們的數據封裝成RDD數據類型每辟,就能夠像是在操作普通小數據集一樣的完成常見的那幾種數據操作。
2.2 進擊的RDD-Dataframe簡介
??在RDD的基礎之上干旧,spark又提出了升級版的數據結構-Dataframe渠欺,不過dataframe注意:這里的dataframe是pyspark中的叫法,在scala等語言中使用的dataset的名稱椎眯。
??那么什么是dataframe呢挠将,簡單的說你可以把他想象成數據庫里的一張表句占,他有自己的column,row蚓耽,還包括一些針對表的操作。如下面盜來的這張圖所示:
??spark中通過引入dataframe的數據結構帶來了很多好處绍填,在這里我們只重點說一說其中的兩個:
- 效率高掌测。dataframe自帶的一些操作都是經過優(yōu)化的内贮,能夠以極為高效的方式完成任務。
- 操作簡便汞斧。經過又一層的封裝夜郁,spark中的數據操作變得更加友好,上手很快粘勒,相比之下原來的針對rdd的操作可以稱得上是非常原始了竞端。
3.spark的生命周期
??spark 作為一個大數據處理的框架,具有自己完整的生命周期庙睡。
3.1 全生命周期
??閑言少敘事富,我在這里一句話串聯一下整個第三節(jié)的脈絡:
??一段程序在spark里叫做一個application,一個application會劃分成很多個job(劃分條件是action),一個job會劃分成很多個stage(劃分條件是shuffle),每個stage里所有被處理的數據會劃分之后交給很多個子任務去處理(劃分條件是partition)
3.2 拆解講解 - job & action
??前文提到了乘陪,一個action的產生將會促使application切分Job统台。那么什么是action?簡單來說就是spark中對于rdd的操作可以分成兩類:tranformation和action暂刘。聽到這兩個名字饺谬,相信很多人已經明白了捂刺,transformation就是只是在做一些變形之類的操作谣拣,有點類似于hadoop里面的map募寨,比如整體加個1啊什么的。而action是實際需要求值出結果的操作森缠,比如說count什么的拔鹰。
??這個概念有點像lazy evaluation的操作,估計和spark的正宗語言是scala有關贵涵×兄總之,就是不到萬不得已不求值宾茂,求值就要切分job.
3.3 拆解講解 - stage & shuffle
??現在說一說job內部的劃分-stage瓷马。前面提到了,spark是不到萬不得已不求值跨晴,求值才劃分job欧聘,因此在一個job內部就完全是transforamtion的操作。
??但是端盆,即使都是變換操作也是有不一樣的怀骤,有的變換是一一對應的變換,比如說每個元素都加1焕妙;而有的變換則是涉及到整個RDD,比如groupby.這就是窄依賴和寬依賴的變換蒋伦。
??為什么突然整這么一個概念呢,記住一句話:寬依賴引發(fā)shuffle 操作焚鹊,shuffle操作導致stage切分痕届。 想一下,我們現在把每個rdd交給很多個小的task取執(zhí)行了末患,大家各自執(zhí)行各自的(并行)爷抓,執(zhí)行完了之后如果沒啥問題接著走后面的操作,直到最后匯總阻塑,這種就是完全并行的操作蓝撇,理想的情況。但是總有一些操作攪屎陈莽,它是全局的操作(寬依賴),它必須得等待前面分好的所有子任務全部執(zhí)行完他才能執(zhí)行渤昌,換句話說就是必須得先在他這里匯總一下。
??那么我們現在得出了一個結論:stage是spark種并行處理的最大單位走搁。一個stage以內的各種操作都可以各自搞各自的独柑,互不影響,從而最高的利用并行開發(fā)的效率私植。而出了stage就只能順序執(zhí)行所有操作了忌栅。
4.如何提高執(zhí)行效率?——spark并發(fā)度的計算
??到這里曲稼,主要的內容都差不多講完了索绪,但是我寫這篇文章最大的原因還沒有說湖员。其實就是我在使用pyspark的時候遇到的一個問題。簡單來說就是程序老是崩瑞驱,總是提醒我內存不足娘摔,我經過好幾天的折騰才發(fā)現是自己設定的問題。也總結出來了一條經驗唤反,就是想提高spark的效率就可以從兩個角度出發(fā):
- 修改配置內容凳寺,增加并發(fā)度。核心配置excutor.instances是spark處理器的個數(虛擬的可以多分配一些),excutor.cores是spark處理器的核心個數(虛擬的可以多分配一些)彤侍。
spark = SparkSession.builder.enableHiveSupport(). \
master("yarn"). \
config('spark.executor.memory', '15g'). \
config('spark.executor.cores', '10'). \
config('spark.executor.instances', '20'). \
config('spark.driver.memory', '20g'). \
getOrCreate()
- 修改RDD的partition肠缨,劃分更小的task。前面提到的我的那個問題的本質原因不是并發(fā)度的問題盏阶,而是劃分之后的任務還是太大了怜瞒,交給每一個核心去處理內存都會扛不住,所以需要手動的劃分(原本采用的是默認的劃分般哼,默認劃分取決于你數據的輸入吴汪,比如從hdf來的文件就是和你file split分片保持一致)。
df.repartition.groupyBy("city").count().sortBy('count')
5.贈送內容:spark的輸出進度條怎么解讀
??最后一個問題蒸眠,spark跑任務的時候的那個進度條里的數字都是啥玩意漾橙?相信很多人剛開始的時候都搞不太明白。簡單說明一下:
- stage:就是前面提到的job內部劃分的stage楞卡,不多說了
- 進度條尾端(X+Y)/Z:X是已經執(zhí)行完的任務總數霜运,基本上和數據的partition是保持一致的。Y是活躍的(準備好可以執(zhí)行的)但是還沒執(zhí)行的的任務數蒋腮。Z是總任務數淘捡。
- 特殊現象:有時候會出現活躍任務數是負數!池摧!這是什么情況焦除?——這個負數就是執(zhí)行失敗的任務數,需要重新執(zhí)行的作彤。