@[TOC]
一逮诲、概述
Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎。Spark是UC Berkeley AMP lab (加州大學(xué)伯克利分校的AMP實(shí)驗(yàn)室)所開(kāi)源的類Hadoop MapReduce的通用并行框架幽告,Spark梅鹦,擁有Hadoop MapReduce所具有的優(yōu)點(diǎn);但不同于MapReduce的是——Job中間輸出結(jié)果可以保存在內(nèi)存中评腺,從而不再需要讀寫HDFS帘瞭,因此Spark能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce的算法。官方地址
1)Spark特點(diǎn)
- 高效性:不同于MapReduce將中間計(jì)算結(jié)果放入磁盤中蒿讥,Spark采用內(nèi)存存儲(chǔ)中間計(jì)算結(jié)果,減少了迭代運(yùn)算的磁盤IO,并通過(guò)并行計(jì)算DAG圖的優(yōu)化芋绸,減少了不同任務(wù)之間的依賴媒殉,降低了延遲等待時(shí)間。內(nèi)存計(jì)算下摔敛,Spark 比 MapReduce 快100倍廷蓉。
- 通用性:Spark提供了統(tǒng)一的解決方案。Spark可以用于批處理马昙、交互式查詢(Spark SQL)桃犬、實(shí)時(shí)流處理(Spark Streaming)、機(jī)器學(xué)習(xí)(Spark MLlib)和圖計(jì)算(GraphX)行楞。
- 易用性:不同于MapReduce僅支持Map和Reduce兩種編程算子攒暇,Spark提供了超過(guò)80種不同的Transformation和Action算子,如map,reduce,filter,groupByKey,sortByKey,foreach等子房,并且采用函數(shù)式編程風(fēng)格形用,實(shí)現(xiàn)相同的功能需要的代碼量極大縮小。
- 兼容性:Spark能夠跟很多開(kāi)源工程兼容使用证杭。如Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調(diào)度器田度,并且Spark可以讀取多種數(shù)據(jù)源,如HDFS解愤、HBase镇饺、MySQL等。
- 容錯(cuò)性高:Spark引進(jìn)了彈性分布式數(shù)據(jù)集RDD (Resilient Distributed Dataset) 的抽象送讲,它是分布在一組節(jié)點(diǎn)中的只讀對(duì)象集合奸笤,這些集合是彈性的,如果數(shù)據(jù)集一部分丟失李茫,則可以根據(jù)“血統(tǒng)”(即充許基于數(shù)據(jù)衍生過(guò)程)對(duì)它們進(jìn)行重建揭保。另外在RDD計(jì)算時(shí)可以通過(guò)CheckPoint來(lái)實(shí)現(xiàn)容錯(cuò),而CheckPoint有兩種方式:CheckPoint Data魄宏,和Logging The Updates秸侣,用戶可以控制采用哪種方式來(lái)實(shí)現(xiàn)容錯(cuò)。
- 適用場(chǎng)景廣泛:大數(shù)據(jù)分析統(tǒng)計(jì)宠互,實(shí)時(shí)數(shù)據(jù)處理味榛,圖計(jì)算及機(jī)器學(xué)習(xí)。
2)Spark適用場(chǎng)景
- 復(fù)雜的批量處理(Batch Data Processing)予跌,偏重點(diǎn)在于處理海量數(shù)據(jù)的能力搏色,至于處理速度可忍受,通常的時(shí)間可能是在數(shù)十分鐘到數(shù)小時(shí)券册。
- 基于歷史數(shù)據(jù)的交互式查詢(Interactive Query)频轿,通常的時(shí)間在數(shù)十秒到數(shù)十分鐘之間垂涯。
- 基于實(shí)時(shí)數(shù)據(jù)流的數(shù)據(jù)處理(Streaming Data Processing),通常在數(shù)百毫秒到數(shù)秒之間航邢。
二叼丑、Spark核心組件
- Spark Core:包含Spark的基本功能误墓;尤其是定義RDD的API、操作以及這兩者上的動(dòng)作。其他Spark的庫(kù)都是構(gòu)建在RDD和Spark Core之上的室奏。
- Spark SQL:提供通過(guò)Apache Hive的SQL變體Hive查詢語(yǔ)言(HiveQL)與Spark進(jìn)行交互的API惑朦。每個(gè)數(shù)據(jù)庫(kù)表被當(dāng)做一個(gè)RDD佃却,Spark SQL查詢被轉(zhuǎn)換為Spark操作抖单。Spark提供的sql形式的對(duì)接Hive、JDBC勒极、HBase等各種數(shù)據(jù)渠道的API是掰,用Java開(kāi)發(fā)人員的思想來(lái)講就是面向接口、解耦合河质,ORMapping冀惭、Spring Cloud Stream等都是類似的思想。
- Spark Streaming:基于SparkCore實(shí)現(xiàn)的可擴(kuò)展掀鹅、高吞吐散休、高可靠性的實(shí)時(shí)數(shù)據(jù)流處理。支持從Kafka乐尊、Flume等數(shù)據(jù)源處理后存儲(chǔ)到HDFS戚丸、DataBase、Dashboard中扔嵌。對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行處理和控制限府。Spark Streaming允許程序能夠像普通RDD一樣處理實(shí)時(shí)數(shù)據(jù)。
- MLlib:一個(gè)常用機(jī)器學(xué)習(xí)算法庫(kù)痢缎,算法被實(shí)現(xiàn)為對(duì)RDD的Spark操作胁勺。這個(gè)庫(kù)包含可擴(kuò)展的學(xué)習(xí)算法,比如分類独旷、回歸等需要對(duì)大量數(shù)據(jù)集進(jìn)行迭代的操作署穗。
三、Spark專業(yè)術(shù)語(yǔ)詳解
1)Application:Spark應(yīng)用程序
指的是用戶編寫的Spark應(yīng)用程序嵌洼,包含了Driver功能代碼和分布在集群中多個(gè)節(jié)點(diǎn)上運(yùn)行的Executor代碼案疲。Spark應(yīng)用程序,由一個(gè)或多個(gè)作業(yè)JOB組成麻养,如下圖所示:
2)Driver:驅(qū)動(dòng)程序
Spark中的Driver即運(yùn)行上述Application的Main()函數(shù)并且創(chuàng)建SparkContext褐啡,其中創(chuàng)建SparkContext的目的是為了準(zhǔn)備Spark應(yīng)用程序的運(yùn)行環(huán)境。在Spark中由SparkContext負(fù)責(zé)和ClusterManager通信鳖昌,進(jìn)行資源的申請(qǐng)备畦、任務(wù)的分配和監(jiān)控等低飒;當(dāng)Executor部分運(yùn)行完畢后,Driver負(fù)責(zé)將SparkContext關(guān)閉萍恕。通常SparkContext代表Driver逸嘀,如下圖所示:
3)Cluster Manager:資源管理器
指的是在集群上獲取資源的外部服務(wù)车要,常用的有:Standalone允粤,Spark原生的資源管理器,由Master負(fù)責(zé)資源的分配翼岁;Haddop Yarn类垫,由Yarn中的ResearchManager負(fù)責(zé)資源的分配;Messos琅坡,由Messos中的Messos Master負(fù)責(zé)資源管理悉患。
4)Executor:執(zhí)行器
Application運(yùn)行在Worker節(jié)點(diǎn)上的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行Task榆俺,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上售躁,每個(gè)Application都有各自獨(dú)立的一批Executor,如下圖所示:
5)Worker:計(jì)算節(jié)點(diǎn)
集群中任何可以運(yùn)行Application代碼的節(jié)點(diǎn)茴晋,類似于Yarn中的NodeManager節(jié)點(diǎn)陪捷。在Standalone模式中指的就是通過(guò)Slave文件配置的Worker節(jié)點(diǎn),在Spark on Yarn模式中指的就是NodeManager節(jié)點(diǎn)诺擅,在Spark on Messos模式中指的就是Messos Slave節(jié)點(diǎn)市袖,如下圖所示:
6)RDD:彈性分布式數(shù)據(jù)集
Resillient Distributed Dataset,Spark的基本計(jì)算單元烁涌,可以通過(guò)一系列算子進(jìn)行操作(主要有Transformation和Action操作)苍碟,如下圖所示:
7)窄依賴
父RDD每一個(gè)分區(qū)最多被一個(gè)子RDD的分區(qū)所用;表現(xiàn)為一個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū)撮执,或兩個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD 的分區(qū)微峰。如圖所示:
8)寬依賴
父RDD的每個(gè)分區(qū)都可能被多個(gè)子RDD分區(qū)所使用,子RDD分區(qū)通常對(duì)應(yīng)所有的父RDD分區(qū)抒钱。如圖所示:
- 常見(jiàn)的窄依賴有:map蜓肆、filter、union继效、mapPartitions症杏、mapValues、join(父RDD是hash-partitioned :如果JoinAPI之前被調(diào)用的RDD API是寬依賴(存在shuffle), 而且兩個(gè)join的RDD的分區(qū)數(shù)量一致瑞信,join結(jié)果的rdd分區(qū)數(shù)量也一樣厉颤,這個(gè)時(shí)候join api是窄依賴)。
- 常見(jiàn)的寬依賴有g(shù)roupByKey凡简、partitionBy逼友、reduceByKey精肃、join(父RDD不是hash-partitioned :除此之外的,rdd 的join api是寬依賴)帜乞。
9)DAG:有向無(wú)環(huán)圖
Directed Acycle graph司抱,反應(yīng)RDD之間的依賴關(guān)系,如圖所示:
10)DAGScheduler:有向無(wú)環(huán)圖調(diào)度器
基于DAG劃分Stage 并以TaskSet的形勢(shì)提交Stage給TaskScheduler黎烈;負(fù)責(zé)將作業(yè)拆分成不同階段的具有依賴關(guān)系的多批任務(wù)习柠;最重要的任務(wù)之一就是:計(jì)算作業(yè)和任務(wù)的依賴關(guān)系,制定調(diào)度邏輯照棋。在SparkContext初始化的過(guò)程中被實(shí)例化资溃,一個(gè)SparkContext對(duì)應(yīng)創(chuàng)建一個(gè)DAGScheduler。如圖所示:
11)TaskScheduler:任務(wù)調(diào)度器
將Taskset提交給worker(集群)運(yùn)行并回報(bào)結(jié)果烈炭;負(fù)責(zé)每個(gè)具體任務(wù)的實(shí)際物理調(diào)度溶锭。如圖所示:
12)Job:作業(yè)
由一個(gè)或多個(gè)調(diào)度階段所組成的一次計(jì)算作業(yè);包含多個(gè)Task組成的并行計(jì)算符隙,往往由Spark Action催生趴捅,一個(gè)JOB包含多個(gè)RDD及作用于相應(yīng)RDD上的各種Operation。如圖所示:
13)Stage:調(diào)度階段
一個(gè)任務(wù)集對(duì)應(yīng)的調(diào)度階段霹疫;每個(gè)Job會(huì)被拆分很多組Task拱绑,每組任務(wù)被稱為Stage,也可稱TaskSet更米,一個(gè)作業(yè)分為多個(gè)階段欺栗;Stage分成兩種類型ShuffleMapStage、ResultStage征峦。如圖所示:
14)TaskSet:任務(wù)集
由一組關(guān)聯(lián)的迟几,但相互之間沒(méi)有Shuffle依賴關(guān)系的任務(wù)所組成的任務(wù)集。如圖所示:
15)Task:任務(wù)
被送到某個(gè)Executor上的工作任務(wù)栏笆;單個(gè)分區(qū)數(shù)據(jù)集上的最小處理流程單元类腮。如圖所示:
總體如圖所示:
四、Spark運(yùn)行基本流程
計(jì)算流程:
七蛉加,Spark支持的資源管理器
Spark與資源管理器無(wú)關(guān)蚜枢,只要能夠獲取executor進(jìn)程,并能保持相互通信就可以了针饥,Spark支持資源管理器包含: Standalone(Spark)厂抽、On Mesos、On YARN丁眼、Or On K8S筷凤,當(dāng)然還有l(wèi)ocal模式。
模式 | 含義 |
---|---|
local | 在本地運(yùn)行,只有一個(gè)工作進(jìn)程藐守,無(wú)并行計(jì)算能力 |
local[K] | 在本地運(yùn)行挪丢,有 K 個(gè)工作進(jìn)程,通常設(shè)置 K 為機(jī)器的CPU 核心數(shù)量 |
local[*] | 在本地運(yùn)行卢厂,工作進(jìn)程數(shù)量等于機(jī)器的 CPU 核心數(shù)量乾蓬。 |
spark://HOST:PORT | 以 Standalone 模式運(yùn)行,這是 Spark 自身提供的集群運(yùn)行模式慎恒,默認(rèn)端口號(hào): 7077 |
mesos://HOST:PORT | 在 Mesos 集群上運(yùn)行任内,Driver 進(jìn)程和 Worker 進(jìn)程運(yùn)行在 Mesos 集群上,部署模式必須使用固定值:--deploy-mode cluster |
yarn | 在yarn集群上運(yùn)行巧号,依賴于hadoop集群族奢,yarn資源調(diào)度框架,將應(yīng)用提交給yarn丹鸿,在ApplactionMaster(相當(dāng)于Stand alone模式中的Master)中運(yùn)行driver,在集群上調(diào)度資源棚品,開(kāi)啟excutor執(zhí)行任務(wù)靠欢。 |
k8s | 在k8s集群上運(yùn)行 |
七、Spark環(huán)境搭建(Spark on Yarn)
1)下載
Spark下載地址:http://spark.apache.org/downloads.html
這里需要注意版本铜跑,我的hadoop版本是3.3.1门怪,這里spark就下載最新版本的3.2.0,而Spark3.2.0依賴的Scala的2.13锅纺,所以后面用到Scala編程時(shí)注意Scala的版本掷空。
$ cd /opt/bigdata/hadoop/software
# 下載
$ wget https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
# 解壓
$ tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz -C /opt/bigdata/hadoop/server/
2)修改配置文件
# 進(jìn)入spark配置目錄
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf
# copy 一個(gè)模板配置
$ cp spark-env.sh.template spark-env.sh
在spark-env.sh下加入如下配置
# Hadoop 的配置文件目錄
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
# YARN 的配置文件目錄
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
# SPARK 的目錄
export SPARK_HOME=/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
# SPARK 執(zhí)行文件目錄
export PATH=$SPARK_HOME/bin:$PATH
復(fù)制/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 到其它節(jié)點(diǎn)
$ scp -r /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 hadoop-node3:/opt/bigdata/hadoop/server/
3)配置環(huán)境變量
在/etc/profile文件中追加如下內(nèi)容:
export SPARK_HOME=/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
export PATH=$SPARK_HOME/bin:$PATH
source 加載生效
$ source /etc/profile
4)運(yùn)行SparkPi(圓周率) 測(cè)試驗(yàn)證
spark-submit 詳細(xì)參數(shù)說(shuō)明
參數(shù)名 | 參數(shù)說(shuō)明 |
---|---|
--master | master 的地址,提交任務(wù)到哪里執(zhí)行囤锉,例如 spark://host:port, yarn, local |
--deploy-mode | 在本地 (client) 啟動(dòng) driver 或在 cluster 上啟動(dòng)坦弟,默認(rèn)是 client |
--class | 應(yīng)用程序的主類,僅針對(duì) java 或 scala 應(yīng)用 |
--name | 應(yīng)用程序的名稱 |
--jars | 用逗號(hào)分隔的本地 jar 包官地,設(shè)置后酿傍,這些 jar 將包含在 driver 和 executor 的 classpath 下 |
--packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐標(biāo) |
--exclude-packages | 為了避免沖突 而指定不包含的 package |
--repositories | 遠(yuǎn)程 repository |
--conf PROP=VALUE | 指定 spark 配置屬性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" |
--properties-file | 加載的配置文件驱入,默認(rèn)為 conf/spark-defaults.conf |
--driver-memory | Driver內(nèi)存赤炒,默認(rèn) 1G |
--driver-java-options | 傳給 driver 的額外的 Java 選項(xiàng) |
--driver-library-path | 傳給 driver 的額外的庫(kù)路徑 |
--driver-class-path | 傳給 driver 的額外的類路徑 |
--driver-cores | Driver 的核數(shù),默認(rèn)是1亏较。在 yarn 或者 standalone 下使用 |
--executor-memory | 每個(gè) executor 的內(nèi)存莺褒,默認(rèn)是1G |
--total-executor-cores | 所有 executor 總共的核數(shù)。僅僅在 mesos 或者 standalone 下使用 |
--num-executors | 啟動(dòng)的 executor 數(shù)量雪情。默認(rèn)為2遵岩。在 yarn 下使用 |
--executor-core | 每個(gè) executor 的核數(shù)。在yarn或者standalone下使用 |
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--num-executors 3 \
--executor-memory 1G \
--executor-cores 1 \
/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.0.jar 100
如果看到控制臺(tái)出現(xiàn)這個(gè)旺罢,說(shuō)明運(yùn)行成功旷余。
查看yarn任務(wù)
查看任務(wù)日志
【注意】默認(rèn)情況下绢记,Hadoop歷史服務(wù)historyserver是沒(méi)有啟動(dòng)的,我們可以通過(guò)下面的命令來(lái)啟動(dòng)Hadoop歷史服務(wù)器正卧。查看日志依賴于historyserver服務(wù)
#啟動(dòng)JobHistoryServer服務(wù)
$ mapred --daemon start historyserver
#查看進(jìn)程
$ jps
#停止JobHistoryServer服務(wù)
$ mapred --daemon stop historyserver
至此已經(jīng)完成的Spark on Yarn 的環(huán)境搭建蠢熄,并通過(guò)測(cè)試SparkPi的運(yùn)行成功了。