Spark
Spark 背景
什么是 Spark
官網(wǎng):http://spark.apache.org
Spark是一種快速己沛、通用于宙、可擴(kuò)展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開(kāi)源,2013年6月成為Apache孵化項(xiàng)目魄咕,2014年2月成為Apache頂級(jí)項(xiàng)目。目前蚌父,Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目的集合哮兰,其中包含SparkSQL、Spark Streaming苟弛、GraphX喝滞、MLlib等子項(xiàng)目,Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架膏秫。Spark基于內(nèi)存計(jì)算右遭,提高了在大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理的實(shí)時(shí)性,同時(shí)保證了高容錯(cuò)性和高可伸縮性缤削,允許用戶(hù)將Spark部署在大量廉價(jià)硬件之上窘哈,形成集群。
Spark與Hadoop
Spark是一個(gè)計(jì)算框架,而Hadoop中包含計(jì)算框架MapReduce和分布式文件系統(tǒng)HDFS,Hadoop更廣泛地說(shuō)還包括在其生態(tài)系統(tǒng)上的其他系統(tǒng).
為什么使用Spark?
Hadoop的MapReduce計(jì)算模型存在問(wèn)題:
Hadoop的MapReduce的核心是Shuffle(洗牌).在整個(gè)Shuffle的過(guò)程中,至少產(chǎn)生6次I/O流.基于MapReduce計(jì)算引擎通常會(huì)將結(jié)果輸出到次盤(pán)上,進(jìn)行存儲(chǔ)和容錯(cuò).另外,當(dāng)一些查詢(xún)(如:hive)翻譯到MapReduce任務(wù)是,往往會(huì)產(chǎn)生多個(gè)Stage,而這些Stage有依賴(lài)底層文件系統(tǒng)來(lái)存儲(chǔ)每一個(gè)Stage的輸出結(jié)果,而I/O的效率往往較低,從而影響MapReduce的運(yùn)行速度.
Spark的特點(diǎn)
快
與Hadoop的MapReduce相比亭敢,Spark基于內(nèi)存的運(yùn)算要快100倍以上宵距,基于硬盤(pán)的運(yùn)算也要快10倍以上。Spark實(shí)現(xiàn)了高效的DAG執(zhí)行引擎吨拗,可以通過(guò)基于內(nèi)存來(lái)高效處理數(shù)據(jù)流满哪。
易用
Spark支持Java、Python和Scala的API劝篷,還支持超過(guò)80種高級(jí)算法哨鸭,使用戶(hù)可以快速構(gòu)建不同的應(yīng)用。而且Spark支持交互式的Python和Scala的shell娇妓,可以非常方便地在這些shell中使用Spark集群來(lái)驗(yàn)證解決問(wèn)題的方法像鸡。
通用
Spark提供了統(tǒng)一的解決方案。Spark可以用于批處理、交互式查詢(xún)(Spark SQL)只估、實(shí)時(shí)流處理(Spark Streaming)志群、機(jī)器學(xué)習(xí)(Spark MLlib)和圖計(jì)算(GraphX)。這些不同類(lèi)型的處理都可以在同一個(gè)應(yīng)用中無(wú)縫使用蛔钙。Spark統(tǒng)一的解決方案非常具有吸引力锌云,畢竟任何公司都想用統(tǒng)一的平臺(tái)去處理遇到的問(wèn)題,減少開(kāi)發(fā)和維護(hù)的人力成本和部署平臺(tái)的物力成本吁脱。
兼容性
Spark 可以非常方便地與其他的開(kāi)源產(chǎn)品進(jìn)行融合桑涎。比如,Spark 可以使用Hadoop 的 YARN 和 Apache Mesos 作為它的資源管理和調(diào)度器.并且可以處理所有 Hadoop 支持的數(shù)據(jù)兼贡,包括 HDFS攻冷、HBase 和 Cassandra 等。這對(duì)于已經(jīng)部署Hadoop 集群的用戶(hù)特別重要遍希,因?yàn)椴恍枰鋈魏螖?shù)據(jù)遷移就可以使用 Spark 的強(qiáng)大處理能力等曼。Spark 也可以不依賴(lài)于第三方的資源管理和調(diào)度器,它實(shí)現(xiàn)了Standalone 作為其內(nèi)置的資源管理和調(diào)度框架凿蒜,這樣進(jìn)一步降低了 Spark 的使用門(mén)檻涉兽,使得所有人都可以非常容易地部署和使用 Spark。此外篙程,Spark 還提供了在EC2 上部Standalone 的 Spark 集群的工具。
Spark的生態(tài)系統(tǒng)
- Spark Core:
實(shí)現(xiàn)了 Spark 的基本功能别厘,包含任務(wù)調(diào)度虱饿、內(nèi)存管理、錯(cuò)誤恢復(fù)触趴、與存儲(chǔ)系統(tǒng) 交互等模塊氮发。Spark Core 中還包含了對(duì)彈性分布式數(shù)據(jù)集(resilient distributed dataset,簡(jiǎn)稱(chēng)RDD)的 API 定義冗懦。
- Spark Streaming:
Spark Streaming基于微批量方式的計(jì)算和處理,可以用于處理實(shí)時(shí)的流數(shù)據(jù).它使用DStream,簡(jiǎn)單來(lái)說(shuō)是一個(gè)彈性分布式數(shù)據(jù)集(RDD)系列,處理實(shí)時(shí)數(shù)據(jù).數(shù)據(jù)可以從Kafka,Flume,Kinesis或TCP套接字等眾多來(lái)源獲取,并且可以使用由高級(jí)函數(shù)(如 map爽冕,reduce,join 和 window)開(kāi)發(fā)的復(fù)雜算法進(jìn)行流數(shù)據(jù)處理披蕉。最后颈畸,處理后的數(shù)據(jù)可以被推送到文件系統(tǒng),數(shù)據(jù)庫(kù)和實(shí)時(shí)儀表板没讲。
- Spark SQL
SPark SQL可以通過(guò)JDBC API將Spark數(shù)據(jù)集暴露出去,而且還可以用傳統(tǒng)的BI和可視化工具在Spark數(shù)據(jù)上執(zhí)行類(lèi)似SQL的查詢(xún),用戶(hù)哈可以用Spark SQL對(duì)不同格式的數(shù)據(jù)(如Json, Parque以及數(shù)據(jù)庫(kù)等)執(zhí)行ETl,將其轉(zhuǎn)化,然后暴露特定的查詢(xún).
- Spark MLlib
MLlib是一個(gè)可擴(kuò)展的Spark機(jī)器學(xué)習(xí)庫(kù)眯娱,由通用的學(xué)習(xí)算法和工具組成,包括二元分類(lèi)爬凑、線(xiàn)性回歸徙缴、聚類(lèi)、協(xié)同過(guò)濾嘁信、梯度下降以及底層優(yōu)化原語(yǔ)于样。
- Spark Graphx:
GraphX是用于圖計(jì)算和并行圖計(jì)算的新的(alpha)Spark API疏叨。通過(guò)引入彈性分布式屬性圖(Resilient Distributed Property Graph),一種頂點(diǎn)和邊都帶有屬性的有向多重圖穿剖,擴(kuò)展了Spark RDD蚤蔓。為了支持圖計(jì)算,GraphX暴露了一個(gè)基礎(chǔ)操作符集合(如subgraph携御,joinVertices和aggregateMessages)和一個(gè)經(jīng)過(guò)優(yōu)化的Pregel API變體昌粤。此外,GraphX還包括一個(gè)持續(xù)增長(zhǎng)的用于簡(jiǎn)化圖分析任務(wù)的圖算法和構(gòu)建器集合啄刹。
- 集群管理器:
Spark 設(shè)計(jì)為可以高效地在一個(gè)計(jì)算節(jié)點(diǎn)到數(shù)千個(gè)計(jì)算節(jié)點(diǎn)之間伸縮計(jì) 算涮坐。為了實(shí)現(xiàn)這樣的要求,同時(shí)獲得最大靈活性誓军,Spark 支持在各種集群管理器(cluster manager)上運(yùn)行袱讹,包括 Hadoop YARN、Apache Mesos昵时,以及 Spark 自帶的一個(gè)簡(jiǎn)易調(diào)度 器捷雕,叫作獨(dú)立調(diào)度器。
Spark得到了眾多大數(shù)據(jù)公司的支持壹甥,這些公司包括Hortonworks救巷、IBM、Intel句柠、Cloudera浦译、MapR、Pivotal溯职、百度精盅、阿里、騰訊谜酒、京東叹俏、攜程、優(yōu)酷土豆僻族。當(dāng)前百度的Spark已應(yīng)用于鳳巢粘驰、大搜索、直達(dá)號(hào)述么、百度大數(shù)據(jù)等業(yè)務(wù)晴氨;阿里利用GraphX構(gòu)建了大規(guī)模的圖計(jì)算和圖挖掘系統(tǒng),實(shí)現(xiàn)了很多生產(chǎn)系統(tǒng)的推薦算法碉输;騰訊Spark集群達(dá)到8000臺(tái)的規(guī)模籽前,是當(dāng)前已知的世界上最大的Spark集群。
Spark 的用戶(hù)和用途
我們大致把Spark的用例分為兩類(lèi):數(shù)據(jù)科學(xué)應(yīng)用和數(shù)據(jù)處理應(yīng)用。也就對(duì)應(yīng)的有兩種人群:數(shù)據(jù)科學(xué)家和工程師枝哄。
數(shù)據(jù)科學(xué)任務(wù)
主要是數(shù)據(jù)分析領(lǐng)域肄梨,數(shù)據(jù)科學(xué)家要負(fù)責(zé)分析數(shù)據(jù)并建模,具備 SQL挠锥、統(tǒng)計(jì)众羡、預(yù)測(cè)建模(機(jī)器學(xué)習(xí))等方面的經(jīng)驗(yàn),以及一定的使用 Python蓖租、 Matlab 或 R 語(yǔ)言進(jìn)行編程的能力粱侣。
數(shù)據(jù)處理應(yīng)用
工程師定義為使用 Spark 開(kāi)發(fā) 生產(chǎn)環(huán)境中的數(shù)據(jù)處理應(yīng)用的軟件開(kāi)發(fā)者,通過(guò)對(duì)接Spark的API實(shí)現(xiàn)對(duì)處理的處理和轉(zhuǎn)換等任務(wù)蓖宦。
Spark架構(gòu)中的基本組件:
Driver:運(yùn)行Application的main() 函數(shù)并創(chuàng)建SparkContext
Worker:從節(jié)點(diǎn),負(fù)責(zé)控制計(jì)算節(jié)點(diǎn),啟動(dòng)Ex而粗投入或Driver
SparkContext: 整個(gè)應(yīng)用的上下文,監(jiān)控應(yīng)用的生命周期
SparkConf:負(fù)責(zé)存儲(chǔ)配置信息齐婴。
Executor: 執(zhí)行器,在worker node上執(zhí)行任務(wù)組件,用于啟動(dòng)線(xiàn)程執(zhí)行任務(wù).每個(gè)Application擁有獨(dú)立的一組Executors
ClusterManager:在standlone模式中即為Master(主節(jié)點(diǎn)),控制整個(gè)集群.監(jiān)控Worker.在Yarn模式中為資源管理器.
RDD:彈性分布式集合,spark的基本計(jì)算單元,一組RDD可形成執(zhí)行的有向無(wú)環(huán)圖RDD Graph
DAG Scheduler: 根據(jù)作業(yè)(Job)構(gòu)建基于Stage的DAG,并交給Stage給TaskScheduler
TaskScheduler:將任務(wù)(Task)分發(fā)給Executor執(zhí)行
SparkEnv:線(xiàn)程級(jí)別的上下文稠茂,存儲(chǔ)運(yùn)行時(shí)的重要組件的引用柠偶。SparkEnv內(nèi)創(chuàng)建并包含如下一些重要組件的引用。
MapOutPutTracker:負(fù)責(zé)Shuffle元信息的存儲(chǔ)睬关。
BroadcastManager:負(fù)責(zé)廣播變量的控制與元信息的存儲(chǔ)诱担。
BlockManager:負(fù)責(zé)存儲(chǔ)管理、創(chuàng)建和查找塊电爹。
MetricsSystem:監(jiān)控運(yùn)行時(shí)性能指標(biāo)信息蔫仙。
Spark的整體流程:client提交應(yīng)用,Master找到一個(gè)Worker啟動(dòng)Driver,Driver向Master或者向資源管理器申請(qǐng)資源,之后將應(yīng)用轉(zhuǎn)化為RDD Graph,再由DAGScheduler將RDD Graph轉(zhuǎn)化為Stage的有向無(wú)環(huán)圖提交給TaskScheduler丐箩,由TaskScheduler提交任務(wù)給Executor執(zhí)行摇邦。在任務(wù)執(zhí)行的過(guò)程中,其他組件協(xié)同工作雏蛮,確保整個(gè)應(yīng)用順利執(zhí)行。
搭建 Spark 集群
Spark的部署模式有Local阱州、Local-Cluster挑秉、Standalone、Yarn苔货、Mesos犀概,我們選擇最具代表性的Standalone集群部署模式。安裝java環(huán)境,Spark自動(dòng)會(huì)把scala SDK打包到Spark中無(wú)需安裝scala環(huán)境
環(huán)境
linux: CentOS-7.5_x64
hadoop: hadoop-3.2.0
spark: spark-2.3.3
zookeeper: zookeeper-3.4.10
機(jī)器規(guī)劃
主機(jī)名 | IP | 安裝軟件 | 運(yùn)行進(jìn)程 |
---|---|---|---|
node-1 | 192.168.91.11 | spark | Master |
node-2 | 192.168.91.12 | spark,zookeeper | Worker,QuorumPeerMain |
node-3 | 192.168.91.13 | spark,zookeeper | Worker,QuorumPeerMain |
node-4 | 192.168.91.14 | spark,zookeeper | Worker,QuorumPeerMain |
配置 Spark 環(huán)境
# 下載對(duì)應(yīng)的Spark安裝包
$ wget http://mirrors.hust.edu.cn/apache/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
# 解壓縮
$ tar -zxvf spark-2.3.3-bin-hadoop2.7.tgz
# 進(jìn)入spark解壓目錄
$ cd $SPARK_HOME
# 修改Spark的環(huán)境配置文件
$ cp conf/spark-env.sh.template spark-env.sh
$ vim conf/spark-env.sh
# 添加如下配置
export JAVA_HOME=/usr/java/jdk1.8.0_191
# 修改slave的配置
$ cp $SPARK_HOME/conf/slaves.template slaves
$ vi slaves
# 在該文件中添加子節(jié)點(diǎn)所在的位置(Worker節(jié)點(diǎn))
node-2
node-3
node-4
# 將配置好的spark 復(fù)制到其他機(jī)器上(node-2,node-3,node-4)
$ scp -r spark-2.3.2-bin-hadoop2.7 root@node-2:/xxx/xxx
# 啟動(dòng)spark集群
$ sbin/start-master.sh
$ sbin/start-slaves.sh
# 也可以是用這個(gè)腳本啟動(dòng)所有機(jī)器
$ sbin/start-all.sh
啟動(dòng)后執(zhí)行jps命令夜惭,主節(jié)點(diǎn)上有Master進(jìn)程姻灶,其他子節(jié)點(diǎn)上有Work進(jìn)行,登錄Spark管理界面查看集群狀態(tài)(主節(jié)點(diǎn)):http://node-1:8080/
Spark 集群 HA
機(jī)器規(guī)劃
主機(jī)名 | IP | 安裝軟件 | 運(yùn)行進(jìn)程 |
---|---|---|---|
node-1 | 192.168.91.11 | spark | Master |
node-2 | 192.168.91.12 | spark,zookeeper | Master,QuorumPeerMain |
node-3 | 192.168.91.13 | spark,zookeeper | Worker,QuorumPeerMain |
node-4 | 192.168.91.14 | spark,zookeeper | Worker,QuorumPeerMain |
1.安裝配置zk集群诈茧,并啟動(dòng)zk集群zookeeper安裝
2.修改spark的配置文件添加如下配置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node-2:2181,node-3:2181,node-4:2181 -Dspark.deploy.zookeeper.dir=/spark"
3.修改所有節(jié)點(diǎn)的slaves文件改為(node-3,node-4)節(jié)點(diǎn)
4.在node1上執(zhí)行 sbin/start-all.sh产喉,然后在 node-2 上啟動(dòng)第二個(gè) Master(sbin/start-master.sh )
執(zhí)行第一個(gè)spark程序
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master-ip:7077 --executor-memory 1G --total-executor-cores 2 $SPARK_HOME/examples/jars/spark-examples_2.11-2.3.3.jar 100
spark Shell
spark-shell是Spark自帶的交互式Shell程序,方便用戶(hù)進(jìn)行交互式編程,用戶(hù)可以在該命令行下用scala編寫(xiě)spark程序曾沈。
$SPARK_HOME/bin/spark-shell --master spark://node-1:7077 --executor-memory 2g --total-executor-cores 2
參數(shù)說(shuō)明:
# 指定Master的地址
--master spark://node-1:7077
# 指定每個(gè)worker可用內(nèi)存為2G
--executor-memory 2g
# 指定整個(gè)集群使用的cup核數(shù)為2個(gè)
--total-executor-cores 2
注意
如果啟動(dòng)spark shell時(shí)沒(méi)有指定master地址这嚣,但是也可以正常啟動(dòng)spark shell和執(zhí)行spark
shell中的程序,其實(shí)是啟動(dòng)了spark的local模式塞俱,該模式僅在本機(jī)啟動(dòng)一個(gè)進(jìn)程姐帚,沒(méi)有與集群建立聯(lián)系。
Spark Shell中已經(jīng)默認(rèn)將SparkContext類(lèi)初始化為對(duì)象sc障涯。用戶(hù)代碼如果需要用到罐旗,則直接應(yīng)用sc即可
spark shell中編寫(xiě)WordCount
在spark shell中用scala語(yǔ)言編寫(xiě)spark程序
# sc是SparkContext對(duì)象,該對(duì)象時(shí)提交spark程序的入口
sc.textFile("file:///root/data/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("file:///root/data/output1")
# 從本地文件系統(tǒng)中讀取數(shù)據(jù)
textFile("file:///root/data/words.txt")
# 讀取每一行數(shù)據(jù)并切分
flatMap(_.split(" "))
# 將數(shù)據(jù)切分映射將單詞和1構(gòu)成元組
map((_,1))
# 按照key進(jìn)行reduce唯蝶,并將value累加
reduceByKey(_+_)
# 將結(jié)果寫(xiě)入到指定位置
saveAsTextFile("file:///root/data/output1")