行動操作是真正觸發(fā)計算的地方揩尸。Spark程序執(zhí)行到行動操作時所禀,才會執(zhí)行真正的計算,從文件中加載數(shù)據(jù)放钦,完成一次又一次轉(zhuǎn)換操作色徘,最終,完成行動操作得到結(jié)果操禀。
操作 | 說明 |
---|---|
count() | 返回數(shù)據(jù)集中的元素個數(shù) |
collect() | 以數(shù)組的形式返回數(shù)據(jù)集中的所有元素 |
first() | 返回數(shù)據(jù)集中的第一個元素 |
take(n) | 以數(shù)組的形式返回數(shù)據(jù)集中的前n個元素 |
reduce(func) | 通過函數(shù)func(輸入兩個參數(shù)并返回一個值)聚合數(shù)據(jù)集中的元素 |
foreach(func) | 將數(shù)據(jù)集中的每個元素傳遞到函數(shù)func中運行 |
惰性機制
在當前的spark目錄下面創(chuàng)建input目錄
cd $SPARK_HOME
mkdir input
vim word.txt
hello world
hello spark
hello hadoop
hello scala
由于textFile()方法只是一個轉(zhuǎn)換操作褂策,因此,這行代碼執(zhí)行后颓屑,不會立即把data.txt文件加載到內(nèi)存中斤寂,這時的lines只是一個指向這個文件的指針。
scala> val lines = sc.textFile("word.txt")
lines: org.apache.spark.rdd.RDD[String] = word.txt MapPartitionsRDD[13] at textFile at <console>:24
下面代碼用來計算每行的長度(即每行包含多少個單詞)揪惦,同樣遍搞,由于map()方法只是一個轉(zhuǎn)換操作,這行代碼執(zhí)行后器腋,不會立即計算每行的長度溪猿。
scala> val lineLengths = lines.map(s=>s.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at map at <console>:25
reduce()方法是一個“動作”類型的操作,這時纫塌,就會觸發(fā)真正的計算诊县。這時,Spark會把計算分解成多個任務在不同的機器上執(zhí)行措左,每臺機器運行位于屬于它自己的map和reduce依痊,最后把結(jié)果返回給Driver Program。
scala> val totalLength = lineLengths.reduce((a,b)=> a+b)
totalLength: Int = 45
count
lines就是一個RDD怎披。lines.filter()會遍歷lines中的每行文本胸嘁,并對每行文本執(zhí)行括號中的匿名函數(shù),也就是執(zhí)行Lamda表達式:line => line.contains(“spark”)钳枕,在執(zhí)行Lamda表達式時,會把當前遍歷到的這行文本內(nèi)容賦值給參數(shù)line赏壹,然后鱼炒,執(zhí)行處理邏輯line.contains(“spark”),也就是只有當改行文本包含“spark”才滿足條件蝌借,才會被放入到結(jié)果集中昔瞧。最后,等到lines集合遍歷結(jié)束后菩佑,就會得到一個結(jié)果集自晰,這個結(jié)果集中包含了所有包含“Spark”的行。最后稍坯,對這個結(jié)果集調(diào)用count()酬荞,這是一個行動操作搓劫,會計算出結(jié)果集中的元素個數(shù)。
scala> val lines = sc.textFile("file:///root/app/spark/input/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///root/app/spark/input/word.txt MapPartitionsRDD[8] at textFile at <console>:24
scala> lines.filter(_.contains("spark")).count
res3: Long = 1
scala> lines.filter(_.contains("hello")).count
res4: Long = 4
持久化
在Spark中混巧,RDD采用惰性求值的機制枪向,每次遇到行動操作,都會從頭開始執(zhí)行計算咧党。如果整個Spark程序中只有一次行動操作秘蛔,這當然不會有什么問題。但是傍衡,在一些情形下深员,我們需要多次調(diào)用不同的行動操作,這就意味著蛙埂,每次調(diào)用行動操作倦畅,都會觸發(fā)一次從頭開始的計算。這對于迭代計算而言箱残,代價是很大的滔迈,迭代計算經(jīng)常需要多次重復使用同一組數(shù)據(jù)。
scala> val list = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:26
scala> rdd.count()
res5: Long = 3
scala> rdd.collect().mkString(",")
res6: String = hadoop,spark,hive
前后共觸發(fā)了兩次從頭到尾的計算被辑。
實際上燎悍,可以通過持久化(緩存)機制避免這種重復計算的開銷∨卫恚可以使用persist()方法對一個RDD標記為持久化谈山,之所以說“標記為持久化”,是因為出現(xiàn)persist()語句的地方宏怔,并不會馬上計算生成RDD并把它持久化奏路,而是要等到遇到第一個行動操作觸發(fā)真正計算以后,才會把計算結(jié)果進行持久化臊诊,持久化后的RDD將會被保留在計算節(jié)點的內(nèi)存中被后面的行動操作重復使用鸽粉。
persist()的圓括號中包含的是持久化級別參數(shù),
persist(MEMORY_ONLY)表示將RDD作為反序列化的對象存儲于JVM中抓艳,如果內(nèi)存不足触机,就要按照LRU原則替換緩存中的內(nèi)容。
persist(MEMORY_AND_DISK)表示將RDD作為反序列化的對象存儲在JVM中玷或,如果內(nèi)存不足儡首,超出的分區(qū)將會被存放在硬盤上。
一般而言偏友,使用cache()方法時蔬胯,會調(diào)用persist(MEMORY_ONLY)。
scala> val list = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:26
scala> rdd.cache
res7: rdd.type = ParallelCollectionRDD[12] at parallelize at <console>:26
//會調(diào)用persist(MEMORY_ONLY)位他,但是氛濒,語句執(zhí)行到這里产场,并不會緩存rdd,這是rdd還沒有被計算生成
scala> rdd.count //第一次行動操作泼橘,觸發(fā)一次真正從頭到尾的計算涝动,這時才會執(zhí)行上面的rdd.cache(),把這個rdd放到緩存中
3
scala> rdd.collect.mkString(",") //第二次行動操作炬灭,不需要觸發(fā)從頭到尾的計算醋粟,只需要重復使用上面緩存中的rdd
res9: String = hadoop,spark,hive
可以使用unpersist()方法手動地把持久化的RDD從緩存中移除。
分區(qū)
RDD是彈性分布式數(shù)據(jù)集重归,通常RDD很大米愿,會被分成很多個分區(qū),分別保存在不同的節(jié)點上鼻吮。RDD分區(qū)的一個分區(qū)原則是使得分區(qū)的個數(shù)盡量等于集群中的CPU核心(core)數(shù)目育苟。
對于不同的Spark部署模式而言(本地模式、Standalone模式椎木、YARN模式违柏、Mesos模式),都可以通過設置spark.default.parallelism這個參數(shù)的值香椎,來配置默認的分區(qū)數(shù)目漱竖,一般而言:
*本地模式:默認為本地機器的CPU數(shù)目,若設置了local[N],則默認為N畜伐;
*Apache Mesos:默認的分區(qū)數(shù)為8馍惹;
*Standalone或YARN:在“集群中所有CPU核心數(shù)目總和”和“2”二者中取較大值作為默認值;
因此玛界,對于parallelize而言万矾,如果沒有在方法中指定分區(qū)數(shù),則默認為spark.default.parallelism慎框,比如:
scala>val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)
scala>val rdd = sc.parallelize(array,2) #設置兩個分區(qū)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29
對于textFile而言良狈,如果沒有在方法中指定分區(qū)數(shù),則默認為min(defaultParallelism,2)笨枯,其中薪丁,defaultParallelism對應的就是spark.default.parallelism。
如果是從HDFS中讀取文件猎醇,則分區(qū)數(shù)為文件分片數(shù)(比如窥突,128MB/片)努溃。