Spark相關(guān)
Spark是用于大數(shù)據(jù)處理的集群計算框架泡挺,沒有使用MapReduce作為執(zhí)行引擎坝辫,而是使用了自研的分布式運行環(huán)境(DAG引擎)在集群上執(zhí)行工作。Spark可以在YARN上運行,并支持Hadoop文件及HDFS辨萍。
Spark最突出的表現(xiàn)在于它能將作業(yè)與作業(yè)之間產(chǎn)生的大規(guī)模的工作數(shù)據(jù)集存儲在內(nèi)存中,在性能上要優(yōu)于等效的MapReduce工作流返弹,通承庥瘢可以高出一個數(shù)量級。因為MapReduce的數(shù)據(jù)集始終需要從磁盤上加載义起。
Spark常用于迭代算法(對一個數(shù)據(jù)集重復(fù)應(yīng)用某個函數(shù)拉背,直到滿足條件退出)和交互式分析(用戶向數(shù)據(jù)集發(fā)出一系列專用的探索性查詢)
Spark與MapReduce的區(qū)別
Spark與MapReduce一樣,也有作業(yè)(job)的概念默终,Spark的作業(yè)比MapReduce的作業(yè)更為通用椅棺,Spark作業(yè)是由任意的多階段(stages)有向無環(huán)圖(DAG)構(gòu)成,其中每個階段相當(dāng)于MapReduce中的map階段或者reduce階段齐蔽。
這些階段在Spark運行環(huán)境中被分解成多個任務(wù)(task)两疚,任務(wù)并行運行在分布于集群中的RDD(彈性分布式數(shù)據(jù)集Resilient Distributed Dataset)分區(qū)上。像MapReduce中的任務(wù)一樣肴熏。
Spark作業(yè)始終運行在應(yīng)用上下文中(applicationContext鬼雀,用實例SparkContext表示),它提供了RDD分組以及共享變量蛙吏。一個應(yīng)用(application)可以串行或者并行運行多個作業(yè)源哩,并為這些作業(yè)提供訪問由同一應(yīng)用的先前作業(yè)所緩存的RDD的機制鞋吉。
彈性分布式數(shù)據(jù)集RDD
RDD是Spark最核心的概念,它是在集群中跨多個機器分區(qū)存儲的一個只讀的對象集合励烦。在典型的Spark程序中谓着,首先要加載一個或多個RDD,作為輸入再通過一系列轉(zhuǎn)換得到一組目標(biāo)RDD坛掠,然后對這些目標(biāo)RDD執(zhí)行一個動作赊锚,如計算出結(jié)果或者寫入持久存儲器。
“彈性分布式數(shù)據(jù)集”中的“彈性”指的是Spark可以通過重新安排計算來自動重建丟失的分區(qū)屉栓。
加載RDD或者執(zhí)行轉(zhuǎn)換不會立即觸發(fā)任何數(shù)據(jù)處理操作舷蒲,只是重建了一個計算的計劃。只有當(dāng)對RDD執(zhí)行某個動作時友多,才會出發(fā)真正的計算牲平。
創(chuàng)建
RDD的創(chuàng)建有三種方法:
1、并行化一個集合(內(nèi)存中的對象集合):該方法適用于對少量的輸入數(shù)據(jù)進行并行的CPU密集型計算
2域滥、使用外部存儲器(如:HDFS)中的數(shù)據(jù)集:創(chuàng)建一個對外部數(shù)據(jù)集的引用纵柿,如:為文本文件創(chuàng)建一個String對象的RDD
val text: RDD[String] = sc.textFile(inputPath)
Spark內(nèi)部使用MapReduce API的TextInputFormat來讀取文件,其文件分割行為與Hadoop一致启绰,因此在使用HDFS的情況下昂儒,每個HDFS塊對應(yīng)于一個Spark分區(qū)。
3委可、對現(xiàn)有的RDD進行轉(zhuǎn)換渊跋。
轉(zhuǎn)換和動作
Spark為RDD提供了兩大類操作:轉(zhuǎn)換(transformation)和動作(action)。轉(zhuǎn)換時從現(xiàn)有的RDD生成新的RDD着倾,而動作則觸發(fā)對RDD的計算并對計算結(jié)果執(zhí)行某種操作刹枉,返回給用戶或保存在外部存儲器(計算在內(nèi)存中進行)。動作時立即性的屈呕,而轉(zhuǎn)換則是惰性的,因為在對RDD執(zhí)行一個動作之前都不會為該RDD的任何轉(zhuǎn)換操作采取實際行動棺亭。
判斷一個操作是轉(zhuǎn)換還是動作:觀察其返回類型虎眨,如果返回的類型是RDD,則是一個轉(zhuǎn)換操作镶摘,否則是一個動作嗽桩。
Spark API文檔:https://spark.apache.org/docs/2.2.0/api/java/index.html
聚合轉(zhuǎn)換:按鍵為鍵值對RDD進行聚合操作的三個主要轉(zhuǎn)換函數(shù)是:reduceByKey()、foldByKey()凄敢、aggregateByKey()
持久化
調(diào)用cache()方法會在executor的內(nèi)存中持久化保存RDD的每個分區(qū)(緩存)碌冶,如果executor沒有足夠的內(nèi)存來存儲RDD分區(qū),計算不會失敗涝缝,但需要重新計算分區(qū)扑庞。因此Spark提供了不同級別的持久化行為譬重。
默認(rèn)的持久化級別是MEMORY_ONLY,使用對象在內(nèi)存中的常規(guī)表示方法
MEMORY_ONLY_SER級別:通過把分區(qū)中的元素序列化為字節(jié)數(shù)組來實現(xiàn)罐氨,多了一份序列化CPU開銷臀规,但生成的序列化RDD分區(qū)大小適合被保存在內(nèi)存中。
MEMORY_AND_DISK級別:如果數(shù)據(jù)集的大小不適合保存在內(nèi)存中栅隐,就將其溢出到磁盤塔嬉。
MEMORY_AND_DISK_SER級別:如果序列化數(shù)據(jù)集的大小不適合保存在內(nèi)存中,就將其溢出到磁盤租悄。
序列化
在使用Spark時谨究,要從兩方面來考慮序列化:數(shù)據(jù)序列化和函數(shù)序列化(閉包函數(shù))
數(shù)據(jù)序列化
默認(rèn)情況下,Spark在通過網(wǎng)絡(luò)將數(shù)據(jù)從一個executor發(fā)送到另一個executor時泣棋,或者以序列化的形式緩存(持久化)數(shù)據(jù)時胶哲,使用的是Java序列化機制:類實現(xiàn)java.io.Serializable或者java.io.Externalizable接口,該機制性能外傅、效率不高纪吮。
使用Kryo序列化機制是更好的選擇,Kryo是一個高效的Java序列化庫萎胰,在驅(qū)動程序的SparkConf中設(shè)置spark.serializer屬性即可使用Kryo
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
在Kryo中注冊類:創(chuàng)建一個KryoRegistrator子類碾盟,然后重寫registerClasses()方法,可提升序列化性能技竟。
函數(shù)序列化:通常函數(shù)序列化會使用默認(rèn)的Java序列化機制冰肴。
共享變量
共享變量分為廣播變量和累加器變量兩種。
廣播變量
廣播變量(broadcast variable)在經(jīng)過序列化后被發(fā)送到各個executor榔组,然后緩存在那里熙尉,以便后期任務(wù)可以在需要時訪問。相當(dāng)于MapReduce中的分布式緩存搓扯,區(qū)別在于Spark將數(shù)據(jù)保存在內(nèi)存中检痰,只有在內(nèi)存耗盡時才會溢出到磁盤上。創(chuàng)建一個廣播變量:通過SparkContext的broadcast()方法傳遞锨推。廣播變量是單向傳播的铅歼。
累加器
累加器(accumulator)是在任務(wù)中只能對它做加法的共享變量,類似于MapReduce中的累加器换可,當(dāng)作業(yè)完成后椎椰,driver程序可以檢索累加器的最終值。通過SparkContext的accumulator()方法來創(chuàng)建一個累加器變量沾鳄。
Spark作業(yè)的運行機制
實體
driver:負(fù)責(zé)托管應(yīng)用(SparkContext)并未作業(yè)調(diào)度任務(wù)慨飘。通常作為一個不由集群管理器(cluster manager)管理的客戶端來運行。
executor:專屬于應(yīng)用译荞,在應(yīng)用運行期運行瓤的,并執(zhí)行該應(yīng)用的任務(wù)休弃,一般運行在集群的計算機上。
作業(yè)的提交
Spark作業(yè)運行過程:
當(dāng)對RDD執(zhí)行一個動作(比如count())時堤瘤,會自動提交一個Soark作業(yè)玫芦,導(dǎo)致內(nèi)部的SprakContext調(diào)用runJob()。步驟1本辐。
然后將調(diào)用傳遞給作為driver的一部分運行的調(diào)度程序桥帆。步驟2。調(diào)度程序由兩部分組成:DAG調(diào)度程序和任務(wù)調(diào)度程序慎皱。其中DAG調(diào)度程序把作業(yè)分解為若干階段(stage)老虫,并由這些階段構(gòu)成一個DAG。任務(wù)調(diào)度程序則負(fù)責(zé)把每個階段中的任務(wù)提交到集群中茫多。
當(dāng)DAG調(diào)度程序已構(gòu)建一個完整的多階段DAG祈匙,它就將每個階段的任務(wù)集合提交給任務(wù)調(diào)度程序。步驟3
當(dāng)任務(wù)集合被發(fā)送到任務(wù)調(diào)度程序后天揖,任務(wù)調(diào)度程序開始為executor分配任務(wù)夺欲,分配的任務(wù)通過調(diào)度程序后端啟動。步驟4
調(diào)度程序后端向executor發(fā)送遠(yuǎn)程啟動任務(wù)的消息今膊。步驟5.
executor接收到消息通知后開始運行任務(wù)些阅。步驟6
DAG的構(gòu)建
任務(wù)可分為兩種類型:shuffle map任務(wù)和result任務(wù)。
- shuffle map任務(wù):與MapReduce中的shuffle的map任務(wù)斑唬,每個shuffle map任務(wù)在一個RDD分區(qū)上運算市埋,然后把結(jié)果發(fā)送回driver,再由driver將每個分區(qū)的計算結(jié)果匯集成最終結(jié)果
DAG調(diào)度程序負(fù)責(zé)將一個階段分解成若干任務(wù)以提交給任務(wù)調(diào)度程序恕刘。DAG調(diào)度程序會為每個任務(wù)賦予一個位置偏好(placement preference)缤谎,以允許任務(wù)調(diào)度程序充分利用數(shù)據(jù)本地化(data locality),例如:對于存儲在HDFS上的輸入RDD分區(qū)而言褐着,它的任務(wù)的位置偏好就是托管這些分區(qū)的數(shù)據(jù)塊的datanode(稱為node local)坷澡,而對于在內(nèi)存中緩存的RDD分區(qū),其任務(wù)位置偏好則是那些保存RDD分區(qū)的executor(稱為process local)
任務(wù)調(diào)度
當(dāng)任務(wù)集合被發(fā)送到任務(wù)調(diào)度程序后含蓉,任務(wù)調(diào)度程序用為該應(yīng)用運行的executor的列表洋访,在斟酌位置偏好的同時構(gòu)建任務(wù)到executor的映射。接著任務(wù)調(diào)度程序?qū)⑷蝿?wù)分配給具有可用內(nèi)核的executor谴餐,并且在executor完成任務(wù)時繼續(xù)分配更多的任務(wù),直到任務(wù)集合全部完成呆抑。
任務(wù)調(diào)度程序在為某個executor分配任務(wù)時岂嗓,首先分配的是進程本地任務(wù)(process-local),再分配節(jié)點本地任務(wù)(node-local)鹊碍,然后分配機架本地任務(wù)(rack-local)厌殉,最后分配任意任務(wù)(非本地)或者推測任務(wù)(speculative task)食绿。推測任務(wù)時現(xiàn)有任務(wù)的復(fù)本,如果任務(wù)運行得比預(yù)期緩慢公罕,則調(diào)度器可以將其作為備份來運行器紧。
當(dāng)任務(wù)完成或者失敗時,executor會向driver發(fā)送狀態(tài)更新消息楼眷,如果失敗了铲汪,任務(wù)調(diào)度程序?qū)⒃诹硪粋€executor上重新提交任務(wù),若是啟用了推測任務(wù)罐柳,則還會為運行緩慢的任務(wù)啟動推測任務(wù)掌腰。
任務(wù)執(zhí)行
executor按如下方式運行任務(wù):
首先,確保任務(wù)的JAR包和文件依賴關(guān)系都是最新的张吉,executor在本地高速緩存中保留了先前任務(wù)已使用的所有依賴齿梁,因此只有在它們更新的情況下才會重新下載。
然后肮蛹,反序列化任務(wù)代碼勺择,因為任務(wù)代碼是以啟動任務(wù)消息的一部分而發(fā)生的序列化字節(jié)
最后JVM執(zhí)行任務(wù)代碼,任務(wù)運行在與executor相同的JVM伦忠,因此任務(wù)的啟動沒有進程開銷省核。
任務(wù)執(zhí)行結(jié)果被序列化并發(fā)送到executor后端,然后以狀態(tài)更新消息的形式返回driver缓苛。
執(zhí)行器和集群管理器
Spark依靠executor(執(zhí)行器)來運行構(gòu)成Sprak作業(yè)的任務(wù)芳撒,負(fù)責(zé)管理executor生命周期的是集群管理器(cluster manager),Spark有多種不同特性的集群管理器未桥,可分為如下幾種:
本地模式:在本地模式下笔刹,一個executor與driver運行在用一個JVM中,該模式對于測試或運行小規(guī)模作業(yè)時非常有用冬耿。其主URL為local舌菜、local[n]n 個線程或者local(*)機器的每個內(nèi)核一個線程。
獨立模式:獨立模式的集群管理器是一個簡單的分布式實現(xiàn)亦镶,它運行了一個master以及多個worker日月。當(dāng)Spark應(yīng)用啟動時,master要求worker代表應(yīng)用生成多個executor進程缤骨。其主URL為:spark://host:port
Mesos模式:Apache Mesos是一個通用的集群資源管理器爱咬,允許根據(jù)組織策略在不同的應(yīng)用之間細(xì)化資源共享。這種模式的主URL為:mesos://host:port
YARN模式:每個運行的Spark應(yīng)用對應(yīng)于一個YARN應(yīng)用實例绊起,每個executor在自己的YARN容器中運行精拟。其主URL為:yarn-client或yarn-cluster
運行在YARN上的Spark
為了在YARN上運行,Spark提供了兩種部署方式:YARN客戶端模式和YARN集群模式。YRAN客戶端模式的driver在客戶端運行蜂绎,而YARN集群模式的driver在YARN的application master集群上運行栅表。
1.YARN客戶端模式:
在YARN客戶端模式下,當(dāng)driver構(gòu)建新的SparkContext實例時师枣,便啟動了與YARN之間的交互怪瓶。步驟1
SparkContext向YARN資源管理器提交一個YARN應(yīng)用。步驟2
YARN資源管理器則啟動集群節(jié)點管理器上的YARN容器践美,并為在其中運行一個SparkExecutorLauncher的application master洗贰。步驟3
ExecutorLauncher向資源管理器請求資源。步驟4
啟動ExecutorBackend進程作為分配給它的容器拨脉。步驟5
每個executor在啟動時會連接回SparkContext哆姻,注冊自身。即向SparkContext提供了關(guān)于可用于運行任務(wù)的executor的數(shù)量及其位置信息玫膀。這些信息被用在任務(wù)的位置偏好策略中矛缨。啟動的executor的數(shù)量在saprk-shell、spark-submit或py-spark中設(shè)置帖旨。executor使用的內(nèi)核數(shù)默認(rèn)為1個箕昭,內(nèi)存默認(rèn)2014MB
2.YARN集群模式:
啟動流程與客戶端模式基本一致,僅是在步驟1解阅,sprak-submit不會允許任何用戶代碼落竹,代碼在集群上運行。
參考資料:《Hadoop權(quán)威指南》