Spark學(xué)習(xí)筆記

Scala語(yǔ)法

至于scala語(yǔ)法而言,大致上和Java的語(yǔ)法類似谐宙,增加了一些函數(shù)式編程,具體語(yǔ)法可以參考Scala語(yǔ)法
Scala是一種針對(duì) JVM 將函數(shù)和面向?qū)ο蠹夹g(shù)組合在一起的編程語(yǔ)言电爹。Scala編程語(yǔ)言近來抓住了很多開發(fā)者的眼球须鼎。它看起來像是一種純粹的面向?qū)ο缶幊陶Z(yǔ)言,而又無(wú)縫地結(jié)合了命令式函數(shù)式的編程風(fēng)格澜沟。
Scala也是一種函數(shù)式語(yǔ)言灾票,其函數(shù)也能當(dāng)成值來使用。
Scala被特意設(shè)計(jì)成能夠與Java和.NET互操作茫虽。它用scalac這個(gè)編譯器把源文件編譯成Java的class文件(即在JVM上運(yùn)行的字節(jié)碼)刊苍。你可以從Scala中調(diào)用所有的Java類庫(kù),也同樣可以從Java應(yīng)用程序中調(diào)用Scala的代碼濒析。

  • 數(shù)據(jù)類型


  • 變量
    基于變量的數(shù)據(jù)類型正什,操作系統(tǒng)會(huì)進(jìn)行內(nèi)存分配并且決定什么將被儲(chǔ)存在保留內(nèi)存中。因此号杏,通過給變量分配不同的數(shù)據(jù)類型婴氮,你可以在這些變量中存儲(chǔ)整數(shù),小數(shù)或者字母盾致。
    在 Scala 中主经,使用關(guān)鍵詞 "var" 聲明變量,使用關(guān)鍵詞 "val" 聲明常量(不能修改)庭惜。
var myVar : String = "Foo"
val myVar : String = "Too"

在 Scala 中聲明變量和常量不一定要指明數(shù)據(jù)類型罩驻,在沒有指明數(shù)據(jù)類型的情況下,其數(shù)據(jù)類型是通過變量或常量的初始值推斷出來的护赊。
所以鉴腻,如果在沒有指明數(shù)據(jù)類型的情況下聲明變量或常量必須要給出其初始值迷扇,否則將會(huì)報(bào)錯(cuò)。

var myVar = 10;
val myVal = "Hello, Scala!";
  • Scala 訪問修飾符
    私有(Private)成員
    用 private 關(guān)鍵字修飾爽哎,帶有此標(biāo)記的成員僅在包含了成員定義的類或?qū)ο髢?nèi)部可見蜓席,同樣的規(guī)則還適用內(nèi)部類。
    保護(hù)(Protected)成員
    在 scala 中课锌,對(duì)保護(hù)(Protected)成員的訪問比 java 更嚴(yán)格一些厨内。因?yàn)樗辉试S保護(hù)成員在定義了該成員的的類的子類中被訪問。而在java中渺贤,用protected關(guān)鍵字修飾的成員雏胃,除了定義了該成員的類的子類可以訪問,同一個(gè)包里的其他類也可以進(jìn)行訪問志鞍。
    公共(Public)成員
    Scala中瞭亮,如果沒有指定任何的修飾符,則默認(rèn)為 public固棚。這樣的成員在任何地方都可以被訪問统翩。
  • 輸入輸出
    輸出和C語(yǔ)言差不多,用print()即可此洲。輸入用readLine函數(shù)從控制臺(tái)讀取一行輸入厂汗。如果要讀取數(shù)字,Boolean或者是字符呜师,你可以用readInt,readDouble,readByte等
val name =readLine("Your name:")
print("Your age:")
val age=readInt()
printf("Hello,%s! you are %d years old \n",name,age)
  • 循環(huán)
    是Scala 中沒有i++,i--操作娶桦。
A for (i <- 1 to (10)) {
println("Number is :" + i)
 }
 B for (ch<-"Hello")
 { 
 println(ch)
 }
 C for (i<-0 to 10 ;form=10-i)println(form)
 D for(i<-0 to 10 if i%2=0) println(form)
 E for(i<-0 until (b.length,2)){//跳步0,2汁汗,4衷畦,6.。知牌。霎匈。。
 val t= b(i)
 b(i)=b(i+1)
 b(i+1)=t
 }
  • 數(shù)組
    數(shù)組分為可變和不可變長(zhǎng)度
#不可變長(zhǎng)度
val nums=new Array[Int](10)
 val s = Array("Hello", "World") 
 s(0) = "Goodbye"
#可變長(zhǎng)度
import scala.collection.mutable.ArrayBuffer
 val b= ArrayBuffer[Int]()
 b+=1 //添加元素+=在尾端添加
 b+=(1,2,34,5,6)
 b.trimEnd(5) //尾端刪除5個(gè)元素
 b.insert(2, 6)
 b.insert(2, 7, 8, 9)
 b.remove(2) 
 b.remove(2, 3)
 b.toArray 
  • 其它操作和Java類似送爸,使用時(shí)查API即可。

安裝Scala

  1. Java 設(shè)置
    確保你本地以及安裝了 JDK 1.5 以上版本暖释,并且設(shè)置了 JAVA_HOME 環(huán)境變量及 JDK 的bin目錄袭厂。
  2. 安裝Scala
    官網(wǎng)下載Scala 安裝包,解壓安裝:
tar zxvf scala-2.11.7.tgz

設(shè)置Scala環(huán)境變量設(shè)置

SCALA_HOME=/opt/scala-2.11.7
PATH=$PATH:$SCALA_HOME/bin
export SCALA_HOME PATH

驗(yàn)證Scala安裝

Scala 已正常安裝球匕。

安裝Spark

由于用的cloudera Hadoop發(fā)行版纹磺,所以直接添加即可。使用Apache Hadoop可以看這里亮曹。
輸入spark-shell啟動(dòng)Spark的時(shí)候報(bào)錯(cuò)

閱讀錯(cuò)誤橄杨,它說yarn需要的內(nèi)存超過了給它設(shè)置的最大內(nèi)存秘症,可以修改yarn的最大內(nèi)存或者減少yarn所需要的內(nèi)存。我們選擇增大yarn的最大內(nèi)存式矫,所以將yarn.scheduler.maximum-allocation-mb和yarn.nodemanager.resource.memory-mb參數(shù)調(diào)高至16G并重啟yarn和spark服務(wù)乡摹。
這是因?yàn)橛胷oot用戶啟動(dòng)了spark,缺少訪問hdfs的一些權(quán)限采转,所以改為hdfs用戶解決聪廉。

  • 測(cè)試SparkPi
    在spark的bin目錄下輸入./run-example SparkPi 10(迭代次數(shù))計(jì)算PI的值。
    第一次運(yùn)行的時(shí)候報(bào)錯(cuò)說找不到main class故慈,檢查發(fā)現(xiàn)沒有配置JAVA_HOME板熊,所以用命令echo $JAVA_HOME檢查一下是否配置好了。
    然后就可以正確運(yùn)行了察绷,并得到結(jié)果干签。
  • 測(cè)試wordcount
    當(dāng)程序一旦打包好,就可以使用bin/spark-submit腳本啟動(dòng)應(yīng)用了. 這個(gè)腳本負(fù)責(zé)設(shè)置spark使用的classpath和依賴,支持不同類型的集群管理器和發(fā)布模式:
./bin/spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

--class: 你的應(yīng)用的啟動(dòng)類 (如 org.apache.spark.examples.SparkPi)
--master: 集群的master URL (如 spark://23.195.26.187:7077)
--deploy-mode: 是否發(fā)布你的驅(qū)動(dòng)到worker節(jié)點(diǎn)(cluster) 或者作為一個(gè)本地客戶端 (client) (default: client)*
--conf: 任意的Spark配置屬性, 格式key=value. 如果值包含空格拆撼,可以加引號(hào)“key=value”.
application-jar: 打包好的應(yīng)用jar,包含依賴. 這個(gè)URL在集群中全局可見容劳。 比如hdfs:// 共享存儲(chǔ)系統(tǒng)谓媒, 如果是 file:// path人柿, 那么所有的節(jié)點(diǎn)的path都包含同樣的jar.
application-arguments: 傳給main()方法的參數(shù)

使用命令./spark-submit --class testSpark.WordCount --master yarn-cluster /usr/tmp/WordCount.jar /tmp/input/words.txt /tmp/result/wordcount/使用集群模式運(yùn)行jar包,并指明了主類和輸入輸出
第一次執(zhí)行的時(shí)候報(bào)錯(cuò)了隧魄,但是只是說


查看日志找到具體錯(cuò)誤筋岛,輸入的文件地址錯(cuò)了娶视。
修正后得到輸出結(jié)果。

Spark Shell

spark shell 是spark自帶的一個(gè)快速原型開發(fā)的工具睁宰,在spark目錄下面的bin目錄下面肪获。把這個(gè)理解為在python交互界面下寫python代碼就行了。

寫一個(gè)wordcount

  • load數(shù)據(jù)集


    sc是在進(jìn)入spark shell 時(shí)候創(chuàng)建一個(gè)spark content 就是spark上下文的意思柒傻,val 是scala語(yǔ)法中聲明常量的方式(val 定義的值實(shí)際上是一個(gè)常量孝赫,如果你試圖改變他的值,解析器會(huì)報(bào)錯(cuò)红符,聲明值或變量但不做初始化也會(huì)報(bào)錯(cuò)青柄。),通過println可以看到讀入的文件被處理成一個(gè)MappedRDD的對(duì)象预侯。這個(gè)sc.textFile的操作就把的數(shù)據(jù)load到內(nèi)存中了致开。

  • 調(diào)用flatMap方法計(jì)算wordCount
    val wordCounts = textFile.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b)
  • 由于Scala語(yǔ)言的特性,此時(shí)wordCounts還不是執(zhí)行計(jì)算后的值萎馅,還只是定義著計(jì)算的方法双戳,需要用collect()來執(zhí)行這個(gè)計(jì)算。

    res2表示得到的結(jié)果糜芳,是一個(gè)Array類型的飒货,如果要取出每一個(gè)值魄衅,可以用Scala的循環(huán)。


配置Spark應(yīng)該程序開發(fā)塘辅、運(yùn)行環(huán)境

在開發(fā)Spark程序的時(shí)候晃虫,總是先在本地進(jìn)行開發(fā),所以就需要spark的開發(fā)環(huán)境莫辨,等調(diào)試程序確保無(wú)誤的時(shí)候再把程序打包到本地Spark運(yùn)行傲茄,所以這就需要在本地配置Spark的運(yùn)行環(huán)境。最后再把jar包提交到分布式環(huán)境運(yùn)行沮榜。(當(dāng)然盘榨,如果你不想在本地測(cè)試jar包而直接拿到集群上去跑可以不配置本地spark,但是后面需要spark/lib文件里的一些jar包蟆融,需單獨(dú)下載)

  1. 配置JDK1.8
    這個(gè)需要配置JAVA_HOME草巡,PATH,CLASSPATH型酥。這個(gè)相信大多數(shù)的都已經(jīng)配置了的山憨,至于每個(gè)環(huán)境變量的具體的作用可以看這里。配置完成后驗(yàn)證一下是否成功弥喉,當(dāng)看到Java信息就成功了郁竟。
  2. Scala的安裝
    首先從DOWNLOAD PREVIOUS VERSIONS下載到對(duì)應(yīng)的版本,在這里需要注意的是由境,Spark的各個(gè)版本需要跟相應(yīng)的Scala版本對(duì)應(yīng)棚亩,比如我這里使用的Spark 1.6.0,Scala使用的 2.10.6版本虏杰。記得下載二進(jìn)制版本的Scala讥蟆。

    安裝完后默認(rèn)是自動(dòng)將bin配置到了path環(huán)境變量,可以驗(yàn)證一下是否自動(dòng)配置了纺阔。
  3. Spark的安裝
    park的安裝非常簡(jiǎn)單瘸彤,直接去Download Apache Spark。有兩個(gè)步驟:
    選擇好對(duì)應(yīng)Hadoop版本的Spark版本笛钝,如下圖中所示质况;
    然后點(diǎn)擊spark-1.6.2-bin-hadoop2.6.tgz,等待下載結(jié)束即可玻靡。
    下載完成后將文件進(jìn)行解壓(可能需要解壓兩次)结榄,最好解壓到一個(gè)盤的根目錄下,并重命名為Spark啃奴,簡(jiǎn)單不易出錯(cuò)。并且需要注意的是雄妥,在Spark的文件目錄路徑名中最蕾,不要出現(xiàn)空格依溯,類似于“Program Files”這樣的文件夾名是不被允許的。
    然后就是配置環(huán)境變量瘟则,把\Spark\bin加入path黎炉。
  1. Hadoop安裝
    系統(tǒng)變量設(shè)置后,就可以在任意當(dāng)前目錄下的cmd中運(yùn)行spark-shell醋拧,但這個(gè)時(shí)候很有可能會(huì)碰到各種錯(cuò)誤慷嗜,這里主要是因?yàn)镾park是基于Hadoop的,所以這里也有必要配置一個(gè)Hadoop的運(yùn)行環(huán)境丹壕。在Hadoop Releases里可以看到Hadoop的各個(gè)歷史版本庆械,這里由于下載的Spark是基于Hadoop 2.6的(在Spark安裝的第一個(gè)步驟中,我們選擇的是Pre-built for Hadoop 2.6)菌赖,我這里選擇2.6.4版本缭乘,選擇好相應(yīng)版本并點(diǎn)擊后,進(jìn)入詳細(xì)的下載頁(yè)面琉用,并下載’hadoop-2.6.4.tar.gz’文件
    下載并解壓到指定目錄堕绩,然后到環(huán)境變量部分設(shè)置HADOOP_HOME為Hadoop的解壓目錄,然后把它下面的bin目錄頁(yè)加入path

    然后再cmd下啟動(dòng)spark邑时,報(bào)錯(cuò)如下

    主要是因?yàn)镠adoop的bin目錄下沒有winutils.exe文件的原因造成的奴紧。這里的解決辦法是:
  • https://github.com/steveloughran/winutils 選擇你安裝的Hadoop版本號(hào),然后進(jìn)入到bin目錄下晶丘,找到winutils.exe文件黍氮,下載方法是點(diǎn)擊winutils.exe文件,進(jìn)入之后在頁(yè)面的右上方部分有一個(gè)Download按鈕铣口,點(diǎn)擊下載即可滤钱。
  • 下載好winutils.exe后,將這個(gè)文件放入到Hadoop的bin目錄下脑题,我這里是E:\study\hadoop-2.6.4\bin件缸。
  • 在打開的cmd中輸入 E:\study\hadoop-2.6.4\bin\winutils.exe chmod 777 /tmp/hive 這個(gè)操作是用來修改權(quán)限的。注意前面的F:\Program Files\hadoop\bin部分要對(duì)應(yīng)的替換成實(shí)際你所安裝的bin目錄所在位置叔遂。
    經(jīng)過這幾個(gè)步驟之后他炊,然后再次開啟一個(gè)新的cmd窗口,如果正常的話已艰,應(yīng)該就可以通過直接輸入spark-shell來運(yùn)行Spark了痊末。

看到這兩句話就說明成功了

Spark context available as sc.
SQL context available as sqlContext.

配置集成開發(fā)環(huán)境IDEA

配置前提
  1. JDK安裝。 請(qǐng)自行前往oracle官方網(wǎng)站下載安裝哩掺,并在command命令行窗口確認(rèn)java -version 可以返回版本號(hào)凿叠,否則的話要去系統(tǒng)環(huán)境變量設(shè)置位置確認(rèn)是否java已經(jīng)被添加到PATH中
  2. Scala下載安裝。移步官網(wǎng) http://www.scala-lang.org/ 下載并安裝即可。同第1步盒件,要在command命令行下確認(rèn)敲擊scala可以進(jìn)入交互式命令窗口蹬碧,否則請(qǐng)確認(rèn)環(huán)境變量的配置。
  3. spark源代碼下載炒刁。官方網(wǎng)站 http://spark.apache.org/downloads.html 上提供有各種hadoop版本的預(yù)編譯版spark代碼恩沽,理論上要根據(jù)你在用的hadoop版本來相應(yīng)選擇,本文僅作配置說明翔始,故任選其中一即可罗心。筆者下載的是spark1.5, 對(duì)應(yīng)hadoop2.6預(yù)編譯的版本,解壓即可城瞎。
  4. Intellij IDEA下載渤闷。https://www.jetbrains.com/idea/ 上可以下載免費(fèi)的community版本。
配置開始
  1. 安裝IDEA 的scala插件
    第一次安裝時(shí)全谤,在plugins處輸入scala關(guān)鍵詞搜索肤晓,在聯(lián)網(wǎng)環(huán)境下點(diǎn)擊安裝即可。

    然后就可以看到下載進(jìn)度

    完了之后記得重啟认然,不然還是不能用
  2. 創(chuàng)建項(xiàng)目并導(dǎo)入相應(yīng)依賴包
    直接在新建項(xiàng)目界面選擇scala項(xiàng)目即可




    Project SDK是java的jdk补憾,如果沒有默認(rèn)加載出來,點(diǎn)擊New卷员,手工定位到j(luò)dk的目錄提交上來即可盈匾。Scala SDK那里如果默認(rèn)沒有加載出來,點(diǎn)擊Create毕骡,在彈出的窗口中安默認(rèn)勾選的System點(diǎn)擊OK即可削饵。便可以看到項(xiàng)目結(jié)構(gòu)了。



    接下來未巫,我們導(dǎo)入上面下載好的spark源碼窿撬。按下圖指引操作, 在+號(hào)處選擇java, 然后定位到你上面步驟中將spark程序解壓到的目錄位置,選擇lib目錄下的spark-assembly-1.5.0-hadoop2.6.0.jar文件叙凡,確認(rèn)劈伴。

    當(dāng)看到項(xiàng)目外部包里面出現(xiàn)了剛剛導(dǎo)入的包就行了。
程序開發(fā)

創(chuàng)建scala class握爷,類型選擇object(如果右鍵沒有Scala class選項(xiàng)是因?yàn)闆]有把src目錄作為你的Sources文件夾屬性跛璧,右鍵src有個(gè)選項(xiàng)把它設(shè)置為Sources文件夾即可,也可以去項(xiàng))新啼。
寫一個(gè)分布式的wordcount(如果想寫本地spark的話就不用RDD追城,像以前那樣寫代碼即可)

import org.apache.spark.SparkConf //使用spark的相關(guān)操作
import org.apache.spark.SparkContext //獲取SparkContext上下文對(duì)象

object MyTest {
  def main(args: Array[String]){
    if (args.length != 2 || args(0) == null || args(1) == null){ //查看傳如參數(shù)是否為2且不為空
      System.exit(1);
    }
    val conf = new SparkConf() //獲取spark環(huán)境的配置,用于傳如上下文
    val sc = new SparkContext(conf)
    val line = sc.textFile(args(0))

    //wordcount的算法燥撞,用flatMap實(shí)現(xiàn)
    val result = line.flatMap(_.split("[^a-zA-Z]+")).map((_, 1)).reduceByKey(_+_)
    result.saveAsTextFile(args(1)) //保存結(jié)果到指定文件夾
    sc.stop() //關(guān)閉上下文對(duì)象

  }
}

最后配置編譯生成的jar包名字和地址座柱。(這里項(xiàng)目名字spark寫成sprak了迷帜。。色洞。)


進(jìn)入到如圖的項(xiàng)目管理界面瞬矩,選擇圖中的目錄,打包時(shí)的main class選擇你的主類锋玲,我這里是MyTest

然后就編譯可以輸出你的包了。build -> build artifacts -> spark_dev:jar -> build, 然后就開始編譯了涵叮,在最下面可以看到進(jìn)行的狀態(tài)惭蹂。到輸出JAR包的目錄下去看看,發(fā)現(xiàn)確實(shí)成功生成了一個(gè)jar文件割粮。(此處打好的jar包如果要提交到數(shù)平的Spark集群上運(yùn)行盾碗,請(qǐng)打開此jar包文件,觀察其中是否有一個(gè)scala的文件夾舀瓢,刪除了廷雅!否則可能與線上的scala版本沖突,不沖突可不刪除)
這里可能會(huì)報(bào)錯(cuò)java.lang.outofmemoryerror

配置jar包output選項(xiàng)京髓,因?yàn)檫\(yùn)行環(huán)境中已經(jīng)有相關(guān)包航缀,所以其他包刪除,只保留’compile output’那一項(xiàng)堰怨,這時(shí)再build就不會(huì)內(nèi)存溢出芥玉。

本地測(cè)試代碼

在cmd中輸入命令
>spark-submit --class MyTest --master local E:\study\Projects\sprak_test\out\artifacts\sprak_test_jar\sprak_test.jar E:\words.txt E:\out
表示把剛剛的jar包提交到spark執(zhí)行,local本地模式备图,輸入文件和輸入路徑灿巧。最后得到結(jié)果:


編寫spark程序

以下內(nèi)容基于spark1.6.0

spark程序可以用Java、python和scala編寫揽涮,由于spark本身是由scala編寫抠藕,再加上scala的語(yǔ)法特定決定了scala是編寫spark程序的最合適的語(yǔ)言。

基本概念

  • RDD:是彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset)的簡(jiǎn)稱蒋困,是分布式內(nèi)存的一個(gè)抽象概念盾似,提供了一種高度受限的共享內(nèi)存模型;
  • DAG:是Directed Acyclic Graph(有向無(wú)環(huán)圖)的簡(jiǎn)稱家破,反映RDD之間的依賴關(guān)系颜说;
  • Executor:是運(yùn)行在工作節(jié)點(diǎn)(Worker Node)上的一個(gè)進(jìn)程,負(fù)責(zé)運(yùn)行任務(wù)汰聋,并為應(yīng)用程序存儲(chǔ)數(shù)據(jù)门粪;
  • 應(yīng)用:用戶編寫的Spark應(yīng)用程序;
  • 任務(wù):運(yùn)行在Executor上的工作單元烹困;
  • 作業(yè):一個(gè)作業(yè)包含多個(gè)RDD及作用于相應(yīng)RDD上的各種操作玄妈;
  • 階段:是作業(yè)的基本調(diào)度單位,一個(gè)作業(yè)會(huì)分為多組任務(wù),每組任務(wù)被稱為“階段”拟蜻,或者也被稱為“任務(wù)集”绎签。

架構(gòu)設(shè)計(jì)

Spark運(yùn)行架構(gòu)包括集群資源管理器(Cluster Manager)、運(yùn)行作業(yè)任務(wù)的工作節(jié)點(diǎn)(Worker Node)酝锅、每個(gè)應(yīng)用的任務(wù)控制節(jié)點(diǎn)(Driver)和每個(gè)工作節(jié)點(diǎn)上負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor)诡必。其中,集群資源管理器可以是Spark自帶的資源管理器搔扁,也可以是YARN或Mesos等資源管理框架爸舒。

與Hadoop MapReduce計(jì)算框架相比,Spark所采用的Executor有兩個(gè)優(yōu)點(diǎn):一是利用多線程來執(zhí)行具體的任務(wù)(Hadoop MapReduce采用的是進(jìn)程模型)稿蹲,減少任務(wù)的啟動(dòng)開銷扭勉;二是Executor中有一個(gè)BlockManager存儲(chǔ)模塊,會(huì)將內(nèi)存和磁盤共同作為存儲(chǔ)設(shè)備苛聘,當(dāng)需要多輪迭代計(jì)算時(shí)涂炎,可以將中間結(jié)果存儲(chǔ)到這個(gè)存儲(chǔ)模塊里,下次需要時(shí)设哗,就可以直接讀該存儲(chǔ)模塊里的數(shù)據(jù)唱捣,而不需要讀寫到HDFS等文件系統(tǒng)里,因而有效減少了IO開銷网梢;或者在交互式查詢場(chǎng)景下爷光,預(yù)先將表緩存到該存儲(chǔ)系統(tǒng)上,從而可以提高讀寫IO性能澎粟。
Spark運(yùn)行架構(gòu)

Spark運(yùn)行基本流程

(1)當(dāng)一個(gè)Spark應(yīng)用被提交時(shí)蛀序,首先需要為這個(gè)應(yīng)用構(gòu)建起基本的運(yùn)行環(huán)境,即由任務(wù)控制節(jié)點(diǎn)(Driver)創(chuàng)建一個(gè)SparkContext活烙,由SparkContext負(fù)責(zé)和資源管理器(Cluster Manager)的通信以及進(jìn)行資源的申請(qǐng)徐裸、任務(wù)的分配和監(jiān)控等。SparkContext會(huì)向資源管理器注冊(cè)并申請(qǐng)運(yùn)行Executor的資源啸盏;
(2)資源管理器為Executor分配資源重贺,并啟動(dòng)Executor進(jìn)程,Executor運(yùn)行情況將隨著“心跳”發(fā)送到資源管理器上回懦;
(3)SparkContext根據(jù)RDD的依賴關(guān)系構(gòu)建DAG圖气笙,DAG圖提交給DAG調(diào)度器(DAGScheduler)進(jìn)行解析,將DAG圖分解成多個(gè)“階段”(每個(gè)階段都是一個(gè)任務(wù)集)怯晕,并且計(jì)算出各個(gè)階段之間的依賴關(guān)系潜圃,然后把一個(gè)個(gè)“任務(wù)集”提交給底層的任務(wù)調(diào)度器(TaskScheduler)進(jìn)行處理;Executor向SparkContext申請(qǐng)任務(wù)舟茶,任務(wù)調(diào)度器將任務(wù)分發(fā)給Executor運(yùn)行谭期,同時(shí)堵第,SparkContext將應(yīng)用程序代碼發(fā)放給Executor;
(4)任務(wù)在Executor上運(yùn)行隧出,把執(zhí)行結(jié)果反饋給任務(wù)調(diào)度器踏志,然后反饋給DAG調(diào)度器,運(yùn)行完畢后寫入數(shù)據(jù)并釋放所有資源胀瞪。


Spark運(yùn)行基本流程

RDD

  • RDD(彈性數(shù)據(jù)集)是Spark提供的最重要的
    抽象的概念针余,它是一種有容錯(cuò)機(jī)制的特殊集合,可
    以分布在集群的節(jié)點(diǎn)上凄诞,以函數(shù)式編操作集合的方
    式涵紊,進(jìn)行各種并行操作。
  • 可以將RDD理解為一個(gè)具有容錯(cuò)機(jī)制的特殊集合幔摸,
    它提供了一種只讀、只能有已存在的RDD變換而來
    的共享內(nèi)存颤练,然后將所有數(shù)據(jù)都加載到內(nèi)存中既忆,方
    便進(jìn)行多次重用。
獲取RDD
  • 從共享的文件系統(tǒng)(HDFS)
  • 通過已存在的RDD轉(zhuǎn)換
  • 將已存在scala集合(只要是Seq對(duì)象)并行化 嗦玖,通過調(diào)用SparkContext的parallelize方法實(shí)現(xiàn)患雇。
  • 改變現(xiàn)有RDD的持久性;RDD是懶散宇挫,短暫的苛吱。
    RDD的固化:cache緩存至內(nèi)存;
    save保存到分布式文件系統(tǒng)

操作RDD

  • Transformation:根據(jù)數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)
    集器瘪,計(jì)算后返回一個(gè)新RDD翠储;例如:Map將數(shù)據(jù)
    的每個(gè)元素經(jīng)過某個(gè)函數(shù)計(jì)算后,返回一個(gè)新的分
    布式數(shù)據(jù)集橡疼。
    常見的Transformation操作有map援所、filter、flatMap欣除、sort等等住拭。
  • Actions:對(duì)數(shù)據(jù)集計(jì)算后返回一個(gè)數(shù)值value給
    驅(qū)動(dòng)程序;例如:Reduce將數(shù)據(jù)集的所有元素用
    某個(gè)函數(shù)聚合后历帚,將最終結(jié)果返回給程序滔岳。
    常見的Actions操作有count、collect等等挽牢。在此階段才發(fā)生真正的計(jì)算
    舉個(gè)例子:
val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(_*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

此段代碼的意思:

  1. 先調(diào)用SparkContext的parallelize方法將一個(gè)list并行化谱煤,獲得RDD。
  2. 然后調(diào)用RDD上的map操作禽拔,使此RDD上元素乘2趴俘,返回一個(gè)新的RDD b睹簇。
  3. 接著對(duì)a、b這個(gè)RDD調(diào)用collect操作寥闪,返回Array對(duì)象太惠。
  4. 其中SparkContext表示spark程序執(zhí)行的上下文,即又spark的環(huán)境得到的疲憋。
    其它的transformation和actions操作在具體運(yùn)用的時(shí)候?qū)W習(xí)

Spark程序的三種運(yùn)行模式

Local模式
  • 該模式被稱為L(zhǎng)ocal[N]模式凿渊,是用單機(jī)的多個(gè)線程來模擬Spark分布式計(jì)算,通常用來驗(yàn)證開發(fā)出來的應(yīng)用程序邏輯上有沒有問題缚柳。

  • 其中N代表可以使用N個(gè)線程埃脏,每個(gè)線程擁有一個(gè)core。如果不指定N秋忙,則默認(rèn)是1個(gè)線程(該線程有1個(gè)core)彩掐。


  • spark-submit 和 spark-submit --master local 效果是一樣的
  • (同理:spark-shell 和 spark-shell --master local 效果是一樣的)
  • spark-submit --master local[4] 代表會(huì)有4個(gè)線程(每個(gè)線程一個(gè)core)來并發(fā)執(zhí)行應(yīng)用程序。
Spark On Yarn模式

在Spark中灰追,有Yarn-Client和Yarn-Cluster兩種模式可以運(yùn)行在Yarn上堵幽,通常Yarn-Cluster適用于生產(chǎn)環(huán)境,而Yarn-Clientr更適用于交互弹澎,調(diào)試模式朴下。
優(yōu)勢(shì)

  1. Spark支持資源動(dòng)態(tài)共享,運(yùn)行于Yarn的框架都共享一個(gè)集中配置好的資源池
  2. 可以很方便的利用Yarn的資源調(diào)度特性來做分類·苦蒿,隔離以及優(yōu)先級(jí)控制負(fù)載殴胧,擁有更靈活的調(diào)度策略
  3. Yarn可以自由地選擇executor數(shù)量
  4. Yarn是唯一支持Spark安全的集群管理器,使用Yarn佩迟,Spark可以運(yùn)行于Kerberized Hadoop之上团滥,在它們進(jìn)程之間進(jìn)行安全認(rèn)證

此模式分為yarn-client和yarn-cluster

Yarn-cluster模式下作業(yè)執(zhí)行流程:

  1. 客戶端生成作業(yè)信息提交給ResourceManager(RM)
  2. RM在某一個(gè)NodeManager(由Yarn決定)啟動(dòng)container并將Application Master(AM)分配給該NodeManager(NM)
  3. NM接收到RM的分配,啟動(dòng)Application Master并初始化作業(yè)报强,此時(shí)這個(gè)NM就稱為Driver
  4. Application向RM申請(qǐng)資源惫撰,分配資源同時(shí)通知其他NodeManager啟動(dòng)相應(yīng)的Executor
  5. Executor向NM上的Application Master注冊(cè)匯報(bào)并完成相應(yīng)的任務(wù)

Yarn-client模式下作業(yè)執(zhí)行流程:

  1. 客戶端生成作業(yè)信息提交給ResourceManager(RM)
  2. RM在本地NodeManager啟動(dòng)container并將Application Master(AM)分配給該NodeManager(NM)
  3. NM接收到RM的分配,啟動(dòng)Application Master并初始化作業(yè)躺涝,此時(shí)這個(gè)NM就稱為Driver
  4. Application向RM申請(qǐng)資源厨钻,分配資源同時(shí)通知其他NodeManager啟動(dòng)相應(yīng)的Executor
  5. Executor向本地啟動(dòng)的Application Master注冊(cè)匯報(bào)并完成相應(yīng)的任務(wù)
分布式模式查看程序執(zhí)行情況

把剛剛寫好的wordcount的jar包sprak_test.jar提交到分布式的環(huán)境中去,并分別選擇yarn-cluster和yarn-client運(yùn)行模式坚嗜,用相應(yīng)的方法查看執(zhí)行情況夯膀。(用戶切換為hdfs

  • yarn-client
    首先執(zhí)行命令hdfs dfs -rm -r /tmp/result/wordcount/刪除輸出路徑,不然會(huì)報(bào)錯(cuò)苍蔬。進(jìn)入到spark的bin目錄诱建,然后還是和以前一樣填寫輸出輸出運(yùn)行,只是運(yùn)行模式改為yarn-client碟绑。
    spark-submit --class MyTest --master yarn-client /usr/tmp/sprak_test.jar /tmp/input/words.txt /tmp/result/wordcount/
    對(duì)于yarn-client模式的運(yùn)行情況俺猿,會(huì)直接打印到終端茎匠。


    也可以通過APP Name在端口18088查看情況

    只是這個(gè)任務(wù)太快了,不能在webUi看到運(yùn)行的過程押袍。其結(jié)果如下:

  • yarn-cluster
    首先執(zhí)行命令hdfs dfs -rm -r /tmp/result/wordcount/刪除輸出路徑诵冒,不然會(huì)報(bào)錯(cuò)。進(jìn)入到spark的bin目錄谊惭,然后還是和以前一樣填寫輸出輸出運(yùn)行汽馋,只是運(yùn)行模式改為yarn-cluster。
    spark-submit --class MyTest --master yarn-cluster/usr/tmp/sprak_test.jar /tmp/input/words.txt /tmp/result/wordcount/
    也是去18088端口查看信息(運(yùn)行太快看不到信息)


如果要查看詳細(xì)情況圈盔,還可以點(diǎn)stdout查看具體運(yùn)行日志


結(jié)果如下:


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末豹芯,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子驱敲,更是在濱河造成了極大的恐慌铁蹈,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件众眨,死亡現(xiàn)場(chǎng)離奇詭異握牧,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)围辙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來放案,“玉大人姚建,你說我怎么就攤上這事≈ㄑ常” “怎么了掸冤?”我有些...
    開封第一講書人閱讀 153,443評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)友雳。 經(jīng)常有香客問我稿湿,道長(zhǎng),這世上最難降的妖魔是什么押赊? 我笑而不...
    開封第一講書人閱讀 55,475評(píng)論 1 279
  • 正文 為了忘掉前任饺藤,我火速辦了婚禮,結(jié)果婚禮上流礁,老公的妹妹穿的比我還像新娘涕俗。我一直安慰自己,他們只是感情好神帅,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評(píng)論 5 374
  • 文/花漫 我一把揭開白布再姑。 她就那樣靜靜地躺著,像睡著了一般找御。 火紅的嫁衣襯著肌膚如雪元镀。 梳的紋絲不亂的頭發(fā)上绍填,一...
    開封第一講書人閱讀 49,185評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音栖疑,去河邊找鬼讨永。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蔽挠,可吹牛的內(nèi)容都是我干的住闯。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼澳淑,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼比原!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起杠巡,我...
    開封第一講書人閱讀 37,112評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤量窘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后氢拥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蚌铜,經(jīng)...
    沈念sama閱讀 43,609評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評(píng)論 2 325
  • 正文 我和宋清朗相戀三年嫩海,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了冬殃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,163評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡叁怪,死狀恐怖审葬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情奕谭,我是刑警寧澤涣觉,帶...
    沈念sama閱讀 33,803評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站血柳,受9級(jí)特大地震影響官册,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜难捌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評(píng)論 3 307
  • 文/蒙蒙 一膝宁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧根吁,春花似錦昆汹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至愚争,卻和暖如春映皆,著一層夾襖步出監(jiān)牢的瞬間挤聘,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評(píng)論 1 261
  • 我被黑心中介騙來泰國(guó)打工捅彻, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留组去,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,636評(píng)論 2 355
  • 正文 我出身青樓步淹,卻偏偏與公主長(zhǎng)得像从隆,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子缭裆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容