Spark
一.Spark生態(tài)圈:
(1)Spark Core : RDD(彈性分布式數(shù)據(jù)集)
(2)Spark SQL
(3)Spark Streaming
(4)Spark MLLib:協(xié)同過(guò)濾目胡,ALS毒返,邏輯回歸等等 --> 機(jī)器學(xué)習(xí)
(5)Spark Graphx : 圖計(jì)算
二.什么是Spark
1.Spark是什么:
Spark是一個(gè)針對(duì)大規(guī)模數(shù)據(jù)處理的快速通用引擎。
- Spark是一種快速凉敲、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎寺旺,2009年誕生于加州大學(xué)伯克利分校AMPLab爷抓,2010年開源,2013年6月成為Apache孵化項(xiàng)目迅涮,2014年2月成為Apache頂級(jí)項(xiàng)目废赞。目前,Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目的集合叮姑,其中包含SparkSQL唉地、Spark Streaming、GraphX传透、MLlib等子項(xiàng)目耘沼,Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架。Spark基于內(nèi)存計(jì)算朱盐,提高了在大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理的實(shí)時(shí)性群嗤,同時(shí)保證了高容錯(cuò)性和高可伸縮性,允許用戶將Spark部署在大量廉價(jià)硬件之上兵琳,形成集群狂秘。Spark得到了眾多大數(shù)據(jù)公司的支持,這些公司包括Hortonworks躯肌、IBM椰棘、Intel券膀、Cloudera、MapR、Pivotal糊啡、百度赞辩、阿里恩袱、騰訊快鱼、京東、攜程曙博、優(yōu)酷土豆拥刻。當(dāng)前百度的Spark已應(yīng)用于鳳巢、大搜索羊瘩、直達(dá)號(hào)泰佳、百度大數(shù)據(jù)等業(yè)務(wù)盼砍;阿里利用GraphX構(gòu)建了大規(guī)模的圖計(jì)算和圖挖掘系統(tǒng),實(shí)現(xiàn)了很多生產(chǎn)系統(tǒng)的推薦算法逝她;騰訊Spark集群達(dá)到8000臺(tái)的規(guī)模浇坐,是當(dāng)前已知的世界上最大的Spark集群。
2.特點(diǎn):
- 快:快100倍(Hadoop 3 之前)
- 易用:支持多種語(yǔ)言開發(fā)
- 通用性:生態(tài)系統(tǒng)全黔宛。
- 易用性:兼容Hadoop
3.最大特點(diǎn):基于內(nèi)存
- Spark是MapReduce的替代方案近刘,而且兼容HDFS、Hive臀晃,可融入Hadoop的生態(tài)系統(tǒng)觉渴,以彌補(bǔ)MapReduce的不足。
三.Spark體系架構(gòu)
1.Spark集群的體系架構(gòu)圖解:
2.Spark的主從結(jié)構(gòu)
四.Spark的安裝部署
1.Spark的安裝部署方式有以下幾種模式:
(1)Standalone: 本機(jī)調(diào)試
(2)YARN
(3)Mesos
(4)Amazon EC2
2.執(zhí)行過(guò)程:
一個(gè)Worker有多個(gè) Executor徽惋。 Executor是任務(wù)的執(zhí)行者案淋,按階段(stage)劃分任務(wù)∠栈妫————> RDD
五.Spark的搭建:
1.準(zhǔn)備工作:
- jdk
- 配置主機(jī)名
- 免密碼登錄
2.偽分布式模式安裝:
(1)下載
(2)上傳到linux
(3)解壓
(4)修改配置文件
- 配置文件:conf/spark-env.sh
cd /opt/module
mv spark-2.1.0-bin-hadoop2.7/ spark/ //重命名spark文件夾
cd /opt/module/spark/conf
mv spark-env.sh.template spark-env.sh //重命名配置文件
vi spark-env.sh
修改內(nèi)容如下:
export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata121 //主節(jié)點(diǎn)的服務(wù)器名
export SPARK_MASTER_PORT=7077 //主節(jié)點(diǎn)端口號(hào)
//下面的可以不寫踢京,默認(rèn)
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
- 配置文件:conf/slave
mv slaves.template slaves
vi slaves
新增內(nèi)容:
bigdata121
(5)啟動(dòng):
cd /opt/module/spark
sbin/start-all.sh
(6)驗(yàn)證:192.168.127.121:8080
3.全分布的安裝部署:
(1)下載
(2)上傳到linux
(3)解壓
(4)修改配置文件
- 配置文件:conf/spark-env.sh
cd /opt/module
mv spark-2.1.0-bin-hadoop2.7/ spark/ //重命名spark文件夾
cd /opt/module/spark/conf
mv spark-env.sh.template spark-env.sh //重命名配置文件
vi spark-env.sh
修改內(nèi)容如下:
export JAVA_HOME=/opt/module/jdk1.8.0_144
export SPARK_MASTER_HOST=bigdata121 //主節(jié)點(diǎn)的服務(wù)器名
export SPARK_MASTER_PORT=7077 //主節(jié)點(diǎn)端口號(hào)
//下面的可以不寫,默認(rèn)
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1024m
- 配置文件:conf/slave
mv slaves.template slaves
vi slaves
新增內(nèi)容:
bigdata122
bigdata123
(5)拷貝到其他兩臺(tái)服務(wù)器
cd /opt/module
src -r spark/ bigdata122:/opt/module
src -r spark/ bigdata123:/opt/module
(6)啟動(dòng)Spark集群:
cd /opt/module/spark
sbin/start-all.sh
六.Spark的HA
1.回顧HA:
(1)HDFS,Yarn,Hbase,Spark:都是主從結(jié)構(gòu)
(2)單點(diǎn)故障
2.基于文件系統(tǒng)的單點(diǎn)恢復(fù)
(1)主要用于開發(fā)或測(cè)試環(huán)境宦棺。當(dāng)spark提供目錄保存spark Application和worker的注冊(cè)信息瓣距,并將他們的恢復(fù)狀態(tài)寫入該目錄中,這時(shí)代咸,一旦Master發(fā)生故障蹈丸,就可以通過(guò)重新啟動(dòng)Master進(jìn)程(sbin/start-master.sh),恢復(fù)已運(yùn)行的spark Application和worker的注冊(cè)信息呐芥。
(2)基于文件系統(tǒng)的單點(diǎn)恢復(fù)逻杖,主要是在spark-en.sh里對(duì)SPARK_DAEMON_JAVA_OPTS設(shè)置
配置參數(shù) | 參考值 |
---|---|
spark.deploy.recoveryMode | 設(shè)置為FILESYSTEM開啟單點(diǎn)恢復(fù)功能,默認(rèn)值:NONE |
spark.deploy.recoveryDirectory | Spark 保存恢復(fù)狀態(tài)的目錄 |
3.基于Zookeeper的Standby Masters
(1)ZooKeeper提供了一個(gè)Leader Election機(jī)制思瘟,利用這個(gè)機(jī)制可以保證雖然集群存在多個(gè)Master弧腥,但是只有一個(gè)是Active的,其他的都是Standby潮太。當(dāng)Active的Master出現(xiàn)故障時(shí),另外的一個(gè)Standby Master會(huì)被選舉出來(lái)虾攻。由于集群的信息铡买,包括Worker, Driver和Application的信息都已經(jīng)持久化到ZooKeeper霎箍,因此在切換的過(guò)程中只會(huì)影響新Job的提交奇钞,對(duì)于正在進(jìn)行的Job沒有任何的影響。加入ZooKeeper的集群整體架構(gòu)如下圖所示漂坏。
配置參數(shù) | 參考值 |
---|---|
spark.deploy.recoveryMode | 設(shè)置為ZOOKEEPER開啟單點(diǎn)恢復(fù)功能景埃,默認(rèn)值:NONE |
spark.deploy.zookeeper.url | ZooKeeper集群的地址 |
spark.deploy.zookeeper.dir | Spark信息在ZK中的保存目錄媒至,默認(rèn):/spark |
(2)修改spark-env.sh參考:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181
-Dspark.deploy.zookeeper.dir=/spark"
(3)另外:每個(gè)節(jié)點(diǎn)上,需要將以下兩行注釋掉谷徙。
(4)同步到其他兩臺(tái)服務(wù)器
(5)ZooKeeper中保存的信息
七.執(zhí)行Spark Demo程序
1.使用Spark Shell
(1)spark-shell是Spark自帶的交互式Shell程序拒啰,方便用戶進(jìn)行交互式編程,用戶可以在該命令行下用scala編寫spark程序完慧。相當(dāng)于REPL ,作為一個(gè)獨(dú)立的Application運(yùn)行
(2)兩種模式:
- 本地模式:spark-shell 不接任何參數(shù)谋旦,代表本地模式
- 集群模式:spark-shell后面帶有參數(shù)
(3)啟動(dòng)Spark shell:
spark-shell
參數(shù)說(shuō)明:
--master spark://spark81:7077 //指定Master的地址
--executor-memory 2g //指定每個(gè)worker可用內(nèi)存為2G
--total-executor-cores 2 //指定整個(gè)集群使用的cup核數(shù)為2個(gè)
例如:
spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2
(4)在Spark shell中編寫WordCount程序
sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")
參數(shù)說(shuō)明:
- sc是SparkContext對(duì)象,該對(duì)象時(shí)提交spark程序的入口
- textFile("hdfs://192.168.88.111:9000/data/data.txt")是hdfs中讀取數(shù)據(jù)
- flatMap(_.split(" "))先map在壓平
- map((_,1))將單詞和1構(gòu)成元組
- reduceByKey(+)按照key進(jìn)行reduce屈尼,并將value累加
- saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")將結(jié)果寫入到hdfs中
(5)wordcount程序册着,處理本地文件,把結(jié)果打印到屏幕上
scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.collect
res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
(6)wordcount程序脾歧,處理HDFS文件甲捏,結(jié)果保存在hdfs上
sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")
(7)單步運(yùn)行wordcount --->RDD
scala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> 1+1
res2: Int = 2
scala> rdd1.collect
res3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)
scala> val rdd2 = rdd1.flatMap(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26
scala> rdd2.collect
res4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28
scala> rdd3.collect
res5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30
scala> rdd4.collect
res6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
(8)RDD 彈性分布式數(shù)據(jù)集
(9)Scala復(fù)習(xí):
- flatten:把嵌套的結(jié)果展開
scala>List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
- flatmap:相當(dāng)于一個(gè) map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)
myList.flatMap(x=>x.map(_*2))
flatmao執(zhí)行過(guò)程:
- 將 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 調(diào)用 map(_*2) 方法。x 代表一個(gè)List
- flatten
2.在IDEA中編寫WordCount程序
(1)需要的jar包:$SPARK_HOME/jars/*.jar
(2)創(chuàng)建Scala Project鞭执,并創(chuàng)建Scala Object司顿、或者Java Class
(3)書寫源代碼,并打成jar包蚕冬,上傳到Linux
Scala版本
(4)運(yùn)行程序:
spark-submit --master spark://spark81:7077
--class mydemo.WordCount jars/wc.jar
hdfs://192.168.88.111:9000/data/data.txt
hdfs://192.168.88.111:9000/output/spark/wc1
Java版本(直接輸出在屏幕)
(4)運(yùn)行程序:
spark-submit --master spark://spark81:7077
--class mydemo.JavaWordCount jars/wc.jar
hdfs://192.168.88.111:9000/data/data.txt
八.Spark運(yùn)行機(jī)制及原理分析
1.WordCount執(zhí)行的流程分析
2.Spark提交任務(wù)的流程析
九.RDD和RDD特性免猾,RDD的算子
1.RDD:彈性分布式數(shù)據(jù)集
(1)什么是RDD?
- RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集囤热,是Spark中最基本的數(shù)據(jù)抽象猎提,它代表一個(gè)不可變、可分區(qū)旁蔼、里面的元素可并行計(jì)算的集合锨苏。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性棺聊。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中伞租,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度限佩。
(2)RDD的屬性:
- 一組分片(Partition)葵诈,即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō)祟同,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理作喘,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù)晕城,如果沒有指定泞坦,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目砖顷。
- 一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)贰锁。Spark中RDD的計(jì)算是以分片為單位的赃梧,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合豌熄,不需要保存每次計(jì)算的結(jié)果授嘀。
- RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD房轿,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系粤攒。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù)囱持,而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算夯接。
- 一個(gè)Partitioner,即RDD的分片函數(shù)纷妆。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù)盔几,一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner掩幢。只有對(duì)于于key-value的RDD逊拍,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None际邻。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量芯丧,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。
- 一個(gè)列表世曾,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)缨恒。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置轮听。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念骗露,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置血巍。
2.如何創(chuàng)建RDD
(1)通過(guò)SparkContext.parallelize方法來(lái)創(chuàng)建(通過(guò)sc.parallelize進(jìn)行創(chuàng)建)
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
scala> rdd1.partitions.length
res35: Int = 3
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29
scala> rdd1.partitions.length
res36: Int = 2
(2)通過(guò)外部數(shù)據(jù)源來(lái)創(chuàng)建
sc.textFile()
scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd2:org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29
(3)RDD的類型:Transformation和Action
3.RDD的基本原理:
4.Transformation
(1)RDD中的所有轉(zhuǎn)換都是延遲加載的萧锉,也就是說(shuō),它們并不會(huì)直接計(jì)算結(jié)果述寡。相反的柿隙,它們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集(例如一個(gè)文件)上的轉(zhuǎn)換動(dòng)作。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的動(dòng)作時(shí)鲫凶,這些轉(zhuǎn)換才會(huì)真正運(yùn)行优俘。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。
轉(zhuǎn)換 | 含義 |
---|---|
map(func) | 返回一個(gè)新的RDD掀序,該RDD由每一個(gè)輸入元素經(jīng)過(guò)func函數(shù)轉(zhuǎn)換后組成 |
filter(func) | 返回一個(gè)新的RDD,該RDD由經(jīng)過(guò)func函數(shù)計(jì)算后返回值為true的輸入元素組成 |
flatMap(func) | 類似于map惭婿,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列不恭,而不是單一元素) |
mapPartitions(func) | 類似于map叶雹,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行,因此在類型為T的RDD上運(yùn)行時(shí)换吧,func的函數(shù)類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 類似于mapPartitions折晦,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運(yùn)行時(shí)沾瓦,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根據(jù)fraction指定的比例對(duì)數(shù)據(jù)進(jìn)行采樣满着,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子 |
union(otherDataset) | 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD |
intersection(otherDataset) | 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD |
distinct([numTasks])) | 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD |
groupByKey([numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用贯莺,返回一個(gè)(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用风喇,返回一個(gè)(K,V)的RDD,使用指定的reduce函數(shù)缕探,將相同key的值聚合到一起魂莫,與groupByKey類似,reduce任務(wù)的個(gè)數(shù)可以通過(guò)第二個(gè)可選的參數(shù)來(lái)設(shè)置 |
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用爹耗,K必須實(shí)現(xiàn)Ordered接口耙考,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey類似,但是更靈活 |
join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調(diào)用潭兽,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調(diào)用倦始,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的RDD |
cartesian(otherDataset) | 笛卡爾積 |
pipe(command, [envVars]) | |
coalesce(numPartitions) | |
repartition(numPartitions) | |
repartitionAndSortWithinPartitions(partitioner) |
5.Action
動(dòng)作 | 含義 |
---|---|
reduce(func) | 通過(guò)func函數(shù)聚集RDD中的所有元素,這個(gè)功能必須是課交換且可并聯(lián)的 |
collect() | 在驅(qū)動(dòng)程序中山卦,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素 |
count() | 返回RDD的元素個(gè)數(shù) |
first() | 返回RDD的第一個(gè)元素(類似于take(1)) |
take(n) | 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組 |
takeSample(withReplacement,num, [seed]) | 返回一個(gè)數(shù)組鞋邑,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分怒坯,seed用于指定隨機(jī)數(shù)生成器種子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng)炫狱,對(duì)于每個(gè)元素,Spark將會(huì)調(diào)用toString方法剔猿,將它裝換為文件中的文本 |
saveAsSequenceFile(path) | 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下视译,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。 |
saveAsObjectFile(path) | |
countByKey() | 針對(duì)(K,V)類型的RDD归敬,返回一個(gè)(K,Int)的map酷含,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)。 |
foreach(func) | 在數(shù)據(jù)集的每一個(gè)元素上汪茧,運(yùn)行函數(shù)func進(jìn)行更新椅亚。 |
十.RDD特性
1.RDD的緩存機(jī)制:
(1)作用:提高性能
(2)使用:標(biāo)識(shí)RDD可以被緩存 persist cache
(3)可以緩存的位置:
2.RDD的容錯(cuò)機(jī)制:通過(guò)檢查點(diǎn)來(lái)實(shí)現(xiàn)
(1)
(1)復(fù)習(xí)檢查點(diǎn):HDFS中的檢查點(diǎn):有SecondaryNamenode來(lái)實(shí)現(xiàn)日志的合并。
(2)RDD的檢查點(diǎn):容錯(cuò)
- 概念:血統(tǒng) Lineage
- 理解:表示任務(wù)執(zhí)行的生命周期舱污。
- WordCount textFile ---> redceByKey
- 如果血統(tǒng)越長(zhǎng)呀舔,越容易出錯(cuò)。
- 假如有檢查點(diǎn)扩灯,可以從最近的一個(gè)檢查點(diǎn)開始媚赖,往后面計(jì)算霜瘪。不用重頭計(jì)算。
(3)RDD檢查點(diǎn)的類型:
a.基于本地目錄:需要將Spark shell 或者任務(wù)運(yùn)行在本地模式上(setMaster("local"))
b.HDFS目錄:用于生產(chǎn),集群模式
sc.setCheckPointDir(目錄)
//舉例:設(shè)置檢查點(diǎn)
scala> var rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24
//設(shè)置檢查點(diǎn)目錄:
scala> sc.setCheckpointDir("hdfs://192.168.109.131:8020/sparkckpt")
//標(biāo)識(shí)rdd1可以執(zhí)行檢查點(diǎn)操作
scala> rdd1.checkpoint
scala> rdd1.count
res2: Long = 923452
3.依賴關(guān)系:寬依賴惧磺,窄依賴
(1)RDD的依賴關(guān)系:
- RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型颖对,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
(2)窄依賴指的是每一個(gè)父RDD的Partition最多被子RDD的一個(gè)Partition使用
總結(jié):窄依賴我們形象的比喻為獨(dú)生子女
(3)寬依賴指的是多個(gè)子RDD的Partition會(huì)依賴同一個(gè)父RDD的Partition
總結(jié):寬依賴我們形象的比喻為超生
4.Spark任務(wù)中的Stage
- DAG(Directed Acyclic Graph)叫做有向無(wú)環(huán)圖磨隘,原始的RDD通過(guò)一系列的轉(zhuǎn)換就就形成了DAG缤底,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage,對(duì)于窄依賴番捂,partition的轉(zhuǎn)換處理在Stage中完成計(jì)算个唧。對(duì)于寬依賴,由于有Shuffle的存在白嘁,只能在parent RDD處理完成后坑鱼,才能開始接下來(lái)的計(jì)算,因此寬依賴是劃分Stage的依據(jù)絮缅。
十一.RDD的高級(jí)算子
1.mapPartitionsWithIndex:對(duì)RDD中的每個(gè)分區(qū)(帶有下標(biāo))進(jìn)行操作鲁沥,下標(biāo)用index表示
通過(guò)這個(gè)算子,我們可以獲取分區(qū)號(hào)耕魄。
def mapPartitionsWithIndex[U](
f: (Int, Iterator[T]) ? Iterator[U],
preservesPartitioning: Boolean = false)(
implicit arg0: ClassTag[U]): RDD[U]
//參數(shù):f是個(gè)函數(shù)參數(shù) f 中第一個(gè)參數(shù)是Int画恰,代表分區(qū)號(hào),第二個(gè)Iterator[T]代表分區(qū)中的元素
例如:
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitions
mapPartitions mapPartitionsWithIndex
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ], [partId : 2 , value = 6 ], [partId : 2 , value = 7 ], [partId : 2 , value = 8 ])
2.aggregate:聚合操作吸奴。類似于分組允扇。
(1)先對(duì)局部進(jìn)行聚合操作,再對(duì)全局進(jìn)行聚合操作则奥。
//調(diào)用聚合操作
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd2.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
scala> import scala.math._
import scala.math._
scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7
說(shuō)明:
(2)對(duì)字符串操作
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27
scala> rdd2.aggregate("")(_+_,_+_)
res11: String = abcdef
scala> rdd2.aggregate("*")(_+_,_+_)
res12: String = **def*abc
(3)復(fù)雜的例子:
a.
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
scala> rdd3.mapPartitionsWithIndex(fun1).collect
res17: Array[String] = Array(
[partId : 0 , value = 12 ], [partId : 0 , value = 23 ], [partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])
scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res13: String = 42
執(zhí)行過(guò)程:
第一個(gè)分區(qū):
(a)第一次比較: "" "12" 長(zhǎng)度最大值 2 2-->"2"
(b)第二次比較: “2” “23” 長(zhǎng)度最大值 2 2-->"2"第二個(gè)分區(qū):
(a)第一次比較: "" "345" 長(zhǎng)度最大值 3 3-->"3"
(b)第二次比較: “3” “4567” 長(zhǎng)度最大值 4 4-->"4"
b.
rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res18: String = 11
執(zhí)行過(guò)程:
第一個(gè)分區(qū):
第一次比較: "" "12" 長(zhǎng)度最小值 0 0-->"0"
第二次比較: “0” “23” 長(zhǎng)度最小值 1 1-->"1"第二個(gè)分區(qū):
第一次比較: "" "345" 長(zhǎng)度最小值 0 0-->"0"
第二次比較: “0” “4567” 長(zhǎng)度最小值 1 1-->"1"
c.aggregateByKey:類似于aggregate考润,區(qū)別:操作的是 key value 的數(shù)據(jù)類型。
3.其他高級(jí)算子:
十二.編程案例
1.分析日志
(1)需求:找到訪問(wèn)量最高的兩個(gè)網(wǎng)頁(yè)
- 第一步:對(duì)網(wǎng)頁(yè)的訪問(wèn)量求和
- 第二步:排序读处,降序
(2)創(chuàng)建自定義分區(qū)
(3)使用JDBCRDD 操作數(shù)據(jù)庫(kù)
(4)操作數(shù)據(jù)庫(kù):把結(jié)果存放到數(shù)據(jù)庫(kù)中
Spark SQL
Spark sql基礎(chǔ)
1.什么是Spark SQL
- Spark SQL 是spark的一個(gè)模塊糊治。來(lái)處理 結(jié)構(gòu)化 的數(shù)據(jù),不能處理非結(jié)構(gòu)化的數(shù)據(jù)
2.特點(diǎn):
(1)容易集成: - 不需要單獨(dú)安裝罚舱。
(2)統(tǒng)一的數(shù)據(jù)訪問(wèn)方式
- 結(jié)構(gòu)化數(shù)據(jù)的類型:JDBC JSon Hive parquer文件 都可以作為Spark SQL 的數(shù)據(jù)源
- 對(duì)接多種數(shù)據(jù)源井辜,且使用方式類似
(3)完全兼容hive - 把Hive中的數(shù)據(jù),讀取到Spark SQL中運(yùn)行管闷。
(4) 支持標(biāo)準(zhǔn)的數(shù)據(jù)連接
- JDBC
3.為什么學(xué)習(xí)Spark SQL
- 執(zhí)行效率比Hive高
- hive 2.x 執(zhí)行引擎可以使用 Spark
4.核心概念:表(DataFrame DataSet) - mysql中的表:表結(jié)構(gòu)粥脚、數(shù)據(jù)
- DataFrame:Schema、RDD(數(shù)據(jù))
- DataSet 在spark1.6以后包个,對(duì)DataFrame做了一個(gè)封裝刷允。
5.創(chuàng)建DataFrame
測(cè)試數(shù)據(jù):?jiǎn)T工表、部門表
(1)第一種方式:使用case class
a.定義Schema
b.讀取文件
c.把每行數(shù)據(jù),映射到Emp上
d.生成DataFrame
(2)第二種方式 使用Spark Session
(3)直接讀取一個(gè)帶格式的文件树灶。
6.操作DataFrame
(1)DSL語(yǔ)句
(2)SQL語(yǔ)句
注意:不能直接執(zhí)行SQL搀菩,需要生成一個(gè)視圖,再執(zhí)行sql破托。
(3)多表查詢
7.操作DataSet
(1)跟DataFrame類似,是一套新的接口歧蒋。高級(jí)的Dataframe
(2)創(chuàng)建DataSet
- 使用序列來(lái)創(chuàng)建DataSet土砂。
- 使用JSON數(shù)據(jù)來(lái)創(chuàng)建DataSet
- 使用其他數(shù)據(jù)
(3)DataSet案例
(4)多表查詢
- 創(chuàng)建部門表
- 創(chuàng)建員工表
- 執(zhí)行多表查詢:等值連接
- 多表連接后再篩選
7.Spark SQL中的視圖
(1)視圖是一個(gè)虛表,不存儲(chǔ)數(shù)據(jù)谜洽。
(2)兩種類型:
- 普通視圖(本地視圖):只在當(dāng)前Session中有效萝映。createOrReplaceTempView createTempView
- 全局視圖: createGlobalTempView:在不同的Session中都有用 把全局視圖創(chuàng)建在命名空間中:global_temp中。類似于一個(gè)庫(kù)阐虚。
二.使用數(shù)據(jù)源
1.在Spark SQL中序臂,可以使用各種各樣的數(shù)據(jù)源來(lái)操作。 結(jié)構(gòu)化
2.使用load函數(shù)实束、save函數(shù)
- load函數(shù)是加載數(shù)據(jù)奥秆,save是存儲(chǔ)數(shù)據(jù)
注意:使用load 或 save時(shí),默認(rèn)是Parquet文件咸灿。列式存儲(chǔ)文件构订。
3.Parquet文件:列式存儲(chǔ)文件,是Spark SQL 默認(rèn)的數(shù)據(jù)源 - 就是一個(gè)普通的文件
(1)把其他文件避矢,轉(zhuǎn)換成Parquet文件
(2)支持Schema的合并
4.json文件
5.JDBC
(1)使用JDBC操作關(guān)系型數(shù)據(jù)庫(kù)悼瘾,加載到Spark中進(jìn)行分析和處理。
(2)方式一:
(3)方式二:
6.使用hive
(1)spark SQL 完全兼容hive
(2)需要進(jìn)行配置
- 拷貝一下文件到spark/conf目錄下:
- Hive 配置文件: hive-site.xml
- Hadoop 配置文件:core-site.xml hdfs-site.xml
(3)配置好后审胸,重啟spark
(4)啟動(dòng)Hadoop 與 hive
三.在IDE中開發(fā)Spark SQL
四.性能優(yōu)化
1.用內(nèi)存中緩存表的數(shù)據(jù)
直接讀取內(nèi)存的值亥宿,來(lái)提高性能
2.了解性能優(yōu)化的相關(guān)參數(shù):參考講義
Spark Streaming
一.常用的實(shí)時(shí)計(jì)算引擎(流式計(jì)算)
1.Apache Storm:真正的流式計(jì)算
2.Spark Streaming :嚴(yán)格上來(lái)說(shuō),不是真正的流式計(jì)算(實(shí)時(shí)計(jì)算)
把連續(xù)的流式數(shù)據(jù)砂沛,當(dāng)成不連續(xù)的RDD
本質(zhì):是一個(gè)離散計(jì)算(不連續(xù))
3.Apache Flink:真正的流式計(jì)算烫扼。與Spark Streaming相反。
把離散的數(shù)據(jù)尺上,當(dāng)成流式數(shù)據(jù)來(lái)處理
4.JStorm
二.Spark Streaming基礎(chǔ)
1.什么是 Spark Streaming材蛛。
- Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
易于構(gòu)建靈活的、高容錯(cuò)的流式系統(tǒng)怎抛。
2.特點(diǎn):
- 易用卑吭,已經(jīng)集成到Spark中
- 容錯(cuò)性:底層RDD,RDD本身具有容錯(cuò)機(jī)制
- 支持多種語(yǔ)言:Java Scala Python
3.演示官方的Demo
往Spark Streaming中發(fā)送字符串马绝,Spark 接收到以后豆赏,進(jìn)行計(jì)數(shù)
使用消息服務(wù)器 netcat Linux自帶
yum install nc.x86_64
nc -l 1234
注意:總核心數(shù) 大于等于2。一個(gè)核心用于接收數(shù)據(jù),另一個(gè)用于處理數(shù)據(jù)
在netcat中寫入數(shù)據(jù) Spark Streaming可以取到
4.開發(fā)自己的NetWorkWordCount程序掷邦,和Spark Core類似
問(wèn)題:Hello Hello
Hello World
現(xiàn)在現(xiàn)象:(Hello,2)
(Hello , 1) (World , 1)
能不能累加起來(lái)白胀?保存記錄下以前的狀態(tài)?
通過(guò)Spark Streaming提供的算子來(lái)實(shí)現(xiàn)
三.高級(jí)特性:
1.什么是DStream抚岗?離散流
- 把連續(xù)的數(shù)據(jù)變成不連續(xù)的RDD
- 因?yàn)镈Stream的特性或杠,導(dǎo)致,Spark Streaming不是真正的流式計(jì)算
2.重點(diǎn)算子講解
(1)updateStateByKey
默認(rèn)情況下宣蔚,Spark Streaming不記錄之前的狀態(tài)向抢,每次發(fā)數(shù)據(jù),都會(huì)從0開始
現(xiàn)在使用本算子胚委,實(shí)現(xiàn)累加操作挟鸠。
(2)transform
3.窗口操作
- 窗口:對(duì)落在窗口內(nèi)的數(shù)據(jù)進(jìn)行處理,也是一個(gè)DStream亩冬,RDD
- 舉例:每10秒鐘把過(guò)去30秒的數(shù)據(jù)采集過(guò)來(lái)
- 注意:先啟動(dòng)nc 再啟動(dòng)程序 local[2]
4.集成Spark SQL : 使用SQL語(yǔ)句來(lái)處理流式數(shù)據(jù)
5.緩存和持久化:和RDD一樣
6.支持檢查點(diǎn):和RDD一樣
四.數(shù)據(jù)源
Spark Streaming是一個(gè)流式計(jì)算引擎艘希,就需要從外部數(shù)據(jù)源來(lái)接收數(shù)據(jù)
1.基本的數(shù)據(jù)源
- 文件流:監(jiān)控文件系統(tǒng)的變化,如果文件有增加硅急,讀取文件中的內(nèi)容
希望Spark Streaming監(jiān)控一個(gè)文件夾覆享,如果有變化,則把變化采集過(guò)來(lái) - RDD隊(duì)列流:可以從隊(duì)列中獲取數(shù)據(jù)
- 套接字流:socketTextStream
2.高級(jí)數(shù)據(jù)源
(1)Flume
(2)Spark SQL 對(duì)接flume有多種方式:
- push方式:flume將數(shù)據(jù)推送給Spark Streaming
- custom sink 模式:比第一種有更好的健壯性和容錯(cuò)性铜秆。使用這種方式淹真,flume配置一個(gè)sink。
- 使用官方提供的spark sink組件
需要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷貝到flume lib下
需要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷貝到IDE的lib下添加到build path中
(3)Kafka
在講Kafka時(shí)连茧,舉例核蘸。
四.性能優(yōu)化的參數(shù)
(1)性能優(yōu)化:
spark submit的時(shí)候,程序報(bào)OOM錯(cuò)誤
程序跑的很慢
(2)方法:調(diào)整spark參數(shù)
conf.set...
性能調(diào)優(yōu)
一.Spark 性能優(yōu)化概覽:
- Spark的計(jì)算本質(zhì)是啸驯,分布式計(jì)算客扎。
- 所以,Spark程序的性能可能因?yàn)榧褐械娜魏我蛩爻霈F(xiàn)瓶頸:CPU罚斗、網(wǎng)絡(luò)帶寬徙鱼、或者內(nèi)存。
- CPU针姿、網(wǎng)絡(luò)帶寬袱吆,是運(yùn)維來(lái)維護(hù)的。
- 聚焦點(diǎn):內(nèi)存距淫。
- 如果內(nèi)存能夠容納下所有的數(shù)據(jù)绞绒,那就不需要調(diào)優(yōu)了。
- 如果內(nèi)存比較緊張榕暇,不足以放下所有數(shù)據(jù)(10億量級(jí)---500G),需要對(duì)內(nèi)存的使用進(jìn)行性能優(yōu)化蓬衡。
- 比如:使用某些方法減少內(nèi)存的消耗喻杈。
二.Spark性能優(yōu)化,主要針對(duì)在內(nèi)存的使用調(diào)優(yōu)狰晚。
三.Spark性能優(yōu)化的技術(shù):
1.使用高性能序列化類庫(kù)
2.優(yōu)化數(shù)據(jù)結(jié)構(gòu)
3.對(duì)于多次使用的RDD進(jìn)行持久化筒饰、checkpoint
4.持久化級(jí)別:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
5.Java虛擬機(jī)垃圾回收調(diào)優(yōu)
6.Shuffle調(diào)優(yōu),1.x版本中壁晒,90%的性能問(wèn)題瓷们,都是由于Shuffle導(dǎo)致的。
四.其他性能優(yōu)化:
1.提高并行度
2.廣播共享數(shù)據(jù)
等等秒咐。换棚。。
五.診斷Spark內(nèi)存使用:首先要看到內(nèi)存使用情況反镇,才能進(jìn)行針對(duì)性的優(yōu)化。
1.內(nèi)存花費(fèi):
(1)每個(gè)Java對(duì)象娘汞,都有一個(gè)對(duì)象頭歹茶,占用16字節(jié),包含一些對(duì)象的元信息你弦,比如指向他的類的指針惊豺。
- 如果對(duì)象本身很小,比如int禽作,但是他的對(duì)象頭比對(duì)象自己還大尸昧。
(2)Java的String對(duì)象,會(huì)比他內(nèi)存的原始數(shù)據(jù)旷偿,多出40個(gè)字節(jié)烹俗。
- String內(nèi)部使用的char數(shù)組來(lái)保存內(nèi)部的字符串序列,并且還要保存諸如輸出長(zhǎng)度之類的信息萍程。
- char使用的是UTF-16編碼幢妄,每個(gè)字符會(huì)占2個(gè)字節(jié)。比如茫负,包含10個(gè)字符的String蕉鸳,2*10+40=60字節(jié)
(3)Java中的集合類型,比如HashMap和LinkedList忍法,內(nèi)部使用鏈表數(shù)據(jù)結(jié)構(gòu)潮尝。 - 鏈表中的每個(gè)數(shù)據(jù),使用Entry對(duì)象包裝饿序。
- Entry對(duì)象勉失,不光有對(duì)象頭,還有指向下一個(gè)Entry的指針嗤堰,占用8字節(jié)戴质。
(4)元素類型為原始數(shù)據(jù)類型(int)度宦,內(nèi)部通常會(huì)使用原始數(shù)據(jù)類型的包裝類型(Integer)來(lái)存儲(chǔ)元素。
2.如何判斷Spark程序消耗內(nèi)存情況告匠?:答案是預(yù)估
(1)設(shè)置RDD的并行度戈抄。
- 兩種方法創(chuàng)建RDD,parallelize() textFile() 在這兩個(gè)方法中后专,傳入第二個(gè)參數(shù)划鸽,設(shè)置RDD的partition數(shù)量。
- 在SparkConfig中設(shè)置一個(gè)參數(shù):
- spark.default.parallelism
- 可以統(tǒng)一設(shè)置這個(gè)application中所有RDD的partition數(shù)量
(2)將RDD緩存 cache()
(3)觀察日志:driver日志
/usr/local/spark-2.1.0-bin-hadoop2.7/work
19/04/13 22:01:05 INFO MemoryStore: Block rdd_3_1 stored as values in memory (estimated size 26.0 MB, free 339.9 MB)
19/04/13 22:01:06 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 26.7 MB, free 313.2 MB)
(4)將這個(gè)內(nèi)存信息相加戚哎,就是RDD內(nèi)存占用量裸诽。
六.使用高性能序列化類庫(kù)
1.數(shù)據(jù)序列化概述
數(shù)據(jù)序列化,就是將對(duì)象或者數(shù)據(jù)結(jié)構(gòu)型凳,轉(zhuǎn)換成特定的格式丈冬,使其可在網(wǎng)絡(luò)中傳輸,或存儲(chǔ)在內(nèi)存或文件中甘畅。
反序列化埂蕊,是相反的操作,將對(duì)象從序列化數(shù)據(jù)中還原出來(lái)疏唾。
序列化后的數(shù)據(jù)格式蓄氧,可以是二進(jìn)制,xml槐脏,Json等任何格式喉童。
對(duì)象、數(shù)據(jù)序列化的重點(diǎn)在于數(shù)據(jù)的交換與傳輸顿天。
在任何分布式系統(tǒng)中堂氯,序列化都是扮演著一個(gè)重要的角色。
如果使用的序列化技術(shù)牌废,操作很慢祖灰,或者序列化后的數(shù)據(jù)量還是很大,會(huì)讓分布式系統(tǒng)應(yīng)用程序性能下降很多畔规。
所以局扶,Spark性能優(yōu)化的第一步,就是進(jìn)行序列化的性能優(yōu)化叁扫。
Spark自身默認(rèn)會(huì)在一些地方對(duì)數(shù)據(jù)進(jìn)行序列化三妈,比如Shuffle。另外莫绣,我們使用了外部數(shù)據(jù)(自定義類型)畴蒲,也要讓其課序列化。
Spark本身對(duì)序列化的便捷性和性能進(jìn)行了取舍
默認(rèn)情況下:Spark傾向于序列化的便捷性对室,使用了Java自身提供的序列化機(jī)制模燥,很方便使用咖祭。
但是,Java序列化機(jī)制性能不高蔫骂,序列化速度慢么翰,序列化后數(shù)據(jù)較大,比較占用內(nèi)存空間辽旋。
2.kryo
- Spark支持使用kryo類庫(kù)來(lái)進(jìn)行序列化浩嫌。
- 速度快,占用空間更小补胚,比Java序列化數(shù)據(jù)占用空間小10倍码耐。
3.如何使用kryo序列化機(jī)制
(1)設(shè)置Spark conf
bin/spark-submit will also read configuration options from conf/spark-defaults.conf,
in which each line consists of a key and a value separated by whitespace. For example:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
(2)使用kryo是,要求需要序列化的類溶其,要提前注冊(cè)骚腥,以獲得高性能
4.kryo類庫(kù)的優(yōu)化
(1)優(yōu)化緩存大小
- 如果注冊(cè)的自定義類型,本身特別大(100個(gè)字段)瓶逃,會(huì)導(dǎo)致要序列化的對(duì)象太大桦沉。此時(shí)需要對(duì)kyro本身進(jìn)行優(yōu)化。因?yàn)閗ryo內(nèi)部的緩存金闽,可能不能存放這么大的class對(duì)象。
spark.kryoserializer.buffer.max //設(shè)置這個(gè)參數(shù)剿骨,將其調(diào)大代芜。
(2)預(yù)先注冊(cè)自定義類型
- 雖然不注冊(cè)自定義類型,kryo也可以正常工作浓利,但會(huì)保存一份他的全限定類名挤庇,耗費(fèi)內(nèi)存。
- 推薦預(yù)先注冊(cè)要序列化的自定義類型贷掖。
七.優(yōu)化數(shù)據(jù)結(jié)構(gòu)
1.概述
- 要減少內(nèi)存的消耗嫡秕,除了使用高效的序列化類庫(kù)外,還要優(yōu)化數(shù)據(jù)結(jié)構(gòu)苹威。
- 避免Java語(yǔ)法特性中所導(dǎo)致的額外內(nèi)存開銷昆咽。
- 核心:優(yōu)化算子函數(shù)內(nèi)部使用到的局部數(shù)據(jù)或算子函數(shù)外部的數(shù)據(jù)。
- 目的:減少對(duì)內(nèi)存的消耗和占用牙甫。
2.如何做掷酗?
(1)優(yōu)先使用數(shù)組以及字符串,而不是集合類窟哺。即:優(yōu)先使用Array泻轰,而不是ArrayList、LinkedList且轨、HashMap
- 使用int[] 會(huì)比List<Integer> 節(jié)省內(nèi)存
(2)將對(duì)象轉(zhuǎn)換成字符串浮声。 - 企業(yè)中虚婿,將HashMap咧党、List這種數(shù)據(jù)震叮,統(tǒng)一用String拼接成特殊格式的字符串
Map<Integer,Person> persons = new HashMap<Integer,Person>()
可以優(yōu)化為:
"id:name,address"
String persons = "1:Andy,Beijing|2:Tom,Tianjin...."
(3)避免使用多層嵌套對(duì)象結(jié)構(gòu)
(4)對(duì)于能夠避免的場(chǎng)景,盡量使用int代替String
- 雖然String比List效率高饮六,但int類型占用更少內(nèi)存,比如:數(shù)據(jù)庫(kù)主鍵羡洁,id玷过,推薦使用自增的id,而不是uuid
八.rdd.cache checkpoint
九.持久化級(jí)別:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
十.Java虛擬機(jī)的調(diào)優(yōu)
1.概述
- 如果在持久化RDD的時(shí)候筑煮,持久化了大量的數(shù)據(jù)辛蚊,那么Java虛擬機(jī)的垃圾回收就可能成為一個(gè)瓶頸
- Java虛擬機(jī)會(huì)定期進(jìn)行垃圾回收,此時(shí)會(huì)追蹤所有Java對(duì)象真仲,并且在垃圾回收時(shí)袋马,找到那些已經(jīng)不再使用的對(duì)象。
- 清理舊對(duì)象秸应,給新對(duì)象騰出空間虑凛。
- 垃圾回收的性能開銷,是與內(nèi)存中的對(duì)象數(shù)量成正比软啼。
- 在做Java虛擬機(jī)調(diào)優(yōu)之前桑谍,必須先做好上面的調(diào)優(yōu)工作,這樣才有意義祸挪。
- 必須注意順序
2.Spark GC原理
見圖片
3.監(jiān)測(cè)垃圾回收
- 我們可以進(jìn)行監(jiān)測(cè)锣披,比如多久進(jìn)行一次垃圾回收以及耗費(fèi)的時(shí)間等等。
spark-submit腳本中贿条,添加一個(gè)配置
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimesStamps"
注意:這個(gè)是輸出到worker日志中雹仿,而不是driver日志。
/usr/local/spark-2.1.0-bin-hadoop2.7/logs worker日志
/usr/local/spark-2.1.0-bin-hadoop2.7/work driver日志
4.優(yōu)化Executor內(nèi)存比例
(1)目的:減少GC次數(shù)整以。
- 對(duì)于GC調(diào)優(yōu)來(lái)說(shuō)胧辽,最重要的就是調(diào)節(jié),RDD的緩存占用的內(nèi)存空間 與 算子執(zhí)行時(shí)創(chuàng)建對(duì)象所占用的內(nèi)存空間 的比例
- 對(duì)于默認(rèn)情況公黑,Spark使用每個(gè)Executor 60% 的內(nèi)存空間來(lái)緩存RDD邑商,在task運(yùn)行期間所創(chuàng)建的對(duì)象,只有40%內(nèi)存空間來(lái)存放凡蚜。
//使用:
conf.set("spark.storage.memoryFraction",0.5)
5.Java GC 調(diào)優(yōu) (-)
十一.shuffle原理
1.優(yōu)化前
圖片
2.優(yōu)化后
圖片
十二.其他調(diào)優(yōu)
1.提高并行度
2.廣播共享數(shù)據(jù)
Spark Mllib:MLlib 是 Spark 可以擴(kuò)展的機(jī)器學(xué)習(xí)庫(kù)奠骄。
一.MLlib概述
MLlib 是 Spark 可以擴(kuò)展的機(jī)器學(xué)習(xí)庫(kù)。
Spark在機(jī)器學(xué)習(xí)方面具有得天獨(dú)厚的有事番刊,有以下幾個(gè)原因:
1.機(jī)器學(xué)習(xí)算法一般都有多個(gè)步驟迭代計(jì)算含鳞,需要在多次迭代后,獲得足夠小的誤差或者收斂才會(huì)停止芹务。
double wucha = 1.0
while(wucha>=0.00001){
建模 wucha -= 某個(gè)值
}
模型計(jì)算完畢
當(dāng)?shù)褂肏adoop的MapReduce計(jì)算框架時(shí)蝉绷,每次都要讀寫硬盤以及任務(wù)啟動(dòng)工作鸭廷,導(dǎo)致很大的IO開銷。
而Spark基于內(nèi)存的計(jì)算模型天生擅長(zhǎng)迭代計(jì)算熔吗。只有在必要時(shí)辆床,才會(huì)讀寫硬盤。
所以Spark是機(jī)器學(xué)習(xí)比較理想的平臺(tái)桅狠。
2.通信讼载,Hadoop的MapReduce計(jì)算框架,通過(guò)heartbeat方式來(lái)進(jìn)行通信和傳遞數(shù)據(jù)中跌,執(zhí)行速度慢咨堤。
- spark 有高效的 Akka 和 Netty 的通信系統(tǒng),通行效率高漩符。
- SPark MLlib 是Spark 對(duì)常用的機(jī)器學(xué)習(xí)算法的實(shí)現(xiàn)庫(kù)一喘,同時(shí)包括相關(guān)測(cè)試和數(shù)據(jù)生成器。
二.什么是機(jī)器學(xué)習(xí)嗜暴?
1.機(jī)器學(xué)習(xí)的定義凸克。
A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P,
if its performance at tasks in T, as measured by P, improves with experience E。
2.三個(gè)關(guān)鍵詞:算法闷沥、經(jīng)驗(yàn)萎战、模型評(píng)價(jià)
在數(shù)據(jù)的基礎(chǔ)上,通過(guò)算法構(gòu)建出模型舆逃,并進(jìn)行評(píng)價(jià)
如果達(dá)到要求蚂维,則用該模型測(cè)試其他數(shù)據(jù)
如果不達(dá)到要求,要調(diào)整算法來(lái)重新建立模型颖侄,再次進(jìn)行評(píng)估
循環(huán)往復(fù),知道獲得滿意的經(jīng)驗(yàn)
3.應(yīng)用:金融反欺詐享郊、語(yǔ)音識(shí)別览祖、自然語(yǔ)言處理、翻譯炊琉、模式識(shí)別展蒂、智能控制等等
4.基于大數(shù)據(jù)的機(jī)器學(xué)習(xí)
(1)傳統(tǒng)的機(jī)器學(xué)習(xí)算法,由于技術(shù)和單機(jī)存儲(chǔ)的現(xiàn)值苔咪,只能在少量數(shù)據(jù)上使用锰悼。即,依賴于數(shù)據(jù)抽樣团赏。
(2)傳統(tǒng)的機(jī)器學(xué)習(xí)存在的問(wèn)題:很難做好隨機(jī)箕般,導(dǎo)致學(xué)習(xí)的模型不準(zhǔn)確。
(3)在大數(shù)據(jù)上進(jìn)行機(jī)器學(xué)習(xí)舔清,直接處理全量數(shù)據(jù)并進(jìn)行大量迭代計(jì)算丝里。
(4)Spark本身計(jì)算優(yōu)勢(shì)曲初,適合機(jī)器學(xué)習(xí)。
(5)另外 spark-shell pyspark 都可以提供及時(shí)查詢工具
5.MLlib
MLlib是Spark機(jī)器學(xué)習(xí)庫(kù)杯聚,簡(jiǎn)化機(jī)器學(xué)習(xí)的工程實(shí)踐工作臼婆,方便擴(kuò)展到更大規(guī)模。
集成了通用的學(xué)習(xí)算法:分類幌绍、回歸颁褂、聚類、協(xié)同過(guò)濾傀广、降維等等
另外颁独,MLlib本身在Spark中,數(shù)據(jù)清洗主儡、SQL奖唯、建模放在一起。
三糜值、線性回歸
四丰捷、余弦相似性
https://blog.csdn.net/u012160689/article/details/15341303
Spark Graphx
一.Spark Graphx 是什么?
1.是Spark 的一個(gè)模塊寂汇,主要用于進(jìn)行以圖為核心的計(jì)算病往,還有分布式圖計(jì)算
2.Graphx 底層基于RDD計(jì)算,和RDD共用一種存儲(chǔ)形態(tài)骄瓣。在展示形態(tài)上停巷,可以用數(shù)據(jù)集來(lái)表示,也可以用圖來(lái)表示榕栏。
二.Spark GraphX 有哪些抽象畔勤?
1.頂點(diǎn)
RDD[(VertexId,VD)]表示
VertexId 代表了頂點(diǎn)的ID,是Long類型
VD 是頂點(diǎn)的屬性扒磁,可以是任何類型
2.邊
RDD[Edge[ED]]表示
Edge表示一個(gè)邊
包含一個(gè)ED類型參數(shù)來(lái)設(shè)定屬性
另外庆揪,邊還包含了源頂點(diǎn)ID和目標(biāo)頂點(diǎn)ID
3.三元組
三元組結(jié)構(gòu)用RDD[EdgeTriplet[VD,ED]]表示
三元組包含一個(gè)邊、邊的屬性妨托、源頂點(diǎn)ID缸榛、源頂點(diǎn)屬性、目標(biāo)頂點(diǎn)ID兰伤、目標(biāo)頂點(diǎn)屬性内颗。
4.圖
Graph表示,通過(guò)頂點(diǎn)和邊來(lái)構(gòu)建敦腔。