一随夸、基本特性
1、與MapReduce的不同
????????不同于MapReduce的是Job中間輸出和結(jié)果可以“保存在內(nèi)存”中震放,從而不再需要讀寫HDFS宾毒,
(1)、基于內(nèi)存
????????mapreduce任務后期再計算的時候殿遂,每一個job的輸出結(jié)果會落地到磁盤诈铛,后續(xù)有其他的job需要依賴于前面job的輸出結(jié)果乙各,這個時候就需要進行大量的磁盤io操作。性能就比較低幢竹。
????????spark任務后期再計算的時候耳峦,job的輸出結(jié)果可以保存在內(nèi)存中,后續(xù)有其他的job需要依賴于前面job的輸出結(jié)果焕毫,這個時候就直接從內(nèi)存中獲取得到蹲坷,避免了磁盤io操作,性能比較高邑飒、
(2)循签、進程與線程
????????mapreduce任務以進程的方式運行在yarn集群中,比如程序中有100個MapTask幸乒,一個task就需要一個進程懦底,這些task要運行就需要開啟100個進程。
????????spark任務以線程的方式運行在進程中罕扎,比如程序中有100個MapTask聚唐,后期一個task就對應一個線程,這里就不在是進程腔召,
????????這些task需要運行杆查,這里可以極端一點:只需要開啟1個進程,在這個進程中啟動100個線程就可以了臀蛛。進程中可以啟動很多個線程亲桦,而開啟一個進程與開啟一個線程需要的時間和調(diào)度代價是不一樣。開啟一個進程需要的時間遠遠大于開啟一個線程浊仆。
2客峭、spark集群安裝部署
????????vim spark-env.sh#配置java的環(huán)境變量 ?#配置zk相關(guān)信息
????????vim slaves指定spark集群的worker節(jié)點
? ? ? ? ?vim /etc/profile??修改spark環(huán)境變量
????????環(huán)境變量生效??source /etc/profile
1、先啟動zk???${ZK_HOME}/bin/zkServer.sh start
2抡柿、啟動spark集群?$SPARK_HOME/sbin/start-all.sh
3舔琅、zk作用:高可用
????????在高可用模式下,整個spark集群就有很多個master洲劣,其中只有一個master被zk選舉成活著的master备蚓,其他的多個master都處于standby,同時把整個spark集群的元數(shù)據(jù)信息通過zk中節(jié)點進行保存囱稽。
????????如果活著的master掛掉郊尝。首先zk會感知到活著的master掛掉,開始在多個處于standby中的master進行選舉战惊,再次產(chǎn)生一個活著的master流昏;這個活著的master會讀取保存在zk節(jié)點中的spark集群元數(shù)據(jù)信息,恢復到上一次master的狀態(tài)。
在master的恢復階段對任務的影響?
????????對已經(jīng)運行的任務是沒有任何影響横缔,由于該任務正在運行铺遂,說明它已經(jīng)拿到了計算資源,這個時候就不需要master茎刚。
????????對即將要提交的任務是有影響腥寇,由于該任務需要有計算資源间影,這個時候會找活著的master去申請計算資源争拐,由于沒有一個活著的master,該任務是獲取不到計算資源艘绍,也就是任務無法運行。
4初狰、web管理界面
http://master主機名:8080:集群的詳細信息莫杈、總資源信息、已用資源信息奢入、還剩資源信息筝闹;正在運行的任務信息、已經(jīng)完成的任務信息
bin/spark-submit
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077,node02:7077,node03:7077 \
--executor-memory?1G \
--total-executor-cores 2 \
examples/jars/spark-examples_2.11-2.3.3.jar \
10
????????spark集群中有很多個master腥光,并不知道哪一個master是活著的master关顷,即使你知道哪一個master是活著的master,它也有可能下一秒就掛掉武福,這里就可以把所有master都羅列出來
--master spark://node01:7077,node02:7077,node03:7077
????????后期程序會輪訓整個master列表议双,最終找到活著的master,然后向它申請計算資源捉片,最后運行程序平痰。
5、spark-shell使用
spark-shell --master local[2]伍纫,默認會產(chǎn)生一個SparkSubmit進程宗雇,sc
--master local[N]:表示本地采用N個線程計算任務
sc.textFile("file:///home/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
讀取HDFS上文件: vim spark-env.sh?export HADOOP_CONF_DIR=hdoop安裝位置
//實現(xiàn)讀取hdfs上文件之后,需要把計算的結(jié)果保存到hdfs上
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//todo:利用scala語言開發(fā)spark程序?qū)崿F(xiàn)單詞統(tǒng)計
object WordCount {
??def main(args: Array[String]): Unit = {
????//1莹规、構(gòu)建sparkConf對象?設置application名稱和master地址
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
//2逾礁、構(gòu)建sparkContext對象,該對象非常重要,它是所有spark程序的執(zhí)行入口访惜,它內(nèi)部會構(gòu)建?DAGScheduler和 TaskScheduler 對象
????val sc = new SparkContext(sparkConf)
????sc.setLogLevel("warn")???//設置日志輸出級別
????val data: RDD[String] = sc.textFile("E:\\words.txt")????//3、讀取數(shù)據(jù)文件
? val words: RDD[String] = data.flatMap(x=>x.split(" "))? ?//4 切分每一行腻扇,獲取所有單詞
? ? val wordAndOne: RDD[(String, Int)] = words.map(x => (x,1))//5债热、每個單詞計為1
????val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x,y)=>x+y)?//6、相同單詞出現(xiàn)的1累加
????//按照單詞出現(xiàn)的次數(shù)降序排列?第二個參數(shù)默認是true表示升序幼苛,設置為false表示降序
????val sortedRDD: RDD[(String, Int)] = result.sortBy( x=> x._2,false)
????val finalResult: Array[(String, Int)] = sortedRDD.collect()
????finalResult.foreach(println)?????//7窒篱、收集數(shù)據(jù)打印
????sc.stop()?? //8、關(guān)閉sc
??}
}
打成jar包提交到集群中運行
spark-submit \
--master spark://node01:7077,node02:7077 \
--class com.kaikeba.WordCountOnSpark \
--executor-memory?1g?\
--total-executor-cores?4 \
original-spark_class01-1.0-SNAPSHOT.jar?/words.txt?/out???jar包與輸入輸出
spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
/kkb/install/spark/examples/jars/spark-examples_2.11-2.3.3.jar 10?10是main方法里面的參數(shù)
????????????executor-memory小了,會把rdd一部分數(shù)據(jù)保存在內(nèi)存中墙杯,一部分數(shù)據(jù)保存在磁盤配并;用該rdd時從內(nèi)存和磁盤中獲取,一定的磁盤io高镐。需要設置的大一點溉旋,如10G/20G/30G等;
????????????total-executor-cores:表示任務運行需要總的cpu核數(shù)嫉髓,它決定了任務并行運行的粒度观腊,也會設置的大一點,如30個/50個/100個算行;????
????????加大計算資源它是最直接梧油、最有效果的優(yōu)化手段。? 在計算資源有限的情況下州邢,可以考慮其他方面儡陨,比如說代碼層面,JVM層面等
6量淌、兩種Yarn模式骗村,最大的區(qū)別就是Driver端的位置不一樣。
????????????yarn-cluster: Driver端運行在yarn集群中类少,與ApplicationMaster進程在一起叙身。
????????????yarn-client: ?Driver端運行在提交任務的客戶端,與ApplicationMaster進程沒關(guān)系,經(jīng)常 用于進行測試
二、集群架構(gòu)
2硫狞、spark集群架構(gòu)
(1)Master:集群的主節(jié)點信轿,負責任務資源的分配。
(2)Worker:集群的從節(jié)點残吩,負責任務計算的節(jié)點财忽。
????????????①Executor:是一個進程,它會在worker節(jié)點啟動該進程(計算資源)
????????????②Task:任務是以task線程的方式運行在worker節(jié)點對應的executor進程中泣侮;
(3)ClusterManager:給程序提供計算資源的外部服務即彪,standAlone模式整個任務的資源分配由spark集群的老大Master負責;把spark程序提交到y(tǒng)arn中運行活尊,整個任務的資源分配由yarn中的老大ResourceManager負責
(4)Driver:是所有spark程序的執(zhí)行入口隶校,會執(zhí)行客戶端寫好的main方法,它會構(gòu)建一個名叫SparkContext對象
(5)Application:是一個spark的應用程序蛹锰,它是包含了客戶端的代碼和任務運行的資源信息
(6)一個application就是一個應用程序深胳,包含了很多個job;
????????????一個action操作對應一個DAG有向無環(huán)圖铜犬,即一個action操作就是一個job舞终;
? ? ? ? ? ? 一個job中包含了大量的寬依賴轻庆,按照寬依賴進行stage劃分,一個job產(chǎn)生了很多個stage敛劝;
????????????一個stage中有很多分區(qū)余爆,一個分區(qū)就是一個task,即一個stage中有很多個task夸盟;
Spark中的調(diào)度模式:FIFO(先進先出)蛾方、FAIR(公平調(diào)度)
任務的分配資源worker策略:盡量打散、盡量集中
????????????盡量打散:一個Application盡可能多的分配到不同的節(jié)點满俗,發(fā)揮數(shù)據(jù)的本地性转捕,提升執(zhí)行效率
????????????盡量集中:盡量分配到盡可能少的節(jié)點
3、運行流程
????????(1) Driver端向資源管理器Master發(fā)送注冊和申請計算資源的請求
????????(2) Master通知對應的worker節(jié)點啟動executor進程(計算資源)
????????(3) executor進程向Driver端發(fā)送注冊并且申請task請求
? ? ? ? (4) Driver端運行客戶端的main方法唆垃,構(gòu)建SparkContext對象五芝,在SparkContext對象內(nèi)部依次構(gòu)建DAGScheduler和TaskScheduler
????????(5)按照客戶端代碼rdd的一系列操作順序,生成DAG有向無環(huán)圖
? ? ? ? ?(6) DAGScheduler拿到DAG有向無環(huán)圖之后辕万,按照寬依賴進行stage的劃分枢步。每一個stage內(nèi)部有很多可以并行運行的task,最后封裝在一個一個的taskSet集合中渐尿,然后把taskSet發(fā)送給TaskScheduler
????????(7) TaskScheduler得到taskSet集合之后醉途,依次遍歷取出每一個task提交到worker節(jié)點上的executor進程中運行
????????(8)所有task運行完成,Driver端向Master發(fā)送注銷請求砖茸,Master通知Worker關(guān)閉executor進程隘擎,Worker上的計算資源得到釋放,最后整個任務也就結(jié)束了凉夯。
三货葬、計算資源
1、RDD概念
????????RDD(Resilient Distributed Dataset)叫做==彈性 分布式 數(shù)據(jù)集==劲够,是Spark中最基本的數(shù)據(jù)抽象震桶,它代表一個不可變、可分區(qū)征绎、里面的元素可并行計算的集合.
????????Resilient: 表示彈性蹲姐,rdd的數(shù)據(jù)是可以保存在內(nèi)存或者是磁盤中.
????????Distributed:它內(nèi)部的元素進行了分布式存儲,方便于后期進行分布式計算.
????????Dataset:就是一個集合人柿,存儲很多數(shù)據(jù).
五大屬性:
?(1) A list of partitions:一個rdd有很多分區(qū)柴墩,每一個分區(qū)內(nèi)部是包含了該rdd的部分數(shù)據(jù)
?(2) A function for computing each split:每個分區(qū)都會實現(xiàn)?計算函數(shù)
?(3) A list of dependencies on other RDDs:一個rdd會依賴于其他多個rdd
?(4) Optionally, a Partitioner for key-value RDDs :kv數(shù)據(jù)的分區(qū)函數(shù)基于哈希,非kv是None
?(5) Optionally, a list of preferred locations to compute each split on:有分區(qū)數(shù)據(jù)的節(jié)點會優(yōu)先開啟計算任務凫岖,數(shù)據(jù)的本地性拐邪。
RDD自定義分區(qū)
RDD數(shù)據(jù)進行分區(qū)時,默認使用的是HashPartitioner:對key進行哈希隘截,然后對分區(qū)總數(shù)取模扎阶,
實現(xiàn)自定義partitioner大致分為3個步驟:
????????????繼承==org.apache.spark.Partitioner==
????????????重寫==numPartitions==方法
????????????重寫==getPartition==方法
//對應上面的rdd數(shù)據(jù)進行自定義分區(qū)
val result: RDD[(String, Int)] = wordLengthRDD.partitionBy(new MyPartitioner(3))
2、RDD的創(chuàng)建
1婶芭、scala集合sc.parallelize
????????val?rdd1=sc.parallelize(List(1,2,3,4,5))
????????val?rdd2=sc.parallelize(Array("hadoop","hive","spark"))
????????val?rdd3=sc.makeRDD(List(1,2,3,4))
2东臀、加載外部的數(shù)據(jù)源sc.textFile
????????val?rdd1=sc.textFile("/words.txt")
3、已存在rdd轉(zhuǎn)換成一個新的rdd
????????val?rdd2=rdd1.flatMap(_.split(" "))
????????val?rdd3=rdd2.map((_,1))
3犀农、RDD算子分類
transformation算子:根據(jù)已經(jīng)存在的rdd轉(zhuǎn)換生成一個新的rdd, 延遲加載惰赋,不會立即執(zhí)行
????????????????????????????????map 、mapPartitions呵哨、flatMap赁濒、
????????????????????????????????filter、Union(求并)孟害、intersection(求交)拒炎、distinct(去重)、
????????????????????????????????Join挨务、reduceByKey 击你、groupByKey、sortByKey谎柄、sortBy
????????????????????????????????repartition有shuffle丁侄、coalesce不shuffle
action算子:真正觸發(fā)任務的運行:
????????????????reduce、
????????????????collect :把RDD的數(shù)據(jù)進行收集之后朝巫,以數(shù)組的形式返回給Driver端
????????????????count鸿摇、first、take(n)
????????????????foreach劈猿、foreachPartition
????????????????saveAsTextFile拙吉、saveAsSequenceFile
????????默認Driver端的內(nèi)存大小為1G,由參數(shù) spark.driver.memory 設置糙臼,果某個rdd的數(shù)據(jù)量超過了Driver端默認的1G內(nèi)存庐镐,對rdd調(diào)用collect操作,這里會出現(xiàn)Driver端的內(nèi)存溢出变逃,所有這個collect操作存在一定的風險必逆,實際開發(fā)代碼一般不會使用。new SparkConf().set("spark.driver.memory","5G")
4揽乱、RDD依賴
RDD和它依賴的父RDD的關(guān)系有兩種不同的類型
????????窄依賴:每一個父RDD的Partition最多被子RDD的一個Partition使用名眉,不會產(chǎn)生shuffle;Map凰棉、flatMap损拢、filter、union等等撒犀,
????????寬依賴:多個子RDD的Partition會依賴同一個父RDD的Partition福压,會產(chǎn)生掏秩;?shufflereduceByKey/sortByKey/groupBy/groupByKey/join等等
????????join分為寬依賴和窄依賴,如果RDD有相同的partitioner荆姆,那么將不會引起shuffle蒙幻,這種join是窄依賴,反之就是寬依賴
5胆筒、RDD的Lineage血統(tǒng)
????????????lineage保存了RDD的依賴關(guān)系邮破,會記錄RDD的元數(shù)據(jù)信息和轉(zhuǎn)換行為,當該RDD的部分分區(qū)數(shù)據(jù)丟失時仆救,它可以根據(jù)這些信息來重新運算和恢復丟失的數(shù)據(jù)分區(qū)
????????RDD只支持粗粒度轉(zhuǎn)換抒和,即只記錄單個塊上執(zhí)行的單個操作。創(chuàng)建RDD的一系列Lineage(即血統(tǒng))記錄下來彤蔽,以便恢復丟失的分區(qū)
6摧莽、RDD緩存機制
????????把一個rdd的數(shù)據(jù)緩存起來,后續(xù)有其他的job需要用到該rdd的結(jié)果數(shù)據(jù)铆惑,可以直接從緩存中獲取得到范嘱,避免了重復計算。緩存是加快后續(xù)對該數(shù)據(jù)的訪問操作员魏。
(1)persist/cache
?????RDD通過persist方法或cache方法可以將前面的計算結(jié)果緩存丑蛤,不會立即緩存,觸發(fā)后面的action撕阎,才會被緩存在計算節(jié)點的內(nèi)存中受裹。調(diào)用rdd的unpersist方法,一個application應用程序結(jié)束之后虏束,對應的緩存數(shù)據(jù)也就自動清除棉饶;
????????cache:最終也是調(diào)用了persist方法,默認的存儲級別都是僅在內(nèi)存存儲一份
????????persist:可以把數(shù)據(jù)緩存在內(nèi)存或者是磁盤镇匀,有豐富的緩存級別照藻,這些緩存級別都被定義在StorageLevel這個object中。
????????為了獲取得到一個rdd的結(jié)果數(shù)據(jù)汗侵,經(jīng)過了大量的算子操作或者是計算邏輯比較復雜幸缕,可以把多次使用到的rdd,是公共rdd進行持久化晰韵;
(2)checkpoint
????????把數(shù)據(jù)保存在內(nèi)存中不安全发乔,服務器掛掉或進程終止,會導致數(shù)據(jù)的丟失雪猪;存在本地磁盤中栏尚,操作刪除了,或者是磁盤損壞只恨,也有可能導致數(shù)據(jù)的丟失译仗;
????????????checkpoint把數(shù)據(jù)保存在分布式文件系統(tǒng)HDFS上抬虽。高可用性,高容錯性(多副本)來最大程度保證數(shù)據(jù)的安全性古劲。
????1斥赋、在hdfs上設置一個checkpoint目錄 sc.setCheckpointDir("hdfs://node01:8020/checkpoint")
????2、對需要做checkpoint操作的rdd調(diào)用checkpoint方法
????????????????val?rdd1=sc.textFile("/words.txt")
????????????????rdd1.checkpoint
????????????????val?rdd2=rdd1.flatMap(_.split(" "))
3产艾、最后需要有一個action操作去觸發(fā)任務的運行
????????????????rdd2.collect
7、廣播變量
????????spark中分布式執(zhí)行的代碼滑绒,需要==傳遞到各個Executor的Task上運行==闷堡。對于一些只讀、固定的數(shù)據(jù)(比如從DB中讀出的數(shù)據(jù)),每次都需要Driver廣播到各個Task上疑故,這樣效率低下
????????廣播變量允許將變量廣播給各個Executor杠览。該Executor上的各個Task從所在節(jié)點的BlockManager獲取變量,而不是從Driver獲取變量纵势,以減少通信的成本踱阿,減少內(nèi)存的占用,從而提升了效率钦铁。
????????(1)通過對一個類型T的對象調(diào)用 SparkContext.broadcast創(chuàng)建出一個Broadcast[T]對象软舌。(任何可序列化的類型都可以這么實現(xiàn))
????????(2)通過 value 屬性訪問該對象的值
????????(3)變量只會被發(fā)到各個節(jié)點一次,應作為只讀值處理(修改這個值不會影響到別的節(jié)點)
val?conf?=?new?SparkConf().setMaster("local[2]").setAppName("brocast")
val?sc=new?SparkContext(conf)
val?rdd1=sc.textFile("/words.txt")
val?word="spark"
//通過調(diào)用sparkContext對象的broadcast方法把數(shù)據(jù)廣播出去
val?broadCast?=?sc.broadcast(word)
//在executor中通過調(diào)用廣播變量的value屬性獲取廣播變量的值
val?rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(broadCast.value))
rdd2.foreach(x=>println(x))
注意:
????????1牛曹、不能將一個RDD使用廣播變量廣播出去
????????2佛点、廣播變量只能在Driver端定義,不能在Executor端定義
????????3黎比、在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值
????????4、如果executor端用到了Driver的變量缘琅,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本
????????5褒颈、如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本
累加器
????????累加器(accumulator)是Spark中提供的一種分布式的變量機制颓帝,其原理類似于mapreduce米碰,即分布式的改變,然后聚合這些改變躲履。
????????累加器的一個常見用途是见间,在調(diào)試時對作業(yè)執(zhí)行過程中的事件進行計數(shù)」げ拢可以使用累加器來進行全局的計數(shù)米诉。
8、DAG劃分stage
????????原始的RDD通過一系列的轉(zhuǎn)換就形成了DAG篷帅。(Directed Acyclic Graph史侣,有向無環(huán)圖)
????????根據(jù)RDD之間依賴關(guān)系的不同將DAG劃分成不同的Stage(調(diào)度階段)拴泌;對于窄依賴,轉(zhuǎn)換處理在一個Stage中完成計算惊橱;對于寬依賴蚪腐,由于有Shuffle的存在,只能在parent RDD處理完成后税朴,才能開始接下來的計算回季,
????????劃分完stage之后,在同一個stage中只有窄依賴正林,沒有寬依賴泡一,可以實現(xiàn)流水線計算;stage中的每一個分區(qū)對應一個task觅廓,在同一個stage中就有很多可以并行運行的task鼻忠。
????????一個Job會被拆分為多組Task,每組任務被稱為一個stage杈绸。stage表示不同的調(diào)度階段帖蔓,一個spark job會對應產(chǎn)生很多個stage
劃分stage的依據(jù)就:寬依賴
????????(1)首先根據(jù)rdd的算子操作順序生成DAG有向無環(huán)圖,接下里從最后一個rdd往前推瞳脓,創(chuàng)建一個新的stage塑娇,把該rdd加入到該stage中,它是最后一個stage篡殷。
? ? ? ? ?(2)在往前推的過程中運行遇到了窄依賴就把該rdd加入到本stage中钝吮,如果遇到了寬依賴,就從寬依賴切開板辽,那么最后一個stage也就結(jié)束了奇瘦。
????????(3) 重新創(chuàng)建一個新的stage,按照第二個步驟繼續(xù)往前推劲弦,一直到最開始的rdd耳标,整個劃分stage也就結(jié)束了。
????????劃分完stage之后邑跪,每一個stage中有很多可以并行運行的task次坡,后期把每一個stage中的task封裝在一個taskSet集合中,最后把一個一個的taskSet集合提交到worker節(jié)點上的executor進程中運行画畅。
????????rdd與rdd之間存在依賴關(guān)系砸琅,stage與stage之前也存在依賴關(guān)系,前面stage中的task先運行轴踱,運行完成了再運行后面stage中的task症脂,也就是說后面stage中的task輸入數(shù)據(jù)是前面stage中task的輸出結(jié)果數(shù)據(jù)。
9、序列化
????????spark是分布式執(zhí)行引擎诱篷,其核心抽象是彈性分布式數(shù)據(jù)集RDD壶唤,其代表了分布在不同節(jié)點的數(shù)據(jù)。Spark的計算是在executor上分布式執(zhí)行的棕所,故用戶開發(fā)的關(guān)于RDD的map闸盔,flatMap,reduceByKey等transformation 操作(閉包)有如下執(zhí)行過程:
?????????1)代碼中對象在driver本地序列化琳省,對象序列化后傳輸?shù)竭h程executor節(jié)點迎吵;
????????2)遠程executor節(jié)點反序列化對象,最終遠程節(jié)點執(zhí)行针贬。
對象在執(zhí)行中钓觉,需要序列化通過網(wǎng)絡傳輸,則必須經(jīng)過序列化過程坚踩。
解決序列化的辦法:
? ? ????1)如果函數(shù)中使用了該類對象,該類要實現(xiàn)序列化瓤狐,類extends Serializable
? ? ????2)如果函數(shù)中使用了該類對象的成員變量瞬铸,該類除了要實現(xiàn)序列化之外,所有的成員變量必須要實現(xiàn)序列化
????????3)對于不能序列化的成員變量使用==“@transient”==標注础锐,告訴編譯器不需要序列化
????????4)也可將依賴的變量嗓节,獨立放到一個小的class中,讓這個class支持序列化皆警,這樣做可以減少網(wǎng)絡傳輸量拦宣,提高效率。
????????5)可以把對象的創(chuàng)建直接在該函數(shù)中構(gòu)建這樣避免需要序列化?
10信姓、spark的shuffle
????????Shuffle就是對數(shù)據(jù)進行重組鸵隧,由于分布式計算的特性和要求,在實現(xiàn)細節(jié)上更加繁瑣和復雜意推。Stage階段的劃分:是根據(jù)是否有寬依賴shuffle過程豆瘫,job會劃分成多個Stage,每一個stage內(nèi)部有很多task菊值。stage與stage之間的過程就是shuffle階段外驱。
????????在Spark的中,負責shuffle過程的執(zhí)行腻窒、計算和處理的組件昵宇,主要就是ShuffleManager。ShuffleManager分為HashShuffleManager和SortShuffleManager儿子,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種瓦哎。
????????在Spark 1.2以前,默認的shuffle是HashShuffleManager。該ShuffleManager-HashShuffleManager有著一個非常嚴重的弊端杭煎,就是會產(chǎn)生大量的中間磁盤文件恩够,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以后的版本中羡铲,默認的ShuffleManager改成了SortShuffleManager蜂桶。
?? ????SortShuffleManager相較于HashShuffleManager來說,有了一定的改進也切。主要就在于每個Task在進行shuffle操作時扑媚,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件雷恃,因此每個Task就只有一個磁盤文件疆股。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分數(shù)據(jù)即可倒槐。
????????HashShuffleManager的運行機制主要分成兩種:普通運行機制旬痹、合并運行機制;合并機制主要是通過復用buffer來優(yōu)化Shuffle過程中產(chǎn)生的小文件的數(shù)量
? ??????普通HashShuffle :
????????????shuffle write讨越,每個task處理的數(shù)據(jù)按key進行“hash分區(qū)”两残,每個task都要創(chuàng)建分區(qū)個份磁盤文件,每個Executor上總共就要創(chuàng)建task*分區(qū)數(shù)個磁盤文件把跨, 產(chǎn)生的磁盤文件的數(shù)量是極其驚人的人弓;
????????????shuffle read的過程中,每個task要從上游stage的所有task所在節(jié)點上着逐,拉取屬于自己的那一個磁盤文件崔赌,每個ask都有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù)32k耸别,? ?
? ? ????缺點:在磁盤上會產(chǎn)生海量的小文件健芭,建立通信和拉取數(shù)據(jù)的次數(shù)變多,此時會產(chǎn)生大量耗時低效的 IO 操作;大量耗時低效的 IO 操作 太雨,導致寫磁盤時的對象過多吟榴,讀磁盤時候的對象也過多,這些對象存儲在堆內(nèi)存中囊扳,會導致堆內(nèi)存不足吩翻,相應會導致頻繁的GC,GC會導致OOM锥咸。
???合并HashShuffle :
????????一個Executor只有一種類型的Key的數(shù)據(jù)狭瞎,每個Executor上總共就要創(chuàng)建分區(qū)數(shù)個磁盤文件;
????????缺點:如果Reducer 端的并行任務或者是數(shù)據(jù)分片過多的話則 Core * Reducer Task 依舊過大搏予,也會產(chǎn)生很多小文件熊锭。
普通SortShuffle
????????普通SortShuffle?相當于預聚合,數(shù)據(jù)會先寫入一個數(shù)據(jù)結(jié)構(gòu),聚合算子寫入Map碗殷,一邊通過Map局部聚合精绎,一邊寫入內(nèi)存。锌妻。在該模式下代乃,數(shù)據(jù)會先寫入一個數(shù)據(jù)結(jié)構(gòu),聚合算子寫入Map仿粹,一邊通過Map局部聚合搁吓,一邊寫入內(nèi)存。Join算子寫入ArrayList直接寫入內(nèi)存中吭历。然后需要判斷是否達到閾值(5M)堕仔,如果達到就會將內(nèi)存數(shù)據(jù)結(jié)構(gòu)的數(shù)據(jù)寫入到磁盤,清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)晌区。
????????在溢寫磁盤前摩骨,先根據(jù)key進行排序,排序過后的數(shù)據(jù)朗若,會分批寫入到磁盤文件中仿吞。每批一萬條寫入到一個臨時磁盤文件,每次溢寫都會產(chǎn)生一個臨時磁盤文件捡偏,一個task過程會產(chǎn)生多個臨時文件。最后在每個task中峡迷,將所有的臨時文件合并merge银伟,一次寫入到最終文件。
????????一個task的所有數(shù)據(jù)都在這一個文件中绘搞。同時單獨寫一份索引文件彤避,標識下游各個task的數(shù)據(jù)在文件中的索引start offset和end offset。這樣算來如果第一個stage 50個task夯辖,每個Executor執(zhí)行一個task琉预,那么無論下游有幾個task,就需要50*2=100個磁盤文件蒿褂。
????????????1. 小文件明顯變少了圆米,一個task只生成一個file文件
????????????2. file文件整體有序,加上索引文件的輔助啄栓,查找變快娄帖,雖然排序浪費一些性能,但是查找變快很多
bypass模式SortShuffle?
????????????優(yōu)化后sortshuffle的普通機制相比昙楚,在shuffleMapTask不多的情況下近速,首先寫的機制是不同,其次不會進行排序。這樣就可以節(jié)約一部分性能開銷削葱。
????????????在shuffleMapTask數(shù)量小于默認值200時奖亚,啟用bypass模式的sortShuffle(原因是數(shù)據(jù)量本身比較少,沒必要進行sort全排序析砸,因為數(shù)據(jù)量少本身查詢速度就快昔字,正好省了sort的那部分性能開銷。)
該機制與普通SortShuffleManager運行機制的不同在于:
????????第一: 磁盤寫機制不同干厚;
????????第二: 不會進行sort排序李滴;
bypass機制運行條件
- shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值
- 不是聚合類的shuffle算子(比如reduceByKey)
四、SparkSQL
是apache Spark用來處理結(jié)構(gòu)化數(shù)據(jù)的一個模塊
1蛮瞄、sparksql的四大特性
(1)易整合所坯,將SQL查詢與Spark程序無縫混合,可以使用不同的語言進行代碼開發(fā)
(2)統(tǒng)一的數(shù)據(jù)源訪問挂捅,可以采用一種統(tǒng)一的方式去對接任意的外部數(shù)據(jù)源
val ?dataFrame = sparkSession.read.文件格式的方法名("該文件格式的路徑")
(3)兼容hive芹助,sparksql可以支持hivesql語法 ?sparksql兼容hivesql
(4)支持標準的數(shù)據(jù)庫連接,支持標準的數(shù)據(jù)庫連接JDBC或者ODBC
2闲先、DataFrame
????????DataFrame是一種以RDD為基礎的分布式數(shù)據(jù)集状土,類似于傳統(tǒng)數(shù)據(jù)庫的二維表格;DataFrame帶有Schema元信息伺糠,即所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型蒙谓。
????????RDD,具有面向?qū)ο缶幊痰娘L格训桶;而且開發(fā)時會進行類型的檢查累驮,保證編譯時類型安全。但是RDD數(shù)據(jù)量比較大時舵揭,由于需要存儲在堆內(nèi)存中谤专,堆內(nèi)存有限,容易出現(xiàn)頻繁的垃圾回收(GC)午绳;同時置侍,RDD發(fā)送到其他服務器,序列化和反序列性能開銷很大拦焚。
????????DataFrame引入了schema元信息和off-heap(堆外內(nèi)存)蜡坊,大量的對象構(gòu)建直接使用操作系統(tǒng)層面上的內(nèi)存,堆內(nèi)存就比較充足赎败,不容易GC算色;同時,schema元信息代表數(shù)據(jù)結(jié)構(gòu)的描述信息螟够,可以省略掉對schema的序列化網(wǎng)絡傳輸灾梦,只需對數(shù)據(jù)內(nèi)容本身進行序列化峡钓,減小序列化和反序列性能開銷。但是若河,編譯時不會進行類型的檢查能岩,編譯時類型不安全;不在具有面向?qū)ο缶幊痰娘L格萧福;
????????DataSet是在Spark1.6中添加的新的接口拉鹃,提供了強類型支持,在RDD的每行數(shù)據(jù)加了類型約束鲫忍,可以用強大lambda函數(shù)膏燕,使用了Spark SQL優(yōu)化的執(zhí)行引擎。DataSet包含了DataFrame的功能悟民,Spark2.0中兩者統(tǒng)一坝辫,DataFrame表示為DataSet[Row],即DataSet的子集射亏。修改了DataSet的缺陷近忙,DataSet可以在編譯時檢查類型,并且是面向?qū)ο蟮木幊探涌凇?/p>
RDD智润,存儲在堆內(nèi)存及舍、序列化開銷很大、編譯時類型安全窟绷、面向?qū)ο缶幊獭?/b>
DataFrame锯玛,引入了schema元信息和堆外內(nèi)存,只需對數(shù)據(jù)內(nèi)容本身進行序列化兼蜈,編譯時類型不安全精拟、不具有面向?qū)ο缶幊田L格应狱。
DataSet儡循,Spark1.6中提供了強類型支持厂抽,包含了DataFrame的功能蔬浙,DataFrame表示為DataSet[Row]忽孽、編譯時類型安全浩螺、面向?qū)ο缶幊獭?br>
3疫剃、 RDD 姊途、DataFrame涉瘾、DataSet?
1)RDD
優(yōu)點:
????????編譯時類型安全、編譯時就能檢查出類型錯誤捷兰、面向?qū)ο蟮木幊田L格立叛、直接通過類名點的方式來操作數(shù)據(jù)
缺點:
????????序列化和反序列化的性能開銷、無論是集群間的通信,還是IO操作都需要對對象的結(jié)構(gòu)和數(shù)據(jù)進行序列化和反序列化贡茅。GC的性能開銷秘蛇,頻繁的創(chuàng)建和銷毀對象, 勢必會增加GC
2)DataFrame
????????DataFrame引入了schema和off-heap
????????schema : RDD每一行的數(shù)據(jù), 結(jié)構(gòu)都是一樣的其做,這個結(jié)構(gòu)就存儲在schema中。?Spark通過schema就能夠讀懂數(shù)據(jù), 因此在通信和IO時就只需要序列化和反序列化數(shù)據(jù), 而結(jié)構(gòu)的部分就可以省略了赁还。
3)DataSet
????????DataSet結(jié)合了RDD和DataFrame的優(yōu)點妖泄,并帶來的一個新的概念Encoder。
????????當序列化數(shù)據(jù)時艘策,Encoder產(chǎn)生字節(jié)碼與off-heap進行交互蹈胡,能夠達到按需訪問數(shù)據(jù)的效果,而不用反序列化整個對象朋蔫。Spark還沒有提供自定義Encoder的API罚渐,但是未來會加入。
三者之間的轉(zhuǎn)換:
(1)DataFrame與DataSet互轉(zhuǎn)
????????·DataFrame轉(zhuǎn)換成DataSet:val dataSet=dataFrame.as[強類型]
????????·DataSet轉(zhuǎn)換成DataFrame:val dataFrame=dataSet.toDF
(1)DataFrame驯妄、DataSet與RDD互轉(zhuǎn):
????????·從dataFrame和dataSet獲取得到rdd:
????????????val rdd1=dataFrame.rdd荷并;val rdd2=dataSet.rdd
????????·RDD轉(zhuǎn)換為DataFrame:
????????方法一:反射機制,定義一個樣例類富玷,后期直接映射成DataFrame的schema信息璧坟;
????????方法二:通過StructType直接指定Schema
3、DataFrame常用操作
DSL風格語法:sparksql中的DataFrame自身提供了一套自己的Api赎懦,可以去使用這套api來做相應的處理
(1)?SQL風格語法
可以把DataFrame注冊成一張表雀鹃,然后通過sparkSession.sql(sql語句)操作
import?org.apache.spark.SparkContext
import?org.apache.spark.rdd.RDD
import?org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
//todo:利用反射機制實現(xiàn)把rdd轉(zhuǎn)成dataFrame
case?class?Person(id:String,name:String,age:Int)
object?CaseClassSchema?{
??def?main(args: Array[String]): Unit?=?{
?? ?//1、構(gòu)建SparkSession對象
?? ?val?spark: SparkSession?=?SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
?? ?//2励两、獲取sparkContext對象
?? ?val?sc: SparkContext?=?spark.sparkContext
?? ?sc.setLogLevel("warn")
?? ?//3黎茎、讀取文件數(shù)據(jù)
?? ?val?data: RDD[Array[String]] =?sc.textFile("E:\\person.txt").map(x=>x.split(" "))
方法一:反射機制,定義一個樣例類当悔,后期直接映射成DataFrame的schema信息
?//4傅瞻、定義一個樣例類
case class Person(id:String,name:String,age:Int)
?? ?//5、將rdd與樣例類進行關(guān)聯(lián)
?? ?val?personRDD: RDD[Person] =?data.map(x=>Person(x(0),x(1),x(2).toInt))
?? ?//6盲憎、將rdd轉(zhuǎn)換成dataFrame
?? ?//需要手動導入隱式轉(zhuǎn)換
?? ?import?spark.implicits._
?? ?val?personDF: DataFrame?=?personRDD.toDF
方法二:通過StructType直接指定Schema
//4嗅骄、將rdd與Row對象進行關(guān)聯(lián)
?val rowRDD: RDD[Row] = data.map(x=>Row(x(0),x(1),x(2).toInt))
//5、指定dataFrame的schema信息饼疙,這里指定的字段個數(shù)和類型必須要跟Row對象保持一致
??val schema=StructType(
????????StructField("id",StringType)::
????????StructField("name",StringType)::
????????StructField("age",IntegerType)::Nil
????)
????val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema)
?? ?//7溺森、對dataFrame進行相應的語法操作
?? ?//todo:----------------- DSL風格語法-----------------start
?? ?//打印schema
?? ?personDF.printSchema()
?? ?//展示數(shù)據(jù)
?? ?personDF.show()
?? ?//獲取第一行數(shù)據(jù)
?? ?val?first: Row?=?personDF.first()
?? ?println("first:"+first)
?? ?//取出前3位數(shù)據(jù)
?? ?val?top3: Array[Row] =?personDF.head(3)
?? ?top3.foreach(println)
?? ?//獲取name字段
?? ?personDF.select("name").show()
?? ?personDF.select($"name").show()
?? ?personDF.select(new?Column("name")).show()
?? ?personDF.select("name","age").show()
?? ?//實現(xiàn)age +1
?? ?personDF.select($"name",$"age",$"age"+1).show()
?? ?//按照age過濾
?? ?personDF.filter($"age"?>30).show()
?? ?val?count: Long?=?personDF.filter($"age"?>30).count()
?? ?println("count:"+count)
?? ?//分組
?? ?personDF.groupBy("age").count().show()
?? ?personDF.show()
?? ?personDF.foreach(row?=>?println(row))
?? ?//使用foreach獲取每一個row對象中的name字段
?? ?personDF.foreach(row?=>println(row.getAs[String]("name")))
?? ?personDF.foreach(row?=>println(row.get(1)))
?? ?personDF.foreach(row?=>println(row.getString(1)))
?? ?personDF.foreach(row?=>println(row.getAs[String](1)))
?? ?//todo:----------------- DSL風格語法--------------------end
?? ?//todo:----------------- SQL風格語法-----------------start
?? ?personDF.createTempView("person")
?? ?//使用SparkSession調(diào)用sql方法統(tǒng)計查詢
?? ?spark.sql("select * from person").show
?? ?spark.sql("select name from person").show
?? ?spark.sql("select name,age from person").show
?? ?spark.sql("select * from person where age >30").show
?? ?spark.sql("select count(*) from person where age >30").show
?? ?spark.sql("select age,count(*) from person group by age").show
?? ?spark.sql("select age,count(*) as count from person group by age").show
?? ?spark.sql("select * from person order by age desc").show
?? ?//todo:----------------- SQL風格語法----------------------end
?? ?//關(guān)閉sparkSession對象
?? ?spark.stop()
??}
}
16、sparksql操作hivesql
添加依賴
?? ? ? ?<dependency>
?? ? ? ? ? ?<groupId>org.apache.spark</groupId>
?? ? ? ? ? ?<artifactId>spark-hive_2.11</artifactId>
?? ? ? ? ? ?<version>2.3.3</version>
?? ? ? ?</dependency>
代碼開發(fā)
import?org.apache.spark.sql.SparkSession
//todo:利用sparksql操作hivesql
object?HiveSupport?{
??def?main(args: Array[String]): Unit?=?{
?? ?//1窑眯、構(gòu)建SparkSession對象
?? ?val?spark: SparkSession?=?SparkSession.builder().appName("HiveSupport") .master("local[2]")
?? ? ?.enableHiveSupport()//開啟對hive的支持
?? ? ?.getOrCreate()
?? ?//2屏积、直接使用sparkSession去操作hivesql語句
?? ? ?//2.1創(chuàng)建一張hive表
?? ? ? spark.sql("create table people(id string,name string,age int) row format delimited fields terminated by ','")
?? ? ?//2.2加載數(shù)據(jù)到hive表中
?? ? ? spark.sql("load data local inpath './data/kaikeba.txt' into table people ")
?? ? ?//2.3查詢
?? ? ?spark.sql("select * from people").show()
?? ?spark.stop()
??}
}
添加mysql連接驅(qū)動jar包
<dependency>
????<groupId>mysql</groupId>
????<artifactId>mysql-connector-java</artifactId>
????<version>5.1.38</version>
</dependency>
代碼開發(fā)
import?java.util.Properties
import?org.apache.spark.sql.{DataFrame, SparkSession}
//todo:通過sparksql把結(jié)果數(shù)據(jù)寫入到mysql表中
object?Data2Mysql?{
??def?main(args: Array[String]): Unit?=?{
?? ?//1、創(chuàng)建SparkSession
?? ?val?spark: SparkSession?=?SparkSession?.builder().appName("Data2Mysql") ?.getOrCreate()
?? ?//2磅甩、讀取mysql表中數(shù)據(jù)
?? ? ? ?//2.1定義url連接
?? ? ? ?val?url="jdbc:mysql://node03:3306/spark"
?? ? ? ?//2.2定義表名
?? ? ? ?val?table="user"
?? ? ? ?//2.3定義屬性
?? ? ? ?val?properties=new?Properties()
?? ? ? ?properties.setProperty("user","root")
?? ? ? ?properties.setProperty("password","123456")
?? ?val?mysqlDF: DataFrame?=?spark.read.jdbc(url,table,properties)
?? ?//把dataFrame注冊成一張表
?? ? ?mysqlDF.createTempView("user")
?? ?//通過sparkSession調(diào)用sql方法
?? ? ? //需要統(tǒng)計經(jīng)度和維度出現(xiàn)的人口總數(shù)大于1000的記錄 保存到mysql表中
?? ?val?result: DataFrame?=?spark.sql("select * from user where age >30")
????//保存結(jié)果數(shù)據(jù)到mysql表中
?? ?//mode:指定數(shù)據(jù)的插入模式
?? ? ? ?//overwrite:表示覆蓋炊林,如果表不存在,事先幫我們創(chuàng)建
?? ? ? ?//append ? :表示追加卷要, 如果表不存在渣聚,事先幫我們創(chuàng)建
?? ? ? ?//ignore ? :表示忽略独榴,如果表事先存在,就不進行任何操作
?? ? ? ?//error ? ?:如果表事先存在就報錯(默認選項)
?? ? result.write.mode(args(0)).jdbc(url,args(1),properties)
?? ?//關(guān)閉
?? ? spark.stop()
??}
}
提交任務腳本
spark-submit \
--master?spark://node01:7077 \
--class?com.kaikeba.sql.Data2Mysql \
--executor-memory?1g \
--total-executor-cores?4?\
--driver-class-path?/home/hadoop/mysql-connector-java-5.1.38.jar \
--jars?/home/hadoop/mysql-connector-java-5.1.38.jar \
spark_class02-1.0-SNAPSHOT.jar \
append ?kaikeba
4饵逐、sparksql中自定義函數(shù)(★★★★★)
import?org.apache.spark.sql.api.java.UDF1
import?org.apache.spark.sql.types.StringType
import?org.apache.spark.sql.{DataFrame, SparkSession}
//TODO:自定義sparksql的UDF函數(shù) ? ?一對一的關(guān)系
object?SparkSQLFunction?{
??def?main(args: Array[String]): Unit?=?{
?? ?//1括眠、創(chuàng)建SparkSession
?? ?val?sparkSession: SparkSession?=?SparkSession.builder().appName("SparkSQLFunction").master("local[2]").getOrCreate()
?//2、構(gòu)建數(shù)據(jù)源生成DataFrame
?? ?val?dataFrame: DataFrame?=?sparkSession.read.text("E:\\data\\test_udf_data.txt")
?//3倍权、注冊成表
?? ?dataFrame.createTempView("t_udf")
?? ?//4掷豺、實現(xiàn)自定義的UDF函數(shù)
?? ?//小寫轉(zhuǎn)大寫
?? ?sparkSession.udf.register("low2Up",new?UDF1[String,String]() {
?? ? ?override?def?call(t1: String): String?=?{
?? ? ? ?t1.toUpperCase
?? ? ?}
?? ?},StringType)
?? ?//大寫轉(zhuǎn)小寫
?? ?sparkSession.udf.register("up2low",(x:String)=>x.toLowerCase)
?? ?//4、把數(shù)據(jù)文件中的單詞統(tǒng)一轉(zhuǎn)換成大小寫
?? ?sparkSession.sql("select ?value from t_udf").show()
?? ?sparkSession.sql("select ?low2Up(value) from t_udf").show()
?? ?sparkSession.sql("select ?up2low(value) from t_udf").show()
?? ?sparkSession.stop()
??}
}
5薄声、sparksql整合hive
1当船、把hive目錄下的hive-site.xml,拷貝到每一個spark的conf文件夾中
2把連接mysql驅(qū)動的jar包默辨,拷貝到spark的jars文件夾中
可以使用spark-sql腳本 后期執(zhí)行sql相關(guān)的任務
啟動腳本
spark-sql \
--master?spark://node01:7077 \
--executor-memory?1g \
--total-executor-cores?4?\
--conf?spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse
應用場景
#!/bin/sh
#定義sparksql提交腳本的頭信息
SUBMITINFO="spark-sql --master spark://node01:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse"?
#定義一個sql語句
SQL="select * from default.hive_source;"?
#執(zhí)行sql語句 ? 類似于 hive -e sql語句
echo?"$SUBMITINFO"?
echo?"$SQL"
$SUBMITINFO?-e?"$SQL"
6德频、sparkSql輸出jdbc
val properties=new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
val mysqlDF:?DataFrame =?spark.read.jdbc(url,table,properties)?
//把dataFrame注冊成一張表
mysqlDF.createTempView("user")
//保存結(jié)果數(shù)據(jù)到mysql表中
result.write.mode("append").jdbc(url,"kaikeba",properties)
//mode:指定數(shù)據(jù)的插入模式
//overwrite: 表示覆蓋,如果表不存在缩幸,事先幫我們創(chuàng)建
//append :表示追加壹置, 如果表不存在,事先幫我們創(chuàng)建
//ignore :表示忽略表谊,如果表事先存在钞护,就不進行任何操作
//error :如果表事先存在就報錯(默認選項) //關(guān)閉
?五、?saprk Sreaming
1爆办、DStream
離散數(shù)據(jù)流
????????一個DStream以一系列連續(xù)的RDDs所展現(xiàn)难咕,其中的每個RDD都包含來自一定間隔的數(shù)據(jù),在DStream上使用的任何操作都會轉(zhuǎn)換為針對底層RDD的操作距辆。
scala版本
object?WordCount?{
??def?main(args: Array[String]): Unit?=?{
?? ?//步驟一:初始化程序入口
?? ?val?conf?=?new?SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
?? ?val?ssc?=?new?StreamingContext(conf,?Seconds(1))//間隔1s的數(shù)據(jù),形成rdd
?? ?//步驟二:獲取數(shù)據(jù)流
?? ?val?lines?=?ssc.socketTextStream("localhost", 9999)
?? ?//步驟三:數(shù)據(jù)處理
?? ?val?words?=?lines.flatMap(_.split(" "))
?? ?val?pairs?=?words.map(word?=>?(word, 1))
?? ?val?wordCounts?=?pairs.reduceByKey(_?+?_)
?? //步驟四: 數(shù)據(jù)輸出
?? ?wordCounts.print()
?? ?//步驟五:啟動任務
?? ?ssc.start()
?? ?ssc.awaitTermination()
?? ?ssc.stop()
??}
}
2余佃、Transformation 高級算子
updateStateByKey、mapWithState跨算、Transform爆土、Window
?(1)updateStateByKey
/??? ? ?*數(shù)據(jù)的處理
?? ? ?* Option:
?? ? ?* ? Some:有值;None:沒有值
?? ? ?* ? values:Seq[Int] ? List{1,1}
?? ? ?* ? state:Option[Int]上一次這個單詞出現(xiàn)了多少次 ?None ?Some 2
?? ? ?*/
?? ?val?wordCountDStream?=?dstream.flatMap(_.split(","))
?? ? ?.map((_, 1))
?? ? ?.updateStateByKey((values: Seq[Int], state: Option[Int]) =>?{
?? ? ? ?val?currentCount?=?values.sum
?? ? ? ?val?lastCount?=?state.getOrElse(0)
?? ? ? ?Some(currentCount?+?lastCount)
?? ? ?})
(2)mapWithState
/**
??*性能更好
??*/
?? ?// currentBatchTime :表示當前的Batch的時間
?? ?// key:表示需要更新狀態(tài)的key
?? ?// value:表示當前batch的對應的key的對應的值
?? ?// currentState:對應key的當前的狀態(tài)
?? ?val?stateSpec?=?StateSpec.function((currentBatchTime: Time, key: String, value: Option[Int], currentState: State[Long]) =>?{
?? ? ?val?sum?=?value.getOrElse(0).toLong?+?currentState.getOption.getOrElse(0L)
?? ? ?val?output?=?(key, sum)
?? ? ?if?(!currentState.isTimingOut()) {
?? ? ? ?currentState.update(sum)
?? ? ?}
?? ? ?Some(output)
?? ?}).initialState(initialRDD).numPartitions(2).timeout(Seconds(30))
?//timeout:當一個key超過這個時間沒有接收到數(shù)據(jù)的時候诸蚕,這個key以及對應的狀態(tài)會被移除掉
?? ?val?result?=?wordsDStream.mapWithState(stateSpec)
(3)Transform
?? ?//transform需要有返回值步势,必須類型是RDD
?? ?val?wordCountDStream?=?wordOneDStream.transform(rdd?=>?{
?? ? ?val?filterRDD: RDD[(String, Boolean)] =?rdd.sparkContext.parallelize(blackListBroadcast.value)
?? ? ?val?resultRDD: RDD[(String, (Int, Option[Boolean]))] =?rdd.leftOuterJoin(filterRDD)
?? ? ?? ? ?resultRDD.filter(tuple?=>?{
?? ? ? ?tuple._2._2.isEmpty
?? ? ?}).map(_._1)
?? ?}).map((_, 1)).reduceByKey(_?+?_)
(4) Window操作
??*實現(xiàn)一個 每隔4秒,統(tǒng)計最近6秒的單詞計數(shù)的情況挫望。
?? ? ?*數(shù)據(jù)的處理
?? ? ?*我們一直講的是數(shù)據(jù)處理的算子
?? ? ?*這個地方算子 就是生產(chǎn)時候用的算子。
?? ? ?*
?? ? ?* ?reduceFunc: (V, V) => V,
?? ? ? ? windowDuration: Duration,6窗口的大小
?? ? ? ? slideDuration: Duration,4滑動的大小
?? ? ? ? numPartitions: Int指定分區(qū)數(shù)
?? ? ?*/
?? ?val?resultWordCountDStream?=?dstream.flatMap(_.split(","))
?? ? ?.map((_, 1))
?? ? ?.reduceByKeyAndWindow((x: Int, y: Int) =>?x?+?y, Seconds(6), Seconds(4))
??}
}
(5)foreachRDD
核心算子講解
?? ?//將結(jié)果保存到Mysql(二)
?? ?wordCounts.foreachRDD?{ (rdd, time) =>
?? ? ?rdd.foreach?{ record?=>
?? ? ? ?Class.forName("com.mysql.jdbc.Driver")
?? ? ? ?val?conn?=?DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "root")
?? ? ? ?val?statement?=?conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
?? ? ? ?statement.setLong(1, time.milliseconds)
?? ? ? ?statement.setString(2, record._1)
?? ? ? ?statement.setInt(3, record._2)
?? ? ? ?statement.execute()
?? ? ? ?statement.close()
?? ? ? ?conn.close()
?? ? ?}
?? ?}
?? ?//將結(jié)果保存到Mysql(七)
?? ?wordCounts.foreachRDD?{ (rdd, time) =>
?? ? ?rdd.foreachPartition?{ partitionRecords?=>
?? ? ? ?val?conn?=?ConnectionPool.getConnection
?? ? ? ?conn.setAutoCommit(false)
?? ? ? ?val?statement?=?conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
?? ? ? ?partitionRecords.zipWithIndex.foreach?{ case?((word, count), index) =>
?? ? ? ? ?statement.setLong(1, time.milliseconds)
?? ? ? ? ?statement.setString(2, word)
?? ? ? ? ?statement.setInt(3, count)
?? ? ? ? ?statement.addBatch()
?? ? ? ? ?if?(index?!=?0?&&?index?%?500?==?0) {
?? ? ? ? ? ?statement.executeBatch()
?? ? ? ? ? ?conn.commit()
?? ? ? ? ?}
?? ? ? ?}
?? ? ? ?statement.executeBatch()
?? ? ? ?statement.close()
?? ? ? ?conn.commit()
?? ? ? ?conn.setAutoCommit(true)
?? ? ? ?ConnectionPool.returnConnection(conn)
?? ? ?}
?? ?}
(6) Checkpoint
/**
??* Dirver HA
??*/
object?DriverHAWordCount?{
??def?main(args: Array[String]): Unit?=?{
?? ?val?checkpointDirectory:String="hdfs://hadoop1:9000/streamingcheckpoint2";
?? ?def?functionToCreateContext(): StreamingContext?=?{
?? ? ?val?conf?=?new?SparkConf().setMaster("local[2]").setAppName("NetWordCount")
?? ? ?val?sc?=?new?SparkContext(conf)
?? ? ?val?ssc?=?new?StreamingContext(sc,Seconds(2))
?? ? ?ssc.checkpoint(checkpointDirectory)
?? ? ?val?dstream: ReceiverInputDStream[String] =?ssc.socketTextStream("hadoop1",9999)
?? ? ?val?wordCountDStream?=?dstream.flatMap(_.split(","))
?? ? ? ?.map((_, 1))
?? ? ? ?.updateStateByKey((values: Seq[Int], state: Option[Int]) =>?{
?? ? ? ? ?val?currentCount?=?values.sum
?? ? ? ? ?val?lastCount?=?state.getOrElse(0)
?? ? ? ? ?Some(currentCount?+?lastCount)
?? ? ? ?})
?? ? ?wordCountDStream.print()
?? ? ?ssc.start()
?? ? ?ssc.awaitTermination()
?? ? ?ssc.stop()
?? ? ?ssc
?? ?}
?? ?val?ssc?=?StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext?_)
?? ?ssc.start()
?? ?ssc.awaitTermination()
?? ?ssc.stop()
??}
}
3狂窑、SparkStreaming和SparkSQL整合
pom.xml里面添加
?? ? ?<dependency>
?? ? ? ? ? ?<groupId>org.apache.spark</groupId>
?? ? ? ? ? ?<artifactId>spark-sql_2.11</artifactId>
?? ? ? ? ? ?<version>2.2.1</version>
?? ? ?</dependency>
? WordCount程序媳板,Spark Streaming消費TCP Server發(fā)過來的實時數(shù)據(jù)的例子:
? 1、在master服務器上啟動一個Netcat server
`$ nc -lk 9998` (如果nc命令無效的話泉哈,我們可以用yum install -y nc來安裝nc)
object?NetworkWordCountForeachRDDDataFrame?{
??def?main(args: Array[String]) {
?? ?val?sparkConf?=?new?SparkConf().setAppName("NetworkWordCountForeachRDD")
?? ?val?sc?=?new?SparkContext(sparkConf)
?? ?// Create the context with a 1 second batch size
?? ?val?ssc?=?new?StreamingContext(sc, Seconds(1))
?? ?//創(chuàng)建一個接收器(ReceiverInputDStream)蛉幸,這個接收器接收一臺機器上的某個端口通過socket發(fā)送過來的數(shù)據(jù)并處理
?? ?val?lines?=?ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
?? ?//處理的邏輯破讨,就是簡單的進行word count
?? ?val?words?=?lines.flatMap(_.split(" "))
?? ?//將RDD轉(zhuǎn)化為Dataset
?? ?words.foreachRDD?{ rdd?=>
?? ? ?// Get the singleton instance of SparkSession
?? ? ?val?spark?=?SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
?? ? ?import?spark.implicits._
?? ? ?// Convert RDD[String] to DataFrame
?? ? ?val?wordsDataFrame?=?rdd.toDF("word")
?? ? ?// Create a temporary view
?? ? ?wordsDataFrame.createOrReplaceTempView("words")
?? ? ?// Do word count on DataFrame using SQL and print it
?? ? ?val?wordCountsDataFrame?=
?? ? ? ?spark.sql("select word, count(*) as total from words group by word")
?? ? ?wordCountsDataFrame.show()
?? ?}
?? ?//啟動Streaming處理流
?? ?ssc.start()
?? ?ssc.stop(false)
?? ?//將RDD轉(zhuǎn)化為Dataset
?? ?words.foreachRDD?{ (rdd, time) =>
?? ? ?// Get the singleton instance of SparkSession
?? ? ?val?spark?=?SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
?? ? ?import?spark.implicits._
?? ? ?// Convert RDD[String] to DataFrame
?? ? ?val?wordsDataFrame?=?rdd.toDF("word")
?? ? ?// Do word count on DataFrame using SQL and print it
?? ? ?val?wordCountsDataFrame?=?wordsDataFrame.groupBy("word").count()
?? ? ?val?resultDFWithTs?=?wordCountsDataFrame.rdd.map(row?=>?(row(0), row(1), time.milliseconds)).toDF("word", "count", "ts")
?? ? ?resultDFWithTs.write.mode(SaveMode.Append).parquet("hdfs://master:9999/user/spark-course/streaming/parquet")
?? ?}
?? ?//等待Streaming程序終止
?? ?ssc.awaitTermination()
??}
}
6、SparkStreaming消費kafka
一奕纫、基于Receiver的方式
????????這種方式使用Receiver來獲取數(shù)據(jù)提陶。Receiver是使用Kafka的高層次Consumer API來實現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的(如果突然數(shù)據(jù)暴增匹层,大量batch堆積隙笆,很容易出現(xiàn)內(nèi)存溢出的問題),然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)升筏。
????????然而撑柔,在默認的配置下,這種方式可能會因為底層的失敗而丟失數(shù)據(jù)您访。如果要啟用高可靠機制铅忿,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log灵汪,WAL)檀训。該機制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預寫日志中。所以享言,即使底層節(jié)點出現(xiàn)了失敗峻凫,也可以使用預寫日志中的數(shù)據(jù)進行恢復。
二担锤、基于Direct的方式
????????這種新的不基于Receiver的直接方式蔚晨,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制肛循。替代掉使用Receiver來接收數(shù)據(jù)后铭腕,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset多糠,從而定義每個batch的offset的范圍累舷。當處理數(shù)據(jù)的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)夹孔。
優(yōu)點如下:
? ??簡化并行讀缺挥:如果要讀取多個partition,不需要創(chuàng)建多個輸入DStream然后對它們進行union操作搭伤。Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition只怎,并且會并行從Kafka中讀取數(shù)據(jù)。所以在Kafka partition和RDD partition之間怜俐,有一個一對一的映射關(guān)系身堡。
? ??高性能:如果要保證零數(shù)據(jù)丟失,在基于receiver的方式中拍鲤,需要開啟WAL機制贴谎。這種方式其實效率低下汞扎,因為數(shù)據(jù)實際上被復制了兩份,Kafka自己本身就有高可靠的機制擅这,會對數(shù)據(jù)復制一份澈魄,而這里又會復制一份到WAL中。而基于direct的方式仲翎,不依賴Receiver痹扇,不需要開啟WAL機制,只要Kafka中作了數(shù)據(jù)的復制谭确,那么就可以通過Kafka的副本進行恢復帘营。
一次且僅一次的事務機制。
三逐哈、對比:
????????基于receiver的方式芬迄,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數(shù)據(jù)的傳統(tǒng)方式昂秃。這種方式配合著WAL機制可以保證數(shù)據(jù)零丟失的高可靠性禀梳,但是卻無法保證數(shù)據(jù)被處理一次且僅一次,可能會處理兩次肠骆。因為Spark和ZooKeeper之間可能是不同步的算途。
????????基于direct的方式,使用kafka的簡單api蚀腿,Spark Streaming自己就負責追蹤消費的offset嘴瓤,并保存在checkpoint中。Spark自己一定是同步的莉钙,因此可以保證數(shù)據(jù)是消費一次且僅消費一次廓脆。
????????在實際生產(chǎn)環(huán)境中大都用Direct方式
7、簡述SparkStreaming窗口函數(shù)的原理(重點)
????窗口函數(shù)就是在原來定義的SparkStreaming計算批次大小的基礎上磁玉,再次進行封裝停忿,每次計算多個批次的數(shù)據(jù),同時還需要傳遞一個滑動步長的參數(shù)蚊伞,用來設置當次計算任務完成之后下一次從什么地方開始計算席赂。
????????圖中time1就是SparkStreaming計算批次大小,虛線框以及實線大框就是窗口的大小时迫,必須為批次的整數(shù)倍颅停。虛線框到大實線框的距離(相隔多少批次),就是滑動步長掠拳。
8癞揉、手寫出wordcount代碼實現(xiàn)(Scala)
?val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
?val sc = new SparkContext(conf)
?sc.textFile("/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/output")
?sc.stop()
9、Spark實現(xiàn)topN的獲取(描述思路或使用偽代碼)(重點)
方法1:
(1)按照key對數(shù)據(jù)進行聚合(groupByKey)
(2)將value轉(zhuǎn)換為數(shù)組烧董,利用scala的sortBy或者sortWith進行排序(mapValues)數(shù)據(jù)量太大,會OOM胧奔。
方法2:
(1)取出所有的key
(2)對key進行迭代逊移,每次取出一個key利用spark的排序算子進行排序
方法3:
(1)自定義分區(qū)器,按照key進行分區(qū)龙填,使不同的key進到不同的分區(qū)
(2)對每個分區(qū)運用spark的排序算子進行排序
10胳泉、京東:調(diào)優(yōu)之前與調(diào)優(yōu)之后性能的詳細對比(例如調(diào)整map個數(shù),map個數(shù)之前多少岩遗、之后多少扇商,有什么提升)
這里舉個例子。比如我們有幾百個文件宿礁,會有幾百個map出現(xiàn)案铺,讀取之后進行join操作,會非常的慢梆靖。這個時候我們可以進行coalesce操作控汉,比如240個map,我們合成60個map返吻,也就是窄依賴姑子。這樣再shuffle,過程產(chǎn)生的文件數(shù)會大大減少测僵。提高join的時間性能街佑。