Spark 是專門為大數(shù)據(jù)處理設(shè)計(jì)的通用計(jì)算引擎,是一個(gè)實(shí)現(xiàn)快速通用的集群計(jì)算平臺(tái)。它是由加州大學(xué)伯克利分校 AMP 實(shí)驗(yàn)室開發(fā)的通用內(nèi)存并行計(jì)算框架颁虐,用來(lái)構(gòu)建大型的、低延遲的數(shù)據(jù)分析應(yīng)用程序卧须。它擴(kuò)展了廣泛使用的 MapReduce 計(jì)算模型另绩。高效的支撐更多計(jì)算模式,包括交互式查詢和流處理故慈。Spark 的一個(gè)主要特點(diǎn)是能夠在內(nèi)存中進(jìn)行計(jì)算板熊,即使依賴磁盤進(jìn)行復(fù)雜的運(yùn)算,Spark 依然比 MapReduce 更加高效察绷。(2.3.3)
Spark 的四大特性:speed干签,easy to use,Generality拆撼,Runs Everywhere
Spark 生態(tài)包含了:Spark Core容劳、Spark Streaming、Structured Streaming闸度、Spark SQL竭贩、Graphx 和機(jī)器學(xué)習(xí)相關(guān)的庫(kù)等。
學(xué)習(xí) Spark 我們應(yīng)該掌握:
(1)Spark Core:
Spark的集群搭建和集群架構(gòu)(Spark 集群中的角色)
spark集群的web管理界面: http://master主機(jī)名:8080
spark-shell --master local[2]
Scala:
//1莺禁、構(gòu)建sparkConf對(duì)象 設(shè)置application名稱和master地址
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
//2留量、構(gòu)建sparkContext對(duì)象,該對(duì)象非常重要,它是所有spark程序的執(zhí)行入口
// 它內(nèi)部會(huì)構(gòu)建 DAGScheduler和 TaskScheduler 對(duì)象
val sc = new SparkContext(sparkConf)
//設(shè)置日志輸出級(jí)別
sc.setLogLevel("warn")
sc.textFile("file:///home/hadoop/words.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).collect
spark--->List(1,1,1)
sc.textFile("file:///home/hadoop/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
val rdd1 = sc.parallelize(List(1,1,2,3,3,4,5,6,7))
Java:
//1哟冬、創(chuàng)建SparkConf對(duì)象
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local[2]");
//2楼熄、構(gòu)建JavaSparkContext對(duì)象
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
Spark Cluster 和 Client 模式的區(qū)別
yarn-cluster模式下提交任務(wù)示例:
spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
/kkb/install/spark/examples/jars/spark-examples_2.11-2.3.3.jar \
10
如果運(yùn)行出現(xiàn)錯(cuò)誤,可能是虛擬內(nèi)存不足浩峡,可以添加參數(shù)
- vim yarn-site.xml
<!--容器是否會(huì)執(zhí)行物理內(nèi)存限制默認(rèn)為True-->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--容器是否會(huì)執(zhí)行虛擬內(nèi)存限制 默認(rèn)為True-->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
yarn-client模式下提交任務(wù)示例:
spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
/kkb/install/spark/examples/jars/spark-examples_2.11-2.3.3.jar \
10
最大的區(qū)別就是Driver端的位置不一樣可岂。
yarn-cluster: Driver端運(yùn)行在yarn集群中,與ApplicationMaster進(jìn)程在一起翰灾。
yarn-client: Driver端運(yùn)行在提交任務(wù)的客戶端,與ApplicationMaster進(jìn)程沒(méi)關(guān)系,經(jīng)常用于進(jìn)行測(cè)試
Spark 的彈性分布式數(shù)據(jù)集 RDD(Resilient Distributed Dataset)
1缕粹、分區(qū)的列表
2稚茅、函數(shù)在每個(gè)分區(qū)上計(jì)算
3、一個(gè)RDD會(huì)依賴其他多個(gè)RDD
4平斩、RDD的分區(qū)函數(shù)(hashPartitioner,RangerPartitioner)Option(Some,Node)
5亚享、為了提高效率,存儲(chǔ)每個(gè)partition位置可選绘面,計(jì)算減少數(shù)據(jù)IO
- 掌握 Spark RDD 編程的算子 API(Transformation 和 Action 算子)
1虹蒋、transformation算子
根據(jù)已經(jīng)存在的rdd轉(zhuǎn)換生成一個(gè)新的rdd, 它是延遲加載,它不會(huì)立即執(zhí)行
轉(zhuǎn)換 | 含義 |
---|---|
map(func) | 返回一個(gè)新的RDD飒货,該RDD由每一個(gè)輸入元素經(jīng)過(guò)func函數(shù)轉(zhuǎn)換后組成 |
filter(func) | 返回一個(gè)新的RDD,該RDD由經(jīng)過(guò)func函數(shù)計(jì)算后返回值為true的輸入元素組成 |
flatMap(func) | 類似于map峭竣,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列塘辅,而不是單一元素) |
mapPartitions(func) | 類似于map,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行皆撩,因此在類型為T的RDD上運(yùn)行時(shí)扣墩,func的函數(shù)類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 類似于mapPartitions,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值扛吞,因此在類型為T的RDD上運(yùn)行時(shí)呻惕,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U] |
union(otherDataset) | 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD |
intersection(otherDataset) | 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD |
distinct([numTasks])) | 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD |
groupByKey([numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用滥比,返回一個(gè)(K,V)的RDD亚脆,使用指定的reduce函數(shù),將相同key的值聚合到一起盲泛,與groupByKey類似濒持,reduce任務(wù)的個(gè)數(shù)可以通過(guò)第二個(gè)可選的參數(shù)來(lái)設(shè)置 |
sortByKey([ascending], [numTasks]) | 在一個(gè)(K,V)的RDD上調(diào)用,K必須實(shí)現(xiàn)Ordered接口寺滚,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey類似柑营,但是更靈活 |
join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的RDD上調(diào)用村视,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的RDD |
coalesce(numPartitions) | 減少 RDD 的分區(qū)數(shù)到指定值官套。 |
repartition(numPartitions) | 重新給 RDD 分區(qū) |
repartitionAndSortWithinPartitions(partitioner) | 重新給 RDD 分區(qū),并且每個(gè)分區(qū)內(nèi)以記錄的 key 排序 |
2蚁孔、 action算子
它會(huì)真正觸發(fā)任務(wù)的運(yùn)行, 將rdd的計(jì)算的結(jié)果數(shù)據(jù)返回給Driver端奶赔,或者是保存結(jié)果數(shù)據(jù)到外部存儲(chǔ)介質(zhì)中
動(dòng)作 | 含義 |
---|---|
reduce(func) | reduce將RDD中元素前兩個(gè)傳給輸入函數(shù),產(chǎn)生一個(gè)新的return值勒虾,新產(chǎn)生的return值與RDD中下一個(gè)元素(第三個(gè)元素)組成兩個(gè)元素纺阔,再被傳給輸入函數(shù),直到最后只有一個(gè)值為止修然。 |
collect() | 在驅(qū)動(dòng)程序中笛钝,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素, 比如說(shuō)rdd的數(shù)據(jù)量達(dá)到了10G, rdd.collect這個(gè)操作非常危險(xiǎn)质况,很有可能出現(xiàn)driver端的內(nèi)存不足 |
count() | 返回RDD的元素個(gè)數(shù) |
first() | 返回RDD的第一個(gè)元素(類似于take(1)) |
take(n) | 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組 |
takeOrdered(n, [ordering]) | 返回自然順序或者自定義順序的前 n 個(gè)元素 |
saveAsTextFile(path) | 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對(duì)于每個(gè)元素玻靡,Spark將會(huì)調(diào)用toString方法结榄,將它裝換為文件中的文本 |
saveAsSequenceFile(path) | 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)囤捻。 |
saveAsObjectFile(path) | 將數(shù)據(jù)集的元素臼朗,以 Java 序列化的方式保存到指定的目錄下 |
countByKey() | 針對(duì)(K,V)類型的RDD,返回一個(gè)(K,Int)的map蝎土,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)视哑。 |
foreach(func) | 在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func |
foreachPartition(func) | 在數(shù)據(jù)集的每一個(gè)分區(qū)上誊涯,運(yùn)行函數(shù)func |
spark connection to mysql
personRDD.foreachPartition( iter =>{
//把數(shù)據(jù)插入到mysql表操作
//1挡毅、獲取連接
val connection: Connection = DriverManager.getConnection("jdbc:mysql://node03:3306/spark","root","123456")
//2、定義插入數(shù)據(jù)的sql語(yǔ)句
val sql="insert into person(id,name,age) values(?,?,?)"
//3暴构、獲取PreParedStatement
try {
val ps: PreparedStatement = connection.prepareStatement(sql)
//4跪呈、獲取數(shù)據(jù),給?號(hào) 賦值
iter.foreach(line =>{
ps.setString(1, line._1)
ps.setString(2, line._2)
ps.setInt(3, line._3)
//設(shè)置批量提交
ps.addBatch()
})
//執(zhí)行批量提交
ps.executeBatch()
} catch {
case e:Exception => e.printStackTrace()
} finally {
if(connection !=null){
connection.close()
}
spark connection to Hbase
usersRDD.foreachPartition(iter =>{
//4.1 獲取hbase的數(shù)據(jù)庫(kù)連接
val configuration: Configuration = HBaseConfiguration.create()
//指定zk集群的地址
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181")
val connection: Connection = ConnectionFactory.createConnection(configuration)
//4.2 對(duì)于hbase表進(jìn)行操作這里需要一個(gè)Table對(duì)象
val table: Table = connection.getTable(TableName.valueOf("person"))
//4.3 把數(shù)據(jù)保存在表中
try {
iter.foreach(x => {
val put = new Put(x(0).getBytes)
val puts = new util.ArrayList[Put]()
//構(gòu)建數(shù)據(jù)
val put1: Put = put.addColumn("f1".getBytes, "gender".getBytes, x(1).getBytes)
val put2: Put = put.addColumn("f1".getBytes, "age".getBytes, x(2).getBytes)
val put3: Put = put.addColumn("f2".getBytes, "position".getBytes, x(3).getBytes)
val put4: Put = put.addColumn("f2".getBytes, "code".getBytes, x(4).getBytes)
puts.add(put1)
puts.add(put2)
puts.add(put3)
puts.add(put4)
//提交數(shù)據(jù)
table.put(puts)
})
} catch {
case e:Exception =>e.printStackTrace()
} finally {
if(connection !=null){
connection.close()
}
Spark DAG(有向無(wú)環(huán)圖)
一個(gè)Job會(huì)被拆分為多組Task取逾,每組任務(wù)被稱為一個(gè)stage
stage表示不同的調(diào)度階段耗绿,一個(gè)spark job會(huì)對(duì)應(yīng)產(chǎn)生很多個(gè)stage(ShuffleMapStage,ResultStage)
由于劃分完stage之后,在同一個(gè)stage中只有窄依賴砾隅,沒(méi)有寬依賴误阻,可以實(shí)現(xiàn)流水線計(jì)算划提,
stage中的每一個(gè)分區(qū)對(duì)應(yīng)一個(gè)task旭愧,在同一個(gè)stage中就有很多可以并行運(yùn)行的task。
劃分完stage之后肺缕,每一個(gè)stage中有很多可以并行運(yùn)行的task邑时,后期把每一個(gè)stage中的task封裝在一個(gè)taskSet集合中奴紧,最后把一個(gè)一個(gè)的taskSet集合提交到worker節(jié)點(diǎn)上的executor進(jìn)程中運(yùn)行。
rdd與rdd之間存在依賴關(guān)系晶丘,stage與stage之前也存在依賴關(guān)系黍氮,前面stage中的task先運(yùn)行,運(yùn)行完成了再運(yùn)行后面stage中的task浅浮,也就是說(shuō)后面stage中的task輸入數(shù)據(jù)是前面stage中task的輸出結(jié)果數(shù)據(jù)沫浆。
RDD 的依賴關(guān)系,什么是寬依賴和窄依賴
窄依賴(narrow dependency)和寬依賴(wide dependency)
所有的窄依賴不會(huì)產(chǎn)生shuffle
所有的寬依賴會(huì)產(chǎn)生shuffle
RDD 的血緣機(jī)制
RDD只支持粗粒度轉(zhuǎn)換滚秩,即只記錄單個(gè)塊上執(zhí)行的單個(gè)操作专执。將創(chuàng)建RDD的一系列Lineage(即血統(tǒng))記錄下來(lái),以便恢復(fù)丟失的分區(qū)
RDD的Lineage會(huì)記錄RDD的元數(shù)據(jù)信息和轉(zhuǎn)換行為郁油,lineage保存了RDD的依賴關(guān)系本股,當(dāng)該RDD的部分分區(qū)數(shù)據(jù)丟失時(shí)攀痊,它可以根據(jù)這些信息來(lái)重新運(yùn)算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。
RDD 緩存機(jī)制
RDD通過(guò)persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存, 程序運(yùn)行完成后對(duì)應(yīng)的緩存數(shù)據(jù)就自動(dòng)消失
Spark 的任務(wù)調(diào)度和資源調(diào)度
任務(wù)調(diào)度: (1) Driver端運(yùn)行客戶端的main方法拄显,構(gòu)建SparkContext對(duì)象苟径,在SparkContext對(duì)象內(nèi)部依次構(gòu)建DAGScheduler和TaskScheduler
(2) 按照rdd的一系列操作順序,來(lái)生成DAG有向無(wú)環(huán)圖
(3) DAGScheduler拿到DAG有向無(wú)環(huán)圖之后躬审,按照寬依賴進(jìn)行stage的劃分棘街。每一個(gè)stage內(nèi)部有很多可以并行運(yùn)行的task,最后封裝在一個(gè)一個(gè)的taskSet集合中承边,然后把taskSet發(fā)送給TaskScheduler
(4)TaskScheduler得到taskSet集合之后遭殉,依次遍歷取出每一個(gè)task提交到worker節(jié)點(diǎn)上的executor進(jìn)程中運(yùn)行。
(5)所有task運(yùn)行完成博助,整個(gè)任務(wù)也就結(jié)束了
資源調(diào)度: (1) Driver端向資源管理器Master發(fā)送注冊(cè)和申請(qǐng)計(jì)算資源的請(qǐng)求
(2) Master通知對(duì)應(yīng)的worker節(jié)點(diǎn)啟動(dòng)executor進(jìn)程(計(jì)算資源)
(3) executor進(jìn)程向Driver端發(fā)送注冊(cè)并且申請(qǐng)task請(qǐng)求
(4) Driver端運(yùn)行客戶端的main方法恩沽,構(gòu)建SparkContext對(duì)象,在SparkContext對(duì)象內(nèi)部依次構(gòu)建DAGScheduler和TaskScheduler
(5) 按照客戶端代碼洪rdd的一系列操作順序翔始,生成DAG有向無(wú)環(huán)圖
(6) DAGScheduler拿到DAG有向無(wú)環(huán)圖之后,按照寬依賴進(jìn)行stage的劃分里伯。每一個(gè)stage內(nèi)部有很多可以并行運(yùn)行的task城瞎,最后封裝在一個(gè)一個(gè)的taskSet集合中,然后把taskSet發(fā)送給TaskScheduler
(7) TaskScheduler得到taskSet集合之后疾瓮,依次遍歷取出每一個(gè)task提交到worker節(jié)點(diǎn)上的executor進(jìn)程中運(yùn)行
(8) 所有task運(yùn)行完成脖镀,Driver端向Master發(fā)送注銷請(qǐng)求,Master通知Worker關(guān)閉executor進(jìn)程狼电,Worker上的計(jì)算資源得到釋放蜒灰,最后整個(gè)任務(wù)也就結(jié)束了。
Spark 任務(wù)分析
參數(shù):
==--executor-memory== : 表示每一個(gè)executor進(jìn)程需要的內(nèi)存大小肩碟,它決定了后期操作數(shù)據(jù)的速度
total-executor-cores: 表示任務(wù)運(yùn)行需要總的cpu核數(shù)强窖,它決定了任務(wù)并行運(yùn)行的粒度
調(diào)度模式: FIFO,F(xiàn)AIR -> 根據(jù)權(quán)重不同來(lái)決定誰(shuí)先執(zhí)行
Spark 的 CheckPoint 和容錯(cuò)
可以把數(shù)據(jù)持久化寫入到hdfs上, 程序運(yùn)行完成后對(duì)應(yīng)的checkpoint數(shù)據(jù)就不會(huì)消失
sc.setCheckpointDir("hdfs://node01:8020/checkpoint")
val rdd1=sc.textFile("/words.txt")
rdd1.checkpoint
val rdd2=rdd1.flatMap(_.split(" "))
rdd2.collect
- Spark 的通信機(jī)制
自定義分區(qū): 繼承==org.apache.spark.Partitioner==, 重寫==numPartitions==方法, 重寫==getPartition==方法
廣播變量: ==傳遞到各個(gè)Executor的Task上運(yùn)行==, 該Executor上的各個(gè)Task再?gòu)乃诠?jié)點(diǎn)的BlockManager獲取變量削祈,而不是從Driver獲取變量翅溺,以減少通信的成本,減少內(nèi)存的占用髓抑,從而提升了效率.
SparkContext.accumulator(initialValue)
==累加器的一個(gè)常見(jiàn)用途是在調(diào)試時(shí)對(duì)作業(yè)執(zhí)行過(guò)程中的事件進(jìn)行計(jì)數(shù)咙崎。可以使用累加器來(lái)進(jìn)行全局的計(jì)數(shù)==
“==org.apache.spark.SparkException: Task not serializable==”
spark的任務(wù)序列化異常 : 在編寫spark程序中吨拍,由于在map褪猛,foreachPartition等算子==內(nèi)部使用了外部定義的變量和函數(shù)==,從而引發(fā)Task未序列化問(wèn)題.
Spark shuffle 原理分析
hashshuffle
sortShuffle
bypass-sortShuffle
(2)Spark Streaming:
- 原理剖析(源碼級(jí)別)和運(yùn)行機(jī)制
- Spark Dstream 及其 API 操作
- Spark Streaming 消費(fèi) Kafka 的兩種方式
- Spark 消費(fèi) Kafka 消息的 Offset 處理
- 數(shù)據(jù)傾斜的處理方案
- Spark Streaming 的算子調(diào)優(yōu)
- 并行度和廣播變量
- Shuffle 調(diào)優(yōu)
(3)Spark SQL:
- Spark SQL 的原理和運(yùn)行機(jī)制
- Catalyst 的整體架構(gòu)
- Spark SQL 的 DataFrame
- Spark SQL 的優(yōu)化策略:內(nèi)存列式存儲(chǔ)和內(nèi)存緩存表羹饰、列存儲(chǔ)壓縮伊滋、邏輯查詢優(yōu)化碳却、Join 的優(yōu)化
(4)Structured Streaming
Spark 從 2.3.0 版本開始支持 Structured Streaming,它是一個(gè)建立在 Spark SQL 引擎之上可擴(kuò)展且容錯(cuò)的流處理引擎新啼,統(tǒng)一了批處理和流處理追城。正是 Structured Streaming 的加入使得 Spark 在統(tǒng)一流、批處理方面能和 Flink 分庭抗禮燥撞。
我們需要掌握:
- Structured Streaming 的模型
- Structured Streaming 的結(jié)果輸出模式
- 事件時(shí)間(Event-time)和延遲數(shù)據(jù)(Late Data)
- 窗口操作
- 水印
- 容錯(cuò)和數(shù)據(jù)恢復(fù)
- Spark Mlib:
本部分是 Spark 對(duì)機(jī)器學(xué)習(xí)支持的部分座柱,我們學(xué)有余力的同學(xué)可以了解一下 Spark 對(duì)常用的分類、回歸物舒、聚類色洞、協(xié)同過(guò)濾、降維以及底層的優(yōu)化原語(yǔ)等算法和工具冠胯』鹬睿可以嘗試自己使用 Spark Mlib 做一些簡(jiǎn)單的算法應(yīng)用。