前言
內容:
Spark Core -- 離線處理
Spark SQL -- 離線處理刺桃、交互
Spark Streaming -- 實時
Spark Graphx -- 圖處理
Spark原理
MapReduce粹淋、Spark、Flink(實時)=> 3代計算引擎瑟慈,昨天桃移、今天、未來
MapReduce葛碧、Spark:類MR引擎借杰,底層原理非常相似,數據分區(qū)进泼、MapTask蔗衡、ReduceTask、Shuffle
Spark Core
Spark概述
什么是Spark
Spark是一個快速乳绕、通用的計算引擎绞惦,特點:
-
速度快
與MR相比,Spark基于內存的運算要快100倍洋措,基于硬盤的運算也要快10倍以上济蝉,Spark實現了高效的DAG執(zhí)行引擎,可以通過基于內存來高效處理數據
-
使用簡單
支持多種語言,支持超過80中高級算法王滤,還支持交互式的Python和Scala的shell贺嫂,可以方便的在這些shell中使用Spark集群來驗證解決問題
-
通用
SparK提供了統一的解決方案,Spark可以用于批處理淑仆、交互式查詢(Spark SQL)實時流處理(Spark Streaming)涝婉、機器學習(Spark MLib)和圖計算(Graphx)。
-
兼容好
可以非常方便的與其他開源產品進行融合蔗怠,可以使用YARN墩弯、Mesos作為他的資源管理和調度,可以處理所以Hadoop支持的數據等寞射。Spark也可以不依賴第三方資源管理和調度器渔工,它實現了Standalone作為內置的資源管理和調度框架
Spark與Hadoop
從狹義的角度上看:Hadoop是分布式框架。有存儲桥温、計算引矩、資源調度三個部分組成
Spark是一個分布式計算引擎,由Scala語言編寫的計算框架侵浸,基于內存的快速旺韭、通用、可擴展的大數據分析引擎
從廣義上來看掏觉,Spark是Hadoop生態(tài)不可或缺的一部分
MapReduce的不足:
- 表達能力有限
- 磁盤IO開銷大
- 延遲高
- 任務之間銜接由IO開銷
-
前一個任務未完成区端,后一個任務無法開始,難以勝任復雜的澳腹、多階段的計算任務
Spark在借鑒的MR的優(yōu)點的同時织盼,很好的解決了MR面臨的問題
MR | Spark |
---|---|
數據存儲結構:磁盤HDFS文件系統的Split | 使用內存構建彈性分布式數據集RDD對數據進行運算和cache |
編程范式:Map+Reduce僅提供兩個操作,表達力欠缺 | 提供了豐富的操作酱塔,是數據處理邏輯的代碼非常簡短 |
計算中間結果需要落到磁盤沥邻,IO及序列化、反序列化代價大 | 計算中間結果在內存中羊娃,維護存取速度比磁盤高幾個數量級 |
Task以進程的方式維護唐全,需要數秒時間才能啟動 | Task以線程的方式維護,對于小數據集讀取能夠達到亞秒級的延遲 |
備注:Spark計算模式也屬于MR蕊玷,Spark是對MR框架的優(yōu)化
在實際應用中邮利,大數據應用有以下三種類型
- 批量處理(離線處理):通常時間跨度在數十分鐘到數小時之間
- 交互式處理:通常時間跨度在數十秒到十分鐘之間
- 流處理(實時處理):通常時間跨度在數百毫秒到數秒之間
當同時存在以上三種情況的時候,Hadoop框架需要同時部署多種不同的軟件:MR/Hive或Impala/Storm集畅。這樣做難免會帶來一些問題:
- 不同場景之間輸入輸出數據無法做到無縫共享近弟,通常需要進行數據轉換
- 不同軟件需要不同的開發(fā)和維護團隊,帶來較高的使用成本
- 比較難以對同一個集群中的各個系統進行統一的資源協調和分配
Spark所提供的生態(tài)系統足以解決上述三種場景挺智,即同時支持批處理祷愉、交互式查詢和流數據處理
- Spark的設計遵循“一個軟件棧滿足不同的應用場景”的理念(all in one),逐漸形成了一套完整的生態(tài)系統二鳄。
- 即能夠在提供內存計算框架订讼,也可以支持SQL即席查詢髓窜、實時流式計算欺殿、機器學習和圖計算
- Spark可以部署在資源管理器YARN之上,提供一站式的大數據解決方案脖苏。
Spark為什么比MR快
-
Spark積極使用內存程拭。MR框架中的一個Job只能擁有一個MapTask任務和一個ReduceTask任務棍潘,如果業(yè)務復雜亦歉,一個Job就表達不出來,需要多個Job組合起來水由,然后前一個Job需要寫到HDFS中绷杜,才能交給后面的Job,在MR中圾结,這樣的運算涉及到多次寫入和讀取操作筝野,Spark框架則可以把多個MapTask和ReduceTask組合在一起連續(xù)執(zhí)行歇竟,中間的計算結果不需要落地
復雜的MR任務:mr + mr + mr+mr ....
復雜的Spark任務: mr -> mr -> mr ....
多進程模型(MR)vs多線程模型(Spark)焕议,MR框架中的MapTask和ReduceTask是進程級別的宝磨,都是JVM進程唤锉,每次啟動都需要重新申請資源消耗了不必要的時間,而SparkTask是基于線程模型株憾,通過復用線程池的線程來減少啟動嗤瞎、關閉Task所需要的系統開銷
系統架構
Spark 運行架構包括:
- Cluster Manager:是集群資源的管理者猫胁。Spark支持3種集群模式(Standalone弃秆、Yarn菠赚、Mesos)
- Worker Node:工作節(jié)點衡查,管理本地資源
- Driver Program:運行應用的main()方法并且創(chuàng)建了SparkContext拌牲。由ClusterManager分配資源歌粥,SparkContext發(fā)送Task到Executor上執(zhí)行
-
Executor:在工作節(jié)點上運行失驶,執(zhí)行Driver發(fā)送的Task嬉探,并向Dirver匯報計算結果
Spark集群部署模式
Spark支持3種不是模式:Standalone眷蜓、Yarn胎围、Mesos
Standalone模式
- 獨立模式,自帶完整的服務氏捞,可單獨部署到一個集群中液茎,無需依賴任何其他資源管理系統捆等,從一定程度上說栋烤,該模式是其他兩種的基礎
- Cluster Manager:Master
- Worker Node:Worker
- 僅支持粗粒度的資源分配方式
Spark on Yarn 模式
- Yarn擁有強大的社區(qū)支持明郭,且逐步已經成為大數據集群資源管理系統的標準
- 在國內生產環(huán)境運用最廣泛的部署模式
- Spark on Yarn的支持兩種模式
- Yarn-Cluster:適用于生產環(huán)境
- Yarn-Client:適用于交互薯定、調試话侄,希望立即看到App的輸出
- Cluster Manager:ResourceManager
- Worker Node:NodeManager
- 僅支持粗粒度的資源分配方式
Spark in Mesos模式
- 官方推薦的模式年堆。
- 運行在Mesos比運行在Yarn上更靈活盏浇、更自然
- Cluster Manager:Mesos Master
- Worker Node:Mesos Slave
- 支持粗粒度缠捌、細粒度的資源分配方式
總結
粗粒度模式(Coarse-grained Mode):每個應用程序的運行環(huán)境由一個Driver和多個Executor組成曼月,每個Executor占用若干資源哑芹,內部可運行多個Task聪姿。程序正式運行之前末购,需要將所有的資源申請好盟榴,且運行的工程中一直占有擎场,即使不使用迅办,也是等到任務結束在回收資源
細粒度模式(Fine-grained Mode):鑒于粗粒度模式會造成大量資源浪費站欺,Spark On Mesos還提供了另外一種調度模式:細粒度模式矾策,這種模式類似于現在的云計算蝴韭,核心思想就是按需分配榄鉴。
三種集群部署模式如何選擇:
- 生產環(huán)境選擇Yarn庆尘,國內使用最廣的模式
- Spark的初學者驶忌,Standalone付魔,簡單
- 開發(fā)測試環(huán)境几苍,可選擇Standalone
- 數據量不會太大妻坝、應用不是太復雜,建議可以從Standalone模式開始
- mesos不會設計
相關術語
-
Application
用戶提交的Spark應用程序界酒,由集群中的一個Driver和許多executor組成
-
Application Jar
一個包含Spark應用程序的Jar盾计,Jar不應該包含Spark或Hadoop的jar署辉,這些jar應該在運行中添加
-
Driver Program
運行應用程序的main()哭尝,并創(chuàng)建SparkContext(Spark應用程序)
-
Cluster Manager
管理集群資源的服務材鹦,如Standalone桶唐,Mesos尤泽,Yarn
-
Deploy Mode
區(qū)分Driver進程在何處運行坯约,在Cluster模式下闹丐,在集群內部運行Driver卿拴。在Client模式下堕花,Driver在集群外部運行航徙。
-
Worker node
運行應用程序的工作節(jié)點
-
Executor
運行應用程序Task和保存數據到踏,每個應用程序都有自己的Executors窝稿,并且各個executor相互獨立
-
Task
Executors應用程序的最小運行單元
-
Job
在用戶程序中伴榔,每次調用Action函數都會產生一個新的Job踪少,也就是說援奢,每個Action會生成一個Job
-
Stage
一個Job會被分解成多個Stage集漾,每個Stage是一系列的Task集合
Spark安裝配置
Spark安裝
安裝步驟
1、下載后解壓縮
cd /opt/lagou/software/
tar zxvf spark-2.4.5-bin-without-hadoop-scala-2.12.tgz
mv spark-2.4.5-bin-without-hadoop-scala-2.12/ ../servers/spark- 2.4.5/
2驱显、設置環(huán)境變量秒紧,并使其生效
vi /etc/profile
export SPARK_HOME=/opt/lagou/servers/spark-2.4.5
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
source /etc/profile
3熔恢、修改配置
文件位置:$SPARK_HOME/conf叙淌,賦值需要的文件
cp slaves.template slaves
cp spark-defaults.conf.template spark-defaults.conf
cp spark-env.sh.template spark-env.sh
cp log4j.properties.template log4j.properties
修改slaves
linux121
linux122
linux123
vim spark-defaults.conf
#Spark的master節(jié)點的地址和端口號
spark.master spark://linux121:7077
#是否做日志的聚合
spark.eventLog.enabled true
#聚合日志在HDFS的目錄,如果放在HDFS需要啟動HDFS茂洒,并提前創(chuàng)建好目錄spark-eventlog
spark.eventLog.dir hdfs://linux121:9000/spark-eventlog
# 序列化方式
spark.serializer org.apache.spark.serializer.KryoSerializer
# Driver內存督勺,默認是1G
spark.driver.memory 512m
修改spark-env.sh
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_231
export HADOOP_HOME=/opt/lagou/servers/hadoop-2.9.2
export HADOOP_CONF_DIR=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop
#這里使用的是 spark-2.4.5-bin-without-hadoop智哀,所以要將 Hadoop 相關 jars 的位置告訴Spark
export SPARK_DIST_CLASSPATH=$(/opt/lagou/servers/hadoop-2.9.2/bin/hadoop classpath)
export SPARK_MASTER_HOST=linux121
export SPARK_MASTER_PORT=7077
4瓷叫、將Spark分發(fā)到其他兩個機器摹菠,并修改環(huán)境變量蔽介,讓環(huán)境變量生效
rsync-script spark-2.4.5
5屉佳、啟動集群
cd $SPARK_HOME/sbin
./start-all.sh
分別在linux121武花、linux122体箕、linux123上執(zhí)行 jps累铅,可以發(fā)現:
linux121:Master娃兽、Worker
linux122:Worker
linux123:Worker
此時 Spark 運行在 Standalone 模式下。
在瀏覽器中輸入:http://linux121:8080/
備注:在$HADOOP_HOME/sbin
及 $SPARK_HOME/sbin
下都有 start-all.sh 和 stop-all.sh 文件
此時直接執(zhí)行start-all或stop-all的時候,回直接先執(zhí)行hadoop的start-all,有沖突
解決方案:
- 刪除一組start-all/stop-all命令勋眯,讓另一個生效
- 對其中一組重命名
- 將其中一個的sbin目錄不放到path中
6客蹋、集群測試
# 計算派:3.14……
run-example SparkPi 10
# 啟動shell腳本
spark-shell
#讀取HDFS 文件嚼酝,并進行單詞統計
val lines = sc.textFile("/azkaban-wc/wc.txt")
lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
Spark不是一定要依賴HDFS的闽巩,只有用到了HDFS涎跨,才需要HDFS
Apache Spark支持多種部署模式。最簡單的就是單機本地模式(Spark所有進程都運 行在一臺機器的JVM中)叔营、偽分布式模式(在一臺機器中模擬集群運行绒尊,相關的進程 在同一臺機器上)婴谱。分布式模式包括:Standalone谭羔、Yarn、Mesos话告。
Apache Spark支持多種部署模式:
- 本地模式超棺。最簡單的運行模式棠绘,Spark所有進程都運行在一臺機器的 JVM 中
- 偽分布式模式氧苍。在一臺機器中模擬集群運行紊撕,相關的進程在同一臺機器上(用的 非常少)
- 分布式模式对扶。包括:Standalone浪南、Yarn络凿、Mesos
- Standalone。使用Spark自帶的資源調度框架
- Yarn怨愤。使用 Yarn 資源調度框架
- Mesos憔四。使用 Mesos 資源調度框架
本地模式
部署在單機了赵,主要用于測試或實驗,最簡單的運行模式络断,所有的進程都運行在一個JVM中貌笨。本地模式用單機的多個線程來模擬Spark分布式計算昌腰,通常用來驗證開發(fā)出來的應用程序邏輯上又餓密友問題遭商,這種模式非常簡單劫流,只需要包Spark安裝包解壓祠汇,修改一些常用配置即可徒扶,不用啟動Master姜骡、Worker守護進程圈澈,也不用啟動HDFS康栈,除非用到递递。
- Spark-shell --master local:在本地啟動一個線程來運行作業(yè)
- spark-shell --master local[N]:啟動N個線程
- spark-shell --master local[*]:使用系統所有的核
- spark-shell --master local[N,M]:N:代表啟動的核數友多,M代表容許作業(yè)失敗的次數
前面幾種模式沒有指定M抖锥,默認是為1
測試:
1、啟動 Spark 本地運行模式,此時JPS后只有一個SparkSubmit進程
spark-shell --master local
2凄杯、 測試
#讀取本地文件菠秒,懶加載,如果文件不存在氯迂,此時不會報錯践叠,等到真正使用的時候才會報錯
val lines = sc.textFile("file:///root/mess.txt")
# 計算行數
lines.count
備注:如果不需要使用HDFS,可以將 spark-defaults.conf相關的日志聚合部分注釋
偽分布式模式
偽分布式模式:在一臺機器中模擬集群運行嚼蚀,相關的進程在同一臺機器上;
備注:不用啟動集群資源管理服務;
- local-cluster[N,cores,memory]
- N模擬集群的 Slave(或worker)節(jié)點個數
- cores模擬集群中各個Slave節(jié)點上的內核數
- memory模擬集群的各個Slave節(jié)點上的內存大小
備注:參數之間沒有空格禁灼,memory不能加單位
1、啟動 Spark 偽分布式模式
spark-shell --master local-cluster[4,2,1024]
2轿曙、使用 jps 檢查弄捕,發(fā)現1個 SparkSubmit 進程和4個 CoarseGrainedExecutorBackend 進程SparkSubmit依然充當全能角色哮独,又是Client進程,又是Driver程序察藐,還有資源管理 的作用皮璧。4個CoarseGrainedExecutorBackend,用來并發(fā)執(zhí)行程序的進程分飞。
3悴务、執(zhí)行簡單的測試程序
park-submit --master local-cluster[4,2,1024] --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.11-2.4.5.jar 10
備注:
- local-cluster[4,2,1024],參數不要給太大譬猫,資源不夠
- 這種模式少用讯檐,有Bug。SPARK-32236
集群模式-Standalone模式
- 分布式部署才能提現分布式計算的價值
- 與單機運行模式不同染服,這里必須先啟動Spark的Master和Worker守護進程别洪,關閉Yarn
- 不用啟動Hadoop服務,除非需要
Standalone配置
- sbin/start-master.sh / sbin/stop-master.sh 啟動關閉master節(jié)點
- sbin/start-slaves.sh / sbin/stop-slave.sh 啟動關閉當前服務器的worker節(jié)點
- sbin/start-slave.sh / sbin/stop-slaves.sh 啟動關閉所有服務器的worker節(jié)點
- sbin/start-all.sh / sbin/stop-all.sh 啟動關閉所有的master和worker節(jié)點
備注:./sbin/start-slave.sh [options];啟動節(jié)點上的worker進程柳刮,調試中較為常用
在 spark-env.sh 中定義:
SPARK_WORKER_CORES:Total number of cores to allow Spark applications to use on the machine (default: all available cores).
SPARK_WORKER_MEMORY:Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m , 2g (default: total memory minus 1 GiB); note that each application's individual memory is configured using its spark.executor.memory property.
可以在spark-eh.sh中修改改參數
export SPARK_WORKER_CORES=10
export SPARK_WORKER_MEMORY=20g
運行模式
最大的區(qū)別:Driver運行在哪里挖垛。
- client模式:(缺省,默認的)秉颗,Driver提交任務給Client痢毒,此時可以在Client模式下,可以看見返回結果蚕甥,適合交互和調試
-
Cluster模式:Driver運行在Spark集群中哪替,看不見程序的返回結果(可以在Driver運行所在的節(jié)點的work目錄下看到),適合生產環(huán)境
測試1(Client模式)
spark-submit --class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 5000
再次使用jps檢查集群中的進程:
- Master進程作為cluster manager菇怀,管理集群資源
- Worker管理節(jié)點資源
- SparkSubmit最為Client段凭舶,不再Spark集群中,運行Driver程序爱沟,SparkApplication執(zhí)行完成帅霜,進程終止
- CoarseGrainedExecutorBackend運行在Worker上,用來并發(fā)執(zhí)行應用程序
測試2(Cluster模式)
spark-submit --class org.apache.spark.examples.SparkPi \
--deploy-mode cluster \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 5000
- SparkSubmit進程會在應用程序提交給集群之后就退出
- Master會在集群中選擇一個Worker進程生成一個子進程DriverWrapper來啟動Driver程序
- Worker節(jié)點上會啟動CoarseGrainedExecutorBackend
- DriverWrapper進程會被占用Worker進程的一個core(缺省分配1C1G)
- 應用程序結果钥顽,會在執(zhí)行Driver程序的節(jié)點的stdout輸出义屏,而不是打印在屏幕上
在啟動 DriverWrapper 的節(jié)點上,進入 $SPARK_HOME/work/蜂大,可以看見類似 driver-20200810233021-0000 的目錄闽铐,這個就是 driver 運行時的日志文件,進入 該目錄奶浦,會發(fā)現:
jar 文件兄墅,這就是移動的計算
stderr 運行日志
stdout 輸出結果
History Server
修改spark-defaults.conf
spark.eventLog.compress true
修改 spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://linux121:9000/spark-eventlog"
spark.history.retainedApplications。設置緩存Cache中保存的應用程序歷史記錄的 個數(默認50)澳叉,如果超過這個值隙咸,舊的將被刪除;緩存文件數不表示實際顯示的文件總數沐悦,只是表示不再緩存中的文件可能需要從硬盤中讀取,速度有差別
前提條件:啟動hdfs(日志需要寫入到HDFS)
啟動historyserver五督,使用 jps 檢查藏否,可以看見 HistoryServer 進程。如果看見該進程充包,請檢查對應的日志副签。
$SPARK_HOME/sbin/start-history-server.sh
web端地址:http://hhb:18080/
高可用配置
Spark Standalone集群是 Master-Slaves架構的集群模式,和大部分的Master- Slaves結構集群一樣基矮,存著Master單點故障的問題。如何解決這個問題家浇,Spark提供 了兩種方案:
1本砰、基于ZK的Standby Master咖楣,適用于生產模式料扰。將 Spark 集群連接到 Zookeeper,利用 Zookeeper 提供的選舉和狀態(tài)保存的功能粪般,一個 Master 處于 Active 狀態(tài)匙监,其他 Master 處于Standby狀態(tài);保證在ZK中的元數據主要是集群的信息稼钩,包括:Worker氮块,Driver和Application以及 Executors的信息;如果Active的Master掛掉了,通過選舉產生新的 Active 的 Master蝠引,然后執(zhí)行狀態(tài)恢 復吊洼,整個恢復過程可能需要1~2分鐘;
2款慨、基于文件系統的單點恢復(Single-Node Rcovery with Local File System), 主要用于開發(fā)或者測試環(huán)境慢显。將 Spark Application 和 Worker 的注冊信息保存在文 件中洁段,一旦Master發(fā)生故障,就可以重新啟動Master進程共郭,將系統恢復到之前的狀態(tài)
配置步驟
1祠丝、啟動ZK
2、修改 spark-env.sh 文件除嘹,并分發(fā)到集群中
# 注釋以下兩行!!!
# export SPARK_MASTER_HOST=linux120
# export SPARK_MASTER_PORT=7077
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=linux121,linux122,linux123 -Dspark.deploy.zookeeper.dir=/spark"
備注:
- spark.deploy.recoveryMode:可選值 Zookeeper写半、FileSystem、None
- deploy.zookeeper.url:Zookeeper的URL尉咕,主機名:端口號(缺省2181)
- deploy.zookeeper.dir:保存集群元數據信息的地址叠蝇,在ZooKeeper中保存該信 息
3、啟動Spark集群(Linux120)
$SPARK_HOME/sbin/start-all.sh
瀏覽器輸入:http://hhb:8080/年缎,剛開始 Master 的狀態(tài)是STANDBY悔捶,稍等一會 變?yōu)?RECOVERING,最終是:ALIVE
4单芜、在 linux121上啟動master
$SPARK_HOME/sbin/start-master.sh
進入瀏覽器輸入:http://linux122:8080/蜕该,此時 Master 的狀態(tài)為:STANDBY
5、殺到linux120上 Master 進程洲鸠,再觀察 linux121 上 Master 狀態(tài)堂淡,由 STANDBY => RECOVERING => ALIVE
小結:
- 配置每個worker的core、memory
- 運行模式:cluster坛怪、client淤齐,client為缺省模式,有返回結果袜匿,適合調試;cluster與此相反
- History server
- 高可用(ZK稚疹,Local File居灯,在ZK中記錄集群狀態(tài))
集群模式-Yarn模式
需要啟動的服務:hdfs服務、yarn服務,需要關閉的Standalone對應的服務(即集群中Master内狗、Worker進程)一山不容二虎
在Yarn模式中怪嫌,Spark應用程序有兩種運行模式
yarn-client。Driver程序運行在客戶端柳沙,適用于交互岩灭、調試,希望立即看到app 的輸出
yarn-cluster赂鲤。Driver程序運行在由RM啟動的 AppMaster中噪径,適用于生產環(huán)境
二者的主要區(qū)別:Driver在哪里
1柱恤、關閉 Standalon 模式下對應的服務;開啟 hdfs、yarn找爱、historyserver 服務
2梗顺、修改 yarn-site.xml 配置。在 $HADOOP_HOME/etc/hadoop/yarn-site.xml 中增加车摄,分發(fā)到集群寺谤,重啟 yarn服務
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
備注:
- yarn.nodemanager.pmem-check-enabled。是否啟動一個線程檢查每個任務 正使用的物理內存量吮播,如果任務超出分配值变屁,則直接將其殺掉,默認是true
- yarn.nodemanager.vmem-check-enabled意狠。是否啟動一個線程檢查每個任務正 使用的虛擬內存量粟关,如果任務超出分配值,則直接將其殺掉摄职,默認是true
3誊役、修改配置,分發(fā)到集群
# spark-default.conf(以下是優(yōu)化)
# 與 hadoop historyserver集成
spark.yarn.historyServer.address linux120:18080
# 添加(以下是優(yōu)化)
spark.yarn.jars hdfs:///spark-yarn/jars/*.jar
上傳jar包
# 將 $SPARK_HOME/jars 下的jar包上傳到hdfs
hdfs dfs -mkdir -p /spark-yarn/jars/
cd $SPARK_HOME/jars
hdfs dfs -put * /spark-yarn/jars/
4谷市、測試
# client
spark-submit --master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 2000
在提取App節(jié)點上可以看見:SparkSubmit蛔垢、CoarseGrainedExecutorBackend
在集群的其他節(jié)點上可以看見:CoarseGrainedExecutorBackend
在提取App節(jié)點上可以看見:程序計算的結果(即可以看見計算返回的結果)
# cluster
spark-submit --master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 2000
在提取App節(jié)點上可以看見:SparkSubmit
在集群的其他節(jié)點上可以見:CoarseGrainedExecutorBackend、ApplicationMaster(Driver運行在此)
在提取App節(jié)點上看不見最終的結果
整合HistoryServer服務
前提:Hadoop的 HDFS迫悠、Yarn鹏漆、HistoryServer 正常;Spark historyserver服務正常;
Hadoop:JobHistoryServer
Spark:HistoryServer
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/opt/lagou/servers/spark-2.4.5/examples/jars/spark-examples_2.12-2.4.5.jar 20
1、修改 spark-defaults.conf创泄,并分發(fā)到集群
spark.yarn.historyServer.address linux121:18080
spark.history.ui.port 18080
2艺玲、重啟/啟動 spark 歷史服務
stop-history-server.sh
start-history-server.sh
3、查看日志
打開日志http://linux123:8088/cluster
備注:
1鞠抑、在課程學習的過程中饭聚,大多數情況使用Standalone模式 或者 local模式
2、建議不使用HA;更多關注的Spark開發(fā)
開發(fā)環(huán)境搭建IDEA
前提:安裝scala插件;能讀寫HDFS文件
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>com.hhb.bigdata</artifactId>
<groupId>com.hhb.bigdata</groupId>
<version>1.0</version>
<relativePath>../bigdata/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark</artifactId>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.10</scala.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.9.2</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<!-- hive 文件-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.7</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 編譯scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 編譯java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
代碼:
package com.hhb.spark.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-13 22:21
**/
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
// val lines: RDD[String] = sc.textFile("hdfs://linux121:9000/azkaban-wc/wc.txt")
// val lines: RDD[String] = sc.textFile("/wcinput/wc.txt")
val lines: RDD[String] = sc.textFile("data/wc.dat")
lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
sc.stop()
}
}
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>