java大數(shù)據(jù)之spark

一狡耻、Spark簡(jiǎn)介

1.1 Spark是什么

Spark是一個(gè)通用的并行計(jì)算框架,由UCBerkeley的AMP實(shí)驗(yàn)室開發(fā)猴凹。Spark基于map reduce 算法模式實(shí)現(xiàn)的分布式計(jì)算夷狰,擁有Hadoop MapReduce所具有的優(yōu)點(diǎn);但不同于Hadoop MapReduce的是Job中間輸出和結(jié)果可以保存在內(nèi)存中精堕,從而不再需要讀寫HDFS孵淘,節(jié)省了磁盤IO耗時(shí),號(hào)稱性能比Hadoop快100倍歹篓。

1.2 Spark官網(wǎng)

http://spark.apache.org/

1.3 Spark架構(gòu)及生態(tài)

(1)Spark Core:包含Spark的基本功能瘫证;尤其是定義RDD的API、操作以及這兩者上的動(dòng)作庄撮。其他Spark的庫(kù)都是構(gòu)建在RDD和Spark Core之上的

(2)Spark SQL:提供通過(guò)Apache Hive的SQL變體Hive查詢語(yǔ)言(HiveQL)與Spark進(jìn)行交互的API背捌。每個(gè)數(shù)據(jù)庫(kù)表被當(dāng)做一個(gè)RDD,Spark SQL查詢被轉(zhuǎn)換為Spark操作洞斯。

(3)Spark Streaming:對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行處理和控制毡庆。Spark Streaming允許程序能夠像普通RDD一樣處理實(shí)時(shí)數(shù)據(jù)

(4)MLlib:一個(gè)常用機(jī)器學(xué)習(xí)算法庫(kù),算法被實(shí)現(xiàn)為對(duì)RDD的Spark操作烙如。這個(gè)庫(kù)包含可擴(kuò)展的學(xué)習(xí)算法,比如分類蝇刀、回歸等需要對(duì)大量數(shù)據(jù)集進(jìn)行迭代的操作然爆。

(5)GraphX:控制圖奴烙、并行圖操作和計(jì)算的一組算法和工具的集合。GraphX擴(kuò)展了RDD API,包含控制圖吨枉、創(chuàng)建子圖、訪問(wèn)路徑上所有頂點(diǎn)的操作


1.4 Spark架構(gòu)的組成圖

(1)Cluster Manager:在standalone模式中即為Master主節(jié)點(diǎn),控制整個(gè)集群,監(jiān)控worker书在。在YARN模式中為資源管理器

(2)Worker節(jié)點(diǎn):從節(jié)點(diǎn)帖族,負(fù)責(zé)控制計(jì)算節(jié)點(diǎn)凉翻,啟動(dòng)Executor或者Driver胞谭。

(3)Driver: 運(yùn)行Application 的main()函數(shù)

(4)Executor:執(zhí)行器,是為某個(gè)Application運(yùn)行在worker node上的一個(gè)進(jìn)程


1.5 Spark三種集群模式

1.Standalone獨(dú)立集群

2.Mesos, apache mesos

3.Yarn, hadoop yarn

1.6 Spark與hadoop比較

(1)Hadoop有兩個(gè)核心模塊彩库,分布式存儲(chǔ)模塊HDFS和分布式計(jì)算模塊Mapreduce

(2)spark本身并沒(méi)有提供分布式文件系統(tǒng),因此spark的分析大多依賴于Hadoop的分布式文件系統(tǒng)HDFS

(3)Hadoop的Mapreduce與spark都可以進(jìn)行數(shù)據(jù)計(jì)算,而相比于Mapreduce鳞仙,spark的速度更快并且提供的功能更加豐富

1.7 Spark運(yùn)行流程

spark運(yùn)行流程圖如下:


(1)構(gòu)建Spark Application的運(yùn)行環(huán)境允耿,啟動(dòng)SparkContext

(2)SparkContext向資源管理器(可以是Standalone提澎,Mesos掂墓,Yarn)申請(qǐng)運(yùn)行Executor資源跨嘉,并啟動(dòng)StandaloneExecutorbackend兑燥,

(3)Executor向SparkContext申請(qǐng)Task

(4)SparkContext將應(yīng)用程序分發(fā)給Executor

(5)SparkContext構(gòu)建成DAG圖嘱支,將DAG圖分解成Stage沛膳、將Taskset發(fā)送給Task Scheduler贞岭,最后由Task Scheduler將Task發(fā)送給Executor運(yùn)行

(6)Task在Executor上運(yùn)行话速,運(yùn)行完釋放所有資源

1.8 Spark運(yùn)行特點(diǎn)

(1)每個(gè)Application獲取專屬的executor進(jìn)程,該進(jìn)程在Application期間一直駐留,并以多線程方式運(yùn)行Task。這種Application隔離機(jī)制是有優(yōu)勢(shì)的雹熬,無(wú)論是從調(diào)度角度看(每個(gè)Driver調(diào)度他自己的任務(wù)),還是從運(yùn)行角度看(來(lái)自不同Application的Task運(yùn)行在不同JVM中)烈菌,當(dāng)然這樣意味著Spark Application不能跨應(yīng)用程序共享數(shù)據(jù)诡壁,除非將數(shù)據(jù)寫入外部存儲(chǔ)系統(tǒng)

(2)Spark與資源管理器無(wú)關(guān),只要能夠獲取executor進(jìn)程宠漩,并能保持相互通信就可以了

(3)提交SparkContext的Client應(yīng)該靠近Worker節(jié)點(diǎn)(運(yùn)行Executor的節(jié)點(diǎn))雕崩,最好是在同一個(gè)Rack里,因?yàn)镾park Application運(yùn)行過(guò)程中SparkContext和Executor之間有大量的信息交換

(4)Task采用了數(shù)據(jù)本地性和推測(cè)執(zhí)行的優(yōu)化機(jī)制

1.9 Spark運(yùn)行模式

(1)Spark的運(yùn)行模式多種多樣,靈活多變,部署在單機(jī)上時(shí)鲤看,既可以用本地模式運(yùn)行世吨,也可以用偽分布模式運(yùn)行罢浇,而當(dāng)以分布式集群的方式部署時(shí)胞锰,也有眾多的運(yùn)行模式可供選擇,這取決于集群的實(shí)際情況兼雄,底層的資源調(diào)度即可以依賴外部資源調(diào)度框架励稳,也可以使用Spark內(nèi)建的Standalone模式。

(2)對(duì)于外部資源調(diào)度框架的支持,目前的實(shí)現(xiàn)包括相對(duì)穩(wěn)定的Mesos模式,以及hadoop YARN模式

(3)本地模式:常用于本地開發(fā)測(cè)試,本地還分別 local 和 local cluster

1.10 Spark特點(diǎn)

(1)運(yùn)行速度快 => Spark擁有DAG執(zhí)行引擎,支持在內(nèi)存中對(duì)數(shù)據(jù)進(jìn)行迭代計(jì)算来涨。官方提供的數(shù)據(jù)表明,如果數(shù)據(jù)由磁盤讀取启盛,速度是Hadoop MapReduce的10倍以上蹦掐,如果數(shù)據(jù)從內(nèi)存中讀取,速度可以高達(dá)100多倍僵闯。

(2)適用場(chǎng)景廣泛 => 大數(shù)據(jù)分析統(tǒng)計(jì),實(shí)時(shí)數(shù)據(jù)處理泳秀,圖計(jì)算及機(jī)器學(xué)習(xí)

(3)易用性 => 編寫簡(jiǎn)單币他,支持80種以上的高級(jí)算子,支持多種語(yǔ)言断盛,數(shù)據(jù)源豐富公你,可部署在多種集群中

(4)容錯(cuò)性高溉躲。Spark引進(jìn)了彈性分布式數(shù)據(jù)集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一組節(jié)點(diǎn)中的只讀對(duì)象集合,這些集合是彈性的,如果數(shù)據(jù)集一部分丟失,則可以根據(jù)“血統(tǒng)”(即充許基于數(shù)據(jù)衍生過(guò)程)對(duì)它們進(jìn)行重建蚀狰。另外在RDD計(jì)算時(shí)可以通過(guò)CheckPoint來(lái)實(shí)現(xiàn)容錯(cuò),而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates禁悠,用戶可以控制采用哪種方式來(lái)實(shí)現(xiàn)容錯(cuò)。

1.11 Spark的適用場(chǎng)景

(1)目前大數(shù)據(jù)處理場(chǎng)景有以下幾個(gè)類型:

(2)復(fù)雜的批量處理(Batch Data Processing)偿警,偏重點(diǎn)在于處理海量數(shù)據(jù)的能力,至于處理速度可忍受分扎,通常的時(shí)間可能是在數(shù)十分鐘到數(shù)小時(shí)列赎;

(3)基于歷史數(shù)據(jù)的交互式查詢(Interactive Query)床未,通常的時(shí)間在數(shù)十秒到數(shù)十分鐘之間

(4)基于實(shí)時(shí)數(shù)據(jù)流的數(shù)據(jù)處理(Streaming Data Processing),通常在數(shù)百毫秒到數(shù)秒之間

1.12 Spark基本概念

Application =>Spark的應(yīng)用程序秉氧,包含一個(gè)Driver program和若干Executor

SparkContext => Spark應(yīng)用程序的入口眷昆,負(fù)責(zé)調(diào)度各個(gè)運(yùn)算資源,協(xié)調(diào)各個(gè)Worker Node上的Executor

Driver Program =>運(yùn)行Application的main()函數(shù)并且創(chuàng)建SparkContext

Executor =>是為Application運(yùn)行在Worker node上的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行Task亚斋,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上作媚。每個(gè)Application都會(huì)申請(qǐng)各自的Executor來(lái)處理任務(wù)

Cluster Manager =>在集群上獲取資源的外部服務(wù) (例如:Standalone、Mesos帅刊、Yarn)

Worker Node =>集群中任何可以運(yùn)行Application代碼的節(jié)點(diǎn)纸泡,運(yùn)行一個(gè)或多個(gè)Executor進(jìn)程

Task =>運(yùn)行在Executor上的工作單元

Job => SparkContext提交的具體Action操作,常和Action對(duì)應(yīng)

Stage =>每個(gè)Job會(huì)被拆分很多組task厚掷,每組任務(wù)被稱為Stage弟灼,也稱TaskSet

RDD =>是Resilient distributed datasets的簡(jiǎn)稱,中文為彈性分布式數(shù)據(jù)集;是Spark最核心的模塊和類

DAGScheduler =>根據(jù)Job構(gòu)建基于Stage的DAG冒黑,并提交Stage給TaskScheduler

TaskScheduler =>將Taskset提交給Worker node集群運(yùn)行并返回結(jié)果

Transformations =>是Spark API的一種類型田绑,Transformation返回值還是一個(gè)RDD,所有的Transformation采用的都是懶策略抡爹,如果只是將Transformation提交是不會(huì)執(zhí)行計(jì)算的

Action =>是Spark API的一種類型掩驱,Action返回值不是一個(gè)RDD,而是一個(gè)scala集合冬竟;計(jì)算只有在Action被提交的時(shí)候計(jì)算才被觸發(fā)欧穴。



二、Spark Core簡(jiǎn)介

2.1 Spark Core之RDD

2.1.1 RDD是什么

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集泵殴,是Spark中最基本的數(shù)據(jù)抽象涮帘,它代表一個(gè)不可變、可分區(qū)笑诅、里面的元素可并行計(jì)算的集合调缨。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性吆你。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中弦叶,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度妇多。

2.1.2 RDD的屬性

(1)partitions(分區(qū)):一組分片(Partition)伤哺,即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō)者祖,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理立莉,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù)七问,如果沒(méi)有指定桃序,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目烂瘫。

(2)partitioner(分區(qū)方法):一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)媒熊。Spark中RDD的計(jì)算是以分片為單位的奇适,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合芦鳍,不需要保存每次計(jì)算的結(jié)果嚷往。

(3)dependencies(依賴關(guān)系):RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD柠衅,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系皮仁。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù)菲宴,而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算贷祈。

(4)compute(獲取分區(qū)迭代列表):一個(gè)Partitioner,即RDD的分片函數(shù)喝峦。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù)势誊,一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner谣蠢。只有對(duì)于于key-value的RDD粟耻,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None眉踱。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量挤忙,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

(5)preferedLocations(優(yōu)先分配節(jié)點(diǎn)列表) :一個(gè)列表谈喳,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)册烈。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置婿禽。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念赏僧,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置谈宛。

2.1.3 RDD的創(chuàng)建方式

(1)通過(guò)讀取文件生成的

由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建次哈,包括本地的文件系統(tǒng)胎署,還有所有Hadoop支持的數(shù)據(jù)集吆录,比如HDFS、Cassandra琼牧、HBase等

scala> val file = sc.textFile("/spark/hello.txt")

(2)通過(guò)并行化的方式創(chuàng)建RDD

由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建恢筝。

scala> val array = Array(1,2,3,4,5)

array: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(array)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26

(3)其他方式

讀取數(shù)據(jù)庫(kù)等等其他的操作。也可以生成RDD巨坊。RDD可以通過(guò)其他的RDD轉(zhuǎn)換而來(lái)的

2.1.4 RDD算子操作之Transformation

主要做的是就是將一個(gè)已有的RDD生成另外一個(gè)RDD撬槽。Transformation具有l(wèi)azy特性(延遲加載)。Transformation算子的代碼不會(huì)真正被執(zhí)行趾撵。只有當(dāng)我們的程序里面遇到一個(gè)action算子的時(shí)候侄柔,代碼才會(huì)真正的被執(zhí)行共啃。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。

常用的Transformation:


2.1.4 RDD算子操作之Action

觸發(fā)代碼的運(yùn)行暂题,我們一段spark代碼里面至少需要有一個(gè)action操作移剪。

常用的Action:


2.1.5 RDD依賴關(guān)系的本質(zhì)內(nèi)幕

由于RDD是粗粒度的操作數(shù)據(jù)集,每個(gè)Transformation操作都會(huì)生成一個(gè)新的RDD薪者,所以RDD之間就會(huì)形成類似流水線的前后依賴關(guān)系纵苛;RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)言津。如圖所示顯示了RDD之間的依賴關(guān)系攻人。


從圖中可知:

窄依賴:是指每個(gè)父RDD的一個(gè)Partition最多被子RDD的一個(gè)Partition所使用,例如map悬槽、filter怀吻、union等操作都會(huì)產(chǎn)生窄依賴;(獨(dú)生子女)

寬依賴:是指一個(gè)父RDD的Partition會(huì)被多個(gè)子RDD的Partition所使用陷谱,例如groupByKey烙博、reduceByKey、sortByKey等操作都會(huì)產(chǎn)生寬依賴烟逊;(超生)

需要特別說(shuō)明的是對(duì)join操作有兩種情況:

(1)圖中左半部分join:如果兩個(gè)RDD在進(jìn)行join操作時(shí)渣窜,一個(gè)RDD的partition僅僅和另一個(gè)RDD中已知個(gè)數(shù)的Partition進(jìn)行join,那么這種類型的join操作就是窄依賴宪躯,例如圖1中左半部分的join操作(join with inputs co-partitioned)乔宿;

(2)圖中右半部分join:其它情況的join操作就是寬依賴,例如圖1中右半部分的join操作(join with inputs not co-partitioned),由于是需要父RDD的所有partition進(jìn)行join的轉(zhuǎn)換访雪,這就涉及到了shuffle详瑞,因此這種類型的join操作也是寬依賴。

總結(jié):

在這里我們是從父RDD的partition被使用的個(gè)數(shù)來(lái)定義窄依賴和寬依賴臣缀,因此可以用一句話概括下:如果父RDD的一個(gè)Partition被子RDD的一個(gè)Partition所使用就是窄依賴坝橡,否則的話就是寬依賴。因?yàn)槭谴_定的partition數(shù)量的依賴關(guān)系精置,所以RDD之間的依賴關(guān)系就是窄依賴计寇;由此我們可以得出一個(gè)推論:即窄依賴不僅包含一對(duì)一的窄依賴,還包含一對(duì)固定個(gè)數(shù)的窄依賴脂倦。

一對(duì)固定個(gè)數(shù)的窄依賴的理解:即子RDD的partition對(duì)父RDD依賴的Partition的數(shù)量不會(huì)隨著RDD數(shù)據(jù)規(guī)模的改變而改變番宁;換句話說(shuō),無(wú)論是有100T的數(shù)據(jù)量還是1P的數(shù)據(jù)量赖阻,在窄依賴中蝶押,子RDD所依賴的父RDD的partition的個(gè)數(shù)是確定的,而寬依賴是shuffle級(jí)別的火欧,數(shù)據(jù)量越大棋电,那么子RDD所依賴的父RDD的個(gè)數(shù)就越多茎截,從而子RDD所依賴的父RDD的partition的個(gè)數(shù)也會(huì)變得越來(lái)越多。

2.1.6 RDD依賴關(guān)系下的數(shù)據(jù)流視圖

在spark中赶盔,會(huì)根據(jù)RDD之間的依賴關(guān)系將DAG圖(有向無(wú)環(huán)圖)劃分為不同的階段稼虎,對(duì)于窄依賴,由于partition依賴關(guān)系的確定性招刨,partition的轉(zhuǎn)換處理就可以在同一個(gè)線程里完成霎俩,窄依賴就被spark劃分到同一個(gè)stage中,而對(duì)于寬依賴沉眶,只能等父RDD shuffle處理完成后打却,下一個(gè)stage才能開始接下來(lái)的計(jì)算。


因此spark劃分stage的整體思路是:從后往前推谎倔,遇到寬依賴就斷開柳击,劃分為一個(gè)stage;遇到窄依賴就將這個(gè)RDD加入該stage中片习。因此在圖2中RDD C,RDD D,RDD E,RDDF被構(gòu)建在一個(gè)stage中,RDD A被構(gòu)建在一個(gè)單獨(dú)的Stage中,而RDD B和RDD G又被構(gòu)建在同一個(gè)stage中捌肴。


在spark中,Task的類型分為2種:ShuffleMapTask和ResultTask藕咏;


簡(jiǎn)單來(lái)說(shuō)状知,DAG的最后一個(gè)階段會(huì)為每個(gè)結(jié)果的partition生成一個(gè)ResultTask,即每個(gè)Stage里面的Task的數(shù)量是由該Stage中最后一個(gè)RDD的Partition的數(shù)量所決定的孽查!而其余所有階段都會(huì)生成ShuffleMapTask饥悴;之所以稱之為ShuffleMapTask是因?yàn)樗枰獙⒆约旱挠?jì)算結(jié)果通過(guò)shuffle到下一個(gè)stage中;也就是說(shuō)上圖中的stage1和stage2相當(dāng)于mapreduce中的Mapper,而ResultTask所代表的stage3就相當(dāng)于mapreduce中的reducer盲再。


在之前動(dòng)手操作了一個(gè)wordcount程序西设,因此可知,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不過(guò)區(qū)別在于:Hadoop中的MapReduce天生就是排序的答朋;而reduceByKey只是根據(jù)Key進(jìn)行reduce贷揽,但spark除了這兩個(gè)算子還有其他的算子;因此從這個(gè)意義上來(lái)說(shuō)梦碗,Spark比Hadoop的計(jì)算算子更為豐富禽绪。

2.2 Spark Core之Job

spark中對(duì)RDD的操作類型分為transformation和action,其中transformation是一種延遲執(zhí)行的操作叉弦,并不會(huì)立即執(zhí)行而是返回一個(gè)含有依賴關(guān)系的RDD丐一,例如map藻糖、filter淹冰、sortBy、flatMap等操作巨柒,當(dāng)調(diào)用action操作時(shí)樱拴,** spark會(huì)通過(guò)DAGScheduler構(gòu)建一個(gè)job的執(zhí)行拓?fù)?包括多個(gè)stage和task,所有的stage和task構(gòu)成了這個(gè)action觸發(fā)的job,最后提交到集群中執(zhí)行柠衍。

2.3 Spark Core之Stage

Job是由stage構(gòu)成的,spark中的stage只有兩種,shuffleMapStage和resultStage。Job劃分Stage的依據(jù)是shuffle與否(即依賴的類型),當(dāng)DagScheduler進(jìn)行DAG執(zhí)行圖構(gòu)建時(shí),遇到一個(gè)shuffle dependency會(huì)生成一個(gè)shuffle map stage,調(diào)用鏈最后一個(gè)shuffle reduce端將生成為result stage 也叫 final stage.


handleJobSubmitted->submitStage(finalStage)->遞歸調(diào)用submitStage,如果當(dāng)前提交的stage沒(méi)有parent stage則直接提交taskSet,否則將當(dāng)前stage加入waiting ?stage列表,每當(dāng)觸發(fā)某些事件時(shí)(MapStageSubmitted晶乔、TaskCompletion..)都會(huì)進(jìn)行一次 waiting stages 的提交珍坊。

2.4 Spark Core之Task

task是邏輯的具體執(zhí)行單元,stage由task構(gòu)成,當(dāng)提交stage時(shí),DAGScheduler會(huì)根據(jù)當(dāng)前stage的類型序列化出不同類型的task并進(jìn)行broadcast,如果是shuffleMapStage則序列化出ShuffleMapTask,如果是resultStage則序列化出ResultTask,其中task的數(shù)量和當(dāng)前stage所依賴的RDD的partition的數(shù)量是一致的正罢,Task作用的數(shù)據(jù)單元是partition,即每個(gè)task只處理一個(gè)partition的數(shù)據(jù)阵漏。

2.5 Spark Core之shuffle

shuffle是影響spark性能的核心所在,和mapreduce中的shuffle概念類似,在spark2.0中shuffleManager只有sortShuffleManager,并且在滿足一定條件下可以使用Serialized sorting 即在tungsten中對(duì)其進(jìn)行的優(yōu)化

2.6 Spark Core之Tungsten

該項(xiàng)目主要是為了讓spark計(jì)算模型能更好的利用硬件性能翻具,主要包含三部分:

1履怯、內(nèi)存管理與二進(jìn)制處理

2、cache-aware

3裆泳、代碼生成


三叹洲、Spark Streaming簡(jiǎn)介

3.1 Spark Streaming概述

Spark Streaming是Spark核心API的一個(gè)擴(kuò)展,可以實(shí)現(xiàn)高吞吐量的工禾、具備容錯(cuò)機(jī)制的實(shí)時(shí)流數(shù)據(jù)的處理运提。支持從多種數(shù)據(jù)源獲取數(shù)據(jù),包括Kafk闻葵、Flume民泵、Twitter、ZeroMQ槽畔、Kinesis 以及TCP sockets洪灯,從數(shù)據(jù)源獲取數(shù)據(jù)之后,可以使用諸如map竟痰、reduce签钩、join和window等高級(jí)函數(shù)進(jìn)行復(fù)雜算法的處理。最后還可以將處理結(jié)果存儲(chǔ)到文件系統(tǒng)坏快,數(shù)據(jù)庫(kù)和現(xiàn)場(chǎng)儀表盤铅檩。在“One Stack rule them all”的基礎(chǔ)上,還可以使用Spark的其他子框架莽鸿,如集群學(xué)習(xí)昧旨、圖計(jì)算等,對(duì)流數(shù)據(jù)進(jìn)行處理祥得。

Spark Streaming處理的數(shù)據(jù)流圖:

3.2 Spark Streaming架構(gòu)

SparkStreaming是一個(gè)對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行高通量兔沃、容錯(cuò)處理的流式處理系統(tǒng),可以對(duì)多種數(shù)據(jù)源(如Kdfka级及、Flume乒疏、Twitter、Zero和TCP?套接字)進(jìn)行類似Map饮焦、Reduce和Join等復(fù)雜操作怕吴,并將結(jié)果保存到外部文件系統(tǒng)窍侧、數(shù)據(jù)庫(kù)或應(yīng)用到實(shí)時(shí)儀表盤。


l計(jì)算流程:Spark Streaming是將流式計(jì)算分解成一系列短小的批處理作業(yè)转绷。這里的批處理引擎是Spark Core伟件,也就是把Spark Streaming的輸入數(shù)據(jù)按照batch size(如1秒)分成一段一段的數(shù)據(jù)(Discretized Stream),每一段數(shù)據(jù)都轉(zhuǎn)換成Spark中的RDD(Resilient Distributed Dataset)议经,然后將Spark Streaming中對(duì)DStream的Transformation操作變?yōu)獒槍?duì)Spark中對(duì)RDD的Transformation操作斧账,將RDD經(jīng)過(guò)操作變成中間結(jié)果保存在內(nèi)存中。整個(gè)流式計(jì)算根據(jù)業(yè)務(wù)的需求可以對(duì)中間的結(jié)果進(jìn)行疊加或者存儲(chǔ)到外部設(shè)備煞肾。下圖顯示了Spark Streaming的整個(gè)流程其骄。


圖Spark Streaming構(gòu)架


l容錯(cuò)性:對(duì)于流式計(jì)算來(lái)說(shuō),容錯(cuò)性至關(guān)重要扯旷。首先我們要明確一下Spark中RDD的容錯(cuò)機(jī)制拯爽。每一個(gè)RDD都是一個(gè)不可變的分布式可重算的數(shù)據(jù)集,其記錄著確定性的操作繼承關(guān)系(lineage)钧忽,所以只要輸入數(shù)據(jù)是可容錯(cuò)的毯炮,那么任意一個(gè)RDD的分區(qū)(Partition)出錯(cuò)或不可用,都是可以利用原始輸入數(shù)據(jù)通過(guò)轉(zhuǎn)換操作而重新算出的耸黑。

對(duì)于Spark Streaming來(lái)說(shuō)桃煎,其RDD的傳承關(guān)系如下圖所示,圖中的每一個(gè)橢圓形表示一個(gè)RDD大刊,橢圓形中的每個(gè)圓形代表一個(gè)RDD中的一個(gè)Partition为迈,圖中的每一列的多個(gè)RDD表示一個(gè)DStream(圖中有三個(gè)DStream),而每一行最后一個(gè)RDD則表示每一個(gè)Batch Size所產(chǎn)生的中間結(jié)果RDD缺菌。我們可以看到圖中的每一個(gè)RDD都是通過(guò)lineage相連接的葫辐,由于Spark Streaming輸入數(shù)據(jù)可以來(lái)自于磁盤,例如HDFS(多份拷貝)或是來(lái)自于網(wǎng)絡(luò)的數(shù)據(jù)流(Spark Streaming會(huì)將網(wǎng)絡(luò)輸入數(shù)據(jù)的每一個(gè)數(shù)據(jù)流拷貝兩份到其他的機(jī)器)都能保證容錯(cuò)性伴郁,所以RDD中任意的Partition出錯(cuò)耿战,都可以并行地在其他機(jī)器上將缺失的Partition計(jì)算出來(lái)。這個(gè)容錯(cuò)恢復(fù)方式比連續(xù)計(jì)算模型(如Storm)的效率更高焊傅。


Spark Streaming中RDD的lineage關(guān)系圖


l實(shí)時(shí)性:對(duì)于實(shí)時(shí)性的討論剂陡,會(huì)牽涉到流式處理框架的應(yīng)用場(chǎng)景。Spark Streaming將流式計(jì)算分解成多個(gè)Spark Job狐胎,對(duì)于每一段數(shù)據(jù)的處理都會(huì)經(jīng)過(guò)Spark DAG圖分解以及Spark的任務(wù)集的調(diào)度過(guò)程鸭栖。對(duì)于目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右)握巢,所以Spark Streaming能夠滿足除對(duì)實(shí)時(shí)性要求非常高(如高頻實(shí)時(shí)交易)之外的所有流式準(zhǔn)實(shí)時(shí)計(jì)算場(chǎng)景晕鹊。

l擴(kuò)展性與吞吐量:Spark目前在EC2上已能夠線性擴(kuò)展到100個(gè)節(jié)點(diǎn)(每個(gè)節(jié)點(diǎn)4Core),可以以數(shù)秒的延遲處理6GB/s的數(shù)據(jù)量(60M records/s),其吞吐量也比流行的Storm高2~5倍捏题,圖4是Berkeley利用WordCount和Grep兩個(gè)用例所做的測(cè)試,在Grep這個(gè)測(cè)試中肉渴,Spark Streaming中的每個(gè)節(jié)點(diǎn)的吞吐量是670k records/s公荧,而Storm是115k records/s。


Spark Streaming與Storm吞吐量比較圖

3.3 Spark Streaming之StreamingContext

有兩種創(chuàng)建StreamingContext的方式:

appName同规,是用來(lái)在Spark UI上顯示的應(yīng)用名稱循狰。master,是一個(gè)Spark券勺、Mesos或者Yarn 集群的URL绪钥,或者是local[*]。

batch interval可以根據(jù)你的應(yīng)用程序的延遲要求以及可用的集群資源情況來(lái)設(shè)置关炼。


一個(gè)StreamingContext定義之后程腹,必須做以下幾件事情:

1、通過(guò)創(chuàng)建輸入DStream來(lái)創(chuàng)建輸入數(shù)據(jù)源儒拂。

2寸潦、通過(guò)對(duì)DStream定義transformation和output算子操作,來(lái)定義實(shí)時(shí)計(jì)算邏輯社痛。

3见转、調(diào)用StreamingContext的start()方法,來(lái)開始實(shí)時(shí)處理數(shù)據(jù)召夹。

4过吻、調(diào)用StreamingContext的awaitTermination()方法含思,來(lái)等待應(yīng)用程序的終止〕丝停可以使用CTRL+C手動(dòng)停止,或者就是讓它持續(xù)不斷的運(yùn)行進(jìn)行計(jì)算淀歇。

5寨典、也可以通過(guò)調(diào)用StreamingContext的stop()方法,來(lái)停止應(yīng)用程序房匆。


StreamingContext需要注意的要點(diǎn):

1耸成、只要一個(gè)StreamingContext啟動(dòng)之后,就不能再往其中添加任何計(jì)算邏輯了浴鸿。比如執(zhí)行start()方法之后井氢,還給某個(gè)DStream執(zhí)行一個(gè)算子。

2岳链、一個(gè)StreamingContext停止之后花竞,是肯定不能夠重啟的。調(diào)用stop()之后,不能再調(diào)用start()

3约急、一個(gè)JVM同時(shí)只能有一個(gè)StreamingContext啟動(dòng)零远。在你的應(yīng)用程序中,不能創(chuàng)建兩個(gè)StreamingContext厌蔽。

4牵辣、調(diào)用stop()方法時(shí),會(huì)同時(shí)停止內(nèi)部的SparkContext奴饮,如果不希望如此纬向,還希望后面繼續(xù)使用SparkContext創(chuàng)建其他類型的Context,比如SQLContext戴卜,那么就用stop(false)逾条。

5、一個(gè)SparkContext可以創(chuàng)建多個(gè)StreamingContext投剥,只要上一個(gè)先用stop(false)停止师脂,再創(chuàng)建下一個(gè)即可。

3.4 Spark Streaming之DStream

DStream(Discretized Stream)作為Spark Streaming的基礎(chǔ)抽象江锨,它代表持續(xù)性的數(shù)據(jù)流危彩。這些數(shù)據(jù)流既可以通過(guò)外部輸入源賴獲取,也可以通過(guò)現(xiàn)有的Dstream的transformation操作來(lái)獲得泳桦。在內(nèi)部實(shí)現(xiàn)上汤徽,DStream由一組時(shí)間序列上連續(xù)的RDD來(lái)表示。每個(gè)RDD都包含了自己特定時(shí)間間隔內(nèi)的數(shù)據(jù)流灸撰。如圖7-3所示谒府。


圖7-3???DStream中在時(shí)間軸下生成離散的RDD序列


對(duì)DStream中數(shù)據(jù)的各種操作也是映射到內(nèi)部的RDD上來(lái)進(jìn)行的,如圖7-4所示浮毯,對(duì)Dtream的操作可以通過(guò)RDD的transformation生成新的DStream完疫。這里的執(zhí)行引擎是Spark。

3.5 Spark Streaming之Input DStream and Receivers

如同Storm的Spout债蓝,Spark Streaming需要指明數(shù)據(jù)源壳鹤。如上例所示的socketTextStream,Spark Streaming以socket連接作為數(shù)據(jù)源讀取數(shù)據(jù)饰迹。當(dāng)然Spark Streaming支持多種不同的數(shù)據(jù)源芳誓,包括Kafka、 Flume啊鸭、HDFS/S3锹淌、Kinesis和Twitter等數(shù)據(jù)源;

3.6 Spark Streaming之Transformations

https://www.cnblogs.com/yhl-yh/p/7651558.html

3.7 Spark Streaming之Operations

https://www.cnblogs.com/yhl-yh/p/7651558.html

3.8 Spark Streaming之Join Operations

(1)DStream對(duì)象之間的Join

  這種join一般應(yīng)用于窗口函數(shù)形成的DStream對(duì)象之間赠制,具體可以參考第一部分中的join操作赂摆,除了簡(jiǎn)單的join之外,還有l(wèi)eftOuterJoin, rightOuterJoin和fullOuterJoin。

(2)DStream和dataset之間的join

  這一種join烟号,可以參考前面transform操作中的示例绊谭。

3.9 Spark Streaming之Output Operations

https://www.cnblogs.com/yhl-yh/p/7651558.html



四、Spark SQL簡(jiǎn)介

4.1 Spark SQL是什么

是Spark用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)模塊,它提供了一個(gè)編程抽象叫做DataFrame,并且作為分布式SQL查詢引擎的作用

4.2 Spark SQL基礎(chǔ)數(shù)據(jù)模型

DataFrame是由“命名列”(類似關(guān)系表的字段定義)所組織起來(lái)的一個(gè)分布式數(shù)據(jù)集合汪拥。你可以把它看成是一個(gè)關(guān)系型數(shù)據(jù)庫(kù)的表达传。

DataFrame可以通過(guò)多種來(lái)源創(chuàng)建:結(jié)構(gòu)化數(shù)據(jù)文件,hive的表喷楣,外部數(shù)據(jù)庫(kù)趟大,或者RDDs

4.3 Spark SQL運(yùn)行流程

1鹤树、將SQL語(yǔ)句通過(guò)詞法和語(yǔ)法解析生成未綁定的邏輯執(zhí)行計(jì)劃(Unresolved LogicalPlan)铣焊,包含Unresolved Relation、Unresolved Function和Unresolved Attribute罕伯,然后在后續(xù)步驟中使用不同的Rule應(yīng)用到該邏輯計(jì)劃上

2曲伊、Analyzer使用Analysis Rules,配合元數(shù)據(jù)(如SessionCatalog 或是 Hive Metastore等)完善未綁定的邏輯計(jì)劃的屬性而轉(zhuǎn)換成綁定的邏輯計(jì)劃追他。具體流程是縣實(shí)例化一個(gè)Simple Analyzer坟募,然后遍歷預(yù)定義好的Batch,通過(guò)父類Rule Executor的執(zhí)行方法運(yùn)行Batch里的Rules邑狸,每個(gè)Rule會(huì)對(duì)未綁定的邏輯計(jì)劃進(jìn)行處理懈糯,有些可以通過(guò)一次解析處理,有些需要多次迭代单雾,迭代直到達(dá)到FixedPoint次數(shù)或前后兩次的樹結(jié)構(gòu)沒(méi)變化才停止操作赚哗。

3、Optimizer使用Optimization Rules硅堆,將綁定的邏輯計(jì)劃進(jìn)行合并屿储、列裁剪和過(guò)濾器下推等優(yōu)化工作后生成優(yōu)化的邏輯計(jì)劃

4、Planner使用Planning Strategies渐逃,對(duì)優(yōu)化的邏輯計(jì)劃進(jìn)行轉(zhuǎn)換(Transform)生成可以執(zhí)行的物理計(jì)劃够掠。根據(jù)過(guò)去的性能統(tǒng)計(jì)數(shù)據(jù),選擇最佳的物理執(zhí)行計(jì)劃CostModel茄菊,最后生成可以執(zhí)行的物理執(zhí)行計(jì)劃樹疯潭,得到SparkPlan

5、在最終真正執(zhí)行物理執(zhí)行計(jì)劃之前面殖,還要進(jìn)行preparations規(guī)則處理袁勺,最后調(diào)用SparkPlan的execute執(zhí)行計(jì)算RDD。

4.4 Spark SQL的誕生

但是畜普,隨著Spark的發(fā)展期丰,對(duì)于野心勃勃的Spark團(tuán)隊(duì)來(lái)說(shuō),Shark對(duì)于Hive的太多依賴(如采用Hive的語(yǔ)法解析器、查詢優(yōu)化器等等)钝荡,制約了Spark的One Stack Rule Them All的既定方針街立,制約了Spark各個(gè)組件的相互集成,所以提出了SparkSQL項(xiàng)目埠通。SparkSQL拋棄原有Shark的代碼赎离,汲取了Shark的一些優(yōu)點(diǎn),如內(nèi)存列存儲(chǔ)(In-Memory Columnar Storage)端辱、Hive兼容性等梁剔,重新開發(fā)了SparkSQL代碼;由于擺脫了對(duì)Hive的依賴性舞蔽,SparkSQL無(wú)論在數(shù)據(jù)兼容荣病、性能優(yōu)化、組件擴(kuò)展方面都得到了極大的方便渗柿,真可謂“退一步个盆,海闊天空”。

4.5 Spark SQL和Hive On Spark的區(qū)別

很多人誤以為Spark SQL和 Hive On Spark是一個(gè)東西朵栖,其實(shí)不然颊亮,這里對(duì)二者做一個(gè)簡(jiǎn)單的介紹:Spark SQL的前身是Shark, Shark 剛開始也是使用了hive里面一些東西的,但隨著后來(lái)的發(fā)展,Shark始終不能很好的支持Hive里面的一些功能陨溅,這是因?yàn)樵诎姹镜母虷ive的多年發(fā)展终惑,有些東西并不能始終支持,Shark 終止后门扇,SparkSQL作為Spark生態(tài)的一員繼續(xù)發(fā)展雹有,而不再受限于Hive,只是兼容Hive悯嗓;而Hive On Spark 是Hive 想要將底層的執(zhí)行引擎在Spark上做很好的支持件舵,并且想要對(duì)Hive的底層執(zhí)行引擎做更多的支持.


4.6 Spark SQL架構(gòu)

SparkSQL架構(gòu)分為前,后脯厨,中铅祸,三個(gè)部分,其中中間部分的Catalyst optimizer是架構(gòu)的核心合武,也是SparkSQL 優(yōu)化的關(guān)鍵所在临梗。


4.7 Spark SQL出現(xiàn)原因

(1)使用SQL來(lái)進(jìn)行大數(shù)據(jù)處理,這本身就為傳統(tǒng)的RDBMS人員降低了要求稼跳,因?yàn)檫@不需要使用人員掌握像mapreduce的編程方法盟庞。

(2)SparkSQL 可以支持SQL API,DataFrame和Dataset API多種API汤善,這意味著開發(fā)人員可以采用DataFrame和Dataset API 進(jìn)行編程什猖,這和采用Spark core 的RDD API 進(jìn)行編程有很大的不同票彪,首先,并不像使用RDD進(jìn)行編程時(shí)不狮,開發(fā)人員在采用不同的編程語(yǔ)言和不同的方式開發(fā)應(yīng)用程序時(shí)降铸,其應(yīng)用程序的性能千差萬(wàn)別,但如果使用DataFrame和Dataset進(jìn)行開發(fā)時(shí)摇零,資深開發(fā)人員和初級(jí)開發(fā)人員開發(fā)的程序性能差異很小推掸,這是因?yàn)镾parkSQL 使用Catalyst optimizer 對(duì)執(zhí)行計(jì)劃做了很好的優(yōu)化。

(3)其次驻仅,SparkSQL既然對(duì)底層使用了很好的優(yōu)化方式谅畅,這就讓開發(fā)人員在不熟悉底層優(yōu)化方式時(shí)也可以進(jìn)行很好的程序開發(fā),降低了開發(fā)人員對(duì)程序開發(fā)的門檻噪服。

(4)最后毡泻,SparkSQL 對(duì)外部數(shù)據(jù)源(External Datasource)做了很好的支持,像對(duì)關(guān)系型數(shù)據(jù)庫(kù)芯咧,如mysql,可以使用外部數(shù)據(jù)源的方式使用SparkSQL對(duì)數(shù)據(jù)庫(kù)里面的數(shù)據(jù)直接進(jìn)行處理牙捉,而不需要像以前使用sqoop導(dǎo)入導(dǎo)出竹揍。

4.8 Spark SQL如何使用

首先敬飒,利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame

然后,利用DataFrame上豐富的api進(jìn)行查詢芬位、轉(zhuǎn)換

最后无拗,將結(jié)果進(jìn)行展現(xiàn)或存儲(chǔ)為各種外部數(shù)據(jù)形式


4.9 Spark SQL愿景

1.寫更少的代碼

2.讀更少的數(shù)據(jù)

3.把優(yōu)化的工作交由底層的優(yōu)化器運(yùn)行

4.14 SparkContext運(yùn)行原理分析

前面我們隊(duì)Spark SQL運(yùn)行架構(gòu)進(jìn)行分析,知道從輸入SQL語(yǔ)句到生成Dataset分為5個(gè)步驟昧碉,但實(shí)際運(yùn)行過(guò)程中在輸入SQL語(yǔ)句之前英染,Spark還有加載SessionCatalog步驟。

4.14.1 使用SessionCatalog保存元數(shù)據(jù)

在解析SQL語(yǔ)句之前需要初始化SQLContext被饿,它定義了Spark SQL執(zhí)行的上下文四康,并把元數(shù)據(jù)保存在SessionCatalog中,這些元數(shù)據(jù)包括表名稱狭握、表字段名稱和字段類型等闪金。

SessionCatalog中保存的是表名和邏輯執(zhí)行計(jì)劃對(duì)應(yīng)的哈希列表,這些數(shù)據(jù)將在解析未綁定的邏輯計(jì)劃上使用论颅。(SessionCatalog中的表名對(duì)應(yīng)的邏輯執(zhí)行計(jì)劃是什么哎垦?是這個(gè)Dataset對(duì)應(yīng)的邏輯執(zhí)行計(jì)劃)

4.14.2 使用Antlr生成未綁定的邏輯計(jì)劃

Spark 2.0版本起使用Antlr進(jìn)行詞法和語(yǔ)法解析。使用Antlr生成未綁定的邏輯計(jì)劃分為兩個(gè)階段:第一階段的過(guò)程為詞法分析(Lexical Analysis)恃疯,負(fù)責(zé)將符號(hào)(Token)分組成符號(hào)類(Token class or Token type)漏设,第二階段就是真正的Parser,默認(rèn)Antlr會(huì)構(gòu)建出一顆分析樹(Parser Tree)或者叫語(yǔ)法樹(Syntax

Tree)今妄。

SQLContext類中定義了SQL的解析方法parseSql郑口。具體的SQL解析在AbastrctSqlParser抽象類中的parse方法進(jìn)行鸳碧,解析完畢后生成語(yǔ)法樹,語(yǔ)法樹會(huì)根據(jù)系統(tǒng)初始化的AstBuilder解析生成表達(dá)式犬性、邏輯計(jì)劃或表標(biāo)識(shí)對(duì)象杆兵。

在AbstractSqlParse的parse方法中,先實(shí)例化詞法解析器SqlBaseLexer和語(yǔ)法解析器SqlBaseParser仔夺,然后嘗試用Antlr較快的解析模式SLL琐脏,如果解析失敗,則會(huì)再嘗試使用普通解析模型LL缸兔,解析完畢后返回解析結(jié)果日裙。

4.14.3 使用Analyzer綁定邏輯執(zhí)行計(jì)劃

該階段Analyzer使用Analysis Rules,結(jié)合SessionCatalog元數(shù)據(jù)惰蜜,對(duì)未綁定的邏輯計(jì)劃進(jìn)行解析昂拂,生成了已綁定的邏輯計(jì)劃。在該處理過(guò)程中抛猖,先實(shí)例化一個(gè)Analyzer格侯,在Analyzer中定義了FiexedPoint和Seq[Batch]兩個(gè)變量,其中FixedPoint為迭代次數(shù)的上線财著,而Seq[Batch]為所定義需要執(zhí)行批處理的序列联四,每個(gè)批處理由一系列Rule和策略所組成,策略一般分為Once和FixedPoint(可理解為迭代次數(shù))

4.14.4 使用Optimizer優(yōu)化邏輯計(jì)劃

Optimizer的實(shí)現(xiàn)和處理方式同Analyzer類似撑教,在該類中定義一系列Rule并同樣繼承于RuleExecutor朝墩。利用這些Rule對(duì)邏輯計(jì)劃和Expression進(jìn)行迭代處理,從而達(dá)到對(duì)樹的節(jié)點(diǎn)進(jìn)行合并和優(yōu)化伟姐。其中主要的優(yōu)化策略總結(jié)起來(lái)是合并收苏、列裁剪和過(guò)濾器下推等幾大類。

Optimizer的優(yōu)化策略不僅對(duì)已綁定的邏輯計(jì)劃進(jìn)行優(yōu)化愤兵,而且對(duì)邏輯計(jì)劃中的Expression也進(jìn)行了優(yōu)化鹿霸,其原理就是遍歷樹,然后應(yīng)用優(yōu)化Rule秆乳,但是注意一點(diǎn)懦鼠,對(duì)邏輯計(jì)劃處理時(shí)先序遍歷,而對(duì)Expression的處理是后續(xù)遍歷(根據(jù)樹節(jié)點(diǎn)類型來(lái)判斷是邏輯計(jì)劃還是Expression嗎矫夷?)

4.14.5 使用SparkPlanner生成可執(zhí)行物理計(jì)劃

SparkPlanner使用Planning Strategies對(duì)優(yōu)化的邏輯計(jì)劃進(jìn)行轉(zhuǎn)換(Transform)葛闷,生成可以執(zhí)行的物理計(jì)劃。在QueryExecution類代碼中双藕,調(diào)用SparkPlanner.plan方法對(duì)優(yōu)化的邏輯計(jì)劃進(jìn)行處理淑趾,而SparkPlanner并未定義plan方法,實(shí)際是調(diào)用SparkPlanner的祖父類QueyrPlanner的plan方法忧陪,然后會(huì)返回一個(gè)Iterator[Physicalplan]扣泊。

SparkPlanner繼承于SparkStrategies近范,而SparkStrategies繼承了QueryPlanner。其中SparkStrategies包含了一系列特定的Strategies延蟹,這些Strategies是繼承自QueryPlanner中定義的GenericStrategy评矩。在SparkPlanner通過(guò)改寫祖父類QueryPlanner中strategies策略變量,在該變量中定義了轉(zhuǎn)換成物理計(jì)劃所執(zhí)行的策略阱飘。

4.14.6 使用QueryExecution執(zhí)行物理計(jì)劃

在該步驟中先調(diào)用SparkPlanner.preparations對(duì)物理計(jì)劃進(jìn)行準(zhǔn)備工作斥杜,規(guī)范返回?cái)?shù)據(jù)行的格式等,然后調(diào)用SparkPlan.execute執(zhí)行物理計(jì)劃沥匈,從數(shù)據(jù)庫(kù)中查詢數(shù)據(jù)并生成RDD蔗喂。

SparkPlan的preparations其實(shí)是一個(gè)RuleExecutor[SparkPlan],它會(huì)調(diào)用RuleExecutor的execut方法對(duì)前面生成的物理計(jì)劃應(yīng)用Rule進(jìn)行匹配高帖,最終生成一個(gè)SparkPlan缰儿。

SparkPlan繼承于QueryPlan,SparkPlan中定義了SQL語(yǔ)句執(zhí)行的execute方法散址,執(zhí)行完execute方法返回的是一個(gè)RDD乖阵,之后可以運(yùn)行Spark作業(yè)對(duì)該RDD進(jìn)行操作。

4.15 Spark SQL小結(jié)

1)Spark SQL的應(yīng)用并不局限于SQL预麸;

2)訪問(wèn)hive瞪浸、json、parquet等文件的數(shù)據(jù)师崎;

3)SQL只是Spark SQL的一個(gè)功能而已默终;

4)Spark SQL提供了SQL的api椅棺、DataFrame和Dataset的API犁罩;


五、SparkMLlib簡(jiǎn)介

5.1 SparkMLlib是什么

MLlib是Spark的機(jī)器學(xué)習(xí)(Machine Learning)庫(kù)两疚,旨在簡(jiǎn)化機(jī)器學(xué)習(xí)的工程實(shí)踐工作床估,并方便擴(kuò)展到更大規(guī)模。MLlib由一些通用的學(xué)習(xí)算法和工具組成诱渤,包括分類丐巫、回歸、聚類勺美、協(xié)同過(guò)濾递胧、降維等,同時(shí)還包括底層的優(yōu)化原語(yǔ)和高層的管道API赡茸。具體來(lái)說(shuō)缎脾,其主要包括以下幾方面的內(nèi)容:

(1)算法工具:常用的學(xué)習(xí)算法,如分類占卧、回歸遗菠、聚類和協(xié)同過(guò)濾联喘;

(2)特征化公交:特征提取、轉(zhuǎn)化辙纬、降維豁遭,和選擇公交;

(3)管道(Pipeline):用于構(gòu)建贺拣、評(píng)估和調(diào)整機(jī)器學(xué)習(xí)管道的工具;

(4)持久性:保存和加載算法蓖谢,模型和管道;

(5)實(shí)用工具:線性代數(shù),統(tǒng)計(jì)譬涡,數(shù)據(jù)處理等工具蜈抓。

5.2 SparkMLlib調(diào)優(yōu)

MLlib是spark的可以擴(kuò)展的機(jī)器學(xué)習(xí)庫(kù),由以下部分組成:通用的學(xué)習(xí)算法和工具類昂儒,包括分類沟使,回歸,聚類渊跋,協(xié)同過(guò)濾腊嗡,降維,當(dāng)然也包括調(diào)優(yōu)的部分

Data

?typesBasic

statistics(基本統(tǒng)計(jì))

summary statistics概括統(tǒng)計(jì)correlations 相關(guān)性stratified sampling 分層取樣hypothesis testing 假設(shè)檢驗(yàn)random data generation 隨機(jī)數(shù)生成Classification

and regression(分類一般針對(duì)離散型數(shù)據(jù)而言的拾酝,回歸是針對(duì)連續(xù)型數(shù)據(jù)的燕少。本質(zhì)上是一樣的)

linear

models (SVMs, logistic regression, linear regression)線性模型(支持向量機(jī),邏輯回歸蒿囤,線性回歸)naive

Bayes貝葉斯算法decision

trees決策樹ensembles

of trees(Random Forests and Gradient-Boosted Trees)多種樹(隨機(jī)森林和梯度增強(qiáng)樹)Collaborative

filtering協(xié)同過(guò)濾

alternating least squares (ALS)(交替最小二乘法(ALS) )Clustering

聚類

k-means k均值算法Dimensionality

reduction (降維)

singular value decomposition (SVD)奇異值分解principal component analysis (PCA)主成分分析Feature

extraction and transformation特征提取和轉(zhuǎn)化Optimization (developer)優(yōu)化部分

stochastic gradient descent隨機(jī)梯度下降limited-memory BFGS (L-BFGS) 短時(shí)記憶的BFGS (擬牛頓法中的一種,解決非線性問(wèn)題)

六客们、SparkGraphX簡(jiǎn)介

6.1 Spark GraphX什么

Spark GraphX是一個(gè)分布式圖處理框架,它是基于Spark平臺(tái)提供對(duì)圖計(jì)算和圖挖掘簡(jiǎn)潔易用的而豐富的接口材诽,極大的方便了對(duì)分布式圖處理的需求底挫。

6.2 Spark GraphX的框架

設(shè)計(jì)GraphX時(shí),點(diǎn)分割和GAS都已成熟脸侥,在設(shè)計(jì)和編碼中針對(duì)它們進(jìn)行了優(yōu)化建邓,并在功能和性能之間尋找最佳的平衡點(diǎn)。如同Spark本身睁枕,每個(gè)子模塊都有一個(gè)核心抽象官边。GraphX的核心抽象是Resilient Distributed Property Graph,一種點(diǎn)和邊都帶屬性的有向多重圖外遇。它擴(kuò)展了Spark RDD的抽象注簿,有Table和Graph兩種視圖,而只需要一份物理存儲(chǔ)跳仿。兩種視圖都有自己獨(dú)有的操作符诡渴,從而獲得了靈活操作和執(zhí)行效率。


如同Spark塔嬉,GraphX的代碼非常簡(jiǎn)潔玩徊。GraphX的核心代碼只有3千多行租悄,而在此之上實(shí)現(xiàn)的Pregel模式,只要短短的20多行恩袱。GraphX的代碼結(jié)構(gòu)整體下圖所示泣棋,其中大部分的實(shí)現(xiàn),

都是圍繞Partition的優(yōu)化進(jìn)行的畔塔。這在某種程度上說(shuō)明了點(diǎn)分割的存儲(chǔ)和相應(yīng)的計(jì)算優(yōu)化潭辈,的確是圖計(jì)算框架的重點(diǎn)和難點(diǎn)。

6.3 Spark GraphX實(shí)現(xiàn)分析

如同Spark本身澈吨,每個(gè)子模塊都有一個(gè)核心抽象把敢。GraphX的核心抽象是Resilient Distributed Property Graph,一種點(diǎn)和邊都帶屬性的有向多重圖谅辣。它擴(kuò)展了Spark RDD的抽象修赞,有Table和Graph兩種視圖,而只需要一份物理存儲(chǔ)桑阶。兩種視圖都有自己獨(dú)有的操作符柏副,從而獲得了靈活操作和執(zhí)行效率。


GraphX的底層設(shè)計(jì)有以下幾個(gè)關(guān)鍵點(diǎn)蚣录。

對(duì)Graph視圖的所有操作割择,最終都會(huì)轉(zhuǎn)換成其關(guān)聯(lián)的Table視圖的RDD操作來(lái)完成。這樣對(duì)一個(gè)圖的計(jì)算萎河,最終在邏輯上荔泳,等價(jià)于一系列RDD的轉(zhuǎn)換過(guò)程。因此虐杯,Graph最終具備了RDD的3個(gè)關(guān)鍵特性:Immutable玛歌、Distributed和Fault-Tolerant,其中最關(guān)鍵的是Immutable(不變性)厦幅。邏輯上沾鳄,所有圖的轉(zhuǎn)換和操作都產(chǎn)生了一個(gè)新圖;物理上确憨,GraphX會(huì)有一定程度的不變頂點(diǎn)和邊的復(fù)用優(yōu)化,對(duì)用戶透明瓤的。

兩種視圖底層共用的物理數(shù)據(jù)休弃,由RDD[Vertex-Partition]和RDD[EdgePartition]這兩個(gè)RDD組成。點(diǎn)和邊實(shí)際都不是以表Collection[tuple]的形式存儲(chǔ)的圈膏,而是由VertexPartition/EdgePartition在內(nèi)部存儲(chǔ)一個(gè)帶索引結(jié)構(gòu)的分片數(shù)據(jù)塊塔猾,以加速不同視圖下的遍歷速度。不變的索引結(jié)構(gòu)在RDD轉(zhuǎn)換過(guò)程中是共用的稽坤,降低了計(jì)算和存儲(chǔ)開銷丈甸。


圖的分布式存儲(chǔ)采用點(diǎn)分割模式糯俗,而且使用partitionBy方法,由用戶指定不同的劃分策略(PartitionStrategy)睦擂。劃分策略會(huì)將邊分配到各個(gè)EdgePartition得湘,頂點(diǎn)Master分配到各個(gè)VertexPartition,EdgePartition也會(huì)緩存本地邊關(guān)聯(lián)點(diǎn)的Ghost副本顿仇。劃分策略的不同會(huì)影響到所需要緩存的Ghost副本數(shù)量淘正,以及每個(gè)EdgePartition分配的邊的均衡程度,需要根據(jù)圖的結(jié)構(gòu)特征選取最佳策略臼闻。目前有EdgePartition2d鸿吆、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut這四種策略述呐。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末惩淳,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子乓搬,更是在濱河造成了極大的恐慌黎泣,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缤谎,死亡現(xiàn)場(chǎng)離奇詭異抒倚,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)坷澡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門托呕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人频敛,你說(shuō)我怎么就攤上這事项郊。” “怎么了斟赚?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵着降,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我拗军,道長(zhǎng)任洞,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任发侵,我火速辦了婚禮交掏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘刃鳄。我一直安慰自己盅弛,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著挪鹏,像睡著了一般见秽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上讨盒,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天解取,我揣著相機(jī)與錄音,去河邊找鬼催植。 笑死肮蛹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的创南。 我是一名探鬼主播伦忠,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼稿辙!你這毒婦竟也來(lái)了昆码?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤邻储,失蹤者是張志新(化名)和其女友劉穎赋咽,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吨娜,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡脓匿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了宦赠。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片陪毡。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖勾扭,靈堂內(nèi)的尸體忽然破棺而出毡琉,到底是詐尸還是另有隱情,我是刑警寧澤妙色,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布桅滋,位于F島的核電站,受9級(jí)特大地震影響身辨,放射性物質(zhì)發(fā)生泄漏丐谋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一栅表、第九天 我趴在偏房一處隱蔽的房頂上張望笋鄙。 院中可真熱鬧,春花似錦怪瓶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)找岖。三九已至,卻和暖如春敛滋,著一層夾襖步出監(jiān)牢的瞬間许布,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工绎晃, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蜜唾,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓庶艾,卻偏偏與公主長(zhǎng)得像袁余,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子咱揍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容