寫在前面
本系列是綜合了自己在學(xué)習spark過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來蓬戚。寫這樣一個系列僅僅是為了梳理個人學(xué)習spark的筆記記錄浮毯,并非為了做什么教程泞辐,所以一切以個人理解梳理為主,沒有必要的細節(jié)就不會記錄了钢猛。若想深入了解,最好閱讀參考文章和官方文檔。
其次欢顷,本系列是基于目前最新的 spark 1.6.0 系列開始的铺峭,spark 目前的更新速度很快墓怀,記錄一下版本好還是必要的。
最后卫键,如果各位覺得內(nèi)容有誤傀履,歡迎留言備注,所有留言 24 小時內(nèi)必定回復(fù)莉炉,非常感謝钓账。
Tips: 如果插圖看起來不明顯,可以:1. 放大網(wǎng)頁絮宁;2. 新標簽中打開圖片梆暮,查看原圖哦。
1. Application
用戶在 spark 上構(gòu)建的程序绍昂,包含了 driver 程序以及在集群上運行的程序代碼惕蹄,物理機器上涉及了 driver,master,worker 三個節(jié)點.
2. Driver Program
創(chuàng)建 sc 卖陵,定義 udf 函數(shù)遭顶,定義一個 spark 應(yīng)用程序所需要的三大步驟的邏輯:加載數(shù)據(jù)集,處理數(shù)據(jù)泪蔫,結(jié)果展示棒旗。
3. Cluster Manager
集群的資源管理器,在集群上獲取資源的外部服務(wù)撩荣。
拿 Yarn 舉例铣揉,客戶端程序會向 Yarn 申請計算我這個任務(wù)需要多少的 memory,多少 CPU餐曹,etc逛拱。
然后 Cluster Manager 會通過調(diào)度告訴客戶端可以使用,然后客戶端就可以把程序送到每個 Worker Node 上面去執(zhí)行了台猴。
4. Worker Node
集群中任何一個可以運行spark應(yīng)用代碼的節(jié)點朽合。Worker Node就是物理節(jié)點,可以在上面啟動Executor進程饱狂。
5. Executor
在每個 Worker Node 上為某應(yīng)用啟動的一個進程曹步,該進程負責運行任務(wù),并且負責將數(shù)據(jù)存在內(nèi)存或者磁盤上休讳,每個任務(wù)都有各自獨立的 Executor讲婚。
Executor 是一個執(zhí)行 Task 的容器。它的主要職責是:
- 初始化程序要執(zhí)行的上下文 SparkEnv俊柔,解決應(yīng)用程序需要運行時的 jar 包的依賴筹麸,加載類。
- 同時還有一個 ExecutorBackend 向 cluster manager 匯報當前的任務(wù)狀態(tài)雏婶,這一方面有點類似 hadoop的 tasktracker 和 task物赶。
總結(jié):Executor 是一個應(yīng)用程序運行的監(jiān)控和執(zhí)行容器。
6. Jobs
包含很多 task 的并行計算尚骄,可以認為是 Spark RDD 里面的 action块差,每個 action 的觸發(fā)會生成一個job。
用戶提交的 Job 會提交給 DAGScheduler倔丈,Job 會被分解成 Stage憨闰,Stage 會被細化成 Task,Task 簡單的說就是在一個數(shù)據(jù) partition 上的單個數(shù)據(jù)處理流程需五。關(guān)于 job鹉动,stage,task宏邮,詳細可以參考這篇文章:『 Spark 』6. 深入研究 spark 運行原理之 job, stage, task
A job is triggered by an action
, like count()
or saveAsTextFile()
, click on a job to see info about the stages
of tasks
inside it.
7. Stage
一個 Job 會被拆分為多組 Task泽示,每組任務(wù)被稱為一個 Stage 就像 Map Stage缸血, Reduce Stage。
Stage 的劃分在 RDD 的論文中有詳細的介紹械筛,簡單的說是以 shuffle 和 result 這兩種類型來劃分捎泻。
在 Spark 中有兩類 task:
-
shuffleMapTask
輸出是shuffle所需數(shù)據(jù), stage的劃分也以此為依據(jù),shuffle之前的所有變換是一個stage埋哟,shuffle之后的操作是另一個stage笆豁。
-
resultTask,
輸出是result赤赊,比如 rdd.parallize(1 to 10).foreach(println) 這個操作沒有shuffle闯狱,直接就輸出了,那么只有它的task是resultTask抛计,stage也只有一個哄孤;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 這個job因為有reduce,所以有一個shuffle過程吹截,那么reduceByKey之前的是一個stage瘦陈,執(zhí)行shuffleMapTask,輸出shuffle所需的數(shù)據(jù)饭弓,reduceByKey到最后是一個stage双饥,直接就輸出結(jié)果了媒抠。如果job中有多次shuffle弟断,那么每個shuffle之前都是一個stage。
8. Task
被送到 executor 上的工作單元趴生。
9. Partition
Partition 類似 hadoop 的 Split阀趴,計算是以 partition 為單位進行的,當然 partition 的劃分依據(jù)有很多苍匆,這是可以自己定義的刘急,像 HDFS 文件,劃分的方式就和 MapReduce 一樣浸踩,以文件的 block 來劃分不同的 partition叔汁。總而言之检碗,Spark 的 partition 在概念上與 hadoop 中的 split 是相似的据块,提供了一種劃分數(shù)據(jù)的方式。
10. RDD
先看看原文 Resilient Distributed Datasets: A Fault-Tolerant Abstraction for
In-Memory Cluster Computing 是怎么介紹 RDD 的折剃。
a distributed memory abstraction
that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.
RDDs are motivated by two types of applications that current computing frameworks handle inefficiently:
- iterative algorithms;
- interactive data mining tools;
In both cases, keeping data in memory can improve performance by an order of magnitude.
To achieve fault tolerance efficiently, RDDs provide a restricted form of shared memory, based on coarsegrained transformations rather than fine-grained updates to shared state. However, we show that RDDs are expressive enough to capture a wide class of computations, including recent specialized programming models for iterative jobs, such as Pregel, and new applications that these models do not capture. We have implemented RDDs in a system called Spark, which we evaluate through a variety of user applications and benchmarks.
每個RDD有5個主要的屬性:
- 一組分片(partition)另假,即數(shù)據(jù)集的基本組成單位
- 一個計算每個分片的函數(shù)
- 對parent RDD的依賴,這個依賴描述了RDD之間的lineage
- 對于key-value的RDD怕犁,一個Partitioner边篮,這是可選擇的
- 一個列表己莺,存儲存取每個partition的preferred位置。對于一個HDFS文件來說戈轿,存儲每個partition所在的塊的位置凌受。這也是可選擇的
把上面這5個主要的屬性總結(jié)一下,可以得出RDD的大致概念思杯。首先要知道胁艰,RDD大概是這樣一種表示數(shù)據(jù)集的東西,它具有以上列出的一些屬性智蝠。是spark項目組設(shè)計用來表示數(shù)據(jù)集的一種數(shù)據(jù)結(jié)構(gòu)腾么。而spark項目組為了讓RDD能handle更多的問題,又規(guī)定RDD應(yīng)該是只讀的杈湾,分區(qū)記錄的一種數(shù)據(jù)集合中解虱。可以通過兩種方式來創(chuàng)建RDD:一種是基于物理存儲中的數(shù)據(jù)漆撞,比如說磁盤上的文件殴泰;另一種,也是大多數(shù)創(chuàng)建RDD的方式浮驳,即通過其他RDD來創(chuàng)建【以后叫做轉(zhuǎn)換】而成悍汛。而正因為RDD滿足了這么多特性,所以spark把RDD叫做Resilient Distributed Datasets至会,中文叫做彈性分布式數(shù)據(jù)集离咐。很多文章都是先講RDD的定義,概念奉件,再來說RDD的特性宵蛀。我覺得其實也可以倒過來,通過RDD的特性反過來理解RDD的定義和概念县貌,通過這種由果溯因的方式來理解RDD也未嘗不可术陶。反正對我個人而言這種方式是挺好的。
RDD是Spark的核心煤痕,也是整個Spark的架構(gòu)基礎(chǔ)梧宫,可以總下出幾個它的特性來:
- 它是不變的數(shù)據(jù)結(jié)構(gòu)存儲
- 它是支持跨集群的分布式數(shù)據(jù)結(jié)構(gòu)
- 可以根據(jù)數(shù)據(jù)記錄的key對結(jié)構(gòu)進行分區(qū)
- 提供了粗粒度的操作,且這些操作都支持分區(qū)
- 它將數(shù)據(jù)存儲在內(nèi)存中摆碉,從而提供了低延遲性
關(guān)于 rdd 的更多詳情塘匣,可以參考這篇文章:『 Spark 』4. spark 之 RDD
11. sc.parallelize
先看看 api 文檔里是怎么說的:parallelize
- parallelize(c, numSlices=None)
Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
簡單的說,parallelize 就是把 driver 端定義的一個數(shù)據(jù)集兆解,或者一個獲取數(shù)據(jù)集的生成器馆铁,分發(fā)到 worker 上的 executor 中,以供后續(xù)分析锅睛。這種方式在測試代碼邏輯時經(jīng)常用到埠巨,但在構(gòu)建真正的 spark 應(yīng)用程序時很少會用到历谍,一般都是從 hdfs 或者數(shù)據(jù)庫去讀取數(shù)據(jù)。
12. code distribute
提交 spark 應(yīng)用時辣垒,spark 會把應(yīng)用代碼分發(fā)到所有的 worker 上面望侈,應(yīng)用依賴的包需要在所有的worker上都存在,有兩種解決 worker 上相關(guān)包依賴的問題:
- 選用一些工具統(tǒng)一部署 spark cluster勋桶;
- 在提交 spark 應(yīng)用的時候脱衙,指定應(yīng)用依賴的相關(guān)包,把 應(yīng)用代碼例驹,應(yīng)用依賴包 一起分發(fā)到 worker捐韩;
13. cache priority
cache 是否支持 priority,目前不支持鹃锈,而且 spark 里面對 rdd 的 cache 和我們常見的緩存系統(tǒng)是不一樣的荤胁。細節(jié)可以找我討論。
14. cores
The number of cores to use on each executor. For YARN and standalone mode only. In standalone mode, setting this parameter allows an application to run multiple executors on the same worker, provided that there are enough cores on that worker. Otherwise, only one executor per application will run on each worker.
每一個 core屎债,相當于一個 worker 上的進程仅政,這些進程會同時執(zhí)行分配到這個 worker 上的任務(wù)。簡單的說盆驹,就是 spark manager 把一個 job 切分幾個 task 分發(fā)到 worker 上同步執(zhí)行圆丹,而每個 worker 把分配給自己的 task 再切分成幾個 subtask,分配給當前 worker 上的不同進程躯喇。
15. Memory
分配給 spark 應(yīng)用的內(nèi)存是僅僅給 cache 數(shù)據(jù)用嗎辫封?
不是,分配給 spark 應(yīng)用的內(nèi)存有三個方面的應(yīng)用:
- spark 本身
- spark 應(yīng)用
- spark 應(yīng)用過程中 runtime 使用玖瘸,比如 UDF 函數(shù)
- spark 應(yīng)用中的 cache
16. RDD narrow/wide dependences
RDD 之間的依賴類別[ 或者秸讹,創(chuàng)建一個 RDD 的不同方法 ]
17. 本地內(nèi)存與集群內(nèi)存
所謂本地內(nèi)存檀咙,是指在 driver 端的程序所需要的內(nèi)存雅倒,由 driver 機器提供,一般用來生成測試數(shù)據(jù)弧可,接受運算結(jié)果等蔑匣;
所謂集群內(nèi)存,是指提交到集群的作業(yè)能夠向集群申請的最多內(nèi)存使用量棕诵,一般用來存儲關(guān)鍵數(shù)據(jù)裁良;
18. 限制用戶使用的內(nèi)存
可以在啟動 spark 應(yīng)用的時候申請;完全可控校套。
19. 當用戶申請總資源超過當前集群總資源
FIFO 資源分配方式价脾。
20. SparkContext [經(jīng)常簡稱為 sc]
spark app 的起點和入口,一般用來加載數(shù)據(jù)集笛匙,生成第一個 rdd侨把。
21. 對一個 rdd 多次 cache 會有什么影響嗎犀变?
不會,只會cache一次秋柄。stackoverflow
4. Next
下一篇获枝,通過幾個簡單的例子來介紹 spark 的基本編程模式。
5. 打開微信骇笔,掃一掃省店,點一點,棒棒的
參考文章
- spark-rdd-paper : Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
- spark python API