spark大數(shù)據(jù)架構(gòu)初學(xué)入門(mén)基礎(chǔ)詳解

Spark是什么


a)?是一種通用的大數(shù)據(jù)計(jì)算框架

b)?Spark Core?離線(xiàn)計(jì)算

Spark SQL?交互式查詢(xún)

Spark Streaming?實(shí)時(shí)流式計(jì)算

Spark MLlib?機(jī)器學(xué)習(xí)

Spark GraphX?圖計(jì)算

c)?特點(diǎn):

i.?一站式:一個(gè)技術(shù)堆棧解決大數(shù)據(jù)領(lǐng)域的計(jì)算問(wèn)題

ii.?基于內(nèi)存

d)?Spark2009年誕生于伯克利大學(xué)的AMPLab實(shí)驗(yàn)室

2010年正式開(kāi)源了Spark項(xiàng)目

2013年Spark成為Apache下的項(xiàng)目

2014年飛速發(fā)展露久,成為Apache的頂級(jí)項(xiàng)目

2015年在國(guó)內(nèi)興起吭产,代替mr,hive,storm等

作者:辛湜(shi)

e)?Spark和Hive:

Spark優(yōu)點(diǎn):

i.?速度快

ii.?Spark SQL支持大量不同的數(shù)據(jù)源

f)?Spark?和Storm

i.?計(jì)算模型不一樣

ii.?Spark吞吐量大

g)?特點(diǎn):快侦锯,易用,通用,兼容性

h)?spark運(yùn)行模式

i.?local(本地)

ii.?standalone(集群)

iii.?on yarn(由?yarn作為資源調(diào)度Spark負(fù)責(zé)任務(wù)調(diào)度和計(jì)算)

iv.?on mesos(由mesos作為資源調(diào)度S)

v.?on cloud()

i)?配置步驟

=======================on yarn====================

【說(shuō)明】

1.?spark任務(wù)運(yùn)行在yarn上拆檬,由yarn來(lái)進(jìn)行資源調(diào)度和管理艘希,spark只負(fù)責(zé)任務(wù)的調(diào)度?和計(jì)算

2.不需要配置和啟動(dòng)spark集群

3.只需要在提交任務(wù)的節(jié)點(diǎn)上安裝并配置spark on yarn?模式

4.必須找一臺(tái)節(jié)點(diǎn)安裝spark

5.?步驟:

i.安裝配置JDK

ii.?vi spark-env.sh

1.?export ?JAVA_HOME=/opt/modules/jdk1.7_6.0

2.?export ?HADOOP_CONF_DIR = /opt/modules/hadoop/etc/hadoop

iii.測(cè)試spark on yarn?模式是否安裝成功

iv.網(wǎng)絡(luò)測(cè)試:http://hadoop-yarn1.beicai.com:8088

=====================sdandalone模式==============

【說(shuō)明】

1.??spark運(yùn)行在spark?集群上书释,由spark進(jìn)行資源調(diào)度管理,同時(shí)還負(fù)責(zé)任務(wù)的調(diào)度和?計(jì)算

2.需要配置和啟動(dòng)spark集群

3.?步驟:

i.安裝配置JDK

ii.上傳并解壓Spark

iii.建立軟連接?ln -s spark spark?或者修改名稱(chēng)

iv.?配置環(huán)境變量

v.安裝配置Spark,修改spark配置文件(spark-env.sh, slaves)

1.?vi spark-env.sh

a)?export ?JAVA_HOME=/opt/modules/jdk(jdk位置)

b)?export SPARK_MASTER_IP=hadoop-yarn1.beicai.com

c)?export SPARK_MASTER_PORT=7077

2.??vi slaves(用于指定在哪些節(jié)點(diǎn)上啟動(dòng)worker)

a)?hadoop-yarn2.beicai.com

hadoop-yarn3.beicai.com

vi.將spark發(fā)送給其他主機(jī)

vii.?啟動(dòng)

/opt/modules/spark/bin/start-all.sh

vii.?查看SparkUI界面:http://hadoop-yarn1.beicai.com:8080

4.?

j)?

一幻赚、Spark原理

1禀忆、Spark的運(yùn)行原理

i臊旭、分布式

Ii、主要基于內(nèi)存(少數(shù)情況基于磁盤(pán))

Iii箩退、迭代式計(jì)算

2离熏、Spark?計(jì)算模式?VS ?MapReduce ?計(jì)算模式對(duì)比


Mr這種計(jì)算模型比較固定,只有兩種階段戴涝,map階段和reduce階段滋戳,兩個(gè)階段結(jié)束?后,任務(wù)就結(jié)束了啥刻,這意味著我們的操作很有限奸鸯,只能在map階段和reduce階段,?也同時(shí)意味著可能需要多個(gè)mr任務(wù)才能處理完這個(gè)job


Spark?是迭代式計(jì)算可帽,一個(gè)階段結(jié)束后娄涩,后面可以有多個(gè)階段,直至任務(wù)計(jì)算完?成映跟,也就意味著我們可以做很多的操作蓄拣,這就是Spark計(jì)算模型比mr?強(qiáng)大的地方


三、什么是Spark RDD努隙?


1弯蚜、什么是RDD?

彈性的剃法,分布式的碎捺,數(shù)據(jù)集


(RDD在邏輯上可以看出來(lái)是代表一個(gè)HDFS上的文件,他分為多個(gè)分區(qū)贷洲,散落?在Spark的多個(gè)節(jié)點(diǎn)上)

3收厨、RDD----彈性

當(dāng)RDD的某個(gè)分區(qū)的數(shù)據(jù)保存到某個(gè)節(jié)點(diǎn)上,當(dāng)這個(gè)節(jié)點(diǎn)的內(nèi)存有限优构,保存不了這個(gè)?分區(qū)的全部數(shù)據(jù)時(shí)诵叁,Spark就會(huì)有選擇性的將部分?jǐn)?shù)據(jù)保存到硬盤(pán)上,例如:當(dāng)worker?的內(nèi)存只能保存20w條數(shù)據(jù)時(shí)钦椭,但是RDD的這個(gè)分區(qū)有30w條數(shù)據(jù)拧额,這時(shí)候Spark就?會(huì)將多余的10w條數(shù)據(jù),保存到硬盤(pán)上去彪腔。Spark的這種有選擇性的在內(nèi)存和硬盤(pán)之間的權(quán)衡機(jī)制就是RDD的彈性特點(diǎn)所在


4侥锦、Spark的容錯(cuò)性

RDD最重要的特性就是,提供了容錯(cuò)性德挣,可以自動(dòng)的從失敗的節(jié)點(diǎn)上恢復(fù)過(guò)來(lái)恭垦,即如?果某個(gè)節(jié)點(diǎn)上的RDD partition(數(shù)據(jù)),因?yàn)楣?jié)點(diǎn)的故障丟了,那么RDD會(huì)自動(dòng)的通過(guò)?自己的數(shù)據(jù)來(lái)源重新計(jì)算該partition番挺,這一切對(duì)使用者來(lái)說(shuō)是透明的

2唠帝、Spark的開(kāi)發(fā)類(lèi)型


(1)、核心開(kāi)發(fā):離線(xiàn)批處理?/?演示性的交互式數(shù)據(jù)處理


(2)玄柏、SQL查詢(xún):底層都是RDD和計(jì)算操作


(3)襟衰、底層都是RDD和計(jì)算操作


(4)、機(jī)器學(xué)習(xí)


(5)粪摘、圖計(jì)算


3右蒲、Spark?核心開(kāi)發(fā)(Spark-core == Spark-RDD)步驟


(1)、創(chuàng)建初始的RDD


(2)赶熟、對(duì)初始的RDD進(jìn)行轉(zhuǎn)換操作形成新的RDD,然后對(duì)新的RDD再進(jìn)行操作陷嘴,直?至操作計(jì)算完成


(3)映砖、將最后的RDD的數(shù)據(jù)保存到某種介質(zhì)中(hive、hdfs灾挨,MySQL邑退、hbase...)


五、Spark原理

Driver劳澄,Master地技,Worker,Executor秒拔,Task各個(gè)節(jié)點(diǎn)之間的聯(lián)系



Spark中的各節(jié)點(diǎn)的作用:

1莫矗、driver的作用:

(1)、 向master進(jìn)行任務(wù)的注冊(cè)

(2)砂缩、構(gòu)建運(yùn)行任務(wù)的基本環(huán)境

(3)作谚、接受該任務(wù)的executor的反向注冊(cè)

(4)、向?qū)儆谠撊蝿?wù)的executor分配任務(wù)


2庵芭、什么是driver妹懒?

我們編寫(xiě)的程序打成jar包后,然后找一臺(tái)能夠連接spark集群的節(jié)點(diǎn)做任務(wù)的driver双吆,具體的表現(xiàn)為SparkSubmit


3眨唬、Master的作用:

(1)、監(jiān)控集群好乐;

(2)匾竿、動(dòng)態(tài)感知worker的上下線(xiàn);

(3)蔚万、接受driver端注冊(cè)請(qǐng)求搂橙;

(4)、任務(wù)資源的調(diào)度


4、Worker的作用:

(1)区转、定時(shí)向master匯報(bào)狀態(tài)苔巨;

(2)、接受master資源調(diào)度命令废离,進(jìn)行資源的調(diào)度

(3)侄泽、啟動(dòng)任務(wù)的容器Executor


5、Executor的作用:

(1)蜻韭、保存計(jì)算的RDD分區(qū)數(shù)據(jù)悼尾;

(2)、向Driver反向注冊(cè)肖方;

(3)闺魏、接受Driver端發(fā)送來(lái)的任務(wù)Task,作用在RDD上進(jìn)行執(zhí)行



Spark 編程的流程:


1俯画、我們編寫(xiě)的程序打包成jar包析桥,然后調(diào)用Spark-Submit?腳本做任務(wù)的提交


2、啟動(dòng)driver做任務(wù)的初始化


3艰垂、Driver會(huì)將任務(wù)極其參數(shù)(core泡仗,memory,driver相關(guān)的參數(shù))進(jìn)行封裝成ApplicationDescript通過(guò)taskSchedulerImpl?提交給Master


4猜憎、Master接受到driver端注冊(cè)任務(wù)請(qǐng)求時(shí)娩怎,會(huì)將請(qǐng)求參數(shù)進(jìn)行解析,并封裝成APP胰柑,然后進(jìn)行持久化截亦,并且加入到其任務(wù)隊(duì)列中的waitingAPPs


5、當(dāng)輪到咱們提交的任務(wù)運(yùn)行時(shí)柬讨,master會(huì)調(diào)用schedule()這個(gè)方法魁巩,做任務(wù)資源調(diào)度


6、Master將調(diào)度好的資源封裝成launchExecutor姐浮,發(fā)送給指定的worker


7谷遂、Worker接收到發(fā)送來(lái)的launchExecutor時(shí),會(huì)將其解析并封裝成ExecutorRunner卖鲤,然后調(diào)用start方法肾扰,啟動(dòng)Executor


8、Executor啟動(dòng)后蛋逾,會(huì)向任務(wù)的Driver進(jìn)行反向注冊(cè)


9集晚、當(dāng)屬于這個(gè)任務(wù)的所有executor啟動(dòng)成功并反向注冊(cè)完之后,driver會(huì)結(jié)束SparkContext對(duì)象的初始化


10区匣、當(dāng)sc?初始化成功后偷拔,意味著運(yùn)行任務(wù)的基本環(huán)境已經(jīng)準(zhǔn)備好了,driver會(huì)繼續(xù)運(yùn)行我們編寫(xiě)好的代碼


11、開(kāi)始注冊(cè)初始的RDD莲绰,并且不斷的進(jìn)行轉(zhuǎn)換操作欺旧,當(dāng)觸發(fā)了一個(gè)action算子時(shí),意味著觸發(fā)了一個(gè)job蛤签,此時(shí)driver就會(huì)將RDD之間的依賴(lài)關(guān)系劃分成一個(gè)一個(gè)的stage辞友,并將stage封裝成taskset,然后將taskset中的每個(gè)task進(jìn)行序列化震肮,封裝成launchtask称龙,發(fā)送給指定的executor執(zhí)行


12、Executor接受到driver發(fā)送過(guò)來(lái)的任務(wù)task戳晌,會(huì)對(duì)task進(jìn)行反序列化鲫尊,然后將對(duì)應(yīng)的算子(flatmap,map沦偎,reduceByKey疫向。。扛施。。)作用在RDD分區(qū)上



六屹篓、RDD詳解


1疙渣、什么是RDD?

RDD(Resilient Disttibuted Dataset)叫做彈性的分布式的數(shù)據(jù)集堆巧,是spark中最基本的數(shù)據(jù)抽象妄荔,它代表一個(gè)不可變,可分區(qū)谍肤,里面的元素可并行計(jì)算的集合


2啦租、RDD的特點(diǎn):

自動(dòng)容錯(cuò)

位置感知性調(diào)度

伸縮性


3、RDD的屬性:

(1)荒揣、一組分片(partition)篷角,即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō)系任,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理恳蹲,并決定并行計(jì)算的粒度,用戶(hù)可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù)俩滥,如果沒(méi)有指定嘉蕾,那么就會(huì)采用默認(rèn)值,默認(rèn)值就是程序所分配到的CPU Core的數(shù)目

(2)霜旧、一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)错忱。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)computer函數(shù)以達(dá)到這個(gè)目的。Computer函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合以清,不需要保存每次計(jì)算的結(jié)果儿普。

(3)、RDD之間的依賴(lài)關(guān)系玖媚。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD箕肃,所以RDD之間就會(huì)形成類(lèi)似于流水一樣的前后依賴(lài)關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí)今魔,Spark可以通過(guò)這個(gè)依賴(lài)關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù)勺像,而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

(4)错森、一個(gè)partition吟宦,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類(lèi)型的分片函數(shù)涩维,一個(gè)是基于hashPartitioner殃姓,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于key-value的RDD瓦阐,才會(huì)有Partitioner蜗侈,非key-value的RDD的Partitioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量睡蟋,也決定了partition RDD Shuffle輸出時(shí)的分片數(shù)量踏幻。

(5)、一個(gè)列表戳杀,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)该面。對(duì)于一個(gè)HDFD文件來(lái)說(shuō)。這個(gè)列表保存的就是每個(gè)Partition所在的快的位置信卡。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念隔缀。Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能的將計(jì)算任務(wù)分配到所要處理數(shù)據(jù)塊的存儲(chǔ)位置傍菇。



4猾瘸、RDD的創(chuàng)建:

進(jìn)行Spark核心編程時(shí),首先要做的事就是創(chuàng)建一個(gè)初始的RDD丢习。Spark Core提供了三種創(chuàng)建RDD的方式:

(1)须妻、使用程序中的集合創(chuàng)建RDD?(調(diào)用parallelize()方法)

(2)、使用本地文件創(chuàng)建RDD(調(diào)用textFile()方法)

(3)泛领、使用HDFD文件創(chuàng)建RDD ?(調(diào)用textFile()方法)


七荒吏、算子


1、什么是算子渊鞋?

是RDD中定義的作用在每一個(gè)RDD分片上的函數(shù)绰更,可以對(duì)RDD中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換?和操作


2瞧挤、RDD算子的分類(lèi)

(1)、Transformation算子儡湾,這類(lèi)算子變換不觸發(fā)提交作業(yè)(特點(diǎn)就是lazy特性)

返回的是一個(gè)RDD

(2)特恬、Action算子,這類(lèi)算子會(huì)觸發(fā)SparkContext提交作業(yè)(觸發(fā)一個(gè)spark job的運(yùn)行徐钠,從而觸發(fā)這個(gè)action之前所有的transformation的執(zhí)行)

返回的是一個(gè)spark對(duì)象


3癌刽、常用的Transformation算子

八、RDD分區(qū)排序


I尝丐、分區(qū)

兩種實(shí)現(xiàn)方式:coalesce和?repartition(底層調(diào)用coalesce)


coalesce(numPartitons,isShuffle)

第一個(gè)參數(shù)是重分區(qū)后的數(shù)量显拜,第二個(gè)參數(shù)是是否進(jìn)行shuffle

如果原來(lái)有N個(gè)分區(qū),重分區(qū)后有M個(gè)分區(qū)

如果?M > N ,必須將第二參數(shù)設(shè)置為true(也就是進(jìn)行shuffle),等價(jià)于?repartition(numPartitons) ???如果是false將不起作用 ?

如果M < N

100-->10?重分區(qū)后的分區(qū)數(shù)比原來(lái)的小的多爹袁,那么久需要使用shuffle远荠,也即是設(shè)置為true

100-->90?重分區(qū)后的分區(qū)數(shù)和原來(lái)的差不多的,那么就不需要使用shuffle失息,也就是設(shè)置為false


II譬淳、排序

sortBy(x => x)這個(gè)算子中帶有隱式轉(zhuǎn)換參數(shù)


x?能夠排序(比較大小)盹兢,那么這個(gè)類(lèi)就必須有比較大小的功能邻梆,也就是實(shí)現(xiàn)了compareTo?或者compare


實(shí)現(xiàn)二次排序有兩種方法:

1、繼承Comparable?接口 或者?Ordered

2绎秒、隱式轉(zhuǎn)換:可以定義隱式轉(zhuǎn)換函數(shù)(Ordered)或者隱式轉(zhuǎn)換值(Ordering)


九浦妄、自定義分區(qū)


自定義分區(qū)

要求:按照key將對(duì)應(yīng)的value輸出到指定的分區(qū)中

解釋?zhuān)鹤远x一個(gè)自定義分區(qū)類(lèi),繼承partitioner替裆,實(shí)現(xiàn)他的兩個(gè)方法

1校辩、numPartitions

2窘问、getPartition

具體的功能根據(jù)項(xiàng)目的要求自定義實(shí)現(xiàn)辆童,然后調(diào)用partitionBy方法,new出自定義的類(lèi)惠赫,傳入?yún)?shù)即可

九把鉴、RDD持久化原理


1、持久化場(chǎng)景:對(duì)于一個(gè)rdd會(huì)被多次引用到儿咱,并且這個(gè)rdd計(jì)算過(guò)程復(fù)雜庭砍,計(jì)算時(shí)間特變耗時(shí)


2系谐、如何進(jìn)行持久化阳液,調(diào)用rdd.persist方法或cache方法,cache方法底層就是調(diào)用persist方法


******************persist(StorageLevel.MEMORY_ONLY)*******************

如果對(duì)RDD做持久化嗅义,默認(rèn)持久化級(jí)別是storageLevel.MEMORY_ONLY ,也就是持久化到內(nèi)存中去钳宪,這種持久化級(jí)別是效率最快的揭北,但是由于是純Java?對(duì)象扳炬,保存到內(nèi)存中,那么內(nèi)存可能保存的數(shù)量就會(huì)較少

***************persist(StorageLevel.MEMORY_ONLY_SER)****************

如果當(dāng)我們集群資源有限時(shí)搔体,那么我們可以采用MEMORY_ONLY_SER恨樟,也就是將Java對(duì)象進(jìn)行序列化之后持久到內(nèi)存中去,這種持久化的好處是能夠持久化更多的數(shù)據(jù)到內(nèi)存中疚俱,但是由于在持久化時(shí)需要序列化劝术,取出來(lái)之后又需要反序列化這一過(guò)程,這個(gè)過(guò)程會(huì)消耗CPU計(jì)算資源呆奕,性能相對(duì)于MEMORY_ONLY?這種持久化級(jí)別來(lái)說(shuō)稍微弱點(diǎn)养晋,但是還是比較高效的


3、如何選擇RDD持久化策略登馒?

Spark提供的多種持久化級(jí)別匙握,主要是為了在CPU和內(nèi)存消耗之間進(jìn)行取舍,下面是一些通用的持久化級(jí)別的選擇建議:

1)陈轿、優(yōu)先使用MEMORY_ONLY圈纺,如果可以緩存所有數(shù)據(jù)的話(huà),那么就使用這種策略麦射,因?yàn)榧儍?nèi)存速度最快蛾娶,而且沒(méi)有序列化,不需要消耗CPU進(jìn)行反序列化操作

2)潜秋、如果MEMORY_ONLY策略蛔琅,無(wú)法存儲(chǔ)所有數(shù)據(jù)的話(huà),那么使用MEMORY_ONLY_SER峻呛,將數(shù)據(jù)進(jìn)行序列化存儲(chǔ)罗售,純內(nèi)存操作還是非常快的钩述,只是要消耗CPU進(jìn)行反序列化

3)寨躁、如果需要進(jìn)行快速的失敗恢復(fù),那么就選擇帶后綴為_(kāi)2的策略牙勘,進(jìn)行數(shù)據(jù)的備份职恳,這樣在失敗時(shí),就不需要重新計(jì)算了

4方面、能不使用DISK相關(guān)的策略放钦,就不要使用,有的時(shí)候恭金,從磁盤(pán)讀取數(shù)據(jù)操禀,還不如重新計(jì)算一次



十一、共享變量


1横腿、共享變量分為兩種:廣播變量?和?累加器


廣播變量(broadcast)


2颓屑、日常所遇問(wèn)題

因?yàn)槊總€(gè)task都需要拷貝這樣的一個(gè)副本到executor去執(zhí)行辙培,那么我們可以想象一下,如果有1000?個(gè)task在某個(gè)worker上執(zhí)行邢锯,而這個(gè)副本有100M扬蕊,那么意味著我們需要拷貝100G的數(shù)據(jù)都到某個(gè)worker上執(zhí)行,這樣的話(huà)會(huì)大大消耗我們的網(wǎng)絡(luò)流量丹擎,同時(shí)會(huì)加大executor的內(nèi)存消耗尾抑,從而增加了我們spark作業(yè)的運(yùn)行時(shí)間,大大降低了spark作業(yè)的運(yùn)行效率蒂培,增加了作業(yè)失敗的概率


3再愈、如何解決以上問(wèn)題,也就是說(shuō)什么時(shí)候使用廣播變量?

當(dāng)RDD引用到了一個(gè)外部變量并且這個(gè)外部變量數(shù)據(jù)量不小护戳,同時(shí)這個(gè)RDD對(duì)應(yīng)的task數(shù)量特別多翎冲,那么此時(shí)使用廣播共享變量再合適不過(guò)了

我們可以將這種大的外部變量做成廣播變量,外部變量做成廣播變量的時(shí)候媳荒,那么每個(gè)executor的內(nèi)存中只會(huì)有一個(gè)外部變量抗悍,而這個(gè)副本針對(duì)所有的task都是共享的,這樣的話(huà)就減少了網(wǎng)絡(luò)流量消耗钳枕,降低了executor的內(nèi)存消耗缴渊,提高了spark作業(yè)運(yùn)行效率和縮短了運(yùn)行時(shí)間,同時(shí)降低了作業(yè)失敗的概率



4鱼炒、廣播變量的使用流程:

1)衔沼、某個(gè)executor的第一個(gè)task先執(zhí)行,首先會(huì)從自己的blockManager中查找外部變量昔瞧,如果沒(méi)有就從鄰居的executor的blockManager的內(nèi)存中獲取這個(gè)外部變量指蚁,如果還是獲取不到,就從driver端獲取自晰,拷貝這個(gè)外部變量到本地的executor的blockManager

2)凝化、當(dāng)這個(gè)executor的其他task執(zhí)行時(shí),就不需要從外面獲取這個(gè)外部變量的副本缀磕,直接從本地的blockManager中獲取即可



5缘圈、如何獲取廣播變量的值劣光?

可以直接調(diào)用廣播變量的value()?這個(gè)方法即可


【注意】廣播變量是只讀的袜蚕,不可寫(xiě)



累加器(Accumulator)


Spark提供的Accumulator?,主要用于多個(gè)節(jié)點(diǎn)對(duì)一個(gè)變量進(jìn)行共享性的操作绢涡,Accumulator只是提供了累加的功能牲剃。但是卻給我們提供了多個(gè)task對(duì)一個(gè)變量并行操作的功能,但是task只能對(duì)Accumulator進(jìn)行累加操作

【注意】task只能對(duì)Accumulator進(jìn)行類(lèi)加操作雄可,只有Driver程序可以讀取Accumulator的值

RDD分區(qū)和容錯(cuò)機(jī)制講解

1凿傅、RDD?的Lineage血統(tǒng)

RDD只支持粗粒度轉(zhuǎn)換缠犀,即在大量記錄上執(zhí)行的單個(gè)操作,將創(chuàng)建RDD的一系列Lineage(血統(tǒng))記錄下來(lái)聪舒。以便恢復(fù)丟失的分區(qū)


2辨液、RDD的依賴(lài)關(guān)系

RDD和它的父RDD的關(guān)系有兩種不同的類(lèi)型:

1)、窄依賴(lài)(一對(duì)一箱残,多對(duì)一)

形象的比喻:獨(dú)生子女

2)滔迈、寬依賴(lài)(多對(duì)多)

形象的比喻:超生


注釋?zhuān)簞澐謘tage的依據(jù)就是寬依賴(lài),也就是RDD之間是否有shuffle被辑,shuffle過(guò)程就是一個(gè)寬依賴(lài)過(guò)程燎悍,shuffle之前的tasks就屬于一個(gè)stage,shuffle之后的也屬于一個(gè)stage盼理,shuffle之前和之后的操作都是窄依賴(lài)

【注意】shuffle過(guò)程分為:shuffle Write過(guò)程 和?shuffle read過(guò)程


4谈山、DAG的生成(有向無(wú)環(huán)圖)和任務(wù)的劃分

DAG(Directed Acyclic Graph)叫做有向無(wú)環(huán)圖(有方向無(wú)循環(huán)的圖)


5、一個(gè)wordCount過(guò)程會(huì)產(chǎn)生多少個(gè)RDD宏怔?

至少會(huì)產(chǎn)生五個(gè)RDD奏路,

第一個(gè),從HDFS中加載后得到一個(gè)RDD(即使用sc.textFile()算子)臊诊,即HadoopRDD

在sc.textFile()過(guò)程中還會(huì)產(chǎn)生一個(gè)RDD(調(diào)用map算子)思劳,產(chǎn)生一個(gè)MapPartitionRDD

第二個(gè),使用flatMap算子妨猩,得到一個(gè)MapPartitionRDD

第三個(gè)潜叛,使用map算子,得到一個(gè)MapPartitionRDD

第四個(gè)壶硅,使用reduceByKey算子威兜,也就是在經(jīng)過(guò)了shuffle過(guò)程后又會(huì)得到一個(gè)shuffledRDD

第五個(gè),使用saveAsTextFile算子庐椒,再產(chǎn)生一個(gè)MapPartitionRDD?

spark程序提交流程講解

Spark任務(wù)簡(jiǎn)介:

Spark-submit--->SparkSubmit-->main-->submit-->doRunMain-->RunMain-->通過(guò)反射創(chuàng)建我們編寫(xiě)的主類(lèi)的實(shí)例對(duì)象椒舵,調(diào)用main方法-->開(kāi)始執(zhí)行我們編寫(xiě)的代碼-->初始化SparkContext對(duì)象-->創(chuàng)建初始的RDD-->觸發(fā)action算子-->提交job-->worker執(zhí)行任務(wù)-->任務(wù)結(jié)束


Spark任務(wù)詳解:

1)、將我們編寫(xiě)的程序打成jar包


2)约谈、調(diào)用spark-submit腳本提交任務(wù)到集群上運(yùn)行


3)笔宿、運(yùn)行sparkSubmit的main方法,在這個(gè)方法中通過(guò)反射的方式創(chuàng)建我們編寫(xiě)的主類(lèi)的實(shí)例對(duì)象棱诱,然后調(diào)用main方法泼橘,開(kāi)始執(zhí)行我們的代碼(注意,我們的spark程序中的driver就運(yùn)行在sparkSubmit進(jìn)程中)


4)迈勋、當(dāng)代碼運(yùn)行到創(chuàng)建SparkContext對(duì)象時(shí)炬灭,那就開(kāi)始初始化SparkContext對(duì)象了


5)、在初始化SparkContext對(duì)象的時(shí)候靡菇,會(huì)創(chuàng)建兩個(gè)特別重要的對(duì)象重归,分別是:DAGScheduler

和TaskScheduler


【DAGScheduler的作用】將RDD的依賴(lài)切分成一個(gè)一個(gè)的stage米愿,然后將stage作為taskSet提交給DriverActor


6)、在構(gòu)建taskScheduler的同時(shí)鼻吮,會(huì)創(chuàng)建兩個(gè)非常重要的對(duì)象育苟,分別是DriverActor和ClientActor


【clientActor的作用】向master注冊(cè)用戶(hù)提交的任務(wù)

【DriverActor的作用】接受executor的反向注冊(cè),將任務(wù)提交給executor


7)椎木、當(dāng)clientActor啟動(dòng)后宙搬,會(huì)將用戶(hù)提交的任務(wù)和相關(guān)的參數(shù)封裝到ApplicationDescription對(duì)象中,然后提交給master進(jìn)行任務(wù)的注冊(cè)


8)拓哺、當(dāng)master接受到clientActor提交的任務(wù)請(qǐng)求時(shí)勇垛,會(huì)將請(qǐng)求參數(shù)進(jìn)行解析,并封裝成Application士鸥,然后將其持久化闲孤,然后將其加入到任務(wù)隊(duì)列waitingApps中


9)、當(dāng)輪到我們提交的任務(wù)運(yùn)行時(shí)烤礁,就開(kāi)始調(diào)用schedule()讼积,進(jìn)行任務(wù)資源的調(diào)度


10)、master將調(diào)度好的資源封裝到launchExecutor中發(fā)送給指定的worker


11)脚仔、worker接受到Maseter發(fā)送來(lái)的launchExecutor時(shí)勤众,會(huì)將其解壓并封裝到ExecutorRunner中,然后調(diào)用這個(gè)對(duì)象的start(),?啟動(dòng)Executor


12)鲤脏、Executor啟動(dòng)后會(huì)向DriverActor進(jìn)行反向注冊(cè)


13)们颜、driverActor會(huì)發(fā)送注冊(cè)成功的消息給Executor


14)、Executor接受到DriverActor注冊(cè)成功的消息后會(huì)創(chuàng)建一個(gè)線(xiàn)程池猎醇,用于執(zhí)行DriverActor發(fā)送過(guò)來(lái)的task任務(wù)


15)窥突、當(dāng)屬于這個(gè)任務(wù)的所有的Executor啟動(dòng)并反向注冊(cè)成功后,就意味著運(yùn)行這個(gè)任務(wù)的環(huán)境已經(jīng)準(zhǔn)備好了硫嘶,driver會(huì)結(jié)束SparkContext對(duì)象的初始化阻问,也就意味著new SparkContext這句代碼運(yùn)行完成


16)、當(dāng)初始化sc成功后沦疾,driver端就會(huì)繼續(xù)運(yùn)行我們編寫(xiě)的代碼称近,然后開(kāi)始創(chuàng)建初始的RDD,然后進(jìn)行一系列轉(zhuǎn)換操作哮塞,當(dāng)遇到一個(gè)action算子時(shí)刨秆,也就意味著觸發(fā)了一個(gè)job


17)、driver會(huì)將這個(gè)job提交給DAGScheduler


18)彻桃、DAGScheduler將接受到的job坛善,從最后一個(gè)算子向前推導(dǎo)晾蜘,將DAG依據(jù)寬依賴(lài)劃分成一個(gè)一個(gè)的stage邻眷,然后將stage封裝成taskSet眠屎,并將taskSet中的task提交給DriverActor


19)、DriverActor接受到DAGScheduler發(fā)送過(guò)來(lái)的task肆饶,會(huì)拿到一個(gè)序列化器改衩,對(duì)task進(jìn)行序列化,然后將序列化好的task封裝到launchTask中驯镊,然后將launchTask發(fā)送給指定的Executor


20)葫督、Executor接受到了DriverActor發(fā)送過(guò)來(lái)的launchTask時(shí),會(huì)拿到一個(gè)反序列化器板惑,對(duì)launchTask進(jìn)行反序列化橄镜,封裝到TaskRunner中,然后從Executor這個(gè)線(xiàn)程池中獲取一個(gè)線(xiàn)程冯乘,將反序列化好的任務(wù)中的算子作用在RDD對(duì)應(yīng)的分區(qū)上


【注意】

Spark的任務(wù)分為為兩種:

a洽胶、shuffleMapTask:shuffle之前的任務(wù)

b、resultTask:shuffle之后的任務(wù)


Spark任務(wù)的本質(zhì):

將RDD的依賴(lài)關(guān)系切分成一個(gè)一個(gè)的stage裆馒,然后將stage作為T(mén)askSet分批次的發(fā)送到Executor上執(zhí)行



十三姊氓、Checkpoint


1、使用checkpoint的場(chǎng)景:

某個(gè)RDD會(huì)被多次引用喷好,計(jì)算特別復(fù)雜翔横,計(jì)算特別耗時(shí)

擔(dān)心中間某些關(guān)鍵的,在后面會(huì)反復(fù)幾次使用的RDD梗搅,可能會(huì)因?yàn)楣?jié)點(diǎn)的故障禾唁,導(dǎo)致持久化數(shù)據(jù)的丟失


2、如何對(duì)RDD進(jìn)行checkpoint无切?

1)蟀俊、設(shè)置還原點(diǎn)目錄,設(shè)置checkpoint目錄

2)订雾、調(diào)用RDD的checkpoint的方法對(duì)該RDD進(jìn)行checkpoint


3肢预、checkpoint的原理

1)、RDD調(diào)用了checkpoint方法之后洼哎,就接受RDDCheckpointData對(duì)象的管理

2)烫映、RDDCheckpointData對(duì)象會(huì)負(fù)責(zé)將調(diào)用了checkpoint的RDD?的狀態(tài)設(shè)置為MarkedForCheckpoint

3)、當(dāng)這個(gè)RDD所在的job運(yùn)行結(jié)束后噩峦,會(huì)調(diào)用最后一個(gè)RDD的doCheckpoint锭沟,根據(jù)其血統(tǒng)向上查找,查找到被標(biāo)注為MarkedForCheckpoint狀態(tài)的RDD识补,將其狀態(tài)改變?yōu)閏heckpointingInProgress

4)族淮、啟動(dòng)一個(gè)單獨(dú)的job,將血統(tǒng)中標(biāo)記為checkpointingInProgress的RDD進(jìn)行checkpoint,也就是將RDD的數(shù)據(jù)寫(xiě)入到checkpoint的目錄中去

5)祝辣、當(dāng)某個(gè)節(jié)點(diǎn)發(fā)生故障贴妻,導(dǎo)致包括持久化的數(shù)據(jù)全部丟失,此時(shí)會(huì)從還原點(diǎn)目錄還原RDD的每個(gè)分區(qū)的數(shù)據(jù)蝙斜,這樣就不需要從頭開(kāi)始計(jì)算一次


4名惩、checkpoint需要注意的地方

因?yàn)镽DD在做checkpoint的時(shí)候,會(huì)單獨(dú)啟動(dòng)一個(gè)job對(duì)需要進(jìn)行checkpoint的RDD進(jìn)行重新計(jì)算孕荠,這樣就會(huì)增加spark作業(yè)運(yùn)行時(shí)間娩鹉,所以spark強(qiáng)烈建議在做checkpoint之前,應(yīng)該對(duì)需要進(jìn)行checkpoint的RDD進(jìn)行持久化(即調(diào)用?.cache)


5稚伍、checkpoint?和持久化的區(qū)別

1)弯予、是否改變血統(tǒng):

持久化(.cache):不會(huì)改變RDD的依賴(lài)關(guān)系,也就是不會(huì)改變其血統(tǒng)

Checkpoint:會(huì)改變RDD的血統(tǒng)个曙,做了checkpoint的RDD會(huì)清除其所有的依賴(lài)關(guān)系熙涤,并將其父RDD強(qiáng)制設(shè)置為checkpointRDD,并且將RDD的狀態(tài)更改為checkpointed


2)困檩、RDD的數(shù)據(jù)的可靠性:

持久化:只是將RDD的數(shù)據(jù)持久化到內(nèi)存或磁盤(pán)中祠挫,但是如果節(jié)點(diǎn)發(fā)生故障,那么持久化的數(shù)據(jù)還是會(huì)丟失

Checkpoint:checkpoint的數(shù)據(jù)保存在第三方高可靠的分布式的文件系統(tǒng)中悼沿,機(jī)試節(jié)點(diǎn)發(fā)生故障等舔,數(shù)據(jù)也不會(huì)丟失,所以checkpoint比持久化可靠性更高



6糟趾、后續(xù)

我們實(shí)現(xiàn)了checkpoint?之后慌植,在某個(gè)task?又調(diào)用了該RDD的iterator()?方法時(shí),就實(shí)現(xiàn)了高容錯(cuò)機(jī)制义郑,即使RDD的持久化數(shù)據(jù)丟失蝶柿,或者壓根兒就沒(méi)有持久化,但是還是可以通過(guò)readCheckpointOrComputer()?方法非驮,優(yōu)先從父RDD-----checkpointRDD中讀取交汤,HDFS(外部文件系統(tǒng))的數(shù)據(jù)









第二部分?spark-sql


一、Spark-SQL前世今生


1劫笙、Spark SQL的特點(diǎn)

1)芙扎、支持多種數(shù)據(jù)源:Hive、RDD填大、Parquet戒洼、JSON、JDBC等允华。

2)圈浇、多種性能優(yōu)化技術(shù):in-memory columnar storage寥掐、byte-code generation、cost model動(dòng)態(tài)評(píng)估等磷蜀。

3)召耘、組件擴(kuò)展性:對(duì)于SQL的語(yǔ)法解析器、分析器以及優(yōu)化器蠕搜,用戶(hù)都可以自己重新開(kāi)發(fā)怎茫,并且動(dòng)態(tài)擴(kuò)展


2收壕、Spark SQL的性能優(yōu)化技術(shù)簡(jiǎn)介

1)妓灌、內(nèi)存列存儲(chǔ)(in-memory columnar storage)

2)、字節(jié)碼生成技術(shù)(byte-code generation)

3)蜜宪、Scala代碼編寫(xiě)的優(yōu)化



3虫埂、Spark SQL and DataFrame

Spark SQL是Spark中的一個(gè)模塊,主要用于進(jìn)行結(jié)構(gòu)化數(shù)據(jù)的處理圃验。它提供的最核心的編程抽象掉伏,就是DataFrame。同時(shí)Spark SQL還可以作為分布式的SQL查詢(xún)引擎澳窑。Spark SQL最重要的功能之一斧散,就是從Hive中查詢(xún)數(shù)據(jù)。


DataFrame摊聋,可以理解為是鸡捐,以列的形式組織的,分布式的數(shù)據(jù)集合麻裁。它其實(shí)和關(guān)系型數(shù)據(jù)庫(kù)中的表非常類(lèi)似箍镜,但是底層做了很多的優(yōu)化。DataFrame可以通過(guò)很多來(lái)源進(jìn)行構(gòu)建煎源,包括:結(jié)構(gòu)化的數(shù)據(jù)文件色迂,Hive中的表,外部的關(guān)系型數(shù)據(jù)庫(kù)手销,以及RDD歇僧。



二、Spark-sql的使用


1锋拖、RDD轉(zhuǎn)換為DataFrame(兩種)

1)馏慨、使用反射的方式來(lái)推斷包含了特定數(shù)據(jù)類(lèi)型的RDD的元數(shù)據(jù)

2)、通過(guò)編程接口來(lái)創(chuàng)建DataFrame


2姑隅、UDF自定義函數(shù)和UDAF自定義聚合函數(shù)

UDF写隶,其實(shí)更多的是針對(duì)單行輸入,返回一個(gè)輸出

UDAF讲仰,則可以針對(duì)多行輸入慕趴,進(jìn)行聚合計(jì)算,返回一個(gè)輸出,功能更加強(qiáng)大


3冕房、Spark-SQL工作原理

SqlParse ?--------->解析器


Analyser ?--------->分析器


Optimizer ?--------->優(yōu)化器


SparkPlan ?--------->物理計(jì)劃

流程:


1)躏啰、自己編寫(xiě)的SQL語(yǔ)句

大家要知道,只要在數(shù)據(jù)庫(kù)類(lèi)型的技術(shù)里面耙册,比如:最傳統(tǒng)的MySQL给僵,Oracle等,包括現(xiàn)在大數(shù)據(jù)領(lǐng)域的數(shù)據(jù)倉(cāng)庫(kù)详拙,比如hive帝际,他的基本的SQL執(zhí)行的模型,都是類(lèi)似的饶辙,首先都要生成一條SQL語(yǔ)句的執(zhí)行計(jì)劃


2)蹲诀、通過(guò)SqlParser(解析器)生成未解析的邏輯計(jì)劃(unresolved LogicalPlan)

3)、通過(guò)Analyzer(分析器)生成解析后的邏輯計(jì)劃(resolved LogicalPlan)

4)弃揽、通過(guò)Optimizer(優(yōu)化器)生成優(yōu)化后的邏輯計(jì)劃(optimized LogicalPlan)

實(shí)際上脯爪,比如傳統(tǒng)的Oracle等數(shù)據(jù)庫(kù),通常都會(huì)生成多個(gè)執(zhí)行計(jì)劃矿微,然后呢痕慢,最后有一個(gè)優(yōu)化器,針對(duì)多個(gè)計(jì)劃涌矢,選擇一個(gè)最好的計(jì)劃掖举,而SparkSql這兒的優(yōu)化指的是,比如說(shuō)蒿辙,剛生成的執(zhí)行計(jì)劃中拇泛,有些地方的性能是顯而易見(jiàn)的,不太好思灌,舉例說(shuō)明:

比如說(shuō)俺叭,我們有一個(gè)SQL語(yǔ)句,select name from (select ... from ...) where ..=..;

此時(shí)泰偿,在執(zhí)行計(jì)劃解析出來(lái)的時(shí)候熄守,其實(shí)就是按照他原封不動(dòng)的樣子,來(lái)解析成可以執(zhí)行的計(jì)劃耗跛,但是呢裕照,Optimizer?在這里其實(shí)就會(huì)對(duì)執(zhí)行計(jì)劃進(jìn)行優(yōu)化,比如說(shuō)调塌,發(fā)現(xiàn)where?條件晋南,其實(shí)可以放在子查詢(xún)中,這樣羔砾,子查詢(xún)的數(shù)量大大變小负间,可以?xún)?yōu)化執(zhí)行速度偶妖,此時(shí),可能就會(huì)變成如下這樣:select name from (select name from ...where ..=..)


5)政溃、通過(guò)SparkPlan趾访,生成最后的物理計(jì)劃(PhysicalPlan)

到物理計(jì)劃這里,那么其實(shí)就是非扯“接地氣”的計(jì)劃了扼鞋。就是說(shuō),已經(jīng)很明朗了愤诱,從那幾個(gè)文件讀取什么數(shù)據(jù)云头,從那幾個(gè)文件中讀取,如何進(jìn)行關(guān)聯(lián)等等


6)转锈、在executor中執(zhí)行物理計(jì)劃

邏輯的執(zhí)行計(jì)劃盘寡,更多的是偏向于邏輯楚殿,比如說(shuō)吧撮慨,大致就是這種樣子的,

From table students=>filter ... => select name ...

這里基本上脆粥,邏輯計(jì)劃都是采用Tree?砌溺,樹(shù)形結(jié)構(gòu)


7)、生成RDD

Select ?name ?from ?students =>?解析变隔,從哪里去查詢(xún)规伐,students表,在哪個(gè)文件里匣缘,從哪個(gè)文件中查詢(xún)哪些數(shù)據(jù)猖闪,比如說(shuō)是name這個(gè)列,此外肌厨,復(fù)雜的SQL培慌,還有,比如說(shuō)查詢(xún)時(shí)柑爸,是否對(duì)表中的數(shù)據(jù)進(jìn)行過(guò)濾和篩選吵护,更不用說(shuō),復(fù)雜時(shí)表鳍,需要有多表的JOIN(咋傳統(tǒng)數(shù)據(jù)庫(kù)中馅而,比如MySQL,執(zhí)行計(jì)劃還涉及到如何掃描和利用索引)




4譬圣、spark-SQL性能優(yōu)化


1)瓮恭、設(shè)置shuffle過(guò)程的并行度:spark.sql.shuffle.partitions(SQLContext.setConf())


2)、在hive數(shù)據(jù)倉(cāng)庫(kù)建設(shè)過(guò)程中厘熟,合理設(shè)置數(shù)據(jù)類(lèi)型屯蹦,比如能設(shè)置為int的诸衔,就不要設(shè)置為bigInt,減少數(shù)據(jù)類(lèi)型導(dǎo)致不必要的內(nèi)存開(kāi)銷(xiāo)


3)颇玷、編寫(xiě)SQL時(shí)笨农,盡量給出明確的列名,比如select name from students帖渠。不要寫(xiě)select *?的方式谒亦。


4)、并行處理查詢(xún)結(jié)果:對(duì)于spark-SQL查詢(xún)的結(jié)果空郊,如果數(shù)據(jù)量比較大份招,比如超過(guò)1000條,那么就不要一次性的collect()到driver再處理狞甚,使用foreach()算子锁摔,并行處理查詢(xún)結(jié)果

5)、緩存表:對(duì)于一條SQL語(yǔ)句可能對(duì)此使用到的表哼审,可以對(duì)其進(jìn)行緩存谐腰,使用?sqlContext.cacheTable(tableName),或者DataFrame.cache()即可涩盾,spark-SQL會(huì)用內(nèi)存列存儲(chǔ)的格式進(jìn)行表的緩存十气,然后spark-sql就可以?xún)H僅掃描需要使用的列,并且自動(dòng)優(yōu)化壓縮春霍,來(lái)最小化內(nèi)存使用和GC開(kāi)銷(xiāo)砸西,SQLContext.uncacheTable(tableName)可以將表從緩存中移除,用SQLContext址儒。setConf()芹枷,設(shè)置spark.sql.inMemoryColumnarStorage.batchSize參數(shù)(默認(rèn)10000),可以設(shè)置列存儲(chǔ)的單位

6)莲趣、廣播join表:spark.sql.autoBroadcastJoinThreshold鸳慈,默認(rèn)10485760 (10 MB)。在內(nèi)存夠用的情況下妖爷,可以增加其大小蝶涩,參數(shù)設(shè)置了一個(gè)表在join的時(shí)候,最大在多大以?xún)?nèi)絮识,可以被廣播出去優(yōu)化性能


5绿聘、Hive on Spark配置

1)、安轉(zhuǎn)配置好Hive和Spark

2)次舌、Set hive.execution.engine=spark;

3)熄攘、set spark.master=spark://mini1:7077




第三部分spark-streaming


1, ?Dstream

?

Dstream是sparkStreaming的數(shù)據(jù)模型,本質(zhì)就是一連串不間斷的RDD彼念,但是它是一個(gè)時(shí)間段的RDD.這些時(shí)間段的RDD源源不斷的連接在一起挪圾。

這個(gè)時(shí)間可以自己設(shè)置浅萧,時(shí)間設(shè)置的越短,實(shí)時(shí)性越高哲思,但是性能消耗也越大洼畅。



2, ?spark streaming從kafka獲取數(shù)據(jù),有哪幾種方式棚赔?

?

有兩種方式:

1.通過(guò)receiver的方式帝簇,

2,通過(guò)direct的方式靠益,dirrect的方式需要自己來(lái)管理偏移量丧肴。


?

3, ?sparkStreaming和storm的區(qū)別


sparkStreaming是spark里面的一個(gè)做流式準(zhǔn)實(shí)時(shí)計(jì)算的組件,它使用的數(shù)據(jù)結(jié)構(gòu)是Dstream胧后,Dstream里面是一連串時(shí)間片的rdd芋浮。

相比于storm,sparkStreaming在實(shí)時(shí)性壳快,保證數(shù)據(jù)不丟失方面都不占用優(yōu)勢(shì)纸巷,spark streaming在spark支持者眼中的優(yōu)勢(shì)是spark Streaming具有高吞吐性,最本質(zhì)來(lái)說(shuō)濒憋,sparkStreaming相比于storm的優(yōu)勢(shì)是sparkStreaming可以和spark core何暇,spark SQL無(wú)縫整合陶夜。



4.對(duì)于需要多次引用的凛驮,并且這個(gè)dstream計(jì)算時(shí)間特別耗時(shí),數(shù)據(jù)特別重要条辟,那么我們就需要對(duì)dstream進(jìn)行checkpoint黔夭,(只有多次引用的,進(jìn)行持久化就可以了)羽嫡,因?yàn)榧词箤?duì)這個(gè)dstream進(jìn)行持久化本姥,數(shù)據(jù)也可能會(huì)丟失,而checkpoint數(shù)據(jù)丟失的可能性小杭棵,但是這樣會(huì)影響spark-streaming的數(shù)據(jù)吞吐量婚惫,因?yàn)樵谧鲇?jì)算的同時(shí),還需要將數(shù)據(jù)寫(xiě)入到外部存儲(chǔ)系統(tǒng)中魂爪,會(huì)降低spark性能先舷,影響吞吐量,非必要情況下不建議使用


5.如何對(duì)dstream做checkpoint

?

首先設(shè)置還原點(diǎn)目錄滓侍,其次調(diào)用dstream的checkpoint方法

【注意】:dstream的checkpoint的周期一定要是產(chǎn)生batch時(shí)間的整數(shù)倍蒋川,同時(shí)spark官方建議將checkpoint的時(shí)間設(shè)置為至少10秒。通常來(lái)說(shuō)撩笆,將checkpoint間隔設(shè)置為窗口操作的滑動(dòng)間隔的5-10倍



6.spark程序在啟動(dòng)時(shí)捺球,會(huì)去這個(gè)checkpointPath目錄下查看是否有保存的driver的元數(shù)據(jù)(1.dstream的操作轉(zhuǎn)換關(guān)系缸浦,2.未處理完的batch)信息,當(dāng)spark-streaming程序在二次啟動(dòng)后就會(huì)去checkpointPath目錄下還原這個(gè)程序氮兵,加載未處理的batch元數(shù)據(jù)信息在內(nèi)存中恢復(fù)裂逐,繼續(xù)進(jìn)行任務(wù)處理




7.為了保證spark-streaming程序7*24小時(shí)運(yùn)行,那么我們程序應(yīng)該具備高可靠性泣栈,怎樣具備高可靠性絮姆?

?

a.程序出現(xiàn)故障,driver死掉了秩霍,流式程序應(yīng)該具備自動(dòng)重啟的功能

b.沒(méi)有計(jì)算完成的rdd在程序異常停止后篙悯,下次啟動(dòng)后還會(huì)將未處理的rdd進(jìn)行處理

【注意】:要在spark_submit中,添加--deploy-mode參數(shù)铃绒,默認(rèn)其值為client鸽照,即在提交應(yīng)用的機(jī)器上啟動(dòng)driver,但是要能夠自動(dòng)重啟driver颠悬,就必須將其值設(shè)置為cluster矮燎;此外,需要添加--supervise參數(shù)赔癌,失敗后自動(dòng)重啟

//spark_submit --executor-memory 1g --total-execute-cores 5 --deploy-model cluster --supervise




8.啟用預(yù)寫(xiě)機(jī)制

a.預(yù)寫(xiě)日志機(jī)制诞外,簡(jiǎn)寫(xiě)為WAL,全稱(chēng)為Write Ahead Log灾票,從spark1.2版本開(kāi)始峡谊,就引入了基于容錯(cuò)的文件系統(tǒng)的WAL機(jī)制。如果啟用該機(jī)制刊苍,Receiver接收到的所有數(shù)據(jù)都會(huì)寫(xiě)入配置的checkpoint目錄中的預(yù)寫(xiě)日志既们。這中機(jī)制可以讓driver在恢復(fù)的時(shí)候,避免數(shù)據(jù)丟失正什,并且可以確保整個(gè)實(shí)時(shí)計(jì)算過(guò)程中零數(shù)據(jù)丟失


b.要配置該機(jī)制啥纸,首先調(diào)用StreamingContext的checkpoint()方法設(shè)置一個(gè)checkpoint目錄,然后需要將spark.streaming.receiver.writeAheadLog.enable參數(shù)設(shè)置為true

然而婴氮,這種極強(qiáng)的可靠性機(jī)制斯棒,會(huì)導(dǎo)致Receiver的吞吐量大幅度下降,因?yàn)閱挝粫r(shí)間內(nèi)主经,有相當(dāng)一部分時(shí)間需要將數(shù)據(jù)寫(xiě)入預(yù)寫(xiě)日志荣暮。如果又希望開(kāi)啟預(yù)寫(xiě)日志機(jī)制,確保數(shù)據(jù)零損失旨怠,又不希望影響系統(tǒng)的吞吐量渠驼,那么可以創(chuàng)建多個(gè)輸入DStream,啟動(dòng)多個(gè)Receiver

此外鉴腻,在啟用了預(yù)寫(xiě)日志機(jī)制之后迷扇,推薦將復(fù)制持久化機(jī)制禁用掉百揭,因?yàn)樗袛?shù)據(jù)已經(jīng)保存在容錯(cuò)的文件系統(tǒng)中,不需要在用復(fù)制機(jī)制進(jìn)行持久化蜓席,保存一份副本器一,只要將輸入的DStream的持久化機(jī)制設(shè)置一下即可,persist(StorageLevel.MEMORY_AND_DISK_SER)厨内。


9.spark-Streaming checkpoint概述

每一個(gè)spark-streaming應(yīng)用祈秕,正常來(lái)說(shuō),都是7*24小時(shí)運(yùn)轉(zhuǎn)的雏胃,這就是實(shí)時(shí)計(jì)算程序的特點(diǎn)请毛,因?yàn)橐掷m(xù)不斷的對(duì)數(shù)據(jù)進(jìn)行計(jì)算,因此瞭亮,對(duì)實(shí)時(shí)計(jì)算應(yīng)用的要求方仿,應(yīng)該是必須要能夠?qū)εc應(yīng)用程序邏輯無(wú)關(guān)的失敗,進(jìn)行容錯(cuò)

如果要實(shí)現(xiàn)這個(gè)目標(biāo)统翩,Spark Streaming程序就必須將足夠的信息checkpoint到容錯(cuò)的存儲(chǔ)系統(tǒng)上仙蚜,從而讓它能夠從失敗中進(jìn)行恢復(fù)

有兩種數(shù)據(jù)需要被進(jìn)行checkpoint

1.元數(shù)據(jù)checkpoint-將定義了流式計(jì)算邏輯的信息,保存到容錯(cuò)的存儲(chǔ)系統(tǒng)上厂汗,比如HDFS委粉,當(dāng)運(yùn)行spark Streaming應(yīng)用程序的Driver進(jìn)程所在的節(jié)點(diǎn)失敗時(shí),該信息可以用于進(jìn)行恢復(fù)娶桦,元數(shù)據(jù)信息包括:

a.配置信息---創(chuàng)建spark Streaming應(yīng)用程序的配置信息贾节,比如sparkConf中的信息

b.DStream的操作信息---定義了spark Stream應(yīng)用程序的計(jì)算邏輯的DStream操作信息

c.未處理的batch信息---那些job正在排隊(duì),還沒(méi)處理的batch信息


2.數(shù)據(jù)checkpoint---將實(shí)時(shí)計(jì)算過(guò)程中產(chǎn)生的RDD的數(shù)據(jù)保存到可靠的存儲(chǔ)系統(tǒng)中趟紊。對(duì)于一些將多個(gè)batch的數(shù)據(jù)進(jìn)行聚合氮双,有狀態(tài)的transformation操作,這是非常有用的霎匈,在這種transformation操作中,生成的RDD是依賴(lài)于之前batch的RDD的融痛,這會(huì)導(dǎo)致隨著時(shí)間的推移蛾方,RDD的依賴(lài)鏈條變得越來(lái)越長(zhǎng)霎俩,要避免由于依賴(lài)鏈條越來(lái)越長(zhǎng),導(dǎo)致的一起變得越來(lái)越長(zhǎng)的失敗恢復(fù)時(shí)間墨吓,有狀態(tài)的transformation操作執(zhí)行過(guò)程中中間產(chǎn)生的RDD,會(huì)定期被checkpoint到可靠的存儲(chǔ)系統(tǒng)上,比如HDFS纹磺,從而削減RDD的依賴(lài)鏈條進(jìn)而縮短失敗恢復(fù)時(shí)帖烘,RDD的恢復(fù)時(shí)間。一句話(huà)概括橄杨,元數(shù)據(jù)checkpoint主要是為了從driver失敗中進(jìn)行恢復(fù)秘症;而RDD checkpoint主要是為了使用到有狀態(tài)的transformation操作時(shí)照卦,能夠在其生產(chǎn)出的數(shù)據(jù)丟失時(shí),進(jìn)行快速的失敗恢復(fù)


10.何時(shí)啟用checkpoint機(jī)制乡摹?


a.使用了有狀態(tài)的transformation操作----比如updateStateByKey役耕,或者reduceByKeyAndWindow操作被使用了,那么checkpoint目錄要求必須提供的聪廉,也就是必須開(kāi)啟checkpoint機(jī)制瞬痘,從而進(jìn)行周期性的RDD checkpoint

b.要保證可以從Driver失敗中進(jìn)行恢復(fù)-----元數(shù)據(jù)checkpoint需要啟用,來(lái)進(jìn)行這種情況的恢復(fù)

【注意】并不是說(shuō)板熊,所有的spark streaming應(yīng)用程序框全,都要啟用checkpoint機(jī)制,如果既不強(qiáng)制要求從Driver失敗中自動(dòng)進(jìn)行恢復(fù)干签,又沒(méi)使用有狀態(tài)的transformation操作竣况,那么就不需要啟用checkpoint,事實(shí)上筒严,這么做反而是有助于提升性能的


11.如何自動(dòng)從Driver失敗中恢復(fù)過(guò)來(lái)

?

要能夠自動(dòng)從Driver失敗中恢復(fù)過(guò)來(lái)運(yùn)行spark Streaming應(yīng)用程序的集群丹泉,就必須監(jiān)控Driver運(yùn)行的過(guò)程,并且在他失敗時(shí)將他重啟鸭蛙,對(duì)于spark自身的standalone模式摹恨,需要進(jìn)行一些配置去supervise driver,在他失敗時(shí)將其重啟


首先娶视,要在spark-submit中晒哄,添加--deploy-mode參數(shù),默認(rèn)其值為client肪获,即在提交應(yīng)用的機(jī)器上啟動(dòng)Driver寝凌,但是,要能夠自動(dòng)重啟Driver孝赫,就必須將其值設(shè)置為cluster较木,此外,需要添加--supervise參數(shù)

?

部分原理圖稍后上傳青柄。?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末伐债,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子致开,更是在濱河造成了極大的恐慌峰锁,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件双戳,死亡現(xiàn)場(chǎng)離奇詭異虹蒋,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)魄衅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)峭竣,“玉大人,你說(shuō)我怎么就攤上這事徐绑⌒巴裕” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵傲茄,是天一觀(guān)的道長(zhǎng)毅访。 經(jīng)常有香客問(wèn)我,道長(zhǎng)盘榨,這世上最難降的妖魔是什么喻粹? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮草巡,結(jié)果婚禮上守呜,老公的妹妹穿的比我還像新娘。我一直安慰自己山憨,他們只是感情好查乒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著郁竟,像睡著了一般玛迄。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上棚亩,一...
    開(kāi)封第一講書(shū)人閱讀 51,631評(píng)論 1 305
  • 那天蓖议,我揣著相機(jī)與錄音,去河邊找鬼讥蟆。 笑死勒虾,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的瘸彤。 我是一名探鬼主播修然,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼钧栖!你這毒婦竟也來(lái)了低零?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤拯杠,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后啃奴,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體潭陪,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了依溯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片老厌。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖黎炉,靈堂內(nèi)的尸體忽然破棺而出枝秤,到底是詐尸還是另有隱情,我是刑警寧澤慷嗜,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布淀弹,位于F島的核電站,受9級(jí)特大地震影響庆械,放射性物質(zhì)發(fā)生泄漏薇溃。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一缭乘、第九天 我趴在偏房一處隱蔽的房頂上張望沐序。 院中可真熱鬧,春花似錦堕绩、人聲如沸策幼。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)特姐。三九已至,卻和暖如春绰寞,著一層夾襖步出監(jiān)牢的瞬間到逊,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工滤钱, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留觉壶,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓件缸,卻偏偏與公主長(zhǎng)得像铜靶,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子他炊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容