Spark學(xué)習(xí)總結(jié)

Spark相關(guān)

Spark是用于大數(shù)據(jù)處理的集群計算框架泡挺,沒有使用MapReduce作為執(zhí)行引擎坝辫,而是使用了自研的分布式運行環(huán)境(DAG引擎)在集群上執(zhí)行工作。Spark可以在YARN上運行,并支持Hadoop文件及HDFS辨萍。

Spark最突出的表現(xiàn)在于它能將作業(yè)與作業(yè)之間產(chǎn)生的大規(guī)模的工作數(shù)據(jù)集存儲在內(nèi)存中,在性能上要優(yōu)于等效的MapReduce工作流返弹,通承庥瘢可以高出一個數(shù)量級。因為MapReduce的數(shù)據(jù)集始終需要從磁盤上加載义起。

Spark常用于迭代算法(對一個數(shù)據(jù)集重復(fù)應(yīng)用某個函數(shù)拉背,直到滿足條件退出)和交互式分析(用戶向數(shù)據(jù)集發(fā)出一系列專用的探索性查詢)

Spark與MapReduce的區(qū)別

  • Spark與MapReduce一樣,也有作業(yè)(job)的概念默终,Spark的作業(yè)比MapReduce的作業(yè)更為通用椅棺,Spark作業(yè)是由任意的多階段(stages)有向無環(huán)圖(DAG)構(gòu)成,其中每個階段相當(dāng)于MapReduce中的map階段或者reduce階段齐蔽。

  • 這些階段在Spark運行環(huán)境中被分解成多個任務(wù)(task)两疚,任務(wù)并行運行在分布于集群中的RDD(彈性分布式數(shù)據(jù)集Resilient Distributed Dataset)分區(qū)上。像MapReduce中的任務(wù)一樣肴熏。

  • Spark作業(yè)始終運行在應(yīng)用上下文中(applicationContext鬼雀,用實例SparkContext表示),它提供了RDD分組以及共享變量蛙吏。一個應(yīng)用(application)可以串行或者并行運行多個作業(yè)源哩,并為這些作業(yè)提供訪問由同一應(yīng)用的先前作業(yè)所緩存的RDD的機制鞋吉。

彈性分布式數(shù)據(jù)集RDD

RDD是Spark最核心的概念,它是在集群中跨多個機器分區(qū)存儲的一個只讀的對象集合励烦。在典型的Spark程序中谓着,首先要加載一個或多個RDD,作為輸入再通過一系列轉(zhuǎn)換得到一組目標(biāo)RDD坛掠,然后對這些目標(biāo)RDD執(zhí)行一個動作赊锚,如計算出結(jié)果或者寫入持久存儲器。

“彈性分布式數(shù)據(jù)集”中的“彈性”指的是Spark可以通過重新安排計算來自動重建丟失的分區(qū)屉栓。

加載RDD或者執(zhí)行轉(zhuǎn)換不會立即觸發(fā)任何數(shù)據(jù)處理操作舷蒲,只是重建了一個計算的計劃。只有當(dāng)對RDD執(zhí)行某個動作時友多,才會出發(fā)真正的計算牲平。

創(chuàng)建

RDD的創(chuàng)建有三種方法:

1、并行化一個集合(內(nèi)存中的對象集合):該方法適用于對少量的輸入數(shù)據(jù)進行并行的CPU密集型計算

2域滥、使用外部存儲器(如:HDFS)中的數(shù)據(jù)集:創(chuàng)建一個對外部數(shù)據(jù)集的引用纵柿,如:為文本文件創(chuàng)建一個String對象的RDD

 val text: RDD[String] = sc.textFile(inputPath)

Spark內(nèi)部使用MapReduce API的TextInputFormat來讀取文件,其文件分割行為與Hadoop一致启绰,因此在使用HDFS的情況下昂儒,每個HDFS塊對應(yīng)于一個Spark分區(qū)。

3委可、對現(xiàn)有的RDD進行轉(zhuǎn)換渊跋。

轉(zhuǎn)換和動作

Spark為RDD提供了兩大類操作:轉(zhuǎn)換(transformation)和動作(action)。轉(zhuǎn)換時從現(xiàn)有的RDD生成新的RDD着倾,而動作則觸發(fā)對RDD的計算并對計算結(jié)果執(zhí)行某種操作刹枉,返回給用戶或保存在外部存儲器(計算在內(nèi)存中進行)。動作時立即性的屈呕,而轉(zhuǎn)換則是惰性的,因為在對RDD執(zhí)行一個動作之前都不會為該RDD的任何轉(zhuǎn)換操作采取實際行動棺亭。

判斷一個操作是轉(zhuǎn)換還是動作:觀察其返回類型虎眨,如果返回的類型是RDD,則是一個轉(zhuǎn)換操作镶摘,否則是一個動作嗽桩。

Spark API文檔:https://spark.apache.org/docs/2.2.0/api/java/index.html

聚合轉(zhuǎn)換:按鍵為鍵值對RDD進行聚合操作的三個主要轉(zhuǎn)換函數(shù)是:reduceByKey()、foldByKey()凄敢、aggregateByKey()

持久化

調(diào)用cache()方法會在executor的內(nèi)存中持久化保存RDD的每個分區(qū)(緩存)碌冶,如果executor沒有足夠的內(nèi)存來存儲RDD分區(qū),計算不會失敗涝缝,但需要重新計算分區(qū)扑庞。因此Spark提供了不同級別的持久化行為譬重。

  • 默認(rèn)的持久化級別是MEMORY_ONLY,使用對象在內(nèi)存中的常規(guī)表示方法

  • MEMORY_ONLY_SER級別:通過把分區(qū)中的元素序列化為字節(jié)數(shù)組來實現(xiàn)罐氨,多了一份序列化CPU開銷臀规,但生成的序列化RDD分區(qū)大小適合被保存在內(nèi)存中。

  • MEMORY_AND_DISK級別:如果數(shù)據(jù)集的大小不適合保存在內(nèi)存中栅隐,就將其溢出到磁盤塔嬉。

  • MEMORY_AND_DISK_SER級別:如果序列化數(shù)據(jù)集的大小不適合保存在內(nèi)存中,就將其溢出到磁盤租悄。

序列化

在使用Spark時谨究,要從兩方面來考慮序列化:數(shù)據(jù)序列化和函數(shù)序列化(閉包函數(shù))

數(shù)據(jù)序列化

默認(rèn)情況下,Spark在通過網(wǎng)絡(luò)將數(shù)據(jù)從一個executor發(fā)送到另一個executor時泣棋,或者以序列化的形式緩存(持久化)數(shù)據(jù)時胶哲,使用的是Java序列化機制:類實現(xiàn)java.io.Serializable或者java.io.Externalizable接口,該機制性能外傅、效率不高纪吮。

使用Kryo序列化機制是更好的選擇,Kryo是一個高效的Java序列化庫萎胰,在驅(qū)動程序的SparkConf中設(shè)置spark.serializer屬性即可使用Kryo

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

在Kryo中注冊類:創(chuàng)建一個KryoRegistrator子類碾盟,然后重寫registerClasses()方法,可提升序列化性能技竟。

函數(shù)序列化:通常函數(shù)序列化會使用默認(rèn)的Java序列化機制冰肴。

共享變量

共享變量分為廣播變量和累加器變量兩種。

廣播變量

廣播變量(broadcast variable)在經(jīng)過序列化后被發(fā)送到各個executor榔组,然后緩存在那里熙尉,以便后期任務(wù)可以在需要時訪問。相當(dāng)于MapReduce中的分布式緩存搓扯,區(qū)別在于Spark將數(shù)據(jù)保存在內(nèi)存中检痰,只有在內(nèi)存耗盡時才會溢出到磁盤上。創(chuàng)建一個廣播變量:通過SparkContext的broadcast()方法傳遞锨推。廣播變量是單向傳播的铅歼。

累加器

累加器(accumulator)是在任務(wù)中只能對它做加法的共享變量,類似于MapReduce中的累加器换可,當(dāng)作業(yè)完成后椎椰,driver程序可以檢索累加器的最終值。通過SparkContext的accumulator()方法來創(chuàng)建一個累加器變量沾鳄。

Spark作業(yè)的運行機制

實體

  • driver:負(fù)責(zé)托管應(yīng)用(SparkContext)并未作業(yè)調(diào)度任務(wù)慨飘。通常作為一個不由集群管理器(cluster manager)管理的客戶端來運行。

  • executor:專屬于應(yīng)用译荞,在應(yīng)用運行期運行瓤的,并執(zhí)行該應(yīng)用的任務(wù)休弃,一般運行在集群的計算機上。

作業(yè)的提交

Spark作業(yè)運行機制.png

Spark作業(yè)運行過程:

  • 當(dāng)對RDD執(zhí)行一個動作(比如count())時堤瘤,會自動提交一個Soark作業(yè)玫芦,導(dǎo)致內(nèi)部的SprakContext調(diào)用runJob()。步驟1本辐。

  • 然后將調(diào)用傳遞給作為driver的一部分運行的調(diào)度程序桥帆。步驟2。調(diào)度程序由兩部分組成:DAG調(diào)度程序和任務(wù)調(diào)度程序慎皱。其中DAG調(diào)度程序把作業(yè)分解為若干階段(stage)老虫,并由這些階段構(gòu)成一個DAG。任務(wù)調(diào)度程序則負(fù)責(zé)把每個階段中的任務(wù)提交到集群中茫多。

  • 當(dāng)DAG調(diào)度程序已構(gòu)建一個完整的多階段DAG祈匙,它就將每個階段的任務(wù)集合提交給任務(wù)調(diào)度程序。步驟3

  • 當(dāng)任務(wù)集合被發(fā)送到任務(wù)調(diào)度程序后天揖,任務(wù)調(diào)度程序開始為executor分配任務(wù)夺欲,分配的任務(wù)通過調(diào)度程序后端啟動。步驟4

  • 調(diào)度程序后端向executor發(fā)送遠(yuǎn)程啟動任務(wù)的消息今膊。步驟5.

  • executor接收到消息通知后開始運行任務(wù)些阅。步驟6

DAG的構(gòu)建

任務(wù)可分為兩種類型:shuffle map任務(wù)和result任務(wù)。

  • shuffle map任務(wù):與MapReduce中的shuffle的map任務(wù)斑唬,每個shuffle map任務(wù)在一個RDD分區(qū)上運算市埋,然后把結(jié)果發(fā)送回driver,再由driver將每個分區(qū)的計算結(jié)果匯集成最終結(jié)果

DAG調(diào)度程序負(fù)責(zé)將一個階段分解成若干任務(wù)以提交給任務(wù)調(diào)度程序恕刘。DAG調(diào)度程序會為每個任務(wù)賦予一個位置偏好(placement preference)缤谎,以允許任務(wù)調(diào)度程序充分利用數(shù)據(jù)本地化(data locality),例如:對于存儲在HDFS上的輸入RDD分區(qū)而言褐着,它的任務(wù)的位置偏好就是托管這些分區(qū)的數(shù)據(jù)塊的datanode(稱為node local)坷澡,而對于在內(nèi)存中緩存的RDD分區(qū),其任務(wù)位置偏好則是那些保存RDD分區(qū)的executor(稱為process local)

任務(wù)調(diào)度

當(dāng)任務(wù)集合被發(fā)送到任務(wù)調(diào)度程序后含蓉,任務(wù)調(diào)度程序用為該應(yīng)用運行的executor的列表洋访,在斟酌位置偏好的同時構(gòu)建任務(wù)到executor的映射。接著任務(wù)調(diào)度程序?qū)⑷蝿?wù)分配給具有可用內(nèi)核的executor谴餐,并且在executor完成任務(wù)時繼續(xù)分配更多的任務(wù),直到任務(wù)集合全部完成呆抑。

任務(wù)調(diào)度程序在為某個executor分配任務(wù)時岂嗓,首先分配的是進程本地任務(wù)(process-local),再分配節(jié)點本地任務(wù)(node-local)鹊碍,然后分配機架本地任務(wù)(rack-local)厌殉,最后分配任意任務(wù)(非本地)或者推測任務(wù)(speculative task)食绿。推測任務(wù)時現(xiàn)有任務(wù)的復(fù)本,如果任務(wù)運行得比預(yù)期緩慢公罕,則調(diào)度器可以將其作為備份來運行器紧。

當(dāng)任務(wù)完成或者失敗時,executor會向driver發(fā)送狀態(tài)更新消息楼眷,如果失敗了铲汪,任務(wù)調(diào)度程序?qū)⒃诹硪粋€executor上重新提交任務(wù),若是啟用了推測任務(wù)罐柳,則還會為運行緩慢的任務(wù)啟動推測任務(wù)掌腰。

任務(wù)執(zhí)行

executor按如下方式運行任務(wù):

  • 首先,確保任務(wù)的JAR包和文件依賴關(guān)系都是最新的张吉,executor在本地高速緩存中保留了先前任務(wù)已使用的所有依賴齿梁,因此只有在它們更新的情況下才會重新下載。

  • 然后肮蛹,反序列化任務(wù)代碼勺择,因為任務(wù)代碼是以啟動任務(wù)消息的一部分而發(fā)生的序列化字節(jié)

  • 最后JVM執(zhí)行任務(wù)代碼,任務(wù)運行在與executor相同的JVM伦忠,因此任務(wù)的啟動沒有進程開銷省核。

任務(wù)執(zhí)行結(jié)果被序列化并發(fā)送到executor后端,然后以狀態(tài)更新消息的形式返回driver缓苛。

執(zhí)行器和集群管理器

Spark依靠executor(執(zhí)行器)來運行構(gòu)成Sprak作業(yè)的任務(wù)芳撒,負(fù)責(zé)管理executor生命周期的是集群管理器(cluster manager),Spark有多種不同特性的集群管理器未桥,可分為如下幾種:

  • 本地模式:在本地模式下笔刹,一個executor與driver運行在用一個JVM中,該模式對于測試或運行小規(guī)模作業(yè)時非常有用冬耿。其主URL為local舌菜、local[n]n 個線程或者local(*)機器的每個內(nèi)核一個線程。

  • 獨立模式:獨立模式的集群管理器是一個簡單的分布式實現(xiàn)亦镶,它運行了一個master以及多個worker日月。當(dāng)Spark應(yīng)用啟動時,master要求worker代表應(yīng)用生成多個executor進程缤骨。其主URL為:spark://host:port

  • Mesos模式:Apache Mesos是一個通用的集群資源管理器爱咬,允許根據(jù)組織策略在不同的應(yīng)用之間細(xì)化資源共享。這種模式的主URL為:mesos://host:port

  • YARN模式:每個運行的Spark應(yīng)用對應(yīng)于一個YARN應(yīng)用實例绊起,每個executor在自己的YARN容器中運行精拟。其主URL為:yarn-client或yarn-cluster

運行在YARN上的Spark

為了在YARN上運行,Spark提供了兩種部署方式:YARN客戶端模式和YARN集群模式。YRAN客戶端模式的driver在客戶端運行蜂绎,而YARN集群模式的driver在YARN的application master集群上運行栅表。

1.YARN客戶端模式:

YARN客戶端模式Spark.png
  • 在YARN客戶端模式下,當(dāng)driver構(gòu)建新的SparkContext實例時师枣,便啟動了與YARN之間的交互怪瓶。步驟1

  • SparkContext向YARN資源管理器提交一個YARN應(yīng)用。步驟2

  • YARN資源管理器則啟動集群節(jié)點管理器上的YARN容器践美,并為在其中運行一個SparkExecutorLauncher的application master洗贰。步驟3

  • ExecutorLauncher向資源管理器請求資源。步驟4

  • 啟動ExecutorBackend進程作為分配給它的容器拨脉。步驟5

每個executor在啟動時會連接回SparkContext哆姻,注冊自身。即向SparkContext提供了關(guān)于可用于運行任務(wù)的executor的數(shù)量及其位置信息玫膀。這些信息被用在任務(wù)的位置偏好策略中矛缨。啟動的executor的數(shù)量在saprk-shell、spark-submit或py-spark中設(shè)置帖旨。executor使用的內(nèi)核數(shù)默認(rèn)為1個箕昭,內(nèi)存默認(rèn)2014MB

2.YARN集群模式:

YARN集群模式Spark.png

啟動流程與客戶端模式基本一致,僅是在步驟1解阅,sprak-submit不會允許任何用戶代碼落竹,代碼在集群上運行。


參考資料:《Hadoop權(quán)威指南》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末货抄,一起剝皮案震驚了整個濱河市述召,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蟹地,老刑警劉巖积暖,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異怪与,居然都是意外死亡夺刑,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進店門分别,熙熙樓的掌柜王于貴愁眉苦臉地迎上來遍愿,“玉大人,你說我怎么就攤上這事耘斩≌犹睿” “怎么了?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵括授,是天一觀的道長倾哺。 經(jīng)常有香客問我轧邪,道長,這世上最難降的妖魔是什么羞海? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮曲管,結(jié)果婚禮上却邓,老公的妹妹穿的比我還像新娘。我一直安慰自己院水,他們只是感情好腊徙,可當(dāng)我...
    茶點故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著檬某,像睡著了一般撬腾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上恢恼,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天民傻,我揣著相機與錄音,去河邊找鬼场斑。 笑死漓踢,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的漏隐。 我是一名探鬼主播喧半,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼青责!你這毒婦竟也來了挺据?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤脖隶,失蹤者是張志新(化名)和其女友劉穎扁耐,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體浩村,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡做葵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了心墅。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片酿矢。...
    茶點故事閱讀 39,992評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖怎燥,靈堂內(nèi)的尸體忽然破棺而出瘫筐,到底是詐尸還是另有隱情,我是刑警寧澤铐姚,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布策肝,位于F島的核電站肛捍,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏之众。R本人自食惡果不足惜拙毫,卻給世界環(huán)境...
    茶點故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望棺禾。 院中可真熱鬧缀蹄,春花似錦、人聲如沸膘婶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽悬襟。三九已至衅码,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間脊岳,已是汗流浹背逝段。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留逸绎,地道東北人惹恃。 一個月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像棺牧,于是被迫代替她去往敵國和親巫糙。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,947評論 2 355

推薦閱讀更多精彩內(nèi)容