1.14 transformation和action
Spark支持兩種RDD操作:transformation和action掌动。transformation操作會(huì)針對(duì)已有的RDD創(chuàng)建一個(gè)新的RDD抵碟;而action則主要是對(duì)RDD進(jìn)行最后的操作,比如遍歷坏匪、reduce拟逮、保存到文件等,并可以返回結(jié)果給Driver程序适滓。
例如敦迄,map就是一種transformation操作,它用于將已有RDD的每個(gè)元素傳入一個(gè)自定義的函數(shù)凭迹,并獲取一個(gè)新的元素罚屋,然后將所有的新元素組成一個(gè)新的RDD。而reduce就是一種action操作嗅绸,它用于對(duì)RDD中的所有元素進(jìn)行聚合操作脾猛,并獲取一個(gè)最終的結(jié)果,然后返回給Driver程序鱼鸠。
transformation的特點(diǎn)就是lazy特性猛拴。lazy特性指的是,如果一個(gè)spark應(yīng)用中只定義了transformation操作蚀狰,那么即使你執(zhí)行該應(yīng)用愉昆,這些操作也不會(huì)執(zhí)行。也就是說麻蹋,transformation是不會(huì)觸發(fā)spark程序的執(zhí)行的跛溉,它們只是記錄了對(duì)RDD所做的操作,但是不會(huì)自發(fā)的執(zhí)行。只有當(dāng)transformation之后芳室,接著執(zhí)行了一個(gè)action操作专肪,那么所有的transformation才會(huì)執(zhí)行。Spark通過這種lazy特性堪侯,來進(jìn)行底層的spark應(yīng)用執(zhí)行的優(yōu)化嚎尤,避免產(chǎn)生過多中間結(jié)果。
action操作執(zhí)行抖格,會(huì)觸發(fā)一個(gè)spark job的運(yùn)行诺苹,從而觸發(fā)這個(gè)action之前所有的transformation的執(zhí)行。這是action的特性雹拄。
transformation和action原理剖析
val lines = sc.textFile("spark.txt")
通過textFile()
方法收奔,針對(duì)外部文件創(chuàng)建了一個(gè)RDD——lines
,但是實(shí)際上滓玖,程序執(zhí)行到這里為止坪哄,spark.txt文件的數(shù)據(jù)是不會(huì)加載到內(nèi)存中的。lines
只是代表了一個(gè)指向spark.txt文件的引用势篡。
val lineLengths = lines.map(line => line.length)
這里對(duì)lines RDD
進(jìn)行了map
算子翩肌,獲取了一個(gè)轉(zhuǎn)換后的lineLengths RDD
。但是這里連數(shù)據(jù)都沒有禁悠,當(dāng)然也不會(huì)做任何操作念祭。lineLengths RDD
也只是一個(gè)概念上的東西而已。
val totalLength = lineLengths.reduce(_ + _)
之后碍侦,執(zhí)行了一個(gè)action
操作——reduce
粱坤。此時(shí)就會(huì)觸發(fā)之前所有transformation
操作的執(zhí)行,Spark會(huì)將操作拆分成多個(gè)task
到多個(gè)機(jī)器上并行執(zhí)行瓷产,每個(gè)task
會(huì)在本地執(zhí)行map
操作站玄,并且進(jìn)行本地的reduce
聚合。最后會(huì)進(jìn)行一個(gè)全局的reduce
聚合濒旦,然后將結(jié)果返回給Driver
程序株旷。
注意,Spark有些特殊的算子尔邓,也就是特殊的transformation
操作晾剖。比如groupByKey
、sortByKey
铃拇、reduceByKey
等钞瀑,其實(shí)只是針對(duì)特殊的RDD的。即包含key-value對(duì)的RDD慷荔。而這種RDD中的元素,實(shí)際上是scala中的一種類型,即Tuple2
显晶,也就是包含兩個(gè)值的Tuple贷岸。
在scala中,需要手動(dòng)導(dǎo)入Spark的相關(guān)隱式轉(zhuǎn)換磷雇,import org.apache.spark.SparkContext._
偿警。然后,對(duì)應(yīng)包含Tuple2
的RDD唯笙,會(huì)自動(dòng)隱式轉(zhuǎn)換為PairRDDFunction
螟蒸,并提供reduceByKey
等方法。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* 統(tǒng)計(jì)每行出現(xiàn)的次數(shù)
* @author Administrator
*
*/
public class LineCount {
public static void main(String[] args) {
// 創(chuàng)建SparkConf
SparkConf conf = new SparkConf()
.setAppName("LineCount")
.setMaster("local");
// 創(chuàng)建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 創(chuàng)建初始RDD崩掘,lines七嫌,每個(gè)元素是一行文本
JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//hello.txt");
// 對(duì)lines RDD執(zhí)行mapToPair算子,將每一行映射為(line, 1)的這種key-value對(duì)的格式
// 然后后面才能統(tǒng)計(jì)每一行出現(xiàn)的次數(shù)
JavaPairRDD<String, Integer> pairs = lines.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1); // key-value對(duì)里面的元素是Tuple2
}
});
// 對(duì)pairs RDD執(zhí)行reduceByKey算子苞慢,統(tǒng)計(jì)出每一行出現(xiàn)的總次數(shù)
JavaPairRDD<String, Integer> lineCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 執(zhí)行一個(gè)action操作诵原,foreach,打印出每一行出現(xiàn)的次數(shù)
lineCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
@Override
// lineCounts里面的元素是Tuple2
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1 + " appears " + t._2 + " times.");
}
});
// 關(guān)閉JavaSparkContext
sc.close();
}
}
// 統(tǒng)計(jì)文件每一行出現(xiàn)的次數(shù)
val lines = sc.textFile("hello.txt")
val linePairs = lines.map(line => (line, 1))
val lineCounts = linePairs.reduceByKey(_ + _)
lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + llineCount._2 + " times."))
常用transformation和action