4泌辫、Spark概要

一随夸、基本特性

1、與MapReduce不同

????????不同于MapReduce的是Job中間輸出和結(jié)果可以“保存在內(nèi)存”中震放,從而不再需要讀寫HDFS宾毒,

(1)、基于內(nèi)存

????????mapreduce任務后期再計算的時候殿遂,每一個job的輸出結(jié)果會落地到磁盤诈铛,后續(xù)有其他的job需要依賴于前面job的輸出結(jié)果乙各,這個時候就需要進行大量的磁盤io操作。性能就比較低幢竹。

????????spark任務后期再計算的時候耳峦,job的輸出結(jié)果可以保存在內(nèi)存中,后續(xù)有其他的job需要依賴于前面job的輸出結(jié)果焕毫,這個時候就直接從內(nèi)存中獲取得到蹲坷,避免了磁盤io操作,性能比較高邑飒、


(2)循签、進程與線程

????????mapreduce任務以進程的方式運行在yarn集群中,比如程序中有100個MapTask幸乒,一個task就需要一個進程懦底,這些task要運行就需要開啟100個進程。

????????spark任務以線程的方式運行在進程中罕扎,比如程序中有100個MapTask聚唐,后期一個task就對應一個線程,這里就不在是進程腔召,

????????這些task需要運行杆查,這里可以極端一點:只需要開啟1個進程,在這個進程中啟動100個線程就可以了臀蛛。進程中可以啟動很多個線程亲桦,而開啟一個進程與開啟一個線程需要的時間和調(diào)度代價是不一樣。開啟一個進程需要的時間遠遠大于開啟一個線程浊仆。

2客峭、spark集群安裝部署

????????vim spark-env.sh#配置java的環(huán)境變量 ?#配置zk相關(guān)信息

????????vim slaves指定spark集群的worker節(jié)點

? ? ? ? ?vim /etc/profile??修改spark環(huán)境變量

????????環(huán)境變量生效??source /etc/profile


1、先啟動zk???${ZK_HOME}/bin/zkServer.sh start

2抡柿、啟動spark集群?$SPARK_HOME/sbin/start-all.sh

3舔琅、zk作用:高可用

????????在高可用模式下,整個spark集群就有很多個master洲劣,其中只有一個master被zk選舉成活著的master备蚓,其他的多個master都處于standby,同時把整個spark集群的元數(shù)據(jù)信息通過zk中節(jié)點進行保存囱稽。

????????如果活著的master掛掉郊尝。首先zk會感知到活著的master掛掉,開始在多個處于standby中的master進行選舉战惊,再次產(chǎn)生一個活著的master流昏;這個活著的master會讀取保存在zk節(jié)點中的spark集群元數(shù)據(jù)信息,恢復到上一次master的狀態(tài)。


在master的恢復階段對任務的影響?

????????對已經(jīng)運行的任務是沒有任何影響横缔,由于該任務正在運行铺遂,說明它已經(jīng)拿到了計算資源,這個時候就不需要master茎刚。

????????對即將要提交的任務是有影響腥寇,由于該任務需要有計算資源间影,這個時候會找活著的master去申請計算資源争拐,由于沒有一個活著的master,該任務是獲取不到計算資源艘绍,也就是任務無法運行。


4初狰、web管理界面

http://master主機名:8080:集群的詳細信息莫杈、總資源信息、已用資源信息奢入、還剩資源信息筝闹;正在運行的任務信息、已經(jīng)完成的任務信息


bin/spark-submit

--class org.apache.spark.examples.SparkPi \

--master spark://node01:7077,node02:7077,node03:7077 \

--executor-memory?1G \

--total-executor-cores 2 \

examples/jars/spark-examples_2.11-2.3.3.jar \

10


????????spark集群中有很多個master腥光,并不知道哪一個master是活著的master关顷,即使你知道哪一個master是活著的master,它也有可能下一秒就掛掉武福,這里就可以把所有master都羅列出來

--master spark://node01:7077,node02:7077,node03:7077

????????后期程序會輪訓整個master列表议双,最終找到活著的master,然后向它申請計算資源捉片,最后運行程序平痰。


5、spark-shell使用

spark-shell --master local[2]伍纫,默認會產(chǎn)生一個SparkSubmit進程宗雇,sc

--master local[N]:表示本地采用N個線程計算任務

sc.textFile("file:///home/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect


讀取HDFS上文件: vim spark-env.sh?export HADOOP_CONF_DIR=hdoop安裝位置

//實現(xiàn)讀取hdfs上文件之后,需要把計算的結(jié)果保存到hdfs上

sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

//todo:利用scala語言開發(fā)spark程序?qū)崿F(xiàn)單詞統(tǒng)計

object WordCount {

??def main(args: Array[String]): Unit = {

????//1莹规、構(gòu)建sparkConf對象?設置application名稱和master地址

val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")

//2逾礁、構(gòu)建sparkContext對象,該對象非常重要,它是所有spark程序的執(zhí)行入口访惜,它內(nèi)部會構(gòu)建?DAGScheduler和 TaskScheduler 對象

????val sc = new SparkContext(sparkConf)

????sc.setLogLevel("warn")???//設置日志輸出級別

????val data: RDD[String] = sc.textFile("E:\\words.txt")????//3、讀取數(shù)據(jù)文件

? val words: RDD[String] = data.flatMap(x=>x.split(" "))? ?//4 切分每一行腻扇,獲取所有單詞

? ? val wordAndOne: RDD[(String, Int)] = words.map(x => (x,1))//5债热、每個單詞計為1

????val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x,y)=>x+y)?//6、相同單詞出現(xiàn)的1累加

????//按照單詞出現(xiàn)的次數(shù)降序排列?第二個參數(shù)默認是true表示升序幼苛,設置為false表示降序

????val sortedRDD: RDD[(String, Int)] = result.sortBy( x=> x._2,false)

????val finalResult: Array[(String, Int)] = sortedRDD.collect()

????finalResult.foreach(println)?????//7窒篱、收集數(shù)據(jù)打印

????sc.stop()?? //8、關(guān)閉sc

??}

}


打成jar包提交到集群中運行

spark-submit \

--master spark://node01:7077,node02:7077 \

--class com.kaikeba.WordCountOnSpark \

--executor-memory?1g?\

--total-executor-cores?4 \

original-spark_class01-1.0-SNAPSHOT.jar?/words.txt?/out???jar包與輸入輸出


spark-submit --class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode cluster \

--driver-memory 1g \

--executor-memory 1g \

--executor-cores 1 \

/kkb/install/spark/examples/jars/spark-examples_2.11-2.3.3.jar 10?10是main方法里面的參數(shù)


????????????executor-memory小了,會把rdd一部分數(shù)據(jù)保存在內(nèi)存中墙杯,一部分數(shù)據(jù)保存在磁盤配并;用該rdd時從內(nèi)存和磁盤中獲取,一定的磁盤io高镐。需要設置的大一點溉旋,如10G/20G/30G等;

????????????total-executor-cores:表示任務運行需要總的cpu核數(shù)嫉髓,它決定了任務并行運行的粒度观腊,也會設置的大一點,如30個/50個/100個算行;????

????????加大計算資源它是最直接梧油、最有效果的優(yōu)化手段。? 在計算資源有限的情況下州邢,可以考慮其他方面儡陨,比如說代碼層面,JVM層面等

6量淌、兩種Yarn模式骗村,最大的區(qū)別就是Driver端的位置不一樣。

????????????yarn-cluster: Driver端運行在yarn集群中类少,與ApplicationMaster進程在一起叙身。

????????????yarn-client: ?Driver端運行在提交任務的客戶端,與ApplicationMaster進程沒關(guān)系,經(jīng)常 用于進行測試


二、集群架構(gòu)

2硫狞、spark集群架構(gòu)

(1)Master:集群的主節(jié)點信轿,負責任務資源的分配。

(2)Worker:集群的從節(jié)點残吩,負責任務計算的節(jié)點财忽。

????????????①Executor:是一個進程,它會在worker節(jié)點啟動該進程(計算資源)

????????????②Task:任務是以task線程的方式運行在worker節(jié)點對應的executor進程中泣侮;

(3)ClusterManager:給程序提供計算資源的外部服務即彪,standAlone模式整個任務的資源分配由spark集群的老大Master負責;把spark程序提交到y(tǒng)arn中運行活尊,整個任務的資源分配由yarn中的老大ResourceManager負責

(4)Driver:是所有spark程序的執(zhí)行入口隶校,會執(zhí)行客戶端寫好的main方法,它會構(gòu)建一個名叫SparkContext對象

(5)Application:是一個spark的應用程序蛹锰,它是包含了客戶端的代碼和任務運行的資源信息

(6)一個application就是一個應用程序深胳,包含了很多個job;

????????????一個action操作對應一個DAG有向無環(huán)圖铜犬,即一個action操作就是一個job舞终;

? ? ? ? ? ? 一個job中包含了大量的寬依賴轻庆,按照寬依賴進行stage劃分,一個job產(chǎn)生了很多個stage敛劝;

????????????一個stage中有很多分區(qū)余爆,一個分區(qū)就是一個task,即一個stage中有很多個task夸盟;


Spark中的調(diào)度模式:FIFO(先進先出)蛾方、FAIR(公平調(diào)度)

任務的分配資源worker策略:盡量打散、盡量集中

????????????盡量打散:一個Application盡可能多的分配到不同的節(jié)點满俗,發(fā)揮數(shù)據(jù)的本地性转捕,提升執(zhí)行效率

????????????盡量集中:盡量分配到盡可能少的節(jié)點

3、運行流程

????????(1) Driver端向資源管理器Master發(fā)送注冊和申請計算資源的請求

????????(2) Master通知對應的worker節(jié)點啟動executor進程(計算資源)

????????(3) executor進程向Driver端發(fā)送注冊并且申請task請求

? ? ? ? (4) Driver端運行客戶端的main方法唆垃,構(gòu)建SparkContext對象五芝,在SparkContext對象內(nèi)部依次構(gòu)建DAGScheduler和TaskScheduler

????????(5)按照客戶端代碼rdd的一系列操作順序,生成DAG有向無環(huán)圖

? ? ? ? ?(6) DAGScheduler拿到DAG有向無環(huán)圖之后辕万,按照寬依賴進行stage的劃分枢步。每一個stage內(nèi)部有很多可以并行運行的task,最后封裝在一個一個的taskSet集合中渐尿,然后把taskSet發(fā)送給TaskScheduler

????????(7) TaskScheduler得到taskSet集合之后醉途,依次遍歷取出每一個task提交到worker節(jié)點上的executor進程中運行

????????(8)所有task運行完成,Driver端向Master發(fā)送注銷請求砖茸,Master通知Worker關(guān)閉executor進程隘擎,Worker上的計算資源得到釋放,最后整個任務也就結(jié)束了凉夯。


三货葬、計算資源

1RDD概念

????????RDD(Resilient Distributed Dataset)叫做==彈性 分布式 數(shù)據(jù)集==劲够,是Spark中最基本的數(shù)據(jù)抽象震桶,它代表一個不可變、可分區(qū)征绎、里面的元素可并行計算的集合.

????????Resilient: 表示彈性蹲姐,rdd的數(shù)據(jù)是可以保存在內(nèi)存或者是磁盤中.

????????Distributed:它內(nèi)部的元素進行了分布式存儲,方便于后期進行分布式計算.

????????Dataset:就是一個集合人柿,存儲很多數(shù)據(jù).


五大屬性:

?(1) A list of partitions:一個rdd有很多分區(qū)柴墩,每一個分區(qū)內(nèi)部是包含了該rdd的部分數(shù)據(jù)

?(2) A function for computing each split:每個分區(qū)都會實現(xiàn)?計算函數(shù)

?(3) A list of dependencies on other RDDs:一個rdd會依賴于其他多個rdd

?(4) Optionally, a Partitioner for key-value RDDs :kv數(shù)據(jù)的分區(qū)函數(shù)基于哈希,非kv是None

?(5) Optionally, a list of preferred locations to compute each split on:有分區(qū)數(shù)據(jù)的節(jié)點會優(yōu)先開啟計算任務凫岖,數(shù)據(jù)的本地性拐邪。


RDD自定義分區(qū)

RDD數(shù)據(jù)進行分區(qū)時,默認使用的是HashPartitioner:對key進行哈希隘截,然后對分區(qū)總數(shù)取模扎阶,

實現(xiàn)自定義partitioner大致分為3個步驟:

????????????繼承==org.apache.spark.Partitioner==

????????????重寫==numPartitions==方法

????????????重寫==getPartition==方法


//對應上面的rdd數(shù)據(jù)進行自定義分區(qū)

val result: RDD[(String, Int)] = wordLengthRDD.partitionBy(new MyPartitioner(3))


2RDD的創(chuàng)建

1婶芭、scala集合sc.parallelize

????????val?rdd1=sc.parallelize(List(1,2,3,4,5))

????????val?rdd2=sc.parallelize(Array("hadoop","hive","spark"))

????????val?rdd3=sc.makeRDD(List(1,2,3,4))

2东臀、加載外部的數(shù)據(jù)源sc.textFile

????????val?rdd1=sc.textFile("/words.txt")

3、已存在rdd轉(zhuǎn)換成一個新的rdd

????????val?rdd2=rdd1.flatMap(_.split(" "))

????????val?rdd3=rdd2.map((_,1))


3犀农、RDD算子分類

transformation算子:根據(jù)已經(jīng)存在的rdd轉(zhuǎn)換生成一個新的rdd, 延遲加載惰赋,不會立即執(zhí)行

????????????????????????????????map 、mapPartitions呵哨、flatMap赁濒、

????????????????????????????????filter、Union(求并)孟害、intersection(求交)拒炎、distinct(去重)、

????????????????????????????????Join挨务、reduceByKey 击你、groupByKey、sortByKey谎柄、sortBy

????????????????????????????????repartition有shuffle丁侄、coalesce不shuffle


action算子:真正觸發(fā)任務的運行:

????????????????reduce、

????????????????collect :把RDD的數(shù)據(jù)進行收集之后朝巫,以數(shù)組的形式返回給Driver端

????????????????count鸿摇、first、take(n)

????????????????foreach劈猿、foreachPartition

????????????????saveAsTextFile拙吉、saveAsSequenceFile


????????默認Driver端的內(nèi)存大小為1G,由參數(shù) spark.driver.memory 設置糙臼,果某個rdd的數(shù)據(jù)量超過了Driver端默認的1G內(nèi)存庐镐,對rdd調(diào)用collect操作,這里會出現(xiàn)Driver端的內(nèi)存溢出变逃,所有這個collect操作存在一定的風險必逆,實際開發(fā)代碼一般不會使用。new SparkConf().set("spark.driver.memory","5G")

4揽乱、RDD依賴

RDD和它依賴的父RDD的關(guān)系有兩種不同的類型

????????窄依賴:每一個父RDD的Partition最多被子RDD的一個Partition使用名眉,不會產(chǎn)生shuffle;Map凰棉、flatMap损拢、filter、union等等撒犀,

????????寬依賴:多個子RDD的Partition會依賴同一個父RDD的Partition福压,會產(chǎn)生掏秩;?shufflereduceByKey/sortByKey/groupBy/groupByKey/join等等

????????join分為寬依賴和窄依賴,如果RDD有相同的partitioner荆姆,那么將不會引起shuffle蒙幻,這種join是窄依賴,反之就是寬依賴


5胆筒、RDD的Lineage血統(tǒng)

????????????lineage保存了RDD的依賴關(guān)系邮破,會記錄RDD的元數(shù)據(jù)信息和轉(zhuǎn)換行為,當該RDD的部分分區(qū)數(shù)據(jù)丟失時仆救,它可以根據(jù)這些信息來重新運算和恢復丟失的數(shù)據(jù)分區(qū)

????????RDD只支持粗粒度轉(zhuǎn)換抒和,即只記錄單個塊上執(zhí)行的單個操作。創(chuàng)建RDD的一系列Lineage(即血統(tǒng))記錄下來彤蔽,以便恢復丟失的分區(qū)


6摧莽、RDD緩存機制

????????把一個rdd的數(shù)據(jù)緩存起來,后續(xù)有其他的job需要用到該rdd的結(jié)果數(shù)據(jù)铆惑,可以直接從緩存中獲取得到范嘱,避免了重復計算。緩存是加快后續(xù)對該數(shù)據(jù)的訪問操作员魏。

(1)persist/cache

?????RDD通過persist方法cache方法可以將前面的計算結(jié)果緩存丑蛤,不會立即緩存,觸發(fā)后面的action撕阎,才會被緩存在計算節(jié)點的內(nèi)存中受裹。調(diào)用rdd的unpersist方法,一個application應用程序結(jié)束之后虏束,對應的緩存數(shù)據(jù)也就自動清除棉饶;

????????cache:最終也是調(diào)用了persist方法,默認的存儲級別都是僅在內(nèi)存存儲一份

????????persist:可以把數(shù)據(jù)緩存在內(nèi)存或者是磁盤镇匀,有豐富的緩存級別照藻,這些緩存級別都被定義在StorageLevel這個object中。

????????為了獲取得到一個rdd的結(jié)果數(shù)據(jù)汗侵,經(jīng)過了大量的算子操作或者是計算邏輯比較復雜幸缕,可以把多次使用到的rdd,是公共rdd進行持久化晰韵;

(2)checkpoint

????????把數(shù)據(jù)保存在內(nèi)存中不安全发乔,服務器掛掉或進程終止,會導致數(shù)據(jù)的丟失雪猪;存在本地磁盤中栏尚,操作刪除了,或者是磁盤損壞只恨,也有可能導致數(shù)據(jù)的丟失译仗;

????????????checkpoint把數(shù)據(jù)保存在分布式文件系統(tǒng)HDFS上抬虽。高可用性,高容錯性(多副本)來最大程度保證數(shù)據(jù)的安全性古劲。

????1斥赋、在hdfs上設置一個checkpoint目錄 sc.setCheckpointDir("hdfs://node01:8020/checkpoint")

????2、對需要做checkpoint操作的rdd調(diào)用checkpoint方法

????????????????val?rdd1=sc.textFile("/words.txt")

????????????????rdd1.checkpoint

????????????????val?rdd2=rdd1.flatMap(_.split(" "))

3产艾、最后需要有一個action操作去觸發(fā)任務的運行

????????????????rdd2.collect

7、廣播變量

????????spark中分布式執(zhí)行的代碼滑绒,需要==傳遞到各個Executor的Task上運行==闷堡。對于一些只讀、固定的數(shù)據(jù)(比如從DB中讀出的數(shù)據(jù)),每次都需要Driver廣播到各個Task上疑故,這樣效率低下

????????廣播變量允許將變量廣播給各個Executor杠览。該Executor上的各個Task從所在節(jié)點的BlockManager獲取變量,而不是從Driver獲取變量纵势,以減少通信的成本踱阿,減少內(nèi)存的占用,從而提升了效率钦铁。

????????(1)通過對一個類型T的對象調(diào)用 SparkContext.broadcast創(chuàng)建出一個Broadcast[T]對象软舌。(任何可序列化的類型都可以這么實現(xiàn))

????????(2)通過 value 屬性訪問該對象的值

????????(3)變量只會被發(fā)到各個節(jié)點一次,應作為只讀值處理(修改這個值不會影響到別的節(jié)點)

val?conf?=?new?SparkConf().setMaster("local[2]").setAppName("brocast")

val?sc=new?SparkContext(conf)

val?rdd1=sc.textFile("/words.txt")

val?word="spark"

//通過調(diào)用sparkContext對象的broadcast方法把數(shù)據(jù)廣播出去

val?broadCast?=?sc.broadcast(word)

//在executor中通過調(diào)用廣播變量的value屬性獲取廣播變量的值

val?rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(broadCast.value))

rdd2.foreach(x=>println(x))


注意:

????????1牛曹、不能將一個RDD使用廣播變量廣播出去

????????2佛点、廣播變量只能在Driver端定義,不能在Executor端定義

????????3黎比、在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值

????????4、如果executor端用到了Driver的變量缘琅,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本

????????5褒颈、如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本


累加器

????????累加器(accumulator)是Spark中提供的一種分布式的變量機制颓帝,其原理類似于mapreduce米碰,即分布式的改變,然后聚合這些改變躲履。

????????累加器的一個常見用途是见间,在調(diào)試時對作業(yè)執(zhí)行過程中的事件進行計數(shù)」げ拢可以使用累加器來進行全局的計數(shù)米诉。


8DAG劃分stage

????????原始的RDD通過一系列的轉(zhuǎn)換就形成了DAG篷帅。(Directed Acyclic Graph史侣,有向無環(huán)圖)

????????根據(jù)RDD之間依賴關(guān)系的不同將DAG劃分成不同的Stage(調(diào)度階段)拴泌;對于窄依賴,轉(zhuǎn)換處理在一個Stage中完成計算惊橱;對于寬依賴蚪腐,由于有Shuffle的存在,只能在parent RDD處理完成后税朴,才能開始接下來的計算回季,

????????劃分完stage之后,在同一個stage中只有窄依賴正林,沒有寬依賴泡一,可以實現(xiàn)流水線計算;stage中的每一個分區(qū)對應一個task觅廓,在同一個stage中就有很多可以并行運行的task鼻忠。


????????一個Job會被拆分為多組Task,每組任務被稱為一個stage杈绸。stage表示不同的調(diào)度階段帖蔓,一個spark job會對應產(chǎn)生很多個stage


劃分stage的依據(jù)就:寬依賴

????????(1)首先根據(jù)rdd的算子操作順序生成DAG有向無環(huán)圖,接下里從最后一個rdd往前推瞳脓,創(chuàng)建一個新的stage塑娇,把該rdd加入到該stage中,它是最后一個stage篡殷。

? ? ? ? ?(2)在往前推的過程中運行遇到了窄依賴就把該rdd加入到本stage中钝吮,如果遇到了寬依賴,就從寬依賴切開板辽,那么最后一個stage也就結(jié)束了奇瘦。

????????(3) 重新創(chuàng)建一個新的stage,按照第二個步驟繼續(xù)往前推劲弦,一直到最開始的rdd耳标,整個劃分stage也就結(jié)束了。

????????劃分完stage之后邑跪,每一個stage中有很多可以并行運行的task次坡,后期把每一個stage中的task封裝在一個taskSet集合中,最后把一個一個的taskSet集合提交到worker節(jié)點上的executor進程中運行画畅。

????????rdd與rdd之間存在依賴關(guān)系砸琅,stage與stage之前也存在依賴關(guān)系,前面stage中的task先運行轴踱,運行完成了再運行后面stage中的task症脂,也就是說后面stage中的task輸入數(shù)據(jù)是前面stage中task的輸出結(jié)果數(shù)據(jù)。

9序列化

????????spark是分布式執(zhí)行引擎诱篷,其核心抽象是彈性分布式數(shù)據(jù)集RDD壶唤,其代表了分布在不同節(jié)點的數(shù)據(jù)。Spark的計算是在executor上分布式執(zhí)行的棕所,故用戶開發(fā)的關(guān)于RDD的map闸盔,flatMap,reduceByKey等transformation 操作(閉包)有如下執(zhí)行過程:

?????????1)代碼中對象在driver本地序列化琳省,對象序列化后傳輸?shù)竭h程executor節(jié)點迎吵;

????????2)遠程executor節(jié)點反序列化對象,最終遠程節(jié)點執(zhí)行针贬。

對象在執(zhí)行中钓觉,需要序列化通過網(wǎng)絡傳輸,則必須經(jīng)過序列化過程坚踩。

解決序列化的辦法:

? ? ????1)如果函數(shù)中使用了該類對象,該類要實現(xiàn)序列化瓤狐,類extends Serializable

? ? ????2)如果函數(shù)中使用了該類對象的成員變量瞬铸,該類除了要實現(xiàn)序列化之外,所有的成員變量必須要實現(xiàn)序列化

????????3)對于不能序列化的成員變量使用==“@transient”==標注础锐,告訴編譯器不需要序列化

????????4)也可將依賴的變量嗓节,獨立放到一個小的class中,讓這個class支持序列化皆警,這樣做可以減少網(wǎng)絡傳輸量拦宣,提高效率。

????????5)可以把對象的創(chuàng)建直接在該函數(shù)中構(gòu)建這樣避免需要序列化?

10信姓、spark的shuffle

????????Shuffle就是對數(shù)據(jù)進行重組鸵隧,由于分布式計算的特性和要求,在實現(xiàn)細節(jié)上更加繁瑣和復雜意推。Stage階段的劃分:是根據(jù)是否有寬依賴shuffle過程豆瘫,job會劃分成多個Stage,每一個stage內(nèi)部有很多task菊值。stage與stage之間的過程就是shuffle階段外驱。

????????在Spark的中,負責shuffle過程的執(zhí)行腻窒、計算和處理的組件昵宇,主要就是ShuffleManager。ShuffleManager分為HashShuffleManagerSortShuffleManager儿子,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種瓦哎。

????????在Spark 1.2以前默認的shuffle是HashShuffleManager。該ShuffleManager-HashShuffleManager有著一個非常嚴重的弊端杭煎,就是會產(chǎn)生大量的中間磁盤文件恩够,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以后的版本中羡铲,默認的ShuffleManager改成了SortShuffleManager蜂桶。

?? ????SortShuffleManager相較于HashShuffleManager來說,有了一定的改進也切。主要就在于每個Task在進行shuffle操作時扑媚,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件雷恃,因此每個Task就只有一個磁盤文件疆股。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分數(shù)據(jù)即可倒槐。

????????HashShuffleManager的運行機制主要分成兩種:普通運行機制旬痹、合并運行機制;合并機制主要是通過復用buffer來優(yōu)化Shuffle過程中產(chǎn)生的小文件的數(shù)量

? ??????普通HashShuffle :

????????????shuffle write讨越,每個task處理的數(shù)據(jù)按key進行“hash分區(qū)”两残,每個task都要創(chuàng)建分區(qū)個份磁盤文件,每個Executor上總共就要創(chuàng)建task*分區(qū)數(shù)個磁盤文件把跨, 產(chǎn)生的磁盤文件的數(shù)量是極其驚人的人弓;

????????????shuffle read的過程中,每個task要從上游stage的所有task所在節(jié)點上着逐,拉取屬于自己的那一個磁盤文件崔赌,每個ask都有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù)32k耸别,? ?

? ? ????缺點:在磁盤上會產(chǎn)生海量的小文件健芭,建立通信和拉取數(shù)據(jù)的次數(shù)變多,此時會產(chǎn)生大量耗時低效的 IO 操作;大量耗時低效的 IO 操作 太雨,導致寫磁盤時的對象過多吟榴,讀磁盤時候的對象也過多,這些對象存儲在堆內(nèi)存中囊扳,會導致堆內(nèi)存不足吩翻,相應會導致頻繁的GC,GC會導致OOM锥咸。

???合并HashShuffle :

????????一個Executor只有一種類型的Key的數(shù)據(jù)狭瞎,每個Executor上總共就要創(chuàng)建分區(qū)數(shù)個磁盤文件;

????????缺點:如果Reducer 端的并行任務或者是數(shù)據(jù)分片過多的話則 Core * Reducer Task 依舊過大搏予,也會產(chǎn)生很多小文件熊锭。


普通SortShuffle

????????普通SortShuffle?相當于預聚合,數(shù)據(jù)會先寫入一個數(shù)據(jù)結(jié)構(gòu),聚合算子寫入Map碗殷,一邊通過Map局部聚合精绎,一邊寫入內(nèi)存。锌妻。在該模式下代乃,數(shù)據(jù)會先寫入一個數(shù)據(jù)結(jié)構(gòu),聚合算子寫入Map仿粹,一邊通過Map局部聚合搁吓,一邊寫入內(nèi)存。Join算子寫入ArrayList直接寫入內(nèi)存中吭历。然后需要判斷是否達到閾值(5M)堕仔,如果達到就會將內(nèi)存數(shù)據(jù)結(jié)構(gòu)的數(shù)據(jù)寫入到磁盤,清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)晌区。

????????在溢寫磁盤前摩骨,先根據(jù)key進行排序,排序過后的數(shù)據(jù)朗若,會分批寫入到磁盤文件中仿吞。每批一萬條寫入到一個臨時磁盤文件,每次溢寫都會產(chǎn)生一個臨時磁盤文件捡偏,一個task過程會產(chǎn)生多個臨時文件。最后在每個task中峡迷,將所有的臨時文件合并merge银伟,一次寫入到最終文件。

????????一個task的所有數(shù)據(jù)都在這一個文件中绘搞。同時單獨寫一份索引文件彤避,標識下游各個task的數(shù)據(jù)在文件中的索引start offset和end offset。這樣算來如果第一個stage 50個task夯辖,每個Executor執(zhí)行一個task琉预,那么無論下游有幾個task,就需要50*2=100個磁盤文件蒿褂。

????????????1. 小文件明顯變少了圆米,一個task只生成一個file文件

????????????2. file文件整體有序,加上索引文件的輔助啄栓,查找變快娄帖,雖然排序浪費一些性能,但是查找變快很多


bypass模式SortShuffle?

????????????優(yōu)化后sortshuffle的普通機制相比昙楚,在shuffleMapTask不多的情況下近速,首先寫的機制是不同,其次不會進行排序。這樣就可以節(jié)約一部分性能開銷削葱。

????????????在shuffleMapTask數(shù)量小于默認值200時奖亚,啟用bypass模式的sortShuffle(原因是數(shù)據(jù)量本身比較少,沒必要進行sort全排序析砸,因為數(shù)據(jù)量少本身查詢速度就快昔字,正好省了sort的那部分性能開銷。)


該機制與普通SortShuffleManager運行機制的不同在于:

????????第一: 磁盤寫機制不同干厚;

????????第二: 不會進行sort排序李滴;

bypass機制運行條件

- shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值

- 不是聚合類的shuffle算子(比如reduceByKey)


四、SparkSQL

是apache Spark用來處理結(jié)構(gòu)化數(shù)據(jù)的一個模塊

1蛮瞄、sparksql的四大特性

(1)易整合所坯,將SQL查詢與Spark程序無縫混合,可以使用不同的語言進行代碼開發(fā)

(2)統(tǒng)一的數(shù)據(jù)源訪問挂捅,可以采用一種統(tǒng)一的方式去對接任意的外部數(shù)據(jù)源

val ?dataFrame = sparkSession.read.文件格式的方法名("該文件格式的路徑")

(3)兼容hive芹助,sparksql可以支持hivesql語法 ?sparksql兼容hivesql

(4)支持標準的數(shù)據(jù)庫連接,支持標準的數(shù)據(jù)庫連接JDBC或者ODBC


2闲先、DataFrame

????????DataFrame是一種以RDD為基礎的分布式數(shù)據(jù)集状土,類似于傳統(tǒng)數(shù)據(jù)庫的二維表格;DataFrame帶有Schema元信息伺糠,即所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型蒙谓。

????????RDD,具有面向?qū)ο缶幊痰娘L格训桶;而且開發(fā)時會進行類型的檢查累驮,保證編譯時類型安全。但是RDD數(shù)據(jù)量比較大時舵揭,由于需要存儲在堆內(nèi)存中谤专,堆內(nèi)存有限,容易出現(xiàn)頻繁的垃圾回收(GC)午绳;同時置侍,RDD發(fā)送到其他服務器,序列化和反序列性能開銷很大拦焚。

????????DataFrame引入了schema元信息和off-heap(堆外內(nèi)存)蜡坊,大量的對象構(gòu)建直接使用操作系統(tǒng)層面上的內(nèi)存,堆內(nèi)存就比較充足赎败,不容易GC算色;同時,schema元信息代表數(shù)據(jù)結(jié)構(gòu)的描述信息螟够,可以省略掉對schema的序列化網(wǎng)絡傳輸灾梦,只需對數(shù)據(jù)內(nèi)容本身進行序列化峡钓,減小序列化和反序列性能開銷。但是若河,編譯時不會進行類型的檢查能岩,編譯時類型不安全;不在具有面向?qū)ο缶幊痰娘L格萧福;

????????DataSet是在Spark1.6中添加的新的接口拉鹃,提供了強類型支持,在RDD的每行數(shù)據(jù)加了類型約束鲫忍,可以用強大lambda函數(shù)膏燕,使用了Spark SQL優(yōu)化的執(zhí)行引擎。DataSet包含了DataFrame的功能悟民,Spark2.0中兩者統(tǒng)一坝辫,DataFrame表示為DataSet[Row],即DataSet的子集射亏。修改了DataSet的缺陷近忙,DataSet可以在編譯時檢查類型,并且是面向?qū)ο蟮木幊探涌凇?/p>

RDD智润,存儲在堆內(nèi)存及舍、序列化開銷很大、編譯時類型安全窟绷、面向?qū)ο缶幊獭?/b>

DataFrame锯玛,引入了schema元信息和堆外內(nèi)存,只需對數(shù)據(jù)內(nèi)容本身進行序列化兼蜈,編譯時類型不安全精拟、不具有面向?qū)ο缶幊田L格应狱。

DataSet儡循,Spark1.6中提供了強類型支持厂抽,包含了DataFrame的功能蔬浙,DataFrame表示為DataSet[Row]忽孽、編譯時類型安全浩螺、面向?qū)ο缶幊獭?br>


3疫剃、 RDD 姊途、DataFrame涉瘾、DataSet?

1RDD

優(yōu)點:

????????編譯時類型安全、編譯時就能檢查出類型錯誤捷兰、面向?qū)ο蟮木幊田L格立叛、直接通過類名點的方式來操作數(shù)據(jù)

缺點:

????????序列化和反序列化的性能開銷、無論是集群間的通信,還是IO操作都需要對對象的結(jié)構(gòu)和數(shù)據(jù)進行序列化和反序列化贡茅。GC的性能開銷秘蛇,頻繁的創(chuàng)建和銷毀對象, 勢必會增加GC

2DataFrame

????????DataFrame引入了schema和off-heap

????????schema : RDD每一行的數(shù)據(jù), 結(jié)構(gòu)都是一樣的其做,這個結(jié)構(gòu)就存儲在schema中。?Spark通過schema就能夠讀懂數(shù)據(jù), 因此在通信和IO時就只需要序列化和反序列化數(shù)據(jù), 而結(jié)構(gòu)的部分就可以省略了赁还。

3DataSet

????????DataSet結(jié)合了RDD和DataFrame的優(yōu)點妖泄,并帶來的一個新的概念Encoder。

????????當序列化數(shù)據(jù)時艘策,Encoder產(chǎn)生字節(jié)碼與off-heap進行交互蹈胡,能夠達到按需訪問數(shù)據(jù)的效果,而不用反序列化整個對象朋蔫。Spark還沒有提供自定義Encoder的API罚渐,但是未來會加入。


三者之間的轉(zhuǎn)換:

(1)DataFrame與DataSet互轉(zhuǎn)

????????·DataFrame轉(zhuǎn)換成DataSet:val dataSet=dataFrame.as[強類型]

????????·DataSet轉(zhuǎn)換成DataFrame:val dataFrame=dataSet.toDF

(1)DataFrame驯妄、DataSet與RDD互轉(zhuǎn):

????????·從dataFrame和dataSet獲取得到rdd:

????????????val rdd1=dataFrame.rdd荷并;val rdd2=dataSet.rdd

????????·RDD轉(zhuǎn)換為DataFrame:

????????方法一:反射機制,定義一個樣例類富玷,后期直接映射成DataFrame的schema信息璧坟;

????????方法二:通過StructType直接指定Schema


3、DataFrame常用操作

DSL風格語法:sparksql中的DataFrame自身提供了一套自己的Api赎懦,可以去使用這套api來做相應的處理

(1)?SQL風格語法

可以把DataFrame注冊成一張表雀鹃,然后通過sparkSession.sql(sql語句)操作

import?org.apache.spark.SparkContext

import?org.apache.spark.rdd.RDD

import?org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}


//todo:利用反射機制實現(xiàn)把rdd轉(zhuǎn)成dataFrame

case?class?Person(id:String,name:String,age:Int)

object?CaseClassSchema?{

??def?main(args: Array[String]): Unit?=?{

?? ?//1、構(gòu)建SparkSession對象

?? ?val?spark: SparkSession?=?SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()

?? ?//2励两、獲取sparkContext對象

?? ?val?sc: SparkContext?=?spark.sparkContext

?? ?sc.setLogLevel("warn")

?? ?//3黎茎、讀取文件數(shù)據(jù)

?? ?val?data: RDD[Array[String]] =?sc.textFile("E:\\person.txt").map(x=>x.split(" "))


方法一:反射機制,定義一個樣例類当悔,后期直接映射成DataFrame的schema信息

?//4傅瞻、定義一個樣例類

case class Person(id:String,name:String,age:Int)

?? ?//5、將rdd與樣例類進行關(guān)聯(lián)

?? ?val?personRDD: RDD[Person] =?data.map(x=>Person(x(0),x(1),x(2).toInt))

?? ?//6盲憎、將rdd轉(zhuǎn)換成dataFrame

?? ?//需要手動導入隱式轉(zhuǎn)換

?? ?import?spark.implicits._

?? ?val?personDF: DataFrame?=?personRDD.toDF


方法二:通過StructType直接指定Schema

//4嗅骄、將rdd與Row對象進行關(guān)聯(lián)

?val rowRDD: RDD[Row] = data.map(x=>Row(x(0),x(1),x(2).toInt))

//5、指定dataFrame的schema信息饼疙,這里指定的字段個數(shù)和類型必須要跟Row對象保持一致

??val schema=StructType(

????????StructField("id",StringType)::

????????StructField("name",StringType)::

????????StructField("age",IntegerType)::Nil

????)

????val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema)



?? ?//7溺森、對dataFrame進行相應的語法操作

?? ?//todo:----------------- DSL風格語法-----------------start

?? ?//打印schema

?? ?personDF.printSchema()

?? ?//展示數(shù)據(jù)

?? ?personDF.show()

?? ?//獲取第一行數(shù)據(jù)

?? ?val?first: Row?=?personDF.first()

?? ?println("first:"+first)

?? ?//取出前3位數(shù)據(jù)

?? ?val?top3: Array[Row] =?personDF.head(3)

?? ?top3.foreach(println)

?? ?//獲取name字段

?? ?personDF.select("name").show()

?? ?personDF.select($"name").show()

?? ?personDF.select(new?Column("name")).show()

?? ?personDF.select("name","age").show()

?? ?//實現(xiàn)age +1

?? ?personDF.select($"name",$"age",$"age"+1).show()

?? ?//按照age過濾

?? ?personDF.filter($"age"?>30).show()

?? ?val?count: Long?=?personDF.filter($"age"?>30).count()

?? ?println("count:"+count)

?? ?//分組

?? ?personDF.groupBy("age").count().show()

?? ?personDF.show()

?? ?personDF.foreach(row?=>?println(row))

?? ?//使用foreach獲取每一個row對象中的name字段

?? ?personDF.foreach(row?=>println(row.getAs[String]("name")))

?? ?personDF.foreach(row?=>println(row.get(1)))

?? ?personDF.foreach(row?=>println(row.getString(1)))

?? ?personDF.foreach(row?=>println(row.getAs[String](1)))

?? ?//todo:----------------- DSL風格語法--------------------end

?? ?//todo:----------------- SQL風格語法-----------------start

?? ?personDF.createTempView("person")

?? ?//使用SparkSession調(diào)用sql方法統(tǒng)計查詢

?? ?spark.sql("select * from person").show

?? ?spark.sql("select name from person").show

?? ?spark.sql("select name,age from person").show

?? ?spark.sql("select * from person where age >30").show

?? ?spark.sql("select count(*) from person where age >30").show

?? ?spark.sql("select age,count(*) from person group by age").show

?? ?spark.sql("select age,count(*) as count from person group by age").show

?? ?spark.sql("select * from person order by age desc").show

?? ?//todo:----------------- SQL風格語法----------------------end

?? ?//關(guān)閉sparkSession對象

?? ?spark.stop()

??}

}

16、sparksql操作hivesql

添加依賴

?? ? ? ?<dependency>

?? ? ? ? ? ?<groupId>org.apache.spark</groupId>

?? ? ? ? ? ?<artifactId>spark-hive_2.11</artifactId>

?? ? ? ? ? ?<version>2.3.3</version>

?? ? ? ?</dependency>

代碼開發(fā)

import?org.apache.spark.sql.SparkSession

//todo:利用sparksql操作hivesql

object?HiveSupport?{

??def?main(args: Array[String]): Unit?=?{

?? ?//1窑眯、構(gòu)建SparkSession對象

?? ?val?spark: SparkSession?=?SparkSession.builder().appName("HiveSupport") .master("local[2]")

?? ? ?.enableHiveSupport()//開啟對hive的支持

?? ? ?.getOrCreate()


?? ?//2屏积、直接使用sparkSession去操作hivesql語句

?? ? ?//2.1創(chuàng)建一張hive表

?? ? ? spark.sql("create table people(id string,name string,age int) row format delimited fields terminated by ','")

?? ? ?//2.2加載數(shù)據(jù)到hive表中

?? ? ? spark.sql("load data local inpath './data/kaikeba.txt' into table people ")

?? ? ?//2.3查詢

?? ? ?spark.sql("select * from people").show()

?? ?spark.stop()

??}

}


添加mysql連接驅(qū)動jar包

<dependency>

????<groupId>mysql</groupId>

????<artifactId>mysql-connector-java</artifactId>

????<version>5.1.38</version>

</dependency>

代碼開發(fā)

import?java.util.Properties

import?org.apache.spark.sql.{DataFrame, SparkSession}

//todo:通過sparksql把結(jié)果數(shù)據(jù)寫入到mysql表中

object?Data2Mysql?{

??def?main(args: Array[String]): Unit?=?{

?? ?//1、創(chuàng)建SparkSession

?? ?val?spark: SparkSession?=?SparkSession?.builder().appName("Data2Mysql") ?.getOrCreate()


?? ?//2磅甩、讀取mysql表中數(shù)據(jù)

?? ? ? ?//2.1定義url連接

?? ? ? ?val?url="jdbc:mysql://node03:3306/spark"

?? ? ? ?//2.2定義表名

?? ? ? ?val?table="user"

?? ? ? ?//2.3定義屬性

?? ? ? ?val?properties=new?Properties()

?? ? ? ?properties.setProperty("user","root")

?? ? ? ?properties.setProperty("password","123456")

?? ?val?mysqlDF: DataFrame?=?spark.read.jdbc(url,table,properties)

?? ?//把dataFrame注冊成一張表

?? ? ?mysqlDF.createTempView("user")

?? ?//通過sparkSession調(diào)用sql方法

?? ? ? //需要統(tǒng)計經(jīng)度和維度出現(xiàn)的人口總數(shù)大于1000的記錄 保存到mysql表中

?? ?val?result: DataFrame?=?spark.sql("select * from user where age >30")

????//保存結(jié)果數(shù)據(jù)到mysql表中

?? ?//mode:指定數(shù)據(jù)的插入模式

?? ? ? ?//overwrite:表示覆蓋炊林,如果表不存在,事先幫我們創(chuàng)建

?? ? ? ?//append ? :表示追加卷要, 如果表不存在渣聚,事先幫我們創(chuàng)建

?? ? ? ?//ignore ? :表示忽略独榴,如果表事先存在,就不進行任何操作

?? ? ? ?//error ? ?:如果表事先存在就報錯(默認選項)


?? ? result.write.mode(args(0)).jdbc(url,args(1),properties)

?? ?//關(guān)閉

?? ? spark.stop()

??}

}

提交任務腳本

spark-submit \

--master?spark://node01:7077 \

--class?com.kaikeba.sql.Data2Mysql \

--executor-memory?1g \

--total-executor-cores?4?\

--driver-class-path?/home/hadoop/mysql-connector-java-5.1.38.jar \

--jars?/home/hadoop/mysql-connector-java-5.1.38.jar \

spark_class02-1.0-SNAPSHOT.jar \

append ?kaikeba


4饵逐、sparksql中自定義函數(shù)(★★★★★)


import?org.apache.spark.sql.api.java.UDF1

import?org.apache.spark.sql.types.StringType

import?org.apache.spark.sql.{DataFrame, SparkSession}

//TODO:自定義sparksql的UDF函數(shù) ? ?一對一的關(guān)系

object?SparkSQLFunction?{

??def?main(args: Array[String]): Unit?=?{

?? ?//1括眠、創(chuàng)建SparkSession

?? ?val?sparkSession: SparkSession?=?SparkSession.builder().appName("SparkSQLFunction").master("local[2]").getOrCreate()

?//2、構(gòu)建數(shù)據(jù)源生成DataFrame

?? ?val?dataFrame: DataFrame?=?sparkSession.read.text("E:\\data\\test_udf_data.txt")

?//3倍权、注冊成表

?? ?dataFrame.createTempView("t_udf")

?? ?//4掷豺、實現(xiàn)自定義的UDF函數(shù)

?? ?//小寫轉(zhuǎn)大寫

?? ?sparkSession.udf.register("low2Up",new?UDF1[String,String]() {

?? ? ?override?def?call(t1: String): String?=?{

?? ? ? ?t1.toUpperCase

?? ? ?}

?? ?},StringType)

?? ?//大寫轉(zhuǎn)小寫

?? ?sparkSession.udf.register("up2low",(x:String)=>x.toLowerCase)

?? ?//4、把數(shù)據(jù)文件中的單詞統(tǒng)一轉(zhuǎn)換成大小寫

?? ?sparkSession.sql("select ?value from t_udf").show()

?? ?sparkSession.sql("select ?low2Up(value) from t_udf").show()

?? ?sparkSession.sql("select ?up2low(value) from t_udf").show()

?? ?sparkSession.stop()

??}

}

5薄声、sparksql整合hive

1当船、把hive目錄下的hive-site.xml,拷貝到每一個spark的conf文件夾中

2把連接mysql驅(qū)動的jar包默辨,拷貝到spark的jars文件夾中

可以使用spark-sql腳本 后期執(zhí)行sql相關(guān)的任務

啟動腳本

spark-sql \

--master?spark://node01:7077 \

--executor-memory?1g \

--total-executor-cores?4?\

--conf?spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse

應用場景

#!/bin/sh

#定義sparksql提交腳本的頭信息

SUBMITINFO="spark-sql --master spark://node01:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node01:8020/user/hive/warehouse"?

#定義一個sql語句

SQL="select * from default.hive_source;"?

#執(zhí)行sql語句 ? 類似于 hive -e sql語句

echo?"$SUBMITINFO"?

echo?"$SQL"

$SUBMITINFO?-e?"$SQL"

6德频、sparkSql輸出jdbc

val properties=new Properties()

properties.setProperty("user","root")

properties.setProperty("password","123456")

val mysqlDF:?DataFrame =?spark.read.jdbc(url,table,properties)?

//dataFrame注冊成一張表

mysqlDF.createTempView("user")


//保存結(jié)果數(shù)據(jù)到mysql表中

result.write.mode("append").jdbc(url,"kaikeba",properties)

//mode:指定數(shù)據(jù)的插入模式

//overwrite: 表示覆蓋,如果表不存在缩幸,事先幫我們創(chuàng)建

//append :表示追加壹置, 如果表不存在,事先幫我們創(chuàng)建

//ignore :表示忽略表谊,如果表事先存在钞护,就不進行任何操作

//error :如果表事先存在就報錯(默認選項) //關(guān)閉



?五、?saprk Sreaming


1爆办、DStream

離散數(shù)據(jù)流

????????一個DStream以一系列連續(xù)的RDDs所展現(xiàn)难咕,其中的每個RDD都包含來自一定間隔的數(shù)據(jù),在DStream上使用的任何操作都會轉(zhuǎn)換為針對底層RDD的操作距辆。

scala版本

object?WordCount?{

??def?main(args: Array[String]): Unit?=?{

?? ?//步驟一:初始化程序入口

?? ?val?conf?=?new?SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

?? ?val?ssc?=?new?StreamingContext(conf,?Seconds(1))//間隔1s的數(shù)據(jù),形成rdd

?? ?//步驟二:獲取數(shù)據(jù)流

?? ?val?lines?=?ssc.socketTextStream("localhost", 9999)

?? ?//步驟三:數(shù)據(jù)處理

?? ?val?words?=?lines.flatMap(_.split(" "))

?? ?val?pairs?=?words.map(word?=>?(word, 1))

?? ?val?wordCounts?=?pairs.reduceByKey(_?+?_)

?? //步驟四: 數(shù)據(jù)輸出

?? ?wordCounts.print()

?? ?//步驟五:啟動任務

?? ?ssc.start()

?? ?ssc.awaitTermination()

?? ?ssc.stop()

??}

}


2余佃、Transformation 高級算子

updateStateByKeymapWithState跨算、Transform爆土、Window


?(1)updateStateByKey

/??? ? ?*數(shù)據(jù)的處理

?? ? ?* Option:

?? ? ?* ? Some:有值;None:沒有值

?? ? ?* ? values:Seq[Int] ? List{1,1}

?? ? ?* ? state:Option[Int]上一次這個單詞出現(xiàn)了多少次 ?None ?Some 2

?? ? ?*/


?? ?val?wordCountDStream?=?dstream.flatMap(_.split(","))

?? ? ?.map((_, 1))

?? ? ?.updateStateByKey((values: Seq[Int], state: Option[Int]) =>?{

?? ? ? ?val?currentCount?=?values.sum

?? ? ? ?val?lastCount?=?state.getOrElse(0)

?? ? ? ?Some(currentCount?+?lastCount)

?? ? ?})

(2)mapWithState

/**

??*性能更好

??*/

?? ?// currentBatchTime :表示當前的Batch的時間

?? ?// key:表示需要更新狀態(tài)的key

?? ?// value:表示當前batch的對應的key的對應的值

?? ?// currentState:對應key的當前的狀態(tài)

?? ?val?stateSpec?=?StateSpec.function((currentBatchTime: Time, key: String, value: Option[Int], currentState: State[Long]) =>?{

?? ? ?val?sum?=?value.getOrElse(0).toLong?+?currentState.getOption.getOrElse(0L)

?? ? ?val?output?=?(key, sum)

?? ? ?if?(!currentState.isTimingOut()) {

?? ? ? ?currentState.update(sum)

?? ? ?}

?? ? ?Some(output)

?? ?}).initialState(initialRDD).numPartitions(2).timeout(Seconds(30))

?//timeout:當一個key超過這個時間沒有接收到數(shù)據(jù)的時候诸蚕,這個key以及對應的狀態(tài)會被移除掉

?? ?val?result?=?wordsDStream.mapWithState(stateSpec)


(3)Transform


?? ?//transform需要有返回值步势,必須類型是RDD

?? ?val?wordCountDStream?=?wordOneDStream.transform(rdd?=>?{

?? ? ?val?filterRDD: RDD[(String, Boolean)] =?rdd.sparkContext.parallelize(blackListBroadcast.value)

?? ? ?val?resultRDD: RDD[(String, (Int, Option[Boolean]))] =?rdd.leftOuterJoin(filterRDD)

?? ? ?? ? ?resultRDD.filter(tuple?=>?{

?? ? ? ?tuple._2._2.isEmpty

?? ? ?}).map(_._1)

?? ?}).map((_, 1)).reduceByKey(_?+?_)


(4) Window操作

??*實現(xiàn)一個 每隔4秒,統(tǒng)計最近6秒的單詞計數(shù)的情況挫望。

?? ? ?*數(shù)據(jù)的處理

?? ? ?*我們一直講的是數(shù)據(jù)處理的算子

?? ? ?*這個地方算子 就是生產(chǎn)時候用的算子。

?? ? ?*

?? ? ?* ?reduceFunc: (V, V) => V,

?? ? ? ? windowDuration: Duration,6窗口的大小

?? ? ? ? slideDuration: Duration,4滑動的大小

?? ? ? ? numPartitions: Int指定分區(qū)數(shù)

?? ? ?*/

?? ?val?resultWordCountDStream?=?dstream.flatMap(_.split(","))

?? ? ?.map((_, 1))

?? ? ?.reduceByKeyAndWindow((x: Int, y: Int) =>?x?+?y, Seconds(6), Seconds(4))

??}

}


(5)foreachRDD

核心算子講解


?? ?//將結(jié)果保存到Mysql(二)

?? ?wordCounts.foreachRDD?{ (rdd, time) =>

?? ? ?rdd.foreach?{ record?=>

?? ? ? ?Class.forName("com.mysql.jdbc.Driver")

?? ? ? ?val?conn?=?DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "root")

?? ? ? ?val?statement?=?conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")

?? ? ? ?statement.setLong(1, time.milliseconds)

?? ? ? ?statement.setString(2, record._1)

?? ? ? ?statement.setInt(3, record._2)

?? ? ? ?statement.execute()

?? ? ? ?statement.close()

?? ? ? ?conn.close()

?? ? ?}

?? ?}


?? ?//將結(jié)果保存到Mysql(七)

?? ?wordCounts.foreachRDD?{ (rdd, time) =>

?? ? ?rdd.foreachPartition?{ partitionRecords?=>

?? ? ? ?val?conn?=?ConnectionPool.getConnection

?? ? ? ?conn.setAutoCommit(false)

?? ? ? ?val?statement?=?conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")

?? ? ? ?partitionRecords.zipWithIndex.foreach?{ case?((word, count), index) =>

?? ? ? ? ?statement.setLong(1, time.milliseconds)

?? ? ? ? ?statement.setString(2, word)

?? ? ? ? ?statement.setInt(3, count)

?? ? ? ? ?statement.addBatch()

?? ? ? ? ?if?(index?!=?0?&&?index?%?500?==?0) {

?? ? ? ? ? ?statement.executeBatch()

?? ? ? ? ? ?conn.commit()

?? ? ? ? ?}

?? ? ? ?}

?? ? ? ?statement.executeBatch()

?? ? ? ?statement.close()

?? ? ? ?conn.commit()

?? ? ? ?conn.setAutoCommit(true)

?? ? ? ?ConnectionPool.returnConnection(conn)

?? ? ?}

?? ?}


(6) Checkpoint

/**

??* Dirver HA

??*/

object?DriverHAWordCount?{

??def?main(args: Array[String]): Unit?=?{

?? ?val?checkpointDirectory:String="hdfs://hadoop1:9000/streamingcheckpoint2";

?? ?def?functionToCreateContext(): StreamingContext?=?{

?? ? ?val?conf?=?new?SparkConf().setMaster("local[2]").setAppName("NetWordCount")

?? ? ?val?sc?=?new?SparkContext(conf)

?? ? ?val?ssc?=?new?StreamingContext(sc,Seconds(2))

?? ? ?ssc.checkpoint(checkpointDirectory)

?? ? ?val?dstream: ReceiverInputDStream[String] =?ssc.socketTextStream("hadoop1",9999)

?? ? ?val?wordCountDStream?=?dstream.flatMap(_.split(","))

?? ? ? ?.map((_, 1))

?? ? ? ?.updateStateByKey((values: Seq[Int], state: Option[Int]) =>?{

?? ? ? ? ?val?currentCount?=?values.sum

?? ? ? ? ?val?lastCount?=?state.getOrElse(0)

?? ? ? ? ?Some(currentCount?+?lastCount)

?? ? ? ?})

?? ? ?wordCountDStream.print()

?? ? ?ssc.start()

?? ? ?ssc.awaitTermination()

?? ? ?ssc.stop()

?? ? ?ssc

?? ?}

?? ?val?ssc?=?StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext?_)

?? ?ssc.start()

?? ?ssc.awaitTermination()

?? ?ssc.stop()

??}

}

3狂窑、SparkStreaming和SparkSQL整合

pom.xml里面添加

?? ? ?<dependency>

?? ? ? ? ? ?<groupId>org.apache.spark</groupId>

?? ? ? ? ? ?<artifactId>spark-sql_2.11</artifactId>

?? ? ? ? ? ?<version>2.2.1</version>

?? ? ?</dependency>


? WordCount程序媳板,Spark Streaming消費TCP Server發(fā)過來的實時數(shù)據(jù)的例子:

? 1、在master服務器上啟動一個Netcat server

`$ nc -lk 9998` (如果nc命令無效的話泉哈,我們可以用yum install -y nc來安裝nc)

object?NetworkWordCountForeachRDDDataFrame?{

??def?main(args: Array[String]) {

?? ?val?sparkConf?=?new?SparkConf().setAppName("NetworkWordCountForeachRDD")

?? ?val?sc?=?new?SparkContext(sparkConf)

?? ?// Create the context with a 1 second batch size

?? ?val?ssc?=?new?StreamingContext(sc, Seconds(1))

?? ?//創(chuàng)建一個接收器(ReceiverInputDStream)蛉幸,這個接收器接收一臺機器上的某個端口通過socket發(fā)送過來的數(shù)據(jù)并處理

?? ?val?lines?=?ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)

?? ?//處理的邏輯破讨,就是簡單的進行word count

?? ?val?words?=?lines.flatMap(_.split(" "))

?? ?//將RDD轉(zhuǎn)化為Dataset

?? ?words.foreachRDD?{ rdd?=>

?? ? ?// Get the singleton instance of SparkSession

?? ? ?val?spark?=?SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

?? ? ?import?spark.implicits._

?? ? ?// Convert RDD[String] to DataFrame

?? ? ?val?wordsDataFrame?=?rdd.toDF("word")

?? ? ?// Create a temporary view

?? ? ?wordsDataFrame.createOrReplaceTempView("words")

?? ? ?// Do word count on DataFrame using SQL and print it

?? ? ?val?wordCountsDataFrame?=

?? ? ? ?spark.sql("select word, count(*) as total from words group by word")

?? ? ?wordCountsDataFrame.show()

?? ?}

?? ?//啟動Streaming處理流

?? ?ssc.start()

?? ?ssc.stop(false)

?? ?//將RDD轉(zhuǎn)化為Dataset

?? ?words.foreachRDD?{ (rdd, time) =>

?? ? ?// Get the singleton instance of SparkSession

?? ? ?val?spark?=?SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

?? ? ?import?spark.implicits._

?? ? ?// Convert RDD[String] to DataFrame

?? ? ?val?wordsDataFrame?=?rdd.toDF("word")

?? ? ?// Do word count on DataFrame using SQL and print it

?? ? ?val?wordCountsDataFrame?=?wordsDataFrame.groupBy("word").count()

?? ? ?val?resultDFWithTs?=?wordCountsDataFrame.rdd.map(row?=>?(row(0), row(1), time.milliseconds)).toDF("word", "count", "ts")

?? ? ?resultDFWithTs.write.mode(SaveMode.Append).parquet("hdfs://master:9999/user/spark-course/streaming/parquet")

?? ?}

?? ?//等待Streaming程序終止

?? ?ssc.awaitTermination()

??}

}

6、SparkStreaming消費kafka

一奕纫、基于Receiver的方式

????????這種方式使用Receiver來獲取數(shù)據(jù)提陶。Receiver是使用Kafka的高層次Consumer API來實現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的(如果突然數(shù)據(jù)暴增匹层,大量batch堆積隙笆,很容易出現(xiàn)內(nèi)存溢出的問題),然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)升筏。

????????然而撑柔,在默認的配置下,這種方式可能會因為底層的失敗而丟失數(shù)據(jù)您访。如果要啟用高可靠機制铅忿,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log灵汪,WAL)檀训。該機制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預寫日志中。所以享言,即使底層節(jié)點出現(xiàn)了失敗峻凫,也可以使用預寫日志中的數(shù)據(jù)進行恢復。

二担锤、基于Direct的方式

????????這種新的不基于Receiver的直接方式蔚晨,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制肛循。替代掉使用Receiver來接收數(shù)據(jù)后铭腕,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset多糠,從而定義每個batch的offset的范圍累舷。當處理數(shù)據(jù)的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)夹孔。

優(yōu)點如下

? ??簡化并行讀缺挥:如果要讀取多個partition,不需要創(chuàng)建多個輸入DStream然后對它們進行union操作搭伤。Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition只怎,并且會并行從Kafka中讀取數(shù)據(jù)。所以在Kafka partition和RDD partition之間怜俐,有一個一對一的映射關(guān)系身堡。

? ??高性能:如果要保證零數(shù)據(jù)丟失,在基于receiver的方式中拍鲤,需要開啟WAL機制贴谎。這種方式其實效率低下汞扎,因為數(shù)據(jù)實際上被復制了兩份,Kafka自己本身就有高可靠的機制擅这,會對數(shù)據(jù)復制一份澈魄,而這里又會復制一份到WAL中。而基于direct的方式仲翎,不依賴Receiver痹扇,不需要開啟WAL機制,只要Kafka中作了數(shù)據(jù)的復制谭确,那么就可以通過Kafka的副本進行恢復帘营。

一次且僅一次的事務機制

逐哈、對比:

????????基于receiver的方式芬迄,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數(shù)據(jù)的傳統(tǒng)方式昂秃。這種方式配合著WAL機制可以保證數(shù)據(jù)零丟失的高可靠性禀梳,但是卻無法保證數(shù)據(jù)被處理一次且僅一次,可能會處理兩次肠骆。因為Spark和ZooKeeper之間可能是不同步的算途。

????????基于direct的方式,使用kafka的簡單api蚀腿,Spark Streaming自己就負責追蹤消費的offset嘴瓤,并保存在checkpoint中。Spark自己一定是同步的莉钙,因此可以保證數(shù)據(jù)是消費一次且僅消費一次廓脆。

????????在實際生產(chǎn)環(huán)境中大都用Direct方式

7、簡述SparkStreaming窗口函數(shù)的原理(重點)

????窗口函數(shù)就是在原來定義的SparkStreaming計算批次大小的基礎上磁玉,再次進行封裝停忿,每次計算多個批次的數(shù)據(jù),同時還需要傳遞一個滑動步長的參數(shù)蚊伞,用來設置當次計算任務完成之后下一次從什么地方開始計算席赂。

????????圖中time1就是SparkStreaming計算批次大小,虛線框以及實線大框就是窗口的大小时迫,必須為批次的整數(shù)倍颅停。虛線框到大實線框的距離(相隔多少批次),就是滑動步長掠拳。

8癞揉、手寫出wordcount代碼實現(xiàn)(Scala)

?val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

?val sc = new SparkContext(conf)

?sc.textFile("/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/output")

?sc.stop()

9、Spark實現(xiàn)topN的獲取(描述思路或使用偽代碼)(重點)

方法1:

(1)按照key對數(shù)據(jù)進行聚合(groupByKey)

(2)將value轉(zhuǎn)換為數(shù)組烧董,利用scala的sortBy或者sortWith進行排序(mapValues)數(shù)據(jù)量太大,會OOM胧奔。

方法2:

(1)取出所有的key

(2)對key進行迭代逊移,每次取出一個key利用spark的排序算子進行排序

方法3:

(1)自定義分區(qū)器,按照key進行分區(qū)龙填,使不同的key進到不同的分區(qū)

(2)對每個分區(qū)運用spark的排序算子進行排序

10胳泉、京東:調(diào)優(yōu)之前與調(diào)優(yōu)之后性能的詳細對比(例如調(diào)整map個數(shù),map個數(shù)之前多少岩遗、之后多少扇商,有什么提升)

這里舉個例子。比如我們有幾百個文件宿礁,會有幾百個map出現(xiàn)案铺,讀取之后進行join操作,會非常的慢梆靖。這個時候我們可以進行coalesce操作控汉,比如240個map,我們合成60個map返吻,也就是窄依賴姑子。這樣再shuffle,過程產(chǎn)生的文件數(shù)會大大減少测僵。提高join的時間性能街佑。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市捍靠,隨后出現(xiàn)的幾起案子沐旨,更是在濱河造成了極大的恐慌,老刑警劉巖剂公,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件希俩,死亡現(xiàn)場離奇詭異,居然都是意外死亡纲辽,警方通過查閱死者的電腦和手機颜武,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拖吼,“玉大人鳞上,你說我怎么就攤上這事〉醯担” “怎么了篙议?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我鬼贱,道長移怯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任这难,我火速辦了婚禮舟误,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘姻乓。我一直安慰自己嵌溢,他們只是感情好,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布蹋岩。 她就那樣靜靜地躺著赖草,像睡著了一般。 火紅的嫁衣襯著肌膚如雪剪个。 梳的紋絲不亂的頭發(fā)上秧骑,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天,我揣著相機與錄音扣囊,去河邊找鬼腿堤。 笑死,一個胖子當著我的面吹牛如暖,可吹牛的內(nèi)容都是我干的笆檀。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼盒至,長吁一口氣:“原來是場噩夢啊……” “哼酗洒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起枷遂,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤樱衷,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后酒唉,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體矩桂,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年痪伦,在試婚紗的時候發(fā)現(xiàn)自己被綠了侄榴。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡网沾,死狀恐怖癞蚕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情辉哥,我是刑警寧澤桦山,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布攒射,位于F島的核電站,受9級特大地震影響恒水,放射性物質(zhì)發(fā)生泄漏会放。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一钉凌、第九天 我趴在偏房一處隱蔽的房頂上張望鸦概。 院中可真熱鬧,春花似錦甩骏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至论熙,卻和暖如春福青,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背脓诡。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工无午, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人祝谚。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓宪迟,卻偏偏與公主長得像查牌,于是被迫代替她去往敵國和親檩小。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容