注:本文是跟著《Spark快速大數(shù)據(jù)分析》一書學(xué)習(xí)的筆記式總結(jié)再悼,不喜勿噴衬鱼。
RDD(Resilient Distributed Dataset)彈性分布式數(shù)據(jù)集是Spark對數(shù)據(jù)的核心抽象。RDD是一個不可變的分布式對象集合吁断。每一個RDD都會被分成多個分區(qū)孵户,不同的分區(qū)運行在集群的不同節(jié)點严沥,這就構(gòu)成了Spark的分布式計算模型鲁冯。
RDD創(chuàng)建
我們可以通過兩種方式創(chuàng)建RDD拷沸。一種方式是直接讀取外部數(shù)據(jù)色查,這在我們實際使用中較常用薯演,另一種是在驅(qū)動程序中分發(fā)驅(qū)動器程序中的對象集合(List或set),一般調(diào)試中會使用秧了。
直接讀取外部數(shù)據(jù)
JavaRDD<String> lines = sc.textFile("/path/to/README.md");
JavaRDD<String> lines1 = ctx.textFile("/path/to/README.md",1);//每次讀取一行(類似Hadoop)
對一個集合進行并行化
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
RDD操作
RDD支持兩種操作:轉(zhuǎn)化和行動跨扮。轉(zhuǎn)化會生成一個新的RDD,如上面所說验毡,RDD是不可變的衡创,原來的RDD依然存在,只是在它的基礎(chǔ)上晶通,生成了一個新的RDD璃氢。行動會返回其他的數(shù)據(jù)類型,很多時候我們將其作為最后結(jié)果輸出狮辽。實際上一也,轉(zhuǎn)化操作是惰性的巢寡,只有在行動操作中,之前的轉(zhuǎn)化操作才會真正執(zhí)行椰苟。這可以避免多余的計算抑月。
轉(zhuǎn)化操作
許多轉(zhuǎn)化操作都是針對各個元素的,也就是說舆蝴,這些轉(zhuǎn)化操作每次只會操作RDD 中的一個元素谦絮。不過并不是所有的轉(zhuǎn)化操作都是這樣的。
通過轉(zhuǎn)化操作洁仗,從老的RDD中生成新的RDD,Spark會使用==譜系圖==來記錄不同的RDD之間的依賴關(guān)系层皱。依靠譜系圖,Spark可以在某些RDD計算出問題后赠潦,恢復(fù)出丟失的信息奶甘。
Java 版計算RDD 中各值的平方
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));
Java 中的flatMap() 將行數(shù)據(jù)切分為單詞
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
words.first(); // 返回"hello"
我們的RDD 中最常缺失的集合屬性是元素的唯一性,因為常常有重復(fù)的元素祭椰。如果只
要唯一的元素臭家,我們可以使用RDD.distinct() 轉(zhuǎn)化操作來生成一個只包含不同元素的新
RDD。不過需要注意方淤,distinct() 操作的開銷很大钉赁,因為它需要將所有數(shù)據(jù)通過網(wǎng)絡(luò)進行
混洗(shuffle),以確保每個元素都只有一份携茂。
表:對一個數(shù)據(jù)為{1, 2, 3, 3}的RDD進行基本的RDD轉(zhuǎn)化操作
函數(shù)名 | 目的 | 示例 | 結(jié)果 |
---|---|---|---|
map() | 將函數(shù)應(yīng)用于RDD 中的每個元素你踩,將返回值構(gòu)成新的RDD | rdd.map(x => x + 1) | {2, 3, 4, 4} |
flatMap() | 將函數(shù)應(yīng)用于RDD 中的每個元素,將返回的迭代器的所有內(nèi)容構(gòu)成新的RDD讳苦。通常用來切分單詞 | rdd.flatMap(x => x.to(3)) | {1, 2, 3, 2, 3, 3, 3} |
filter() | 返回一個由通過傳給filter()的函數(shù)的元素組成的RDD | rdd.filter(x => x != 1) | {2, 3, 3} |
distinct() | 去重 | rdd.distinct() | {1, 2, 3} |
sample(withReplacement, fraction, [seed]) | 對RDD 采樣带膜,以及是否替換 | rdd.sample(false, 0.5) | 非確定的 |
表:對數(shù)據(jù)分別為{1, 2, 3}和{3, 4, 5}的RDD進行針對兩個RDD的轉(zhuǎn)化操作
函數(shù)名 | 目的 | 示例 | 結(jié)果 |
---|---|---|---|
union() | 生成一個包含兩個RDD 中所有元素的RDD | rdd.union(other) | {1, 2, 3, 3, 4, 5} |
intersection() | 求兩個RDD 共同的元素的RDD | rdd.intersection(other) | {3} |
subtract() | 移除一個RDD 中的內(nèi)容(例如移除訓(xùn)練數(shù)據(jù)) | rdd.subtract(other) | {1, 2} |
cartesian() | 與另一個RDD 的笛卡兒積 | rdd.cartesian(other) | {(1, 3), (1, 4), ...(3, 5)} |
行動操作
行動操作會把最終求得的結(jié)果返回到驅(qū)動器程序,或者寫入外部存儲系統(tǒng)中鸳谜。它往往會觸發(fā)轉(zhuǎn)化RDD的運行膝藕。
調(diào)試小數(shù)據(jù)集的情況下,我們可以使用RDD的collection()函數(shù)咐扭,獲取整個RDD中的數(shù)據(jù)芭挽。但這又一個前提,要單擊內(nèi)存能夠成功存下這所有的數(shù)據(jù)才行蝗肪。生產(chǎn)環(huán)境下這顯然是不現(xiàn)實的袜爪。我們一般將數(shù)據(jù)寫到HDFS或Amazon S3這種分布式的存儲系統(tǒng)中。
Java 中的reduce()
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) { return x + y; }
});
Java 中的aggregate()
class AvgCount implements Serializable {
public AvgCount(int total, int num) {
this.total = total;
this.num = num;
}
public int total;
public int num;
public double avg() {
return total / (double) num;
}
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
表:對一個數(shù)據(jù)為{1, 2, 3, 3}的RDD進行基本的RDD行動操作
函數(shù)名 | 目的 | 示例 | 結(jié)果 |
---|---|---|---|
collect() | 返回RDD 中的所有元素 | rdd.collect() | {1, 2, 3, 3} |
count() | RDD 中的元素個數(shù) | rdd.count() | 4 |
countByValue() | 各元素在RDD 中出現(xiàn)的次數(shù) | rdd.countByValue() | {(1, 1),(2, 1),(3, 2)} |
take(num) | 從RDD 中返回num 個元素 | rdd.take(2) | {1, 2} |
top(num) | 從RDD 中返回最前面的num個元素 | rdd.top(2) | {3, 3} |
takeOrdered(num)(ordering) | 從RDD 中按照提供的順序返回最前面的num 個元素 | rdd.takeOrdered(2)(myOrdering) | {3, 3} |
takeSample(withReplacement, num, [seed]) | 從RDD中返回任意一些元素 | rdd.takeSample(false, 1) | 非確定的 |
reduce(func) | 并行整合RDD 中所有數(shù)據(jù)(例如sum) | rdd.reduce((x, y) => x + y) | 9 |
fold(zero)(func) | 和reduce() 一樣薛闪, 但是需要提供初始值 | rdd.fold(0)((x, y) => x + y) | 9 |
aggregate(zeroValue)(seqOp, combOp) | 和reduce() 相似辛馆, 但是通常返回不同類型的函數(shù) | rdd.aggregate((0, 0)) ((x, y) =>(x._1 + y, x._2 + 1),(x, y) =>(x._1 + y._1, x._2 + y._2)) | (9,4) |
foreach(func) | 對RDD 中的每個元素使用給定的函數(shù) | rdd.foreach(func) | 無 |
持久化
對于Spark,雖然RDD是惰性求值的豁延,但是昙篙,如果簡單地對其調(diào)用行動操作倔韭,多次使用某一個RDD時就會重復(fù)計算。為了避免多次重新計算一個RDD,我們可以將其持久化瓢对。
表:org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化級
別
級 別 | 使用的空間 | CPU時間 | 是否在內(nèi)存中 | 是否在磁盤上 | 備 注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果數(shù)據(jù)在內(nèi)存中放不下寿酌,則溢寫道磁盤上 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 如果數(shù)據(jù)在內(nèi)存中存不下,則溢寫道磁盤上硕蛹。在內(nèi)存中存放序列化后的數(shù)據(jù) |
DISK_ONLY | 低 | 高 | 否 | 是 |
我們在第一次對這個RDD 調(diào)用行動操作前就調(diào)用了persist() 方法醇疼。==persist() 調(diào)
用本身不會觸發(fā)強制求值==。
RDD 還有一個方法叫作unpersist()法焰,調(diào)用該方法可以手動把持久化的RDD 從緩
存中移除秧荆。
Java在不同RDD類型間轉(zhuǎn)換
在Java 中,各種RDD 的特殊類型間的轉(zhuǎn)換更為明確埃仪。Java 中有兩個專門的類JavaDoubleRDD
和JavaPairRDD乙濒,來處理特殊類型的RDD,這兩個類還針對這些類型提供了額外的函數(shù)卵蛉。
這讓你可以更加了解所發(fā)生的一切颁股,但是也顯得有些累贅。
要構(gòu)建出這些特殊類型的RDD傻丝,需要使用特殊版本的類來替代一般使用的Function 類甘有。如果
要從T 類型的RDD 創(chuàng)建出一個DoubleRDD,我們就應(yīng)當(dāng)在映射操作中使用DoubleFunction<T>
來替代Function<T, Double>葡缰。表3-5 展示了一些特殊版本的函數(shù)類及其用法亏掀。
此外,我們也需要調(diào)用RDD 上的一些別的函數(shù)(因此不能只是創(chuàng)建出一個DoubleFunction
然后把它傳給map())泛释。當(dāng)需要一個DoubleRDD 時滤愕,我們應(yīng)當(dāng)調(diào)用mapToDouble() 來替代
map(),跟其他所有函數(shù)所遵循的模式一樣怜校。
表3-5:Java中針對專門類型的函數(shù)接口
函數(shù)名 | 等價函數(shù) | 用途 |
---|---|---|
DoubleFlatMapFunction<T> | Function<T, Iterable<Double>> | 用于flatMapToDouble间影,以生成DoubleRDD |
DoubleFunction<T> | Function<T, Double> | 用于mapToDouble,以生成DoubleRDD |
PairFlatMapFunction<T, K, V> | Function<T, Iterable<Tuple2<K, V>>> | 用于flatMapToPair韭畸,以生成PairRDD<K, V> |
PairFunction<T, K, V> | Function<T, Tuple2<K, V>> | 用于mapToPair宇智,以生成PairRDD<K, V> |
例:用Java 創(chuàng)建DoubleRDD
JavaDoubleRDD result = rdd.mapToDouble(
new DoubleFunction<Integer>() {
public double call(Integer x) {
return (double) x * x;
}
});
System.out.println(result.mean());
鍵值對RDD
眾所周知,Hadoop的MapReduce一般處理鍵值對數(shù)據(jù)胰丁。Map輸出,Reduce輸入和輸出都是Key-Value形式喂分。在Spark中我們也支持鍵值對形式锦庸。并且,Spark的鍵值對Rdd相對MapReduce的鍵值對來說蒲祈,操作更為簡單甘萧,形式更加復(fù)雜多樣萝嘁。Spark 為包含鍵值對類型的RDD 提供了一些專有的操作。這些RDD 被稱為pair RDD扬卷。
創(chuàng)建鍵值對RDD
Java 沒有自帶的二元組類型牙言,因此Spark 的Java API 讓用戶使用scala.Tuple2 類來創(chuàng)建二
元組。這個類很簡單:Java 用戶可以通過new Tuple2(elem1, elem2) 來創(chuàng)建一個新的二元
組怪得,并且可以通過._1() 和._2() 方法訪問其中的元素咱枉。
在Java 中使用第一個單詞作為鍵創(chuàng)建出一個pair RDD
PairFunction<String, String, String> keyData =
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
表4-1:Pair RDD的轉(zhuǎn)化操作(以鍵值對集合{(1, 2), (3, 4), (3, 6)}為例)
函數(shù)名 | 目的 | 示例 | 結(jié)果 |
---|---|---|---|
reduceByKey(func) | 合并具有相同鍵的值 | rdd.reduceByKey((x, y) => x + y) | {(1,2),(3,10)} |
groupByKey() | 對具有相同鍵的值進行分組 | rdd.groupByKey() | {(1,[2]),(3, [4,6])} |
combineByKey( createCombiner,mergeValue,mergeCombiners,partitioner) | 使用不同的返回類型合并具有相同鍵的值 | ||
mapValues(func) | 對pair RDD 中的每個值應(yīng)用一個函數(shù)而不改變鍵 | rdd.mapValues(x =>x+1) | {(1,3), (3,5), (3,7)} |
flatMapValues(func) | 對pair RDD 中的每個值應(yīng)用一個返回迭代器的函數(shù),然后對返回的每個元素都生成一個對應(yīng)原鍵的鍵對記錄徒恋。通常用于符號化 | rdd.flatMapValues(x => (x to 5)) | {(1,2), (1,3), (1,4), (1,5), (3,4), (3,5)} |
keys() | 返回一個僅包含鍵的RDD | rdd.keys() | {1, 3,3} |
values() | 返回一個僅包含值的RDD | rdd.values() | {2, 4,6} |
sortByKey() | 返回一個根據(jù)鍵排序的RDD | rdd.sortByKey() | {(1,2), (3,4), (3,6)} |
表4-2:針對兩個pair RDD的轉(zhuǎn)化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})
函數(shù)名 | 目的 | 示例 | 結(jié)果 |
---|---|---|---|
subtractByKey | 刪掉RDD 中鍵與other RDD 中的鍵相同的元素 | rdd.subtractByKey(other) | {(1, 2)} |
join | 對兩個RDD 進行內(nèi)連接 | rdd.join(other) | {(3, (4, 9)), (3,(6, 9))} |
rightOuterJoin | 對兩個RDD 進行連接操作蚕断,確保第一個RDD 的鍵必須存在(右外連接) | rdd.rightOuterJoin(other) | {(3,(Some(4),9)),(3,(Some(6),9))} |
leftOuterJoin | 對兩個RDD 進行連接操作,確保第二個RDD的鍵必須存在(左外連接) | rdd.leftOuterJoin(other) | {(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))} |
cogroup | 將兩個RDD 中擁有相同鍵的數(shù)據(jù)分組到一起 | rdd.cogroup(other) | {(1,([2],[]), (3,([4, 6],[9]))} |
combineByKey()
combineByKey()是最為常用的基于鍵進行聚合的函數(shù)入挣。大多數(shù)基于鍵聚合的函數(shù)都是用它
實現(xiàn)的亿乳。和aggregate() 一樣,combineByKey() 可以讓用戶返回與輸入數(shù)據(jù)的類型不同的
返回值径筏。
要理解combineByKey()葛假, 要先理解它在處理數(shù)據(jù)時是如何處理每個元素的。由于
combineByKey() 會遍歷分區(qū)中的所有元素滋恬,因此每個元素的鍵要么還沒有遇到過桐款,要么就
和之前的某個元素的鍵相同。
如果這是一個新的元素夷恍,combineByKey() 會使用一個叫作createCombiner() 的函數(shù)來創(chuàng)建
那個鍵對應(yīng)的累加器的初始值魔眨。需要注意的是,這一過程會在每個分區(qū)中第一次出現(xiàn)各個
鍵時發(fā)生酿雪,而不是在整個RDD 中第一次出現(xiàn)一個鍵時發(fā)生遏暴。
如果這是一個在處理當(dāng)前分區(qū)之前已經(jīng)遇到的鍵,它會使用mergeValue() 方法將該鍵的累
加器對應(yīng)的當(dāng)前值與這個新的值進行合并指黎。
由于每個分區(qū)都是獨立處理的朋凉,因此對于同一個鍵可以有多個累加器。如果有兩個或者更
多的分區(qū)都有對應(yīng)同一個鍵的累加器醋安,就需要使用用戶提供的mergeCombiners() 方法將各
個分區(qū)的結(jié)果進行合并杂彭。
如果已知數(shù)據(jù)在進行combineByKey() 時無法從map 端聚合中獲益的話,可以
禁用它吓揪。例如亲怠,由于聚合函數(shù)(追加到一個隊列)無法在map 端聚合時節(jié)約
任何空間实蓬,groupByKey() 就把它禁用了洽洁。如果希望禁用map 端組合,就需要
指定分區(qū)方式晦譬。就目前而言,你可以通過傳遞rdd.partitioner 來直接使用
源RDD 的分區(qū)方式习勤。
例:在Java 中使用combineByKey() 求每個鍵對應(yīng)的平均值
public static class AvgCount implements Serializable {
public AvgCount(int total, int num) { total_ = total; num_ = num; }
public int total_;
public int num_;
public float avg() { returntotal_/(float)num_; }
}
Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
public AvgCount call(Integer x) {
return new AvgCount(x, 1);
}
};
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total_ += x;
a.num_ += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
}
};
AvgCount initial = new AvgCount(0,0);
JavaPairRDD<String, AvgCount> avgCounts =
nums.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
Pair RDD的行動操作
表:Pair RDD的行動操作(以鍵值對集合{(1, 2), (3, 4), (3, 6)}為例)
函數(shù) | 描述 | 示例 | 結(jié)果 |
---|---|---|---|
countByKey() | 對每個鍵對應(yīng)的元素分別計數(shù) | rdd.countByKey() | {(1, 1), (3, 2)} |
collectAsMap() | 將結(jié)果以映射表的形式返回踪栋,以便查詢 | rdd.collectAsMap() | Map{(1,2), (3,4), (3, 6)} |
lookup(key) | 返回給定鍵對應(yīng)的所有值 | rdd.lookup(3) | [4, 6] |