一. 編程模型
二. 組件簡介
三. 作業(yè)執(zhí)行
四. 內存管理
五. 存儲原理
六. shuffle
七. 性能調優(yōu)
八. 知識腦圖
一. 編程模型
1. RDD的基本屬性
分區(qū)
在一些大規(guī)模計算中,一個數據集中的數據量會達到非常大的級別,而這些數據難以在一臺機器上進行存儲和計算,rdd的思路就是將這些數據進行分區(qū),一個大的數據集被分為很小的分區(qū)
rdd的計算是以分區(qū)為單位進行的,而且同一分區(qū)的所有數據進行相同的計算邏輯.對于同一個分區(qū)的數據而言,必須執(zhí)行相同的操作:要么都執(zhí)行,要么都不執(zhí)行.分區(qū)的數量決定了同時執(zhí)行的任務的數量,因為可以為每個分區(qū)啟動一個計算任務用于單獨計算這個分區(qū)的數據
計算函數
rdd的數據被分區(qū)了,但是每個分區(qū)的數據是如何得來的?一個是rdd的數據來源只有兩種:一個是從數據源或集合中進行加載得到rdd的數據;另一個是通過其他rdd進行一定的轉換得到的數據,無論是哪一種方式,rdd的數據其實都是通過rdd的計算函數得到的
依賴
每個父rdd的分區(qū)最多被子rdd的一個分區(qū)使用,這種依賴稱為窄依賴;多個子rdd的分區(qū)數據依賴父rdd的同一個分區(qū)的數據
什么時候發(fā)生shuffle???
shuffle算子是否一定觸發(fā)shuffle?
分區(qū)器
分區(qū)器的作用是:如何把map階段的結果進行分組,區(qū)分出結果是給reduce階段的rdd哪個分區(qū)
首選運行位置
每個RDD對于每個分區(qū)來說有一組首選運行位置,用于標識RDD的這個分區(qū)數據最好能夠在哪臺主機運行.如Hadooprdd能夠實現加載數據的任務在相應的數據節(jié)點上執(zhí)行
2. RDD的緩存
rdd是進行迭代式計算,默認并不會保存中間結果的數據,在計算完成后,中間的結果數據都將會丟失,如果一個rdd在計算完成后,不是通過流水線的方式被一個rdd調用,而是被多個rdd分別調用,則在計算過程中就需要對rdd進行保存,避免rdd的二次計算,當一個rdd被緩存后,后面調用的時候需要rdd的數據直接從緩存中讀取,而不是對rdd再次進行計算.尤其是一個rdd經過了特別復雜的計算過程,對其緩存可以極大的提高程序的執(zhí)行效率
因為rdd是分布式的,不同的分區(qū)散落在不同的節(jié)點上,所以rdd的緩存也是分布式的
3. Spark RDD操作
Spark定義了很多對rdd的操作,主要分為兩類:transformation和action,transformation并不會真正的觸發(fā)job的執(zhí)行,它只是定義了rdd之間的轉換關系,即rdd之間的lineage,只有action才會觸發(fā)job的真正執(zhí)行
transformation
操作 | 說明 |
---|---|
map | 迭代RDD中的每個元素生成新的RDD |
filter | |
flatmap | |
mappartitions | |
distinct | |
groupbykey | |
reducebykey | |
union | |
coalesce | |
repartition |
action
操作 | 說明 |
---|---|
collect | |
count | |
first | |
take | |
saveAsTextFile | |
foreach | |
reduce |
持久化
操作 | 說明 |
---|---|
MEMORY_ONLY | |
MEMORY_AND_DISK | |
MEMORY_ONLY_SER | |
MEMORY_AND_DISK_SER | |
DISK_ONLY | |
MEMORY_ONLY_2 |
4. 源碼分析
環(huán)境準備
idea環(huán)境,安裝scala插件,創(chuàng)建一個maven項目...
RDD源碼
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
val id: Int = sc.newRddId()
RDD分區(qū)源碼
trait Partition extends Serializable {
def index: Int
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
RDD的partition是一個特質,其實現很簡單,就是記錄了分區(qū)的索引并重寫了hashcode,RDD通過一個partition數組,即可表示出這個RDD由多少個分區(qū)組成,每個分區(qū)的索引用于表示出每個不同的分區(qū),子類通過實現partition的特質,從而具有更加豐富的分區(qū)功能
private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
rawSplit: InputSplit with Writable)
extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
override def hashCode(): Int = 31 * (31 + rddId) + index
override def equals(other: Any): Boolean = super.equals(other)
}
RDD計算函數源碼
def compute(split: Partition, context: TaskContext): Iterator[T]
RDD依賴源碼
RDD分區(qū)器源碼
RDD首選運行位置源碼
補充說明
- (不發(fā)生shuffle的情況下)在多個rdd轉換的過程,因為他們之間的分區(qū)是一一對應的,也就是每個rdd只依賴父rdd的一個固定的分區(qū)的數據即可.每個分區(qū)中的數據可以通過一個流水線任務(task)轉換完成,各個任務之間相互獨立,互不影響
- 在計算rdd的過程中,如果出現shuffle,則其過程有如下特點:第一點,必須首先計算出依賴rdd的所有分區(qū)的數據,然后后續(xù)rdd才能繼續(xù)進行計算.第二點,shuffle的過程必然分為兩個階段,map和reduce階段.第三點,后一個階段必須在上一個階段的數據全部完成計算以后才能開始計算,所以必須拆為兩組不同的任務按照先后順序執(zhí)行
二. 組件介紹
1. 術語介紹
- Appplication
- Job
- Task
- Stage
- Driver
- Executor
- Worker Node
2. Spark RPC
3. Driver
Driver進程其實就是運行SparkContext進程.在SparkContext初始化的過程中,創(chuàng)建了一些組件.這些組件負責實現Job執(zhí)行,stage劃分,task提交等Driver的功能
SparkEnv
RpcEnv
SerializerManager
ShuffleManager
BroadcastManager
BlockManager
MemoryManager
MapOutputTracker
listenerBus
SparkUI
DAGScheduler
TaskScheduler
SchedulerBackend
4. Executor
5. 運行模式
6. 存儲簡介
在Spark中有很多需要存儲數據的地方,如對RDD進行緩存,shuffle時map階段的數據存儲,廣播變量時各節(jié)點對變量的存儲等.這些數據的存儲都離不開Spark的存儲模塊.Spark的存儲模塊將需要存儲的數據進行了抽象,無論是說明類型的數據,無論數據是什么內容,只要需要存儲的數據都稱為block,每個block都有唯一的id進行標識,并且存儲模塊提供多種不同級別的存儲,比如是內存存儲,磁盤存儲,堆外內容存儲等.
7. 源碼分析??
SparkContext主要組件
SparkEnv創(chuàng)建
消息總線創(chuàng)建
TaskScheduler和SchedulerBackend
DAGScheduler
Master
Worker
應用提交流程
三. 作業(yè)執(zhí)行原理
1. 整體執(zhí)行流程
2. Job提交
- 為什么需要action操作
- Job提交
- 分布式執(zhí)行
- Spark實現
3. Stage劃分
- 寬依賴和窄依賴
- 如何判斷RDD之間的依賴關系
- Stage劃分
- Spark實現
4. Task劃分
5. Task提交
6. Task執(zhí)行
7. Task結果處理
8. 源碼分析
四. 作業(yè)執(zhí)行原理
1. 內存使用概述
- 堆內存和堆外內存
- 存儲內存和執(zhí)行內存
- 邏輯劃分
2. 內存池的劃分
- 內存池概念
- 內存池劃分
- 內存模式
3. 內存管理
- 靜態(tài)內存管理器
- 統(tǒng)一內存管理器
4. 源碼分析
五. 存儲原理
六. shuffle
1. Spark執(zhí)行shuffle的流程
總體流程
shuffleRDD的生成
在shuffledependency中包含了在shuffle過程中使用的幾個重要的組件,這些組件如下
- Partitioner,用于將key進行分組,判斷哪個key應該分到哪一組中.partitioner確定了所有的key一種能夠分為多少組.這些分組的數量也決定了下游reduce任務分區(qū)的大小.在map端數據進行分組時,便將每個key使用partitioner進行分組,進而得到每個key所屬的分組
- Aggregator,用戶將同一個key的兩個value進行聚合,也可以將兩個聚合后的值進行聚合.在reduce端將使用Aggregator將同一個key的所有value進行聚合,如果定義了在map進行聚合,在執(zhí)行map過程的時候,也會調用Aggregator首先在map端進行聚合
- ShuffleHandle,用于在map端獲取寫入器(shuffleWriter)將分區(qū)的數據寫入文件中,在reduce端用于獲取分區(qū)讀取器(shuffleReader),讀取該分區(qū)中對應的不同的map端輸出的數據
- 此外,在shuffledependency中還記錄了依賴的父RDD,該shuffle的id,是否對key進行排序,是否在map端進行聚合
Stage的劃分
在執(zhí)行的rdd觸發(fā)action操作后,DAGScheduler會遞歸RDD的依賴關系,每遇到一個shuffledependency就會將依賴的RDD劃分到新的stage中,最終一個job被劃分到有先后依賴關系的多個stage中.最后的stage稱為ResultStage,之前所有的Stage都為ShuffleMapStage.一個ShuffleMapStage加載數據的過程可能直接從數據源中加載,也可以是某個shuffle過程的reduce階段,從上個stage的map端輸出進行加載.但所有的ShuffleMapStage運行完成后,都會將數據分組到當前節(jié)點的BlockMananger的文件中,等待下一個stage來拉取結果
Task的劃分
Stage劃分完成后,每個Stage會根據計算RDD的分區(qū)的大小劃分為多個Task,每個Task計算RDD的一個分區(qū)的數據,ShuffleMapStage中劃分的task為shufflemaptask,shufflemaptask會被序列化到executor節(jié)點中進行執(zhí)行,shufflemaptask的執(zhí)行會將該分區(qū)的數據進行分組,如果需要map端聚合在分組過程中則還會進行聚合操作.最終將分組的數據寫入到所在節(jié)點的文件.shufflemaptask在序列化時,發(fā)送到executor中的內容主要有該stage中執(zhí)行map操作的rdd,下游rdd依賴的shuffledependency,計算的分區(qū)等
Map端的寫入
Reduce端的讀取
2. Shuffle內存管理
任務內存管理
內存消費者
內存消費組件
在執(zhí)行shuffle的過程中,有幾個重要的地方需要申請執(zhí)行內存,在map端將key進行聚合的過程(如果需要),將key按照分區(qū)排序的過程(如果需要)和在reduce端將數據聚合的過程,對key排序的過程(如果需要).在這些過程中,都需要申請執(zhí)行內存完成需要的操作.對于不同的操作如排序,聚合等,spark使用不同的組件來完成其功能.其中,externalsorter和shuffleexternalsorter用于map端對迭代器的key按照分區(qū)排序,externalsorter還用于reduce端對key的排序,Exeternalappendonlymap用于對于迭代器中key的聚合
Tungsten內存管理
Tungsten內存消費組件
3. ShuffleWrite
- HashshuffleManager
- HashshuffleWriter:在map端每個shufflemaptask執(zhí)行時,都會獲取一個shuffleWriter,HashshuffleWriter在寫入map端數據的時候,會對迭代器中的數據使用partitioner進行分組,為每個分組生成一個文件,將分組中的數據寫入到文件中,如果map端需要聚合時,hashshufflewriter會使用externalappendonlymap首先對數據進行聚合,將聚合后的數據分組寫入到不同的文件中.假如在map中的task數量為10000,在reduce的端的task數量為1000,那么在集群中map端的過程會形成1000 * 1000各文件,由此可見,使用hashshufflewriter將會產生大量的文件,會對系統(tǒng)的IO造成巨大壓力,而且在對文件讀寫需要打開文件的輸出流,打開大量的文件將會消耗大量的內存,使executor端的內存也產生很大的壓力.為了解決大量文件的問題,spark引入consolidation機制,同一個executor中的同一個CPU核執(zhí)行的task,可以將相同的分組寫入到同一文件中,這在一定程度上減少了文件的生成
- SortshuffleManager:SortshuffleManager可以獲取三種不同的shufflewriter,這三種shufflewriter在map端最終都將數據寫入了一個文件中,避免了大量文件的生成,減緩了shuffle過程中io壓力,在獲取三種不同的shufflewriter中,其寫入數據過程時不同的,但最終寫入的文件格式和效果是一致的,都是按照key按照分區(qū)進行排序,依次將不同分區(qū)的數據序列化后寫入到同一個文件,再使用一個index小文件記錄每個分區(qū)的數據在文件中的索引即可.在Spark中使用filesegment對象表示文件的一部分,filesegment中保留有文件索引的引用和該文件的偏移量的開始與文件的長度
- Bypassmergesortshufflewriter
使用Bypassmergesortshufflewriter的前提是map端數據不需要聚合,并且生成的分區(qū)數小于200,該值可以通過spark.shuffle.sort.bypassMergeThreshold
配置,因為Bypassmergesortshufflewriter與hashshufflewriter非常類似,每個task會為下游的每個分區(qū)生成一個文件,在這種情況下如果分區(qū)數太多會造成大量的文件被打開,產生io瓶頸,因此使用該shufflewriter時,分區(qū)數不應該太多,不必執(zhí)行按照分區(qū)排序的過程,在小分區(qū)的情況下就能獲得不錯的性能,Bypassmergesortshufflewriter為每個分區(qū)生成一個臨時文件,最終將所有的文件合并,按照分區(qū)順序寫入一個文件中,同時生成對應index索引文件 - sortshufflewriter
- unsafeshufflewriter
4. ShuffleRead
七. 性能調優(yōu)
1. 任務監(jiān)控
- SparkUI
- Spark運行日志詳解
Driver端的日志
- 在提交Driver程序,sparkcontext初始化,通過driverwrapper將程序提交到集群中運行
- spark context在初始化時,會創(chuàng)建sparkenv;driver端的sparkenv中創(chuàng)建了mapoutputtracker用于保存shufflemap端的結果;創(chuàng)建了blockmanagermaster用于管理所有的blockmanager,維護block的元數據;創(chuàng)建了blockmanager,用于保存該節(jié)點block塊,在blockmanager中創(chuàng)建了diskblockmanager用于磁盤數據存儲,創(chuàng)建了memorystore用于內存存儲,后續(xù)過程啟動sparkUI
- 在blockmanager創(chuàng)建過程中還會創(chuàng)建blocktransferservice,用于該節(jié)點與其他節(jié)點通信.在shuffle過程中,reduce端根據blockid到map端拉取數據就是通過該組件實現的
- 在創(chuàng)建schedulerbackend后,會通過Standaloneappclient將application提交到master中,master分配資源啟動executor,executor啟動成功后會再次向driver中進行注冊,通知driver啟動成功
- 當executor初始化完成后,其節(jié)點中的blockmanager會到driver中的blockmanager中進行注冊,driver中的blockmanager也會進行注冊
- 在使用textfile方法的時候,spark會將Hadoop的配置文件進行廣播,其他executor到hdfs中拉取數據時,使用此配置.廣播變量最終會存儲在memorystore中,也會顯示存儲數據的大小和剩余空間的大小
- 在用戶編寫的代碼中action操作會觸發(fā)job操作的執(zhí)行,每個job會被劃分為stage,如果一個stage中所有的父stage都計算完成或沒有父stage,則會提交這個stage
- 每個提交的stage都會被劃分為多個task,task的數量與rdd分區(qū)一致,所有的task計算邏輯都是相同的,將task序列化后進行廣播,這樣可實現多個task在同一個executor執(zhí)行時,僅僅保存一份task的二級制數據,多個task被封裝為taskset交給taskscheduler
- schedulerbackend將等待的task提交到有空閑CPU的executor中,并輸出task的id,stage id,運行的executor,計算的分區(qū),本地化級別等信息
- 當計算的task運行完成時,會將結果返回到driver端,輸出task的計算時間,executor,計算的分區(qū)等信息.當有task計算完成會有executor的CPU,這時會將等待運行的task提交到空閑的executor中,如此循環(huán)往復,直到stage的所有task都完成計算
- 當該stage中所有task都計算完成時,會將taskset從隊列中移除,完成該stage的計算,此時會查找依賴此stage的子stage,將子stage進行提交,計算子stage
- 當某個job的所有stage完成后,該job計算完成,輸出job的計算時間,運行下一個job
Executor端的日志
- driver端負責stage的劃分,task的提交.executor端負責任務的執(zhí)行并將任務結果進行返回.executor在初始化時同樣會創(chuàng)建sparkenv,在sparkenv中創(chuàng)建blockmanager,memorymanager等組件
- 當executor端接收到task時,會運行task,并輸出該task屬于哪個stage,在首次運行task時,會從廣播變量中獲取task的二級制數據,該節(jié)點的blockmanager會從遠程的節(jié)點拉取
- 在task運行過程中,如果對RDD進行緩存,會將該RDD的計算的分區(qū)的數據緩存到節(jié)點的blockmanager,如果指定使用內存,則當內存不足時,會提示緩存失敗
- 當blockmanager中存儲內存不足時,則將內存中的數據溢寫到磁盤中
- 當task運行完成時,會將task的運行結果返回到driver,并輸出計算結果的大小
- 在執(zhí)行shuffle操作時,map端使用exeternalsorter對數據進行分組,按照分區(qū)進行排序,如果內存不足,則會將內存中的數據寫入到磁盤中,為后續(xù)數據迭代留出空間.在reduce端使用exeternalsorter對key進行排序時,如果內存不足則同樣會溢出到磁盤中
- 在reduce端對所有的map端的task中的數據進行聚合時,會使用exeternalappendonlymap組件,如果內存不足,該組件會將數據溢出到磁盤中
2. 程序優(yōu)化
- 并行度
- 避免創(chuàng)建重復的RDD
- RDD持久化
- 廣播變量
- 高性能序列化庫
- 優(yōu)化資源操作連接
3. 資源優(yōu)化
- CPU,spark應用程序分配的CPU的個數決定了集群中能夠同時并行運行的task的個數
- 內存
- 磁盤
- executor數量的權衡:如果僅僅對于CPU的使用而言,其實是相同的,因為每個task的執(zhí)行會占用一個CPU,task不會關心是使用哪個executor的CPU運行的.對于內存使用則不同,因為如果在一個executor中CPU的數量過多,在該executor中執(zhí)行的task數量會變多,如果task需要進行shuffle操作,則所有task會共享同一個executor中的執(zhí)行內存.假如此時一個executor分配的內存過少,則會造成每個task分配的執(zhí)行內存過少.同樣,如果對RDD進行緩存時,在一個executor中,此時也需要增加內存滿足多個分區(qū)的數據緩存使用.如果在任務中使用到了大的廣播變量,此時分配的executor越多,那么共享變量的副本數就越多.此外,executor進程本身也需要消耗內存
- spark管理內存比例
- 使用alluxio加速數據訪問
4. shuffle過程優(yōu)化
- map端聚合
- 文件讀寫緩沖區(qū)
- reduce端并行拉取數量
- 溢寫文件上限
- 數據傾斜