spark基礎(chǔ)知識總結(jié)

spark基礎(chǔ)知識總結(jié)

1. 概述

1.1. 簡介

Apache Spark是一個圍繞速度温兼、易用性和復(fù)雜分析構(gòu)建的大數(shù)據(jù)處理框架。最初在2009年由加州大學(xué)伯克利分校的AMPLab開發(fā)泥栖,并于2010年成為Apache的開源項(xiàng)目之一幸斥。
Spark為我們提供了一個全面峭拘、統(tǒng)一的框架用于管理各種有著不同性質(zhì)(文本數(shù)據(jù)俊庇、圖表數(shù)據(jù)等)的數(shù)據(jù)集和數(shù)據(jù)源(批量數(shù)據(jù)或?qū)崟r的流數(shù)據(jù))的大數(shù)據(jù)處理的需求。
利用內(nèi)存數(shù)據(jù)存儲和接近實(shí)時的處理能力鸡挠,Spark比其他的大數(shù)據(jù)處理技術(shù)的性能要快很多倍暇赤。

1.2. Mapreduce和Spark

MapReduce是一路計(jì)算的優(yōu)秀解決方案,不過對于需要多路計(jì)算和算法的用例來說宵凌,并非十分高效鞋囊。

如果想要完成比較復(fù)雜的工作,就必須將一系列的MapReduce作業(yè)串聯(lián)起來然后順序執(zhí)行這些作業(yè)瞎惫。每一個作業(yè)都是高時延的溜腐,而且只有在前一個作業(yè)完成之后下一個作業(yè)才能開始啟動。

在下一步開始之前瓜喇,上一步的作業(yè)輸出數(shù)據(jù)必須要存儲到分布式文件系統(tǒng)中挺益。因此,復(fù)制和磁盤存儲會導(dǎo)致這種方式速度變慢乘寒。

而Spark則允許程序開發(fā)者使用有向無環(huán)圖(DAG)開發(fā)復(fù)雜的多步數(shù)據(jù)管道望众。而且還支持跨作業(yè)的內(nèi)存數(shù)據(jù)共享,以便不同的作業(yè)可以共同處理同一個數(shù)據(jù)伞辛。
Spark將中間結(jié)果保存在內(nèi)存中而不是將其寫入磁盤烂翰,當(dāng)需要多次處理同一數(shù)據(jù)集時,這一點(diǎn)特別實(shí)用蚤氏。
Spark會嘗試在內(nèi)存中存儲盡可能多的數(shù)據(jù)然后將其寫入磁盤甘耿。它可以將某個數(shù)據(jù)集的一部分存入內(nèi)存而剩余部分存入磁盤。從而Spark可以用于處理大于集群內(nèi)存容量總和的數(shù)據(jù)集竿滨。

1.3. Hadoop為什么慢

spark_base_1_3.png

Spark因?yàn)槠涮幚頂?shù)據(jù)的方式不一樣佳恬,會比MapReduce快上很多捏境。MapReduce是分步對數(shù)據(jù)進(jìn)行處理的: ”從集群中讀取數(shù)據(jù),進(jìn)行一次處理毁葱,將結(jié)果寫到集群垫言,從集群中讀取更新后的數(shù)據(jù),進(jìn)行下一次的處理倾剿,將結(jié)果寫到集群骏掀,等等…“ Booz Allen Hamilton的數(shù)據(jù)科學(xué)家Kirk Borne如此解析。

反觀Spark柱告,它會在內(nèi)存中以接近“實(shí)時”的時間完成所有的數(shù)據(jù)分析:“從集群中讀取數(shù)據(jù),完成所有必須的分析處理笑陈,將結(jié)果寫回集群际度,完成,” Born說道涵妥。Spark的批處理速度比MapReduce快近10倍乖菱,內(nèi)存中的數(shù)據(jù)分析速度則快近100倍。

如果需要處理的數(shù)據(jù)和結(jié)果需求大部分情況下是靜態(tài)的蓬网,且你也有耐心等待批處理的完成的話窒所,MapReduce的處理方式也是完全可以接受的。

但如果你需要對流數(shù)據(jù)進(jìn)行分析帆锋,比如那些來自于工廠的傳感器收集回來的數(shù)據(jù)吵取,又或者說你的應(yīng)用是需要多重?cái)?shù)據(jù)處理的,那么你也許更應(yīng)該使用Spark進(jìn)行處理锯厢。

大部分機(jī)器學(xué)習(xí)算法都是需要多重?cái)?shù)據(jù)處理的皮官。此外,通常會用到Spark的應(yīng)用場景有以下方面:實(shí)時的市場活動实辑,在線產(chǎn)品推薦捺氢,網(wǎng)絡(luò)安全分析,機(jī)器日記監(jiān)控等剪撬。

1.4. mapreduce和spark對比

spark_base_1_4_1.png
spark_base_1_4_2.png

1.5. spark的其他特性

1摄乒、支持比Map和Reduce更多的函數(shù)。
2残黑、可以通過延遲計(jì)算幫助優(yōu)化整體數(shù)據(jù)處理流程馍佑。
3、提供簡明梨水、一致的Scala挤茄,Java和Python API。
4冰木、提供交互式Scala和Python Shell穷劈。幫助進(jìn)行原型驗(yàn)證和邏輯測試
(目前暫不支持Java)

2. Spark生態(tài)系統(tǒng)

除了Spark核心API之外笼恰,Spark生態(tài)系統(tǒng)中還包括其他附加庫,可以在大數(shù)據(jù)分析和機(jī)器學(xué)習(xí)領(lǐng)域提供更多的能力歇终。

spark_base_2.png

2.1. Spark Streaming:

Spark Streaming基于微批量方式的計(jì)算和處理社证,可以用于處理實(shí)時的流數(shù)據(jù)。它使用DStream评凝,簡單來說就是一個彈性分布式數(shù)據(jù)集(RDD)系列追葡,處理實(shí)時數(shù)據(jù)。

2.2. Spark SQL:

Spark SQL可以通過JDBC API將Spark數(shù)據(jù)集暴露出去奕短,而且還可以用傳統(tǒng)的BI和可視化工具在Spark數(shù)據(jù)上執(zhí)行類似SQL的查詢宜肉。用戶還可以用Spark SQL對不同格式的數(shù)據(jù)(如JSON,Parquet以及數(shù)據(jù)庫等)執(zhí)行ETL翎碑,將其轉(zhuǎn)化谬返,然后暴露給特定的查詢。

2.3. Spark MLlib:

MLlib是一個可擴(kuò)展的Spark機(jī)器學(xué)習(xí)庫日杈,由通用的學(xué)習(xí)算法和工具組成遣铝,包括二元分類、線性回歸莉擒、聚類酿炸、協(xié)同過濾、梯度下降以及底層優(yōu)化原語涨冀。

2.4. Spark GraphX:

GraphX是用于圖計(jì)算和并行圖計(jì)算的新的(alpha)Spark API填硕。通過引入彈性分布式屬性圖(Resilient Distributed Property Graph),一種頂點(diǎn)和邊都帶有屬性的有向多重圖鹿鳖,擴(kuò)展了Spark RDD廷支。
Tachyon是一個以內(nèi)存為中心的分布式文件系統(tǒng),能夠提供內(nèi)存級別速度的跨集群框架(如Spark和MapReduce)的可信文件共享栓辜。它將工作集文件緩存在內(nèi)存中恋拍,從而避免到磁盤中加載需要經(jīng)常讀取的數(shù)據(jù)集。通過這一機(jī)制藕甩,不同的作業(yè)/查詢和框架可以以內(nèi)存級的速度訪問緩存的文件施敢。
BlinkDB是一個近似查詢引擎,用于在海量數(shù)據(jù)上執(zhí)行交互式SQL查詢狭莱。BlinkDB可以通過犧牲數(shù)據(jù)精度來提升查詢響應(yīng)時間僵娃。通過在數(shù)據(jù)樣本上執(zhí)行查詢并展示包含有意義的錯誤線注解的結(jié)果,操作大數(shù)據(jù)集合腋妙。

2.5. BDAS

spark_base_2_5.png

3. Spark體系架構(gòu)

Spark體系架構(gòu)包括如下三個主要組件:

  • 數(shù)據(jù)存儲
  • API
  • 資源管理框架

3.1. 資源管理:

Spark既可以部署在一個單獨(dú)的服務(wù)器集群上(Standalone)
也可以部署在像Mesos或YARN這樣的分布式計(jì)算框架之上默怨。

3.2. Spark API:

應(yīng)用開發(fā)者可以用標(biāo)準(zhǔn)的API接口創(chuàng)建基于Spark的應(yīng)用
Spark提供三種程序設(shè)計(jì)語言的API:

  • Scala
  • Java
  • Python

3.3. 數(shù)據(jù)存儲:

Spark用HDFS文件系統(tǒng)存儲數(shù)據(jù)。它可用于存儲任何兼容于Hadoop的數(shù)據(jù)源骤素,包括HDFS匙睹,HBase愚屁,Cassandra等。
Spark在對數(shù)據(jù)的處理過程中痕檬,會將數(shù)據(jù)封裝成RDD數(shù)據(jù)結(jié)構(gòu)

4.RDD

RDD(Resilient Distributed Datasets)霎槐,彈性分布式數(shù)據(jù)集, 是分布式內(nèi)存的一個抽象概念
RDD作為數(shù)據(jù)結(jié)構(gòu)梦谜,本質(zhì)上是一個只讀的分區(qū)記錄集合丘跌。一個RDD可以包含多個分區(qū),每個分區(qū)就是一個dataset片段
RDD并不保存真正的數(shù)據(jù)唁桩,僅保存元數(shù)據(jù)信息
RDD之間可以存在依賴關(guān)系

4.1. RDD----彈性分布式數(shù)據(jù)集:核心

RDD是Spark框架中的核心概念闭树。
可以將RDD視作數(shù)據(jù)庫中的一張表。其中可以保存任何類型的數(shù)據(jù)荒澡,可以通過API來處理RDD及RDD中的數(shù)據(jù)
類似于Mapreduce报辱,RDD也有分區(qū)的概念
RDD是不可變的,可以用變換(Transformation)操作RDD仰猖,但是這個變換所返回的是一個全新的RDD,而原有的RDD仍然保持不變

4.2. RDD創(chuàng)建的三種方式

  • 集合并行化
val arr = Array(1,2,3,4,5,6,7,8)
val rdd1 = sc.parallelize(arr, 2) //2代表分區(qū)數(shù)量
  • 從外部文件系統(tǒng)
分布式文件系統(tǒng):如hdfs文件系統(tǒng)奈籽,S3:
val rdd2 = sc.textFile("hdfs://node1:9000/words.txt")

  • 從父RDD轉(zhuǎn)換成新的子RDD
通過Transformation操作

4.3. RDD----彈性分布式數(shù)據(jù)集

RDD支持兩種類型的操作:

  • 變換(Transformation)
    變換:變換的返回值是一個新的RDD集合饥侵,而不是單個值。調(diào)用一個變換方法衣屏,不會有任何求值計(jì)算躏升,它只獲取一個RDD作為參數(shù),然后返回一個新的RDD狼忱。Transformation是lazy模式膨疏,延遲執(zhí)行
變換函數(shù)包括:map,filter钻弄,flatMap佃却,groupByKey,reduceByKey窘俺,aggregateByKey饲帅,pipe和coalesce。
  • 行動(Action)
    行動:行動操作計(jì)算并返回一個新的值瘤泪。當(dāng)在一個RDD對象上調(diào)用行動函數(shù)時灶泵,會在這一時刻計(jì)算全部的數(shù)據(jù)處理查詢并返回結(jié)果值。
行動操作包括:reduce对途,collect赦邻,count,first实檀,take惶洲,countByKey以及foreach按声。

4.4. RDD操作流程示意

spark_base_4_4.png

4.5. RDD的轉(zhuǎn)換與操作

spark_base_4_5.png

4.6. RDD ---- 源碼中的注釋

Internally, each RDD is characterized by five main properties:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

4.7. 安裝spark集群(Standalone)

見文檔

4.8. spark交互式shell

spark提供一個scala-shell提供交互式操作
啟動spark-shell
bin/spark-shell --master spark://masterip:port(7077) 集群運(yùn)行模式
bin/spark-shell --master local local運(yùn)行模式
wordcount示例
scala>sc.textFile("hdfs://namenode:port/data").flatMap(.split("\t")).map((,1)).reduceByKey().collect

5. spark命令

5.1. 查看spark的文檔常見操作

官網(wǎng)鏈接

  • action操作


    spark_base_action.png
  • transformation操作


    spark_base_transformation.png
  • 通過并行化scala集合創(chuàng)建RDD

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  • 查看該rdd的分區(qū)數(shù)量
rdd1.partitions.length
  • 更改分區(qū),因?yàn)閞dd是只讀的湃鹊,所以重新分區(qū)后會生成新的rdd來使用新的分區(qū)
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
val rdd3 = rdd2.repartition(3)
rdd3.partitions.length
  • union求并集儒喊,注意類型要一致
val rdd6 = sc.parallelize(List(5,6,4,7))
val rdd7 = sc.parallelize(List(1,2,3,4))
val rdd8 = rdd6.union(rdd7)
rdd8.distinct.sortBy(x=>x).collect

  • intersection求交集
val rdd9 = rdd6.intersection(rdd7)
  • join:keyvalue形式的值,key相同join出來
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.join(rdd2)
  • groupByKey
val rdd3 = rdd1 union rdd2
rdd3.groupByKey
rdd3.groupByKey.map(x=>(x._1,x._2.sum))
  • WordCount
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((_,1)).groupByKey.map(t=>(t._1, t._2.sum)).collect
  • cogroup:在自己的集合中分組币呵,將分組的結(jié)果和其他集合中的結(jié)果取并集
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.cogroup(rdd2)
val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))
  • cartesian笛卡爾積
val rdd1 = sc.parallelize(List("tom", "jerry"))
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
val rdd3 = rdd1.cartesian(rdd2)

spark action操作

  • 并行化創(chuàng)建rdd
val rdd1 = sc.parallelize(List(1,2,3,4,5))
  • collect:將rdd的數(shù)據(jù)計(jì)算怀愧,轉(zhuǎn)換成scala的集合打印控制臺,數(shù)據(jù)量小時用余赢。
rdd1.collect
  • reduce:將元素進(jìn)行reduce計(jì)算芯义,直接顯示結(jié)果
val rdd2 = rdd1.reduce(_+_)
  • count:求個數(shù)
rdd1.count
  • top:取rdd中的最大的前兩個
rdd1.top(2)
  • take:取前幾個
rdd1.take(2)
  • first:取集合的第一個元素,相當(dāng)于take(1)
rdd1.first
  • takeOrdered:取排序的前幾個
rdd1.takeOrdered(3)
  • 將結(jié)果保存成文本文件
saveAsTextFile(“路徑 or hdfs”)

5.2. spark shell

  • 啟動local模式的spark shell
./bin/spark-shell
  • 啟動集群的spark shell
./bin/spark-shell --master spark://master1:7077
  • 啟動集群的spark shell妻柒,配置參數(shù)
./bin/spark-shell --master spark://master1:7077 --executor-memory 512m --total-executor-cores 3
參數(shù)解釋:
spark://master1:7077 : 指定主機(jī)
--executor-memory 512m:每個work使用多大內(nèi)存
--total-executor-cores 3:指定work總共使用的核數(shù)

5.3. spark的演示

  • sc: spark context,啟動spark會自動創(chuàng)建的對象扛拨,客戶端和spark交互的橋梁
  • 創(chuàng)建RDD:
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  • 查看RDD的分區(qū):
rdd1.partitions.length
  • 轉(zhuǎn)換:
rdd1.filter(_%2==0)
  • 執(zhí)行:
res1.collect
  • map:
rdd1. filter(_%2==0).map(_*10).collect
  • sortby:
rdd1. filter(_%2==0).map(_*10).sortBy(x=>x,false).collect

6. spark api wordcount

6.1. 創(chuàng)建項(xiàng)目

6.2. 導(dǎo)包

6.3. 寫wordcount

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext 
class WordCount {
 }
object WordCount {  
    def main(args: Array[String]) {    
    //創(chuàng)建配置,設(shè)置app的name     
    val conf = new SparkConf().setAppName("WordCount")     
    //創(chuàng)建sparkcontext举塔,將conf傳進(jìn)來     
    val sc = new SparkContext(conf)     
    //從文件中讀取數(shù)據(jù)绑警,做wordcount,寫到文件系統(tǒng)
    sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile(args(1))     
    //停止     
    sc.stop()  
    }
}

6.4. 打jar包

使用maven打包

6.5. 提交

注意提交的時候央渣,涉及到ip的地方盡量用域名计盒,否則報(bào)錯

spark-submit --class WordCount --master spark://master1:7077 --executor-memory 512m --total-executor-cores 3 /home/hadoop/wordcount.jar hdfs://master1ha:9000/core-site.xml hdfs://master1ha:9000/out1

7. spark源碼分析

待更新

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市芽丹,隨后出現(xiàn)的幾起案子北启,更是在濱河造成了極大的恐慌,老刑警劉巖拔第,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件咕村,死亡現(xiàn)場離奇詭異,居然都是意外死亡蚊俺,警方通過查閱死者的電腦和手機(jī)懈涛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來泳猬,“玉大人肩钠,你說我怎么就攤上這事≡葜常” “怎么了价匠?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長呛每。 經(jīng)常有香客問我踩窖,道長,這世上最難降的妖魔是什么晨横? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任洋腮,我火速辦了婚禮箫柳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘啥供。我一直安慰自己悯恍,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布伙狐。 她就那樣靜靜地躺著涮毫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪贷屎。 梳的紋絲不亂的頭發(fā)上罢防,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機(jī)與錄音唉侄,去河邊找鬼咒吐。 笑死,一個胖子當(dāng)著我的面吹牛属划,可吹牛的內(nèi)容都是我干的恬叹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼同眯,長吁一口氣:“原來是場噩夢啊……” “哼绽昼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起嗽测,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤绪励,失蹤者是張志新(化名)和其女友劉穎肿孵,沒想到半個月后唠粥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡停做,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年晤愧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蛉腌。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡官份,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出烙丛,到底是詐尸還是另有隱情舅巷,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布河咽,位于F島的核電站钠右,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏忘蟹。R本人自食惡果不足惜飒房,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一搁凸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧狠毯,春花似錦护糖、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至惜颇,卻和暖如春皆刺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背凌摄。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工羡蛾, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人锨亏。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓痴怨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親器予。 傳聞我的和親對象是個殘疾皇子浪藻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容