Scala語(yǔ)法
至于scala語(yǔ)法而言,大致上和Java的語(yǔ)法類似谐宙,增加了一些函數(shù)式編程,具體語(yǔ)法可以參考Scala語(yǔ)法
Scala是一種針對(duì) JVM 將函數(shù)和面向?qū)ο蠹夹g(shù)組合在一起的編程語(yǔ)言电爹。Scala編程語(yǔ)言近來抓住了很多開發(fā)者的眼球须鼎。它看起來像是一種純粹的面向?qū)ο缶幊陶Z(yǔ)言,而又無(wú)縫地結(jié)合了命令式和函數(shù)式的編程風(fēng)格澜沟。
Scala也是一種函數(shù)式語(yǔ)言灾票,其函數(shù)也能當(dāng)成值來使用。
Scala被特意設(shè)計(jì)成能夠與Java和.NET互操作茫虽。它用scalac這個(gè)編譯器把源文件編譯成Java的class文件(即在JVM上運(yùn)行的字節(jié)碼)刊苍。你可以從Scala中調(diào)用所有的Java類庫(kù),也同樣可以從Java應(yīng)用程序中調(diào)用Scala的代碼濒析。
-
數(shù)據(jù)類型
- 變量
基于變量的數(shù)據(jù)類型正什,操作系統(tǒng)會(huì)進(jìn)行內(nèi)存分配并且決定什么將被儲(chǔ)存在保留內(nèi)存中。因此号杏,通過給變量分配不同的數(shù)據(jù)類型婴氮,你可以在這些變量中存儲(chǔ)整數(shù),小數(shù)或者字母盾致。
在 Scala 中主经,使用關(guān)鍵詞 "var" 聲明變量,使用關(guān)鍵詞 "val" 聲明常量(不能修改)庭惜。
var myVar : String = "Foo"
val myVar : String = "Too"
在 Scala 中聲明變量和常量不一定要指明數(shù)據(jù)類型罩驻,在沒有指明數(shù)據(jù)類型的情況下,其數(shù)據(jù)類型是通過變量或常量的初始值推斷出來的护赊。
所以鉴腻,如果在沒有指明數(shù)據(jù)類型的情況下聲明變量或常量必須要給出其初始值迷扇,否則將會(huì)報(bào)錯(cuò)。
var myVar = 10;
val myVal = "Hello, Scala!";
- Scala 訪問修飾符
私有(Private)成員
用 private 關(guān)鍵字修飾爽哎,帶有此標(biāo)記的成員僅在包含了成員定義的類或?qū)ο髢?nèi)部可見蜓席,同樣的規(guī)則還適用內(nèi)部類。
保護(hù)(Protected)成員
在 scala 中课锌,對(duì)保護(hù)(Protected)成員的訪問比 java 更嚴(yán)格一些厨内。因?yàn)樗辉试S保護(hù)成員在定義了該成員的的類的子類中被訪問。而在java中渺贤,用protected關(guān)鍵字修飾的成員雏胃,除了定義了該成員的類的子類可以訪問,同一個(gè)包里的其他類也可以進(jìn)行訪問志鞍。
公共(Public)成員
Scala中瞭亮,如果沒有指定任何的修飾符,則默認(rèn)為 public固棚。這樣的成員在任何地方都可以被訪問统翩。 - 輸入輸出
輸出和C語(yǔ)言差不多,用print()即可此洲。輸入用readLine函數(shù)從控制臺(tái)讀取一行輸入厂汗。如果要讀取數(shù)字,Boolean或者是字符呜师,你可以用readInt,readDouble,readByte等
val name =readLine("Your name:")
print("Your age:")
val age=readInt()
printf("Hello,%s! you are %d years old \n",name,age)
- 循環(huán)
是Scala 中沒有i++,i--操作娶桦。
A for (i <- 1 to (10)) {
println("Number is :" + i)
}
B for (ch<-"Hello")
{
println(ch)
}
C for (i<-0 to 10 ;form=10-i)println(form)
D for(i<-0 to 10 if i%2=0) println(form)
E for(i<-0 until (b.length,2)){//跳步0,2汁汗,4衷畦,6.。知牌。霎匈。。
val t= b(i)
b(i)=b(i+1)
b(i+1)=t
}
- 數(shù)組
數(shù)組分為可變和不可變長(zhǎng)度
#不可變長(zhǎng)度
val nums=new Array[Int](10)
val s = Array("Hello", "World")
s(0) = "Goodbye"
#可變長(zhǎng)度
import scala.collection.mutable.ArrayBuffer
val b= ArrayBuffer[Int]()
b+=1 //添加元素+=在尾端添加
b+=(1,2,34,5,6)
b.trimEnd(5) //尾端刪除5個(gè)元素
b.insert(2, 6)
b.insert(2, 7, 8, 9)
b.remove(2)
b.remove(2, 3)
b.toArray
- 其它操作和Java類似送爸,使用時(shí)查API即可。
安裝Scala
- Java 設(shè)置
確保你本地以及安裝了 JDK 1.5 以上版本暖释,并且設(shè)置了 JAVA_HOME 環(huán)境變量及 JDK 的bin目錄袭厂。 - 安裝Scala
從官網(wǎng)下載Scala 安裝包,解壓安裝:
tar zxvf scala-2.11.7.tgz
設(shè)置Scala環(huán)境變量設(shè)置
SCALA_HOME=/opt/scala-2.11.7
PATH=$PATH:$SCALA_HOME/bin
export SCALA_HOME PATH
驗(yàn)證Scala安裝Scala 已正常安裝球匕。
安裝Spark
由于用的cloudera Hadoop發(fā)行版纹磺,所以直接添加即可。使用Apache Hadoop可以看這里亮曹。
輸入spark-shell啟動(dòng)Spark的時(shí)候報(bào)錯(cuò)
- 測(cè)試SparkPi
在spark的bin目錄下輸入./run-example SparkPi 10(迭代次數(shù))
計(jì)算PI的值。
第一次運(yùn)行的時(shí)候報(bào)錯(cuò)說找不到main class故慈,檢查發(fā)現(xiàn)沒有配置JAVA_HOME板熊,所以用命令echo $JAVA_HOME
檢查一下是否配置好了。
然后就可以正確運(yùn)行了察绷,并得到結(jié)果干签。
- 測(cè)試wordcount
當(dāng)程序一旦打包好,就可以使用bin/spark-submit腳本啟動(dòng)應(yīng)用了. 這個(gè)腳本負(fù)責(zé)設(shè)置spark使用的classpath和依賴,支持不同類型的集群管理器和發(fā)布模式:
./bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
--class: 你的應(yīng)用的啟動(dòng)類 (如 org.apache.spark.examples.SparkPi)
--master: 集群的master URL (如 spark://23.195.26.187:7077)
--deploy-mode: 是否發(fā)布你的驅(qū)動(dòng)到worker節(jié)點(diǎn)(cluster) 或者作為一個(gè)本地客戶端 (client) (default: client)*
--conf: 任意的Spark配置屬性, 格式key=value. 如果值包含空格拆撼,可以加引號(hào)“key=value”.
application-jar: 打包好的應(yīng)用jar,包含依賴. 這個(gè)URL在集群中全局可見容劳。 比如hdfs:// 共享存儲(chǔ)系統(tǒng)谓媒, 如果是 file:// path人柿, 那么所有的節(jié)點(diǎn)的path都包含同樣的jar.
application-arguments: 傳給main()方法的參數(shù)
使用命令./spark-submit --class testSpark.WordCount --master yarn-cluster /usr/tmp/WordCount.jar /tmp/input/words.txt /tmp/result/wordcount/
使用集群模式運(yùn)行jar包,并指明了主類和輸入輸出
第一次執(zhí)行的時(shí)候報(bào)錯(cuò)了隧魄,但是只是說
查看日志找到具體錯(cuò)誤筋岛,輸入的文件地址錯(cuò)了娶视。
Spark Shell
spark shell 是spark自帶的一個(gè)快速原型開發(fā)的工具睁宰,在spark目錄下面的bin目錄下面肪获。把這個(gè)理解為在python交互界面下寫python代碼就行了。
寫一個(gè)wordcount
-
load數(shù)據(jù)集
sc是在進(jìn)入spark shell 時(shí)候創(chuàng)建一個(gè)spark content 就是spark上下文的意思柒傻,val 是scala語(yǔ)法中聲明常量的方式(val 定義的值實(shí)際上是一個(gè)常量孝赫,如果你試圖改變他的值,解析器會(huì)報(bào)錯(cuò)红符,聲明值或變量但不做初始化也會(huì)報(bào)錯(cuò)青柄。),通過println可以看到讀入的文件被處理成一個(gè)MappedRDD的對(duì)象预侯。這個(gè)sc.textFile的操作就把的數(shù)據(jù)load到內(nèi)存中了致开。
- 調(diào)用flatMap方法計(jì)算wordCount
val wordCounts = textFile.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b)
-
由于Scala語(yǔ)言的特性,此時(shí)wordCounts還不是執(zhí)行計(jì)算后的值萎馅,還只是定義著計(jì)算的方法双戳,需要用collect()來執(zhí)行這個(gè)計(jì)算。
res2表示得到的結(jié)果糜芳,是一個(gè)Array類型的飒货,如果要取出每一個(gè)值魄衅,可以用Scala的循環(huán)。
配置Spark應(yīng)該程序開發(fā)塘辅、運(yùn)行環(huán)境
在開發(fā)Spark程序的時(shí)候晃虫,總是先在本地進(jìn)行開發(fā),所以就需要spark的開發(fā)環(huán)境莫辨,等調(diào)試程序確保無(wú)誤的時(shí)候再把程序打包到本地Spark運(yùn)行傲茄,所以這就需要在本地配置Spark的運(yùn)行環(huán)境。最后再把jar包提交到分布式環(huán)境運(yùn)行沮榜。(當(dāng)然盘榨,如果你不想在本地測(cè)試jar包而直接拿到集群上去跑可以不配置本地spark,但是后面需要spark/lib文件里的一些jar包蟆融,需單獨(dú)下載)
- 配置JDK1.8
這個(gè)需要配置JAVA_HOME草巡,PATH,CLASSPATH型酥。這個(gè)相信大多數(shù)的都已經(jīng)配置了的山憨,至于每個(gè)環(huán)境變量的具體的作用可以看這里。配置完成后驗(yàn)證一下是否成功弥喉,當(dāng)看到Java信息就成功了郁竟。
- Scala的安裝
首先從DOWNLOAD PREVIOUS VERSIONS下載到對(duì)應(yīng)的版本,在這里需要注意的是由境,Spark的各個(gè)版本需要跟相應(yīng)的Scala版本對(duì)應(yīng)棚亩,比如我這里使用的Spark 1.6.0,Scala使用的 2.10.6版本虏杰。記得下載二進(jìn)制版本的Scala讥蟆。
安裝完后默認(rèn)是自動(dòng)將bin配置到了path環(huán)境變量,可以驗(yàn)證一下是否自動(dòng)配置了纺阔。
- Spark的安裝
park的安裝非常簡(jiǎn)單瘸彤,直接去Download Apache Spark。有兩個(gè)步驟:
選擇好對(duì)應(yīng)Hadoop版本的Spark版本笛钝,如下圖中所示质况;
然后點(diǎn)擊spark-1.6.2-bin-hadoop2.6.tgz
,等待下載結(jié)束即可玻靡。
下載完成后將文件進(jìn)行解壓(可能需要解壓兩次)结榄,最好解壓到一個(gè)盤的根目錄下,并重命名為Spark啃奴,簡(jiǎn)單不易出錯(cuò)。并且需要注意的是雄妥,在Spark的文件目錄路徑名中最蕾,不要出現(xiàn)空格依溯,類似于“Program Files”這樣的文件夾名是不被允許的。
然后就是配置環(huán)境變量瘟则,把\Spark\bin加入path黎炉。
- Hadoop安裝
系統(tǒng)變量設(shè)置后,就可以在任意當(dāng)前目錄下的cmd中運(yùn)行spark-shell醋拧,但這個(gè)時(shí)候很有可能會(huì)碰到各種錯(cuò)誤慷嗜,這里主要是因?yàn)镾park是基于Hadoop的,所以這里也有必要配置一個(gè)Hadoop的運(yùn)行環(huán)境丹壕。在Hadoop Releases里可以看到Hadoop的各個(gè)歷史版本庆械,這里由于下載的Spark是基于Hadoop 2.6的(在Spark安裝的第一個(gè)步驟中,我們選擇的是Pre-built for Hadoop 2.6
)菌赖,我這里選擇2.6.4版本缭乘,選擇好相應(yīng)版本并點(diǎn)擊后,進(jìn)入詳細(xì)的下載頁(yè)面琉用,并下載’hadoop-2.6.4.tar.gz’文件
下載并解壓到指定目錄堕绩,然后到環(huán)境變量部分設(shè)置HADOOP_HOME為Hadoop的解壓目錄,然后把它下面的bin目錄頁(yè)加入path
然后再cmd下啟動(dòng)spark邑时,報(bào)錯(cuò)如下
主要是因?yàn)镠adoop的bin目錄下沒有winutils.exe文件的原因造成的奴紧。這里的解決辦法是:
- 去 https://github.com/steveloughran/winutils 選擇你安裝的Hadoop版本號(hào),然后進(jìn)入到bin目錄下晶丘,找到
winutils.exe
文件黍氮,下載方法是點(diǎn)擊winutils.exe
文件,進(jìn)入之后在頁(yè)面的右上方部分有一個(gè)Download
按鈕铣口,點(diǎn)擊下載即可滤钱。 - 下載好
winutils.exe
后,將這個(gè)文件放入到Hadoop的bin目錄下脑题,我這里是E:\study\hadoop-2.6.4\bin
件缸。 - 在打開的cmd中輸入
E:\study\hadoop-2.6.4\bin\winutils.exe chmod 777 /tmp/hive
這個(gè)操作是用來修改權(quán)限的。注意前面的F:\Program Files\hadoop\bin
部分要對(duì)應(yīng)的替換成實(shí)際你所安裝的bin目錄所在位置叔遂。
經(jīng)過這幾個(gè)步驟之后他炊,然后再次開啟一個(gè)新的cmd窗口,如果正常的話已艰,應(yīng)該就可以通過直接輸入spark-shell
來運(yùn)行Spark了痊末。
看到這兩句話就說明成功了
Spark context available as sc.
SQL context available as sqlContext.
配置集成開發(fā)環(huán)境IDEA
配置前提
- JDK安裝。 請(qǐng)自行前往oracle官方網(wǎng)站下載安裝哩掺,并在command命令行窗口確認(rèn)java -version 可以返回版本號(hào)凿叠,否則的話要去系統(tǒng)環(huán)境變量設(shè)置位置確認(rèn)是否java已經(jīng)被添加到PATH中
- Scala下載安裝。移步官網(wǎng) http://www.scala-lang.org/ 下載并安裝即可。同第1步盒件,要在command命令行下確認(rèn)敲擊scala可以進(jìn)入交互式命令窗口蹬碧,否則請(qǐng)確認(rèn)環(huán)境變量的配置。
- spark源代碼下載炒刁。官方網(wǎng)站 http://spark.apache.org/downloads.html 上提供有各種hadoop版本的預(yù)編譯版spark代碼恩沽,理論上要根據(jù)你在用的hadoop版本來相應(yīng)選擇,本文僅作配置說明翔始,故任選其中一即可罗心。筆者下載的是spark1.5, 對(duì)應(yīng)hadoop2.6預(yù)編譯的版本,解壓即可城瞎。
- Intellij IDEA下載渤闷。https://www.jetbrains.com/idea/ 上可以下載免費(fèi)的community版本。
配置開始
- 安裝IDEA 的scala插件
第一次安裝時(shí)全谤,在plugins處輸入scala關(guān)鍵詞搜索肤晓,在聯(lián)網(wǎng)環(huán)境下點(diǎn)擊安裝即可。
然后就可以看到下載進(jìn)度
完了之后記得重啟认然,不然還是不能用 -
創(chuàng)建項(xiàng)目并導(dǎo)入相應(yīng)依賴包
直接在新建項(xiàng)目界面選擇scala項(xiàng)目即可
Project SDK是java的jdk补憾,如果沒有默認(rèn)加載出來,點(diǎn)擊New卷员,手工定位到j(luò)dk的目錄提交上來即可盈匾。Scala SDK那里如果默認(rèn)沒有加載出來,點(diǎn)擊Create毕骡,在彈出的窗口中安默認(rèn)勾選的System點(diǎn)擊OK即可削饵。便可以看到項(xiàng)目結(jié)構(gòu)了。
接下來未巫,我們導(dǎo)入上面下載好的spark源碼窿撬。按下圖指引操作, 在+號(hào)處選擇java, 然后定位到你上面步驟中將spark程序解壓到的目錄位置,選擇lib目錄下的spark-assembly-1.5.0-hadoop2.6.0.jar文件叙凡,確認(rèn)劈伴。
當(dāng)看到項(xiàng)目外部包里面出現(xiàn)了剛剛導(dǎo)入的包就行了。
程序開發(fā)
創(chuàng)建scala class握爷,類型選擇object(如果右鍵沒有Scala class選項(xiàng)是因?yàn)闆]有把src目錄作為你的Sources文件夾屬性跛璧,右鍵src有個(gè)選項(xiàng)把它設(shè)置為Sources文件夾即可,也可以去項(xiàng))新啼。
寫一個(gè)分布式的wordcount(如果想寫本地spark的話就不用RDD追城,像以前那樣寫代碼即可)
import org.apache.spark.SparkConf //使用spark的相關(guān)操作
import org.apache.spark.SparkContext //獲取SparkContext上下文對(duì)象
object MyTest {
def main(args: Array[String]){
if (args.length != 2 || args(0) == null || args(1) == null){ //查看傳如參數(shù)是否為2且不為空
System.exit(1);
}
val conf = new SparkConf() //獲取spark環(huán)境的配置,用于傳如上下文
val sc = new SparkContext(conf)
val line = sc.textFile(args(0))
//wordcount的算法燥撞,用flatMap實(shí)現(xiàn)
val result = line.flatMap(_.split("[^a-zA-Z]+")).map((_, 1)).reduceByKey(_+_)
result.saveAsTextFile(args(1)) //保存結(jié)果到指定文件夾
sc.stop() //關(guān)閉上下文對(duì)象
}
}
最后配置編譯生成的jar包名字和地址座柱。(這里項(xiàng)目名字spark寫成sprak了迷帜。。色洞。)
進(jìn)入到如圖的項(xiàng)目管理界面瞬矩,選擇圖中的目錄,打包時(shí)的main class選擇你的主類锋玲,我這里是MyTest
然后就編譯可以輸出你的包了。build -> build artifacts -> spark_dev:jar -> build, 然后就開始編譯了涵叮,在最下面可以看到進(jìn)行的狀態(tài)惭蹂。到輸出JAR包的目錄下去看看,發(fā)現(xiàn)確實(shí)成功生成了一個(gè)jar文件割粮。(此處打好的jar包如果要提交到數(shù)平的Spark集群上運(yùn)行盾碗,請(qǐng)打開此jar包文件,觀察其中是否有一個(gè)scala的文件夾舀瓢,刪除了廷雅!否則可能與線上的scala版本沖突,不沖突可不刪除)
這里可能會(huì)報(bào)錯(cuò)java.lang.outofmemoryerror
配置jar包output選項(xiàng)京髓,因?yàn)檫\(yùn)行環(huán)境中已經(jīng)有相關(guān)包航缀,所以其他包刪除,只保留’compile output’那一項(xiàng)堰怨,這時(shí)再build就不會(huì)內(nèi)存溢出芥玉。
本地測(cè)試代碼
在cmd中輸入命令
>spark-submit --class MyTest --master local E:\study\Projects\sprak_test\out\artifacts\sprak_test_jar\sprak_test.jar E:\words.txt E:\out
表示把剛剛的jar包提交到spark執(zhí)行,local本地模式备图,輸入文件和輸入路徑灿巧。最后得到結(jié)果:
編寫spark程序
以下內(nèi)容基于spark1.6.0
spark程序可以用Java、python和scala編寫揽涮,由于spark本身是由scala編寫抠藕,再加上scala的語(yǔ)法特定決定了scala是編寫spark程序的最合適的語(yǔ)言。
基本概念
- RDD:是彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset)的簡(jiǎn)稱蒋困,是分布式內(nèi)存的一個(gè)抽象概念盾似,提供了一種高度受限的共享內(nèi)存模型;
- DAG:是Directed Acyclic Graph(有向無(wú)環(huán)圖)的簡(jiǎn)稱家破,反映RDD之間的依賴關(guān)系颜说;
- Executor:是運(yùn)行在工作節(jié)點(diǎn)(Worker Node)上的一個(gè)進(jìn)程,負(fù)責(zé)運(yùn)行任務(wù)汰聋,并為應(yīng)用程序存儲(chǔ)數(shù)據(jù)门粪;
- 應(yīng)用:用戶編寫的Spark應(yīng)用程序;
- 任務(wù):運(yùn)行在Executor上的工作單元烹困;
- 作業(yè):一個(gè)作業(yè)包含多個(gè)RDD及作用于相應(yīng)RDD上的各種操作玄妈;
- 階段:是作業(yè)的基本調(diào)度單位,一個(gè)作業(yè)會(huì)分為多組任務(wù),每組任務(wù)被稱為“階段”拟蜻,或者也被稱為“任務(wù)集”绎签。
架構(gòu)設(shè)計(jì)
Spark運(yùn)行架構(gòu)包括集群資源管理器(Cluster Manager)、運(yùn)行作業(yè)任務(wù)的工作節(jié)點(diǎn)(Worker Node)酝锅、每個(gè)應(yīng)用的任務(wù)控制節(jié)點(diǎn)(Driver)和每個(gè)工作節(jié)點(diǎn)上負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor)诡必。其中,集群資源管理器可以是Spark自帶的資源管理器搔扁,也可以是YARN或Mesos等資源管理框架爸舒。
Spark運(yùn)行基本流程
(1)當(dāng)一個(gè)Spark應(yīng)用被提交時(shí)蛀序,首先需要為這個(gè)應(yīng)用構(gòu)建起基本的運(yùn)行環(huán)境,即由任務(wù)控制節(jié)點(diǎn)(Driver)創(chuàng)建一個(gè)SparkContext活烙,由SparkContext負(fù)責(zé)和資源管理器(Cluster Manager)的通信以及進(jìn)行資源的申請(qǐng)徐裸、任務(wù)的分配和監(jiān)控等。SparkContext會(huì)向資源管理器注冊(cè)并申請(qǐng)運(yùn)行Executor的資源啸盏;
(2)資源管理器為Executor分配資源重贺,并啟動(dòng)Executor進(jìn)程,Executor運(yùn)行情況將隨著“心跳”發(fā)送到資源管理器上回懦;
(3)SparkContext根據(jù)RDD的依賴關(guān)系構(gòu)建DAG圖气笙,DAG圖提交給DAG調(diào)度器(DAGScheduler)進(jìn)行解析,將DAG圖分解成多個(gè)“階段”(每個(gè)階段都是一個(gè)任務(wù)集)怯晕,并且計(jì)算出各個(gè)階段之間的依賴關(guān)系潜圃,然后把一個(gè)個(gè)“任務(wù)集”提交給底層的任務(wù)調(diào)度器(TaskScheduler)進(jìn)行處理;Executor向SparkContext申請(qǐng)任務(wù)舟茶,任務(wù)調(diào)度器將任務(wù)分發(fā)給Executor運(yùn)行谭期,同時(shí)堵第,SparkContext將應(yīng)用程序代碼發(fā)放給Executor;
(4)任務(wù)在Executor上運(yùn)行隧出,把執(zhí)行結(jié)果反饋給任務(wù)調(diào)度器踏志,然后反饋給DAG調(diào)度器,運(yùn)行完畢后寫入數(shù)據(jù)并釋放所有資源胀瞪。
RDD
- RDD(彈性數(shù)據(jù)集)是Spark提供的最重要的
抽象的概念针余,它是一種有容錯(cuò)機(jī)制的特殊集合,可
以分布在集群的節(jié)點(diǎn)上凄诞,以函數(shù)式編操作集合的方
式涵紊,進(jìn)行各種并行操作。 - 可以將RDD理解為一個(gè)具有容錯(cuò)機(jī)制的特殊集合幔摸,
它提供了一種只讀、只能有已存在的RDD變換而來
的共享內(nèi)存颤练,然后將所有數(shù)據(jù)都加載到內(nèi)存中既忆,方
便進(jìn)行多次重用。
獲取RDD
- 從共享的文件系統(tǒng)(HDFS)
- 通過已存在的RDD轉(zhuǎn)換
- 將已存在scala集合(只要是Seq對(duì)象)并行化 嗦玖,通過調(diào)用SparkContext的parallelize方法實(shí)現(xiàn)患雇。
- 改變現(xiàn)有RDD的持久性;RDD是懶散宇挫,短暫的苛吱。
RDD的固化:cache緩存至內(nèi)存;
save保存到分布式文件系統(tǒng)
操作RDD
- Transformation:根據(jù)數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)
集器瘪,計(jì)算后返回一個(gè)新RDD翠储;例如:Map將數(shù)據(jù)
的每個(gè)元素經(jīng)過某個(gè)函數(shù)計(jì)算后,返回一個(gè)新的分
布式數(shù)據(jù)集橡疼。
常見的Transformation操作有map援所、filter、flatMap欣除、sort等等住拭。 - Actions:對(duì)數(shù)據(jù)集計(jì)算后返回一個(gè)數(shù)值value給
驅(qū)動(dòng)程序;例如:Reduce將數(shù)據(jù)集的所有元素用
某個(gè)函數(shù)聚合后历帚,將最終結(jié)果返回給程序滔岳。
常見的Actions操作有count、collect等等挽牢。在此階段才發(fā)生真正的計(jì)算
舉個(gè)例子:
val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(_*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
此段代碼的意思:
- 先調(diào)用SparkContext的parallelize方法將一個(gè)list并行化谱煤,獲得RDD。
- 然后調(diào)用RDD上的map操作禽拔,使此RDD上元素乘2趴俘,返回一個(gè)新的RDD b睹簇。
- 接著對(duì)a、b這個(gè)RDD調(diào)用collect操作寥闪,返回Array對(duì)象太惠。
- 其中SparkContext表示spark程序執(zhí)行的上下文,即又spark的環(huán)境得到的疲憋。
其它的transformation和actions操作在具體運(yùn)用的時(shí)候?qū)W習(xí)
Spark程序的三種運(yùn)行模式
Local模式
該模式被稱為L(zhǎng)ocal[N]模式凿渊,是用單機(jī)的多個(gè)線程來模擬Spark分布式計(jì)算,通常用來驗(yàn)證開發(fā)出來的應(yīng)用程序邏輯上有沒有問題缚柳。
其中N代表可以使用N個(gè)線程埃脏,每個(gè)線程擁有一個(gè)core。如果不指定N秋忙,則默認(rèn)是1個(gè)線程(該線程有1個(gè)core)彩掐。
- spark-submit 和 spark-submit --master local 效果是一樣的
- (同理:spark-shell 和 spark-shell --master local 效果是一樣的)
- spark-submit --master local[4] 代表會(huì)有4個(gè)線程(每個(gè)線程一個(gè)core)來并發(fā)執(zhí)行應(yīng)用程序。
Spark On Yarn模式
在Spark中灰追,有Yarn-Client和Yarn-Cluster兩種模式可以運(yùn)行在Yarn上堵幽,通常Yarn-Cluster適用于生產(chǎn)環(huán)境,而Yarn-Clientr更適用于交互弹澎,調(diào)試模式朴下。
優(yōu)勢(shì)
- Spark支持資源動(dòng)態(tài)共享,運(yùn)行于Yarn的框架都共享一個(gè)集中配置好的資源池
- 可以很方便的利用Yarn的資源調(diào)度特性來做分類·苦蒿,隔離以及優(yōu)先級(jí)控制負(fù)載殴胧,擁有更靈活的調(diào)度策略
- Yarn可以自由地選擇executor數(shù)量
- Yarn是唯一支持Spark安全的集群管理器,使用Yarn佩迟,Spark可以運(yùn)行于Kerberized Hadoop之上团滥,在它們進(jìn)程之間進(jìn)行安全認(rèn)證
此模式分為yarn-client和yarn-cluster
Yarn-cluster模式下作業(yè)執(zhí)行流程:
- 客戶端生成作業(yè)信息提交給ResourceManager(RM)
- RM在某一個(gè)NodeManager(由Yarn決定)啟動(dòng)container并將Application Master(AM)分配給該NodeManager(NM)
- NM接收到RM的分配,啟動(dòng)Application Master并初始化作業(yè)报强,此時(shí)這個(gè)NM就稱為Driver
- Application向RM申請(qǐng)資源惫撰,分配資源同時(shí)通知其他NodeManager啟動(dòng)相應(yīng)的Executor
- Executor向NM上的Application Master注冊(cè)匯報(bào)并完成相應(yīng)的任務(wù)
Yarn-client模式下作業(yè)執(zhí)行流程:
- 客戶端生成作業(yè)信息提交給ResourceManager(RM)
- RM在本地NodeManager啟動(dòng)container并將Application Master(AM)分配給該NodeManager(NM)
- NM接收到RM的分配,啟動(dòng)Application Master并初始化作業(yè)躺涝,此時(shí)這個(gè)NM就稱為Driver
- Application向RM申請(qǐng)資源厨钻,分配資源同時(shí)通知其他NodeManager啟動(dòng)相應(yīng)的Executor
- Executor向本地啟動(dòng)的Application Master注冊(cè)匯報(bào)并完成相應(yīng)的任務(wù)
分布式模式查看程序執(zhí)行情況
把剛剛寫好的wordcount的jar包sprak_test.jar提交到分布式的環(huán)境中去,并分別選擇yarn-cluster和yarn-client運(yùn)行模式坚嗜,用相應(yīng)的方法查看執(zhí)行情況夯膀。(用戶切換為hdfs)
-
yarn-client
首先執(zhí)行命令hdfs dfs -rm -r /tmp/result/wordcount/
刪除輸出路徑,不然會(huì)報(bào)錯(cuò)苍蔬。進(jìn)入到spark的bin目錄诱建,然后還是和以前一樣填寫輸出輸出運(yùn)行,只是運(yùn)行模式改為yarn-client碟绑。
spark-submit --class MyTest --master yarn-client /usr/tmp/sprak_test.jar /tmp/input/words.txt /tmp/result/wordcount/
對(duì)于yarn-client模式的運(yùn)行情況俺猿,會(huì)直接打印到終端茎匠。
也可以通過APP Name在端口18088查看情況
只是這個(gè)任務(wù)太快了,不能在webUi看到運(yùn)行的過程押袍。其結(jié)果如下:
-
yarn-cluster
首先執(zhí)行命令hdfs dfs -rm -r /tmp/result/wordcount/
刪除輸出路徑诵冒,不然會(huì)報(bào)錯(cuò)。進(jìn)入到spark的bin目錄谊惭,然后還是和以前一樣填寫輸出輸出運(yùn)行汽馋,只是運(yùn)行模式改為yarn-cluster。
spark-submit --class MyTest --master yarn-cluster/usr/tmp/sprak_test.jar /tmp/input/words.txt /tmp/result/wordcount/
也是去18088端口查看信息(運(yùn)行太快看不到信息)
如果要查看詳細(xì)情況圈盔,還可以點(diǎn)stdout查看具體運(yùn)行日志
結(jié)果如下: