(十五)大數(shù)據(jù)學(xué)習(xí)之Spark

Spark

一.Spark生態(tài)圈:

(1)Spark Core : RDD(彈性分布式數(shù)據(jù)集)
(2)Spark SQL
(3)Spark Streaming
(4)Spark MLLib:協(xié)同過(guò)濾目胡,ALS毒返,邏輯回歸等等 --> 機(jī)器學(xué)習(xí)
(5)Spark Graphx : 圖計(jì)算

二.什么是Spark

1.Spark是什么:
Spark是一個(gè)針對(duì)大規(guī)模數(shù)據(jù)處理的快速通用引擎。

  • Spark是一種快速凉敲、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎寺旺,2009年誕生于加州大學(xué)伯克利分校AMPLab爷抓,2010年開源,2013年6月成為Apache孵化項(xiàng)目迅涮,2014年2月成為Apache頂級(jí)項(xiàng)目废赞。目前,Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目的集合叮姑,其中包含SparkSQL唉地、Spark Streaming、GraphX传透、MLlib等子項(xiàng)目耘沼,Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架。Spark基于內(nèi)存計(jì)算朱盐,提高了在大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理的實(shí)時(shí)性群嗤,同時(shí)保證了高容錯(cuò)性和高可伸縮性,允許用戶將Spark部署在大量廉價(jià)硬件之上兵琳,形成集群狂秘。Spark得到了眾多大數(shù)據(jù)公司的支持,這些公司包括Hortonworks躯肌、IBM椰棘、Intel券膀、Cloudera、MapR、Pivotal糊啡、百度赞辩、阿里恩袱、騰訊快鱼、京東、攜程曙博、優(yōu)酷土豆拥刻。當(dāng)前百度的Spark已應(yīng)用于鳳巢、大搜索羊瘩、直達(dá)號(hào)泰佳、百度大數(shù)據(jù)等業(yè)務(wù)盼砍;阿里利用GraphX構(gòu)建了大規(guī)模的圖計(jì)算和圖挖掘系統(tǒng),實(shí)現(xiàn)了很多生產(chǎn)系統(tǒng)的推薦算法逝她;騰訊Spark集群達(dá)到8000臺(tái)的規(guī)模浇坐,是當(dāng)前已知的世界上最大的Spark集群。

2.特點(diǎn):

  • 快:快100倍(Hadoop 3 之前)
  • 易用:支持多種語(yǔ)言開發(fā)
  • 通用性:生態(tài)系統(tǒng)全黔宛。
  • 易用性:兼容Hadoop

3.最大特點(diǎn):基于內(nèi)存

  • Spark是MapReduce的替代方案近刘,而且兼容HDFS、Hive臀晃,可融入Hadoop的生態(tài)系統(tǒng)觉渴,以彌補(bǔ)MapReduce的不足。

三.Spark體系架構(gòu)

1.Spark集群的體系架構(gòu)圖解:

image.png

2.Spark的主從結(jié)構(gòu)

image.png

四.Spark的安裝部署

1.Spark的安裝部署方式有以下幾種模式:
(1)Standalone: 本機(jī)調(diào)試

(2)YARN
(3)Mesos
(4)Amazon EC2
2.執(zhí)行過(guò)程:
一個(gè)Worker有多個(gè) Executor徽惋。 Executor是任務(wù)的執(zhí)行者案淋,按階段(stage)劃分任務(wù)∠栈妫————> RDD

五.Spark的搭建:

1.準(zhǔn)備工作

  • jdk
  • 配置主機(jī)名
  • 免密碼登錄

2.偽分布式模式安裝:
(1)下載
(2)上傳到linux
(3)解壓
(4)修改配置文件

  • 配置文件:conf/spark-env.sh
cd /opt/module

mv spark-2.1.0-bin-hadoop2.7/ spark/       //重命名spark文件夾

cd /opt/module/spark/conf 

mv spark-env.sh.template spark-env.sh    //重命名配置文件

vi spark-env.sh

修改內(nèi)容如下:

export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata121      //主節(jié)點(diǎn)的服務(wù)器名
export SPARK_MASTER_PORT=7077           //主節(jié)點(diǎn)端口號(hào)
//下面的可以不寫踢京,默認(rèn)
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
image.png
  • 配置文件:conf/slave
mv slaves.template slaves

vi slaves

新增內(nèi)容:

bigdata121
image.png

(5)啟動(dòng):

cd /opt/module/spark

sbin/start-all.sh
image.png

(6)驗(yàn)證:192.168.127.121:8080

image.png

3.全分布的安裝部署:
(1)下載
(2)上傳到linux
(3)解壓
(4)修改配置文件

  • 配置文件:conf/spark-env.sh
cd /opt/module

mv spark-2.1.0-bin-hadoop2.7/ spark/       //重命名spark文件夾

cd /opt/module/spark/conf 

mv spark-env.sh.template spark-env.sh    //重命名配置文件

vi spark-env.sh

修改內(nèi)容如下:

export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata121      //主節(jié)點(diǎn)的服務(wù)器名
export SPARK_MASTER_PORT=7077           //主節(jié)點(diǎn)端口號(hào)
//下面的可以不寫,默認(rèn)
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
image.png
  • 配置文件:conf/slave
mv slaves.template slaves

vi slaves

新增內(nèi)容:

bigdata122
bigdata123
image.png

(5)拷貝到其他兩臺(tái)服務(wù)器

cd /opt/module


src -r spark/ bigdata122:/opt/module

src -r spark/ bigdata123:/opt/module
  

(6)啟動(dòng)Spark集群:

cd /opt/module/spark

sbin/start-all.sh

六.Spark的HA

1.回顧HA:
(1)HDFS,Yarn,Hbase,Spark:都是主從結(jié)構(gòu)
(2)單點(diǎn)故障
2.基于文件系統(tǒng)的單點(diǎn)恢復(fù)
(1)主要用于開發(fā)或測(cè)試環(huán)境宦棺。當(dāng)spark提供目錄保存spark Application和worker的注冊(cè)信息瓣距,并將他們的恢復(fù)狀態(tài)寫入該目錄中,這時(shí)代咸,一旦Master發(fā)生故障蹈丸,就可以通過(guò)重新啟動(dòng)Master進(jìn)程(sbin/start-master.sh),恢復(fù)已運(yùn)行的spark Application和worker的注冊(cè)信息呐芥。
(2)基于文件系統(tǒng)的單點(diǎn)恢復(fù)逻杖,主要是在spark-en.sh里對(duì)SPARK_DAEMON_JAVA_OPTS設(shè)置

配置參數(shù) 參考值
spark.deploy.recoveryMode 設(shè)置為FILESYSTEM開啟單點(diǎn)恢復(fù)功能,默認(rèn)值:NONE
spark.deploy.recoveryDirectory Spark 保存恢復(fù)狀態(tài)的目錄

3.基于Zookeeper的Standby Masters
(1)ZooKeeper提供了一個(gè)Leader Election機(jī)制思瘟,利用這個(gè)機(jī)制可以保證雖然集群存在多個(gè)Master弧腥,但是只有一個(gè)是Active的,其他的都是Standby潮太。當(dāng)Active的Master出現(xiàn)故障時(shí),另外的一個(gè)Standby Master會(huì)被選舉出來(lái)虾攻。由于集群的信息铡买,包括Worker, Driver和Application的信息都已經(jīng)持久化到ZooKeeper霎箍,因此在切換的過(guò)程中只會(huì)影響新Job的提交奇钞,對(duì)于正在進(jìn)行的Job沒有任何的影響。加入ZooKeeper的集群整體架構(gòu)如下圖所示漂坏。

image.png
配置參數(shù) 參考值
spark.deploy.recoveryMode 設(shè)置為ZOOKEEPER開啟單點(diǎn)恢復(fù)功能景埃,默認(rèn)值:NONE
spark.deploy.zookeeper.url ZooKeeper集群的地址
spark.deploy.zookeeper.dir Spark信息在ZK中的保存目錄媒至,默認(rèn):/spark

(2)修改spark-env.sh參考:

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 
-Dspark.deploy.zookeeper.dir=/spark"

(3)另外:每個(gè)節(jié)點(diǎn)上,需要將以下兩行注釋掉谷徙。

image.png

(4)同步到其他兩臺(tái)服務(wù)器
(5)ZooKeeper中保存的信息

image.png
image.png

七.執(zhí)行Spark Demo程序

1.使用Spark Shell
(1)spark-shell是Spark自帶的交互式Shell程序拒啰,方便用戶進(jìn)行交互式編程,用戶可以在該命令行下用scala編寫spark程序完慧。相當(dāng)于REPL ,作為一個(gè)獨(dú)立的Application運(yùn)行
(2)兩種模式:

  • 本地模式:spark-shell 不接任何參數(shù)谋旦,代表本地模式
  • 集群模式:spark-shell后面帶有參數(shù)

(3)啟動(dòng)Spark shell:

spark-shell

參數(shù)說(shuō)明:

--master spark://spark81:7077     //指定Master的地址
--executor-memory 2g      //指定每個(gè)worker可用內(nèi)存為2G
--total-executor-cores 2       //指定整個(gè)集群使用的cup核數(shù)為2個(gè)

例如:

spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2

(4)在Spark shell中編寫WordCount程序

sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")

參數(shù)說(shuō)明:

(5)wordcount程序册着,處理本地文件,把結(jié)果打印到屏幕上

scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
                .flatMap(_.split(" "))
                .map((_,1))
                .reduceByKey(_+_)
                .collect
                
                res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))

(6)wordcount程序脾歧,處理HDFS文件甲捏,結(jié)果保存在hdfs上

sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt")
                .flatMap(_.split(" "))
                .map((_,1))
                .reduceByKey(_+_)
                .saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")

(7)單步運(yùn)行wordcount --->RDD

scala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
                rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> 1+1
res2: Int = 2

scala> rdd1.collect
res3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)

scala> val rdd2 = rdd1.flatMap(_.split(" "))
                rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26

scala> rdd2.collect
res4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)

scala> val rdd3 = rdd2.map((_,1))
                rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28

scala> rdd3.collect
res5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))

scala> val rdd4 = rdd3.reduceByKey(_+_)
                rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30

scala> rdd4.collect
res6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))

(8)RDD 彈性分布式數(shù)據(jù)集
(9)Scala復(fù)習(xí):

  • flatten:把嵌套的結(jié)果展開
scala>List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
  • flatmap:相當(dāng)于一個(gè) map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
                    myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)                 
myList.flatMap(x=>x.map(_*2))

flatmao執(zhí)行過(guò)程:

  • 將 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 調(diào)用 map(_*2) 方法。x 代表一個(gè)List
  • flatten
    2.在IDEA中編寫WordCount程序
    (1)需要的jar包:$SPARK_HOME/jars/*.jar
    (2)創(chuàng)建Scala Project鞭执,并創(chuàng)建Scala Object司顿、或者Java Class
    (3)書寫源代碼,并打成jar包蚕冬,上傳到Linux

Scala版本

image.png

(4)運(yùn)行程序:

spark-submit --master spark://spark81:7077 
--class mydemo.WordCount jars/wc.jar 
hdfs://192.168.88.111:9000/data/data.txt 
hdfs://192.168.88.111:9000/output/spark/wc1

Java版本(直接輸出在屏幕)

image.png

(4)運(yùn)行程序:

spark-submit --master spark://spark81:7077 
--class mydemo.JavaWordCount jars/wc.jar 
hdfs://192.168.88.111:9000/data/data.txt

八.Spark運(yùn)行機(jī)制及原理分析

1.WordCount執(zhí)行的流程分析

image.png

2.Spark提交任務(wù)的流程析

image.png

九.RDD和RDD特性免猾,RDD的算子

1.RDD:彈性分布式數(shù)據(jù)集
(1)什么是RDD?

  • RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集囤热,是Spark中最基本的數(shù)據(jù)抽象猎提,它代表一個(gè)不可變、可分區(qū)旁蔼、里面的元素可并行計(jì)算的集合锨苏。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性棺聊。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中伞租,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度限佩。

(2)RDD的屬性:

  • 一組分片(Partition)葵诈,即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō)祟同,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理作喘,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù)晕城,如果沒有指定泞坦,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目砖顷。
  • 一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)贰锁。Spark中RDD的計(jì)算是以分片為單位的赃梧,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合豌熄,不需要保存每次計(jì)算的結(jié)果授嘀。
  • RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD房轿,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系粤攒。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù)囱持,而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算夯接。
  • 一個(gè)Partitioner,即RDD的分片函數(shù)纷妆。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù)盔几,一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner掩幢。只有對(duì)于于key-value的RDD逊拍,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None际邻。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量芯丧,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。
  • 一個(gè)列表世曾,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)缨恒。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置轮听。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念骗露,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置血巍。

2.如何創(chuàng)建RDD
(1)通過(guò)SparkContext.parallelize方法來(lái)創(chuàng)建(通過(guò)sc.parallelize進(jìn)行創(chuàng)建)

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
            
scala> rdd1.partitions.length
res35: Int = 3
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29

scala> rdd1.partitions.length
res36: Int = 2

(2)通過(guò)外部數(shù)據(jù)源來(lái)創(chuàng)建

sc.textFile()           
scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd2:org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29

(3)RDD的類型:TransformationAction
3.RDD的基本原理:

image.png

4.Transformation
(1)RDD中的所有轉(zhuǎn)換都是延遲加載的萧锉,也就是說(shuō),它們并不會(huì)直接計(jì)算結(jié)果述寡。相反的柿隙,它們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集(例如一個(gè)文件)上的轉(zhuǎn)換動(dòng)作。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的動(dòng)作時(shí)鲫凶,這些轉(zhuǎn)換才會(huì)真正運(yùn)行优俘。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。

轉(zhuǎn)換 含義
map(func) 返回一個(gè)新的RDD掀序,該RDD由每一個(gè)輸入元素經(jīng)過(guò)func函數(shù)轉(zhuǎn)換后組成
filter(func) 返回一個(gè)新的RDD,該RDD由經(jīng)過(guò)func函數(shù)計(jì)算后返回值為true的輸入元素組成
flatMap(func) 類似于map惭婿,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列不恭,而不是單一元素)
mapPartitions(func) 類似于map叶雹,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行,因此在類型為T的RDD上運(yùn)行時(shí)换吧,func的函數(shù)類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似于mapPartitions折晦,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運(yùn)行時(shí)沾瓦,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對(duì)數(shù)據(jù)進(jìn)行采樣满着,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子
union(otherDataset) 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD
intersection(otherDataset) 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD
distinct([numTasks])) 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD
groupByKey([numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用贯莺,返回一個(gè)(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用风喇,返回一個(gè)(K,V)的RDD,使用指定的reduce函數(shù)缕探,將相同key的值聚合到一起魂莫,與groupByKey類似,reduce任務(wù)的個(gè)數(shù)可以通過(guò)第二個(gè)可選的參數(shù)來(lái)設(shè)置
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])
sortByKey([ascending], [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用爹耗,K必須實(shí)現(xiàn)Ordered接口耙考,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用潭兽,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用倦始,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

5.Action

動(dòng)作 含義
reduce(func) 通過(guò)func函數(shù)聚集RDD中的所有元素,這個(gè)功能必須是課交換且可并聯(lián)的
collect() 在驅(qū)動(dòng)程序中山卦,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
count() 返回RDD的元素個(gè)數(shù)
first() 返回RDD的第一個(gè)元素(類似于take(1))
take(n) 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個(gè)數(shù)組鞋邑,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分怒坯,seed用于指定隨機(jī)數(shù)生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng)炫狱,對(duì)于每個(gè)元素,Spark將會(huì)調(diào)用toString方法剔猿,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下视译,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。
saveAsObjectFile(path)
countByKey() 針對(duì)(K,V)類型的RDD归敬,返回一個(gè)(K,Int)的map酷含,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)。
foreach(func) 在數(shù)據(jù)集的每一個(gè)元素上汪茧,運(yùn)行函數(shù)func進(jìn)行更新椅亚。

十.RDD特性

1.RDD的緩存機(jī)制:
(1)作用:提高性能
(2)使用:標(biāo)識(shí)RDD可以被緩存 persist cache
(3)可以緩存的位置:

2.RDD的容錯(cuò)機(jī)制:通過(guò)檢查點(diǎn)來(lái)實(shí)現(xiàn)
(1)
(1)復(fù)習(xí)檢查點(diǎn):HDFS中的檢查點(diǎn):有SecondaryNamenode來(lái)實(shí)現(xiàn)日志的合并。
(2)RDD的檢查點(diǎn):容錯(cuò)

  • 概念:血統(tǒng) Lineage
  • 理解:表示任務(wù)執(zhí)行的生命周期舱污。
  • WordCount textFile ---> redceByKey
  • 如果血統(tǒng)越長(zhǎng)呀舔,越容易出錯(cuò)。
  • 假如有檢查點(diǎn)扩灯,可以從最近的一個(gè)檢查點(diǎn)開始媚赖,往后面計(jì)算霜瘪。不用重頭計(jì)算。

(3)RDD檢查點(diǎn)的類型:
a.基于本地目錄:需要將Spark shell 或者任務(wù)運(yùn)行在本地模式上(setMaster("local"))

image.png

b.HDFS目錄:用于生產(chǎn),集群模式

image.png
sc.setCheckPointDir(目錄)
                
//舉例:設(shè)置檢查點(diǎn)
scala> var rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24

//設(shè)置檢查點(diǎn)目錄:
scala> sc.setCheckpointDir("hdfs://192.168.109.131:8020/sparkckpt")

//標(biāo)識(shí)rdd1可以執(zhí)行檢查點(diǎn)操作
scala> rdd1.checkpoint

scala> rdd1.count
res2: Long = 923452 

3.依賴關(guān)系:寬依賴惧磺,窄依賴
(1)RDD的依賴關(guān)系:

  • RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型颖对,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
    (2)窄依賴指的是每一個(gè)父RDD的Partition最多被子RDD的一個(gè)Partition使用
    總結(jié):窄依賴我們形象的比喻為獨(dú)生子女

(3)寬依賴指的是多個(gè)子RDD的Partition會(huì)依賴同一個(gè)父RDD的Partition
總結(jié):寬依賴我們形象的比喻為超生

4.Spark任務(wù)中的Stage

  • DAG(Directed Acyclic Graph)叫做有向無(wú)環(huán)圖磨隘,原始的RDD通過(guò)一系列的轉(zhuǎn)換就就形成了DAG缤底,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage,對(duì)于窄依賴番捂,partition的轉(zhuǎn)換處理在Stage中完成計(jì)算个唧。對(duì)于寬依賴,由于有Shuffle的存在白嘁,只能在parent RDD處理完成后坑鱼,才能開始接下來(lái)的計(jì)算,因此寬依賴是劃分Stage的依據(jù)絮缅。
image.png

十一.RDD的高級(jí)算子

1.mapPartitionsWithIndex:對(duì)RDD中的每個(gè)分區(qū)(帶有下標(biāo))進(jìn)行操作鲁沥,下標(biāo)用index表示
通過(guò)這個(gè)算子,我們可以獲取分區(qū)號(hào)耕魄。

def mapPartitionsWithIndex[U](
        f: (Int, Iterator[T]) ? Iterator[U], 
        preservesPartitioning: Boolean = false)(
        implicit arg0: ClassTag[U]): RDD[U]
        
//參數(shù):f是個(gè)函數(shù)參數(shù) f 中第一個(gè)參數(shù)是Int画恰,代表分區(qū)號(hào),第二個(gè)Iterator[T]代表分區(qū)中的元素

例如:

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
             | iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
             | }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]

scala> rdd1.mapPartitions
        mapPartitions   mapPartitionsWithIndex

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
        [partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ], [partId : 2 , value = 6 ], [partId : 2 , value = 7 ], [partId : 2 , value = 8 ])

2.aggregate:聚合操作吸奴。類似于分組允扇。
(1)先對(duì)局部進(jìn)行聚合操作,再對(duì)全局進(jìn)行聚合操作则奥。

//調(diào)用聚合操作
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd2.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
        [partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])

scala> import scala.math._
import scala.math._

scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7

說(shuō)明:
(2)對(duì)字符串操作

scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27

scala> rdd2.aggregate("")(_+_,_+_)
res11: String = abcdef

scala> rdd2.aggregate("*")(_+_,_+_)
res12: String = **def*abc

(3)復(fù)雜的例子:
a.

scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27
        
scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
             | iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
             | }

scala> rdd3.mapPartitionsWithIndex(fun1).collect
res17: Array[String] = Array(
        [partId : 0 , value = 12 ], [partId : 0 , value = 23 ], [partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])
        
scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res13: String = 42

執(zhí)行過(guò)程:

  • 第一個(gè)分區(qū):
    (a)第一次比較: "" "12" 長(zhǎng)度最大值 2 2-->"2"
    (b)第二次比較: “2” “23” 長(zhǎng)度最大值 2 2-->"2"

  • 第二個(gè)分區(qū):
    (a)第一次比較: "" "345" 長(zhǎng)度最大值 3 3-->"3"
    (b)第二次比較: “3” “4567” 長(zhǎng)度最大值 4 4-->"4"

b.

rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
        
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res18: String = 11

執(zhí)行過(guò)程:

  • 第一個(gè)分區(qū):
    第一次比較: "" "12" 長(zhǎng)度最小值 0 0-->"0"
    第二次比較: “0” “23” 長(zhǎng)度最小值 1 1-->"1"

  • 第二個(gè)分區(qū):
    第一次比較: "" "345" 長(zhǎng)度最小值 0 0-->"0"
    第二次比較: “0” “4567” 長(zhǎng)度最小值 1 1-->"1"
    c.aggregateByKey:類似于aggregate考润,區(qū)別:操作的是 key value 的數(shù)據(jù)類型。

3.其他高級(jí)算子:

十二.編程案例

1.分析日志
(1)需求:找到訪問(wèn)量最高的兩個(gè)網(wǎng)頁(yè)

  • 第一步:對(duì)網(wǎng)頁(yè)的訪問(wèn)量求和
  • 第二步:排序读处,降序

(2)創(chuàng)建自定義分區(qū)

(3)使用JDBCRDD 操作數(shù)據(jù)庫(kù)

(4)操作數(shù)據(jù)庫(kù):把結(jié)果存放到數(shù)據(jù)庫(kù)中

Spark SQL

Spark sql基礎(chǔ)

1.什么是Spark SQL

  • Spark SQL 是spark的一個(gè)模塊糊治。來(lái)處理 結(jié)構(gòu)化 的數(shù)據(jù),不能處理非結(jié)構(gòu)化的數(shù)據(jù)
    2.特點(diǎn):
    (1)容易集成:
  • 不需要單獨(dú)安裝罚舱。

(2)統(tǒng)一的數(shù)據(jù)訪問(wèn)方式

  • 結(jié)構(gòu)化數(shù)據(jù)的類型:JDBC JSon Hive parquer文件 都可以作為Spark SQL 的數(shù)據(jù)源
  • 對(duì)接多種數(shù)據(jù)源井辜,且使用方式類似
    (3)完全兼容hive
  • 把Hive中的數(shù)據(jù),讀取到Spark SQL中運(yùn)行管闷。

(4) 支持標(biāo)準(zhǔn)的數(shù)據(jù)連接

  • JDBC

3.為什么學(xué)習(xí)Spark SQL

  • 執(zhí)行效率比Hive高
  • hive 2.x 執(zhí)行引擎可以使用 Spark
    4.核心概念:表(DataFrame DataSet)
  • mysql中的表:表結(jié)構(gòu)粥脚、數(shù)據(jù)
  • DataFrame:Schema、RDD(數(shù)據(jù))
  • DataSet 在spark1.6以后包个,對(duì)DataFrame做了一個(gè)封裝刷允。
    5.創(chuàng)建DataFrame
    測(cè)試數(shù)據(jù):?jiǎn)T工表、部門表
    (1)第一種方式:使用case class
    a.定義Schema
    b.讀取文件
    c.把每行數(shù)據(jù),映射到Emp上
    d.生成DataFrame

(2)第二種方式 使用Spark Session
(3)直接讀取一個(gè)帶格式的文件树灶。

6.操作DataFrame
(1)DSL語(yǔ)句
(2)SQL語(yǔ)句
注意:不能直接執(zhí)行SQL搀菩,需要生成一個(gè)視圖,再執(zhí)行sql破托。
(3)多表查詢

7.操作DataSet
(1)跟DataFrame類似,是一套新的接口歧蒋。高級(jí)的Dataframe
(2)創(chuàng)建DataSet

  • 使用序列來(lái)創(chuàng)建DataSet土砂。
  • 使用JSON數(shù)據(jù)來(lái)創(chuàng)建DataSet
  • 使用其他數(shù)據(jù)

(3)DataSet案例

(4)多表查詢

  • 創(chuàng)建部門表
  • 創(chuàng)建員工表
  • 執(zhí)行多表查詢:等值連接
  • 多表連接后再篩選

7.Spark SQL中的視圖
(1)視圖是一個(gè)虛表,不存儲(chǔ)數(shù)據(jù)谜洽。
(2)兩種類型:

  • 普通視圖(本地視圖):只在當(dāng)前Session中有效萝映。createOrReplaceTempView createTempView
  • 全局視圖: createGlobalTempView:在不同的Session中都有用 把全局視圖創(chuàng)建在命名空間中:global_temp中。類似于一個(gè)庫(kù)阐虚。

二.使用數(shù)據(jù)源

1.在Spark SQL中序臂,可以使用各種各樣的數(shù)據(jù)源來(lái)操作。 結(jié)構(gòu)化
2.使用load函數(shù)实束、save函數(shù)

  • load函數(shù)是加載數(shù)據(jù)奥秆,save是存儲(chǔ)數(shù)據(jù)
    注意:使用load 或 save時(shí),默認(rèn)是Parquet文件咸灿。列式存儲(chǔ)文件构订。
    3.Parquet文件:列式存儲(chǔ)文件,是Spark SQL 默認(rèn)的數(shù)據(jù)源
  • 就是一個(gè)普通的文件

(1)把其他文件避矢,轉(zhuǎn)換成Parquet文件
(2)支持Schema的合并

4.json文件

5.JDBC
(1)使用JDBC操作關(guān)系型數(shù)據(jù)庫(kù)悼瘾,加載到Spark中進(jìn)行分析和處理。
(2)方式一:
(3)方式二:

6.使用hive
(1)spark SQL 完全兼容hive
(2)需要進(jìn)行配置

  • 拷貝一下文件到spark/conf目錄下:
  • Hive 配置文件: hive-site.xml
  • Hadoop 配置文件:core-site.xml hdfs-site.xml

(3)配置好后审胸,重啟spark

(4)啟動(dòng)Hadoop 與 hive

三.在IDE中開發(fā)Spark SQL

四.性能優(yōu)化

1.用內(nèi)存中緩存表的數(shù)據(jù)
直接讀取內(nèi)存的值亥宿,來(lái)提高性能
2.了解性能優(yōu)化的相關(guān)參數(shù):參考講義

Spark Streaming

一.常用的實(shí)時(shí)計(jì)算引擎(流式計(jì)算)

1.Apache Storm:真正的流式計(jì)算

2.Spark Streaming :嚴(yán)格上來(lái)說(shuō),不是真正的流式計(jì)算(實(shí)時(shí)計(jì)算)
把連續(xù)的流式數(shù)據(jù)砂沛,當(dāng)成不連續(xù)的RDD
本質(zhì):是一個(gè)離散計(jì)算(不連續(xù))
3.Apache Flink:真正的流式計(jì)算烫扼。與Spark Streaming相反。
把離散的數(shù)據(jù)尺上,當(dāng)成流式數(shù)據(jù)來(lái)處理
4.JStorm

二.Spark Streaming基礎(chǔ)

1.什么是 Spark Streaming材蛛。

  • Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
    易于構(gòu)建靈活的、高容錯(cuò)的流式系統(tǒng)怎抛。

2.特點(diǎn):

  • 易用卑吭,已經(jīng)集成到Spark中
  • 容錯(cuò)性:底層RDD,RDD本身具有容錯(cuò)機(jī)制
  • 支持多種語(yǔ)言:Java Scala Python

3.演示官方的Demo
往Spark Streaming中發(fā)送字符串马绝,Spark 接收到以后豆赏,進(jìn)行計(jì)數(shù)
使用消息服務(wù)器 netcat Linux自帶
yum install nc.x86_64
nc -l 1234
注意:總核心數(shù) 大于等于2。一個(gè)核心用于接收數(shù)據(jù),另一個(gè)用于處理數(shù)據(jù)
在netcat中寫入數(shù)據(jù) Spark Streaming可以取到

4.開發(fā)自己的NetWorkWordCount程序掷邦,和Spark Core類似
問(wèn)題:Hello Hello
Hello World
現(xiàn)在現(xiàn)象:(Hello,2)
(Hello , 1) (World , 1)
能不能累加起來(lái)白胀?保存記錄下以前的狀態(tài)?
通過(guò)Spark Streaming提供的算子來(lái)實(shí)現(xiàn)

三.高級(jí)特性:

1.什么是DStream抚岗?離散流

  • 把連續(xù)的數(shù)據(jù)變成不連續(xù)的RDD
  • 因?yàn)镈Stream的特性或杠,導(dǎo)致,Spark Streaming不是真正的流式計(jì)算

2.重點(diǎn)算子講解
(1)updateStateByKey
默認(rèn)情況下宣蔚,Spark Streaming不記錄之前的狀態(tài)向抢,每次發(fā)數(shù)據(jù),都會(huì)從0開始
現(xiàn)在使用本算子胚委,實(shí)現(xiàn)累加操作挟鸠。
(2)transform

3.窗口操作

  • 窗口:對(duì)落在窗口內(nèi)的數(shù)據(jù)進(jìn)行處理,也是一個(gè)DStream亩冬,RDD
  • 舉例:每10秒鐘把過(guò)去30秒的數(shù)據(jù)采集過(guò)來(lái)
  • 注意:先啟動(dòng)nc 再啟動(dòng)程序 local[2]

4.集成Spark SQL : 使用SQL語(yǔ)句來(lái)處理流式數(shù)據(jù)
5.緩存和持久化:和RDD一樣
6.支持檢查點(diǎn):和RDD一樣

四.數(shù)據(jù)源

Spark Streaming是一個(gè)流式計(jì)算引擎艘希,就需要從外部數(shù)據(jù)源來(lái)接收數(shù)據(jù)
1.基本的數(shù)據(jù)源

  • 文件流:監(jiān)控文件系統(tǒng)的變化,如果文件有增加硅急,讀取文件中的內(nèi)容
    希望Spark Streaming監(jiān)控一個(gè)文件夾覆享,如果有變化,則把變化采集過(guò)來(lái)
  • RDD隊(duì)列流:可以從隊(duì)列中獲取數(shù)據(jù)
  • 套接字流:socketTextStream

2.高級(jí)數(shù)據(jù)源
(1)Flume
(2)Spark SQL 對(duì)接flume有多種方式:

  • push方式:flume將數(shù)據(jù)推送給Spark Streaming
  • custom sink 模式:比第一種有更好的健壯性和容錯(cuò)性铜秆。使用這種方式淹真,flume配置一個(gè)sink。
  • 使用官方提供的spark sink組件
    需要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷貝到flume lib下
    需要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷貝到IDE的lib下添加到build path中
(3)Kafka
    在講Kafka時(shí)连茧,舉例核蘸。

四.性能優(yōu)化的參數(shù)

(1)性能優(yōu)化:
spark submit的時(shí)候,程序報(bào)OOM錯(cuò)誤
程序跑的很慢
(2)方法:調(diào)整spark參數(shù)
conf.set...

性能調(diào)優(yōu)

一.Spark 性能優(yōu)化概覽:

  • Spark的計(jì)算本質(zhì)是啸驯,分布式計(jì)算客扎。
  • 所以,Spark程序的性能可能因?yàn)榧褐械娜魏我蛩爻霈F(xiàn)瓶頸:CPU罚斗、網(wǎng)絡(luò)帶寬徙鱼、或者內(nèi)存。
  • CPU针姿、網(wǎng)絡(luò)帶寬袱吆,是運(yùn)維來(lái)維護(hù)的。
  • 聚焦點(diǎn):內(nèi)存距淫。
  • 如果內(nèi)存能夠容納下所有的數(shù)據(jù)绞绒,那就不需要調(diào)優(yōu)了。
  • 如果內(nèi)存比較緊張榕暇,不足以放下所有數(shù)據(jù)(10億量級(jí)---500G),需要對(duì)內(nèi)存的使用進(jìn)行性能優(yōu)化蓬衡。
  • 比如:使用某些方法減少內(nèi)存的消耗喻杈。

二.Spark性能優(yōu)化,主要針對(duì)在內(nèi)存的使用調(diào)優(yōu)狰晚。

三.Spark性能優(yōu)化的技術(shù):

1.使用高性能序列化類庫(kù)
2.優(yōu)化數(shù)據(jù)結(jié)構(gòu)
3.對(duì)于多次使用的RDD進(jìn)行持久化筒饰、checkpoint
4.持久化級(jí)別:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
5.Java虛擬機(jī)垃圾回收調(diào)優(yōu)
6.Shuffle調(diào)優(yōu),1.x版本中壁晒,90%的性能問(wèn)題瓷们,都是由于Shuffle導(dǎo)致的。

四.其他性能優(yōu)化:

1.提高并行度
2.廣播共享數(shù)據(jù)
等等秒咐。换棚。。

五.診斷Spark內(nèi)存使用:首先要看到內(nèi)存使用情況反镇,才能進(jìn)行針對(duì)性的優(yōu)化。

1.內(nèi)存花費(fèi):
(1)每個(gè)Java對(duì)象娘汞,都有一個(gè)對(duì)象頭歹茶,占用16字節(jié),包含一些對(duì)象的元信息你弦,比如指向他的類的指針惊豺。

  • 如果對(duì)象本身很小,比如int禽作,但是他的對(duì)象頭比對(duì)象自己還大尸昧。

(2)Java的String對(duì)象,會(huì)比他內(nèi)存的原始數(shù)據(jù)旷偿,多出40個(gè)字節(jié)烹俗。

  • String內(nèi)部使用的char數(shù)組來(lái)保存內(nèi)部的字符串序列,并且還要保存諸如輸出長(zhǎng)度之類的信息萍程。
  • char使用的是UTF-16編碼幢妄,每個(gè)字符會(huì)占2個(gè)字節(jié)。比如茫负,包含10個(gè)字符的String蕉鸳,2*10+40=60字節(jié)
    (3)Java中的集合類型,比如HashMap和LinkedList忍法,內(nèi)部使用鏈表數(shù)據(jù)結(jié)構(gòu)潮尝。
  • 鏈表中的每個(gè)數(shù)據(jù),使用Entry對(duì)象包裝饿序。
  • Entry對(duì)象勉失,不光有對(duì)象頭,還有指向下一個(gè)Entry的指針嗤堰,占用8字節(jié)戴质。

(4)元素類型為原始數(shù)據(jù)類型(int)度宦,內(nèi)部通常會(huì)使用原始數(shù)據(jù)類型的包裝類型(Integer)來(lái)存儲(chǔ)元素。
2.如何判斷Spark程序消耗內(nèi)存情況告匠?:答案是預(yù)估
(1)設(shè)置RDD的并行度戈抄。

  • 兩種方法創(chuàng)建RDD,parallelize() textFile() 在這兩個(gè)方法中后专,傳入第二個(gè)參數(shù)划鸽,設(shè)置RDD的partition數(shù)量。
  • 在SparkConfig中設(shè)置一個(gè)參數(shù):
  • spark.default.parallelism
  • 可以統(tǒng)一設(shè)置這個(gè)application中所有RDD的partition數(shù)量

(2)將RDD緩存 cache()

(3)觀察日志:driver日志

/usr/local/spark-2.1.0-bin-hadoop2.7/work
            19/04/13 22:01:05 INFO MemoryStore: Block rdd_3_1 stored as values in memory (estimated size 26.0 MB, free 339.9 MB)
            19/04/13 22:01:06 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 26.7 MB, free 313.2 MB)

(4)將這個(gè)內(nèi)存信息相加戚哎,就是RDD內(nèi)存占用量裸诽。

六.使用高性能序列化類庫(kù)

1.數(shù)據(jù)序列化概述

  • 數(shù)據(jù)序列化,就是將對(duì)象或者數(shù)據(jù)結(jié)構(gòu)型凳,轉(zhuǎn)換成特定的格式丈冬,使其可在網(wǎng)絡(luò)中傳輸,或存儲(chǔ)在內(nèi)存或文件中甘畅。

  • 反序列化埂蕊,是相反的操作,將對(duì)象從序列化數(shù)據(jù)中還原出來(lái)疏唾。

  • 序列化后的數(shù)據(jù)格式蓄氧,可以是二進(jìn)制,xml槐脏,Json等任何格式喉童。

  • 對(duì)象、數(shù)據(jù)序列化的重點(diǎn)在于數(shù)據(jù)的交換與傳輸顿天。

  • 在任何分布式系統(tǒng)中堂氯,序列化都是扮演著一個(gè)重要的角色。

  • 如果使用的序列化技術(shù)牌废,操作很慢祖灰,或者序列化后的數(shù)據(jù)量還是很大,會(huì)讓分布式系統(tǒng)應(yīng)用程序性能下降很多畔规。

  • 所以局扶,Spark性能優(yōu)化的第一步,就是進(jìn)行序列化的性能優(yōu)化叁扫。

  • Spark自身默認(rèn)會(huì)在一些地方對(duì)數(shù)據(jù)進(jìn)行序列化三妈,比如Shuffle。另外莫绣,我們使用了外部數(shù)據(jù)(自定義類型)畴蒲,也要讓其課序列化。

  • Spark本身對(duì)序列化的便捷性和性能進(jìn)行了取舍

  • 默認(rèn)情況下:Spark傾向于序列化的便捷性对室,使用了Java自身提供的序列化機(jī)制模燥,很方便使用咖祭。

  • 但是,Java序列化機(jī)制性能不高蔫骂,序列化速度慢么翰,序列化后數(shù)據(jù)較大,比較占用內(nèi)存空間辽旋。

2.kryo

  • Spark支持使用kryo類庫(kù)來(lái)進(jìn)行序列化浩嫌。
  • 速度快,占用空間更小补胚,比Java序列化數(shù)據(jù)占用空間小10倍码耐。
    3.如何使用kryo序列化機(jī)制
    (1)設(shè)置Spark conf
bin/spark-submit will also read configuration options from conf/spark-defaults.conf, 
in which each line consists of a key and a value separated by whitespace. For example:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer   org.apache.spark.serializer.KryoSerializer

(2)使用kryo是,要求需要序列化的類溶其,要提前注冊(cè)骚腥,以獲得高性能

4.kryo類庫(kù)的優(yōu)化
(1)優(yōu)化緩存大小

  • 如果注冊(cè)的自定義類型,本身特別大(100個(gè)字段)瓶逃,會(huì)導(dǎo)致要序列化的對(duì)象太大桦沉。此時(shí)需要對(duì)kyro本身進(jìn)行優(yōu)化。因?yàn)閗ryo內(nèi)部的緩存金闽,可能不能存放這么大的class對(duì)象。
spark.kryoserializer.buffer.max  //設(shè)置這個(gè)參數(shù)剿骨,將其調(diào)大代芜。

(2)預(yù)先注冊(cè)自定義類型

  • 雖然不注冊(cè)自定義類型,kryo也可以正常工作浓利,但會(huì)保存一份他的全限定類名挤庇,耗費(fèi)內(nèi)存。
  • 推薦預(yù)先注冊(cè)要序列化的自定義類型贷掖。

七.優(yōu)化數(shù)據(jù)結(jié)構(gòu)

1.概述

  • 要減少內(nèi)存的消耗嫡秕,除了使用高效的序列化類庫(kù)外,還要優(yōu)化數(shù)據(jù)結(jié)構(gòu)苹威。
  • 避免Java語(yǔ)法特性中所導(dǎo)致的額外內(nèi)存開銷昆咽。
  • 核心:優(yōu)化算子函數(shù)內(nèi)部使用到的局部數(shù)據(jù)或算子函數(shù)外部的數(shù)據(jù)。
  • 目的:減少對(duì)內(nèi)存的消耗和占用牙甫。

2.如何做掷酗?
(1)優(yōu)先使用數(shù)組以及字符串,而不是集合類窟哺。即:優(yōu)先使用Array泻轰,而不是ArrayList、LinkedList且轨、HashMap

  • 使用int[] 會(huì)比List<Integer> 節(jié)省內(nèi)存
    (2)將對(duì)象轉(zhuǎn)換成字符串浮声。
  • 企業(yè)中虚婿,將HashMap咧党、List這種數(shù)據(jù)震叮,統(tǒng)一用String拼接成特殊格式的字符串
Map<Integer,Person> persons = new HashMap<Integer,Person>()
可以優(yōu)化為:
"id:name,address"
String persons = "1:Andy,Beijing|2:Tom,Tianjin...."

(3)避免使用多層嵌套對(duì)象結(jié)構(gòu)
(4)對(duì)于能夠避免的場(chǎng)景,盡量使用int代替String

  • 雖然String比List效率高饮六,但int類型占用更少內(nèi)存,比如:數(shù)據(jù)庫(kù)主鍵羡洁,id玷过,推薦使用自增的id,而不是uuid

八.rdd.cache checkpoint

九.持久化級(jí)別:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化

十.Java虛擬機(jī)的調(diào)優(yōu)

1.概述

  • 如果在持久化RDD的時(shí)候筑煮,持久化了大量的數(shù)據(jù)辛蚊,那么Java虛擬機(jī)的垃圾回收就可能成為一個(gè)瓶頸
  • Java虛擬機(jī)會(huì)定期進(jìn)行垃圾回收,此時(shí)會(huì)追蹤所有Java對(duì)象真仲,并且在垃圾回收時(shí)袋马,找到那些已經(jīng)不再使用的對(duì)象。
  • 清理舊對(duì)象秸应,給新對(duì)象騰出空間虑凛。
  • 垃圾回收的性能開銷,是與內(nèi)存中的對(duì)象數(shù)量成正比软啼。
  • 在做Java虛擬機(jī)調(diào)優(yōu)之前桑谍,必須先做好上面的調(diào)優(yōu)工作,這樣才有意義祸挪。
  • 必須注意順序

2.Spark GC原理
見圖片

3.監(jiān)測(cè)垃圾回收

  • 我們可以進(jìn)行監(jiān)測(cè)锣披,比如多久進(jìn)行一次垃圾回收以及耗費(fèi)的時(shí)間等等。
spark-submit腳本中贿条,添加一個(gè)配置
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimesStamps"

注意:這個(gè)是輸出到worker日志中雹仿,而不是driver日志。
/usr/local/spark-2.1.0-bin-hadoop2.7/logs  worker日志
/usr/local/spark-2.1.0-bin-hadoop2.7/work  driver日志

4.優(yōu)化Executor內(nèi)存比例
(1)目的:減少GC次數(shù)整以。

  • 對(duì)于GC調(diào)優(yōu)來(lái)說(shuō)胧辽,最重要的就是調(diào)節(jié),RDD的緩存占用的內(nèi)存空間 與 算子執(zhí)行時(shí)創(chuàng)建對(duì)象所占用的內(nèi)存空間 的比例
  • 對(duì)于默認(rèn)情況公黑,Spark使用每個(gè)Executor 60% 的內(nèi)存空間來(lái)緩存RDD邑商,在task運(yùn)行期間所創(chuàng)建的對(duì)象,只有40%內(nèi)存空間來(lái)存放凡蚜。
//使用:
conf.set("spark.storage.memoryFraction",0.5)

5.Java GC 調(diào)優(yōu) (-)

十一.shuffle原理

1.優(yōu)化前
圖片
2.優(yōu)化后
圖片

十二.其他調(diào)優(yōu)

1.提高并行度
2.廣播共享數(shù)據(jù)

Spark Mllib:MLlib 是 Spark 可以擴(kuò)展的機(jī)器學(xué)習(xí)庫(kù)奠骄。

一.MLlib概述

MLlib 是 Spark 可以擴(kuò)展的機(jī)器學(xué)習(xí)庫(kù)。
Spark在機(jī)器學(xué)習(xí)方面具有得天獨(dú)厚的有事番刊,有以下幾個(gè)原因:
1.機(jī)器學(xué)習(xí)算法一般都有多個(gè)步驟迭代計(jì)算含鳞,需要在多次迭代后,獲得足夠小的誤差或者收斂才會(huì)停止芹务。

    double wucha = 1.0
    while(wucha>=0.00001){
        建模  wucha -= 某個(gè)值
    }
    
    模型計(jì)算完畢
    
    當(dāng)?shù)褂肏adoop的MapReduce計(jì)算框架時(shí)蝉绷,每次都要讀寫硬盤以及任務(wù)啟動(dòng)工作鸭廷,導(dǎo)致很大的IO開銷。
    而Spark基于內(nèi)存的計(jì)算模型天生擅長(zhǎng)迭代計(jì)算熔吗。只有在必要時(shí)辆床,才會(huì)讀寫硬盤。
    所以Spark是機(jī)器學(xué)習(xí)比較理想的平臺(tái)桅狠。

2.通信讼载,Hadoop的MapReduce計(jì)算框架,通過(guò)heartbeat方式來(lái)進(jìn)行通信和傳遞數(shù)據(jù)中跌,執(zhí)行速度慢咨堤。

  • spark 有高效的 Akka 和 Netty 的通信系統(tǒng),通行效率高漩符。
  • SPark MLlib 是Spark 對(duì)常用的機(jī)器學(xué)習(xí)算法的實(shí)現(xiàn)庫(kù)一喘,同時(shí)包括相關(guān)測(cè)試和數(shù)據(jù)生成器。

二.什么是機(jī)器學(xué)習(xí)嗜暴?

1.機(jī)器學(xué)習(xí)的定義凸克。
A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P,
if its performance at tasks in T, as measured by P, improves with experience E。

2.三個(gè)關(guān)鍵詞:算法闷沥、經(jīng)驗(yàn)萎战、模型評(píng)價(jià)
在數(shù)據(jù)的基礎(chǔ)上,通過(guò)算法構(gòu)建出模型舆逃,并進(jìn)行評(píng)價(jià)
如果達(dá)到要求蚂维,則用該模型測(cè)試其他數(shù)據(jù)
如果不達(dá)到要求,要調(diào)整算法來(lái)重新建立模型颖侄,再次進(jìn)行評(píng)估
循環(huán)往復(fù),知道獲得滿意的經(jīng)驗(yàn)

3.應(yīng)用:金融反欺詐享郊、語(yǔ)音識(shí)別览祖、自然語(yǔ)言處理、翻譯炊琉、模式識(shí)別展蒂、智能控制等等

4.基于大數(shù)據(jù)的機(jī)器學(xué)習(xí)
(1)傳統(tǒng)的機(jī)器學(xué)習(xí)算法,由于技術(shù)和單機(jī)存儲(chǔ)的現(xiàn)值苔咪,只能在少量數(shù)據(jù)上使用锰悼。即,依賴于數(shù)據(jù)抽樣团赏。
(2)傳統(tǒng)的機(jī)器學(xué)習(xí)存在的問(wèn)題:很難做好隨機(jī)箕般,導(dǎo)致學(xué)習(xí)的模型不準(zhǔn)確。
(3)在大數(shù)據(jù)上進(jìn)行機(jī)器學(xué)習(xí)舔清,直接處理全量數(shù)據(jù)并進(jìn)行大量迭代計(jì)算丝里。
(4)Spark本身計(jì)算優(yōu)勢(shì)曲初,適合機(jī)器學(xué)習(xí)。
(5)另外 spark-shell pyspark 都可以提供及時(shí)查詢工具

5.MLlib

  • MLlib是Spark機(jī)器學(xué)習(xí)庫(kù)杯聚,簡(jiǎn)化機(jī)器學(xué)習(xí)的工程實(shí)踐工作臼婆,方便擴(kuò)展到更大規(guī)模。

  • 集成了通用的學(xué)習(xí)算法:分類幌绍、回歸颁褂、聚類、協(xié)同過(guò)濾傀广、降維等等

  • 另外颁独,MLlib本身在Spark中,數(shù)據(jù)清洗主儡、SQL奖唯、建模放在一起。

三糜值、線性回歸

四丰捷、余弦相似性
https://blog.csdn.net/u012160689/article/details/15341303

Spark Graphx

一.Spark Graphx 是什么?

1.是Spark 的一個(gè)模塊寂汇,主要用于進(jìn)行以圖為核心的計(jì)算病往,還有分布式圖計(jì)算
2.Graphx 底層基于RDD計(jì)算,和RDD共用一種存儲(chǔ)形態(tài)骄瓣。在展示形態(tài)上停巷,可以用數(shù)據(jù)集來(lái)表示,也可以用圖來(lái)表示榕栏。

二.Spark GraphX 有哪些抽象畔勤?

1.頂點(diǎn)
RDD[(VertexId,VD)]表示
VertexId 代表了頂點(diǎn)的ID,是Long類型
VD 是頂點(diǎn)的屬性扒磁,可以是任何類型
2.邊
RDD[Edge[ED]]表示
Edge表示一個(gè)邊
包含一個(gè)ED類型參數(shù)來(lái)設(shè)定屬性
另外庆揪,邊還包含了源頂點(diǎn)ID和目標(biāo)頂點(diǎn)ID

3.三元組
三元組結(jié)構(gòu)用RDD[EdgeTriplet[VD,ED]]表示
三元組包含一個(gè)邊、邊的屬性妨托、源頂點(diǎn)ID缸榛、源頂點(diǎn)屬性、目標(biāo)頂點(diǎn)ID兰伤、目標(biāo)頂點(diǎn)屬性内颗。

4.圖
Graph表示,通過(guò)頂點(diǎn)和邊來(lái)構(gòu)建敦腔。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末均澳,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌负懦,老刑警劉巖筒捺,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異纸厉,居然都是意外死亡系吭,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門颗品,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)肯尺,“玉大人,你說(shuō)我怎么就攤上這事躯枢≡蛞鳎” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵锄蹂,是天一觀的道長(zhǎng)氓仲。 經(jīng)常有香客問(wèn)我,道長(zhǎng)得糜,這世上最難降的妖魔是什么敬扛? 我笑而不...
    開封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮朝抖,結(jié)果婚禮上啥箭,老公的妹妹穿的比我還像新娘。我一直安慰自己治宣,他們只是感情好急侥,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著侮邀,像睡著了一般坏怪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上绊茧,一...
    開封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天铝宵,我揣著相機(jī)與錄音,去河邊找鬼按傅。 笑死捉超,一個(gè)胖子當(dāng)著我的面吹牛胧卤,可吹牛的內(nèi)容都是我干的唯绍。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼枝誊,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼况芒!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤绝骚,失蹤者是張志新(化名)和其女友劉穎耐版,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體压汪,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡粪牲,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了止剖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片腺阳。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖穿香,靈堂內(nèi)的尸體忽然破棺而出亭引,到底是詐尸還是另有隱情,我是刑警寧澤皮获,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布焙蚓,位于F島的核電站,受9級(jí)特大地震影響洒宝,放射性物質(zhì)發(fā)生泄漏购公。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一待德、第九天 我趴在偏房一處隱蔽的房頂上張望君丁。 院中可真熱鬧,春花似錦将宪、人聲如沸绘闷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)印蔗。三九已至,卻和暖如春丑勤,著一層夾襖步出監(jiān)牢的瞬間华嘹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工法竞, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留耙厚,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓岔霸,卻偏偏與公主長(zhǎng)得像薛躬,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子呆细,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容