Spark 為包含鍵值對類型的 RDD 提供了一些專有的操作负敏。這些 RDD 被稱為 pair RDD1渡讼。Pair RDD 是很多程序的構(gòu)成要素锐秦,因為它們提供了并行操作各個鍵或跨節(jié)點重新進行數(shù)據(jù)分組 的操作接口坛掠。例如赊锚,pair RDD 提供 reduceByKey() 方法,可以分別歸約每個鍵對應的數(shù)據(jù)屉栓, 還有 join() 方法舷蒲,可以把兩個 RDD 中鍵相同的元素組合到一起,合并為一個 RDD友多。我們 通常從一個 RDD 中提取某些字段(例如代表事件時間牲平、用戶 ID 或者其他標識符的字段), 并使用這些字段作為 pair RDD 操作中的鍵域滥。
創(chuàng)建Pair RDD
在 Spark 中有很多種創(chuàng)建 pair RDD 的方式欠拾。第 5 章會講到,很多存儲鍵值對的數(shù)據(jù)格式會 在讀取時直接返回由其鍵值對數(shù)據(jù)組成的 pair RDD骗绕。此外藐窄,當需要把一個普通的 RDD 轉(zhuǎn) 為 pair RDD 時,可以調(diào)用 map() 函數(shù)來實現(xiàn)酬土,傳遞的函數(shù)需要返回鍵值對荆忍。后面會展示如 何將由文本行組成的 RDD 轉(zhuǎn)換為以每行的第一個單詞為鍵的 pair RDD。
構(gòu)建鍵值對 RDD 的方法在不同的語言中會有所不同撤缴。在 Python 中刹枉,為了讓提取鍵之后的 數(shù)據(jù)能夠在函數(shù)中使用,需要返回一個由二元組組成的 RDD(見例 4-1)屈呕。
例 4-1:在 Python 中使用第一個單詞作為鍵創(chuàng)建出一個 pair RDD pairs = lines.map(lambda x: (x.split(" ")[0], x))
在 Scala 中微宝,為了讓提取鍵之后的數(shù)據(jù)能夠在函數(shù)中使用,同樣需要返回二元組(見例 4-2)虎眨。隱式轉(zhuǎn)換可以讓二元組 RDD 支持附加的鍵值對函數(shù)蟋软。
例 4-2:在 Scala 中使用第一個單詞作為鍵創(chuàng)建出一個 pair RDD val pairs = lines.map(x => (x.split(" ")(0), x))
Java 沒有自帶的二元組類型,因此 Spark 的 Java API 讓用戶使用 scala.Tuple2 類來創(chuàng)建二 元組嗽桩。這個類很簡單:Java 用戶可以通過 new Tuple2(elem1, elem2) 來創(chuàng)建一個新的二元 組岳守,并且可以通過 ._1() 和 ._2() 方法訪問其中的元素。
Java 用戶還需要調(diào)用專門的 Spark 函數(shù)來創(chuàng)建 pair RDD碌冶。例如湿痢,要使用 mapToPair() 函數(shù) 來代替基礎(chǔ)版的 map() 函數(shù),這在 3.5.2 節(jié)中的“Java”一節(jié)有過更詳細的討論扑庞。下面通過 例 4-3 中展示一個簡單的例子譬重。
例 4-3:在 Java 中使用第一個單詞作為鍵創(chuàng)建出一個 pair RDD
PairFunction<String, String, String> keyData =
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
當用 Scala 和 Python 從一個內(nèi)存中的數(shù)據(jù)集創(chuàng)建 pair RDD 時拒逮,只需要對這個由二元組組成 的集合調(diào)用 SparkContext.parallelize() 方法。而要使用 Java 從內(nèi)存數(shù)據(jù)集創(chuàng)建 pair RDD 的話臀规,則需要使用 SparkContext.parallelizePairs()消恍。
4.3 Pair RDD的轉(zhuǎn)化操作
Pair RDD 可以使用所有標準 RDD 上的可用的轉(zhuǎn)化操作。3.4 節(jié)中介紹的所有有關(guān)傳遞函數(shù)的規(guī)則也都同樣適用于 pair RDD以现。由于 pair RDD 中包含二元組狠怨,所以需要傳遞的函數(shù)應 當操作二元組而不是獨立的元素。表 4-1 和表 4-2 總結(jié)了對 pair RDD 的一些轉(zhuǎn)化操作
Pair RDD 也還是 RDD(元素為 Java 或 Scala 中的 Tuple2 對象或 Python 中的元組)邑遏,因此 同樣支持 RDD 所支持的函數(shù)佣赖。例如,我們可以拿前一節(jié)中的 pair RDD记盒,篩選掉長度超過 20 個字符的行憎蛤,如例 4-4 至例 4-6 以及圖 4-1 所示。
例 4-4:用 Python 對第二個元素進行篩選
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)
例 4-5:用 Scala 對第二個元素進行篩選 pairs.filter{case (key, value) => value.length < 20}
例 4-6:用 Java 對第二個元素進行篩選
Function<Tuple2<String, String>, Boolean> longWordFilter =
new Function<Tuple2<String, String>, Boolean>() {
public Boolean call(Tuple2<String, String> keyValue) {
return (keyValue._2().length() < 20);
} };
JavaPairRDD<String, String> result = pairs.filter(longWordFilter);
有時纪吮,我們只想訪問 pair RDD 的值部分俩檬,這時操作二元組很麻煩。由于這是一種常見的 使用模式碾盟,因此 Spark 提供了 mapValues(func) 函數(shù)棚辽,功能類似于 map{case (x, y): (x, func(y))}”龋可以在很多例子中使用這個函數(shù)屈藐。
4.3.1 聚合操作
當數(shù)據(jù)集以鍵值對形式組織的時候,聚合具有相同鍵的元素進行一些統(tǒng)計是很常見的操 作熙尉。之前講解過基礎(chǔ) RDD 上的 fold()联逻、combine()、reduce() 等行動操作检痰,pair RDD 上則 有相應的針對鍵的轉(zhuǎn)化操作包归。Spark 有一組類似的操作,可以組合具有相同鍵的值铅歼。這些 操作返回 RDD公壤,因此它們是轉(zhuǎn)化操作而不是行動操作。
reduceByKey() 與 reduce() 相當類似;它們都接收一個函數(shù)谭贪,并使用該函數(shù)對值進行合并境钟。 reduceByKey() 會為數(shù)據(jù)集中的每個鍵進行并行的歸約操作锦担,每個歸約操作會將鍵相同的值合 并起來俭识。因為數(shù)據(jù)集中可能有大量的鍵,所以 reduceByKey() 沒有被實現(xiàn)為向用戶程序返回一 個值的行動操作洞渔。實際上套媚,它會返回一個由各鍵和對應鍵歸約出來的結(jié)果值組成的新的 RDD缚态。
例 4-8:在 Scala 中使用 reduceByKey() 和 mapValues() 計算每個鍵對應的平均值 rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
例 4-10:用 Scala 實現(xiàn)單詞計數(shù)
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
例 4-11:用 Java 實現(xiàn)單詞計數(shù)
JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }
});
JavaPairRDD<String, Integer> result = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); }
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
事實上,我們可以對第一個 RDD 使用 countByValue() 函數(shù)堤瘤,以更快地實現(xiàn) 單詞計數(shù):
input.flatMap(x => x.split(" ")).countByValue()玫芦。
combineByKey() 是最為常用的基于鍵進行聚合的函數(shù)。大多數(shù)基于鍵聚合的函數(shù)都是用它 實現(xiàn)的本辐。和 aggregate() 一樣桥帆,combineByKey() 可以讓用戶返回與輸入數(shù)據(jù)的類型不同的 返回值。
要理解 combineByKey()慎皱,要先理解它在處理數(shù)據(jù)時是如何處理每個元素的老虫。由于 combineByKey() 會遍歷分區(qū)中的所有元素,因此每個元素的鍵要么還沒有遇到過茫多,要么就 和之前的某個元素的鍵相同祈匙。
如果這是一個新的元素,combineByKey() 會使用一個叫作 createCombiner() 的函數(shù)來創(chuàng)建 那個鍵對應的累加器的初始值天揖。需要注意的是夺欲,這一過程會在每個分區(qū)中第一次出現(xiàn)各個 鍵時發(fā)生,而不是在整個 RDD 中第一次出現(xiàn)一個鍵時發(fā)生今膊。
如果這是一個在處理當前分區(qū)之前已經(jīng)遇到的鍵些阅,它會使用 mergeValue() 方法將該鍵的累 加器對應的當前值與這個新的值進行合并。
由于每個分區(qū)都是獨立處理的斑唬,因此對于同一個鍵可以有多個累加器扑眉。如果有兩個或者更 多的分區(qū)都有對應同一個鍵的累加器,就需要使用用戶提供的 mergeCombiners() 方法將各 個分區(qū)的結(jié)果進行合并赖钞。
combineByKey() 有多個參數(shù)分別對應聚合操作的各個階段腰素,因而非常適合用來解釋聚合操 作各個階段的功能劃分。為了更好地演示 combineByKey() 是如何工作的雪营,下面來看看如何 計算各鍵對應的平均值弓千,
例 4-13:在 Scala 中使用 combineByKey() 求每個鍵對應的平均值
val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().map(println(_))
例 4-14:在 Java 中使用 combineByKey() 求每個鍵對應的平均值
public static class AvgCount implements Serializable {
public AvgCount(int total, int num) { total_ = total; num_ = num; }
public int total_;
public int num_;
public float avg() { returntotal_/(float)num_; }
}
Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
public AvgCount call(Integer x) {
return new AvgCount(x, 1);
}
};
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total_ += x;
a.num_ += 1;
return a;
} };
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
} };
AvgCount initial = new AvgCount(0,0);
JavaPairRDD<String, AvgCount> avgCounts =
nums.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
有很多函數(shù)可以進行基于鍵的數(shù)據(jù)合并。它們中的大多數(shù)都是在 combineByKey() 的基礎(chǔ)上實現(xiàn)的献起,為用戶提供了更簡單的接口洋访。不管怎樣,在 Spark 中使用這些專用的聚合函數(shù)谴餐,始終要比手動將數(shù)據(jù)分組再歸約快很多姻政。
并行度調(diào)優(yōu)
到目前為止,我們已經(jīng)討論了所有的轉(zhuǎn)化操作的分發(fā)方式岂嗓,但是還沒有探討 Spark 是怎樣 確定如何分割工作的汁展。每個 RDD 都有固定數(shù)目的分區(qū),分區(qū)數(shù)決定了在 RDD 上執(zhí)行操作 時的并行度。
在執(zhí)行聚合或分組操作時食绿,可以要求 Spark 使用給定的分區(qū)數(shù)侈咕。Spark 始終嘗試根據(jù)集群的大小推斷出一個有意義的默認值,但是有時候你可能要對并行度進行調(diào)優(yōu)來獲取更好的性能表現(xiàn)器紧。
本章討論的大多數(shù)操作符都能接收第二個參數(shù)耀销,這個參數(shù)用來指定分組結(jié)果或聚合結(jié)果的 RDD 的分區(qū)數(shù),如例 4-15 和例 4-16 所示铲汪。
例 4-15:在 Python 中自定義 reduceByKey() 的并行度
data = [("a", 3), ("b", 4), ("a", 1)] sc.parallelize(data).reduceByKey(lambda x, y: x + y) # 默認并行度 sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # 自定義并行度
例 4-16:在 Scala 中自定義 reduceByKey() 的并行度
val data = Seq(("a", 3), ("b", 4), ("a", 1)) sc.parallelize(data).reduceByKey((x, y) => x + y) // 默認并行度 sc.parallelize(data).reduceByKey((x, y) => x + y, 10) // 自定義并行度
有時熊尉,我們希望在除分組操作和聚合操作之外的操作中也能改變 RDD 的分區(qū)。對于 這樣的情況掌腰,Spark 提供了 repartition() 函數(shù)帽揪。它會把數(shù)據(jù)通過網(wǎng)絡進行混洗,并創(chuàng) 建出新的分區(qū)集合辅斟。切記转晰,對數(shù)據(jù)進行重新分區(qū)是代價相對比較大的操作。Spark 中也 有一個優(yōu)化版的 repartition()士飒,叫作 coalesce()查邢。你可以使用 Java 或 Scala 中的 rdd. partitions.size() 以及 Python 中的 rdd.getNumPartitions 查看 RDD 的分區(qū)數(shù),并確保調(diào) 用 coalesce() 時將 RDD 合并到比現(xiàn)在的分區(qū)數(shù)更少的分區(qū)中酵幕。
4.3.2 數(shù)據(jù)分組
對于有鍵的數(shù)據(jù)扰藕,一個常見的用例是將數(shù)據(jù)根據(jù)鍵進行分組——比如查看一個顧客的所有訂單。
如果數(shù)據(jù)已經(jīng)以預期的方式提取了鍵芳撒,groupByKey() 就會使用 RDD 中的鍵來對數(shù)據(jù)進行 分組邓深。對于一個由類型 K 的鍵和類型 V 的值組成的 RDD,所得到的結(jié)果 RDD 類型會是 [K, Iterable[V]]笔刹。
groupBy() 可以用于未成對的數(shù)據(jù)上芥备,也可以根據(jù)除鍵相同以外的條件進行分組。它可以 接收一個函數(shù)舌菜,對源 RDD 中的每個元素使用該函數(shù)萌壳,將返回結(jié)果作為鍵再進行分組。
4.3.3 連接
將有鍵的數(shù)據(jù)與另一組有鍵的數(shù)據(jù)一起使用是對鍵值對數(shù)據(jù)執(zhí)行的最有用的操作之一日月。連 接數(shù)據(jù)可能是 pair RDD 最常用的操作之一袱瓮。連接方式多種多樣:右外連接、左外連接爱咬、交 叉連接以及內(nèi)連接尺借。
普通的 join 操作符表示內(nèi)連接 2。只有在兩個 pair RDD 中都存在的鍵才叫輸出精拟。當一個輸 入對應的某個鍵有多個值時燎斩,生成的 pair RDD 會包括來自兩個輸入 RDD 的每一組相對應 的記錄虱歪。例 4-17 可以幫你理解這個定義。
例 4-17:在 Scala shell 中進行內(nèi)連接
storeAddress = {
(Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
(Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")}
storeRating = {
(Store("Ritual"), 4.9), (Store("Philz"), 4.8))}
storeAddress.join(storeRating) == {
(Store("Ritual"), ("1026 Valencia St", 4.9)),
(Store("Philz"), ("748 Van Ness Ave", 4.8)),
(Store("Philz"), ("3101 24th St", 4.8))}
4.3.4 數(shù)據(jù)排序
很多時候瘫里,讓數(shù)據(jù)排好序是很有用的实蔽,尤其是在生成下游輸出時荡碾。如果鍵有已定義的順 序谨读,就可以對這種鍵值對 RDD 進行排序。當把數(shù)據(jù)排好序后坛吁,后續(xù)對數(shù)據(jù)進行 collect() 或 save() 等操作都會得到有序的數(shù)據(jù)劳殖。
我們經(jīng)常要將 RDD 倒序排列,因此 sortByKey() 函數(shù)接收一個叫作 ascending 的參數(shù)拨脉,表 示我們是否想要讓結(jié)果按升序排序(默認值為 true)哆姻。有時我們也可能想按完全不同的排 序依據(jù)進行排序。要支持這種情況玫膀,我們可以提供自定義的比較函數(shù)矛缨。
例 4-19:在 Python 中以字符串順序?qū)φ麛?shù)進行自定義排序
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))
例 4-20:在 Scala 中以字符串順序?qū)φ麛?shù)進行自定義排序
val input: RDD[(Int, Venue)] = ...
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()
例 4-21:在 Java 中以字符串順序?qū)φ麛?shù)進行自定義排序
class IntegerComparator implements Comparator<Integer> {
public int compare(Integer a, Integer b) {
return String.valueOf(a).compareTo(String.valueOf(b))
}
}
rdd.sortByKey(comp)
4.4 Pair RDD的行動操作
和轉(zhuǎn)化操作一樣,所有基礎(chǔ) RDD 支持的傳統(tǒng)行動操作也都在 pair RDD 上可用帖旨。Pair RDD 提供了一些額外的行動操作箕昭,可以讓我們充分利用數(shù)據(jù)的鍵值對特性。這些操作列在了表 4-3 中解阅。
4.5 數(shù)據(jù)分區(qū)
本章要討論的最后一個 Spark 特性是對數(shù)據(jù)集在節(jié)點間的分區(qū)進行控制落竹。在分布式程序中, 通信的代價是很大的货抄,因此控制數(shù)據(jù)分布以獲得最少的網(wǎng)絡傳輸可以極大地提升整體性 能述召。和單節(jié)點的程序需要為記錄集合選擇合適的數(shù)據(jù)結(jié)構(gòu)一樣,Spark 程序可以通過控制 RDD 分區(qū)方式來減少通信開銷蟹地。分區(qū)并不是對所有應用都有好處的——比如积暖,如果給定 RDD 只需要被掃描一次,我們完全沒有必要對其預先進行分區(qū)處理怪与。只有當數(shù)據(jù)集多次在 諸如連接這種基于鍵的操作中使用時呀酸,分區(qū)才會有幫助。我們會給出一些小例子來說明這點琼梆。
Spark 中所有的鍵值對 RDD 都可以進行分區(qū)性誉。系統(tǒng)會根據(jù)一個針對鍵的函數(shù)對元素進行分 組。盡管 Spark 沒有給出顯示控制每個鍵具體落在哪一個工作節(jié)點上的方法(部分原因是 Spark 即使在某些節(jié)點失敗時依然可以工作)茎杂,但 Spark 可以確保同一組的鍵出現(xiàn)在同一個 節(jié)點上错览。比如,你可能使用哈希分區(qū)將一個 RDD 分成了 100 個分區(qū)煌往,此時鍵的哈希值對 100 取模的結(jié)果相同的記錄會被放在一個節(jié)點上倾哺。你也可以使用范圍分區(qū)法轧邪,將鍵在同一 個范圍區(qū)間內(nèi)的記錄都放在同一個節(jié)點上。
舉個簡單的例子羞海,我們分析這樣一個應用忌愚,它在內(nèi)存中保存著一張很大的用戶信息表—— 也就是一個由 (UserID, UserInfo) 對組成的 RDD,其中 UserInfo 包含一個該用戶所訂閱 的主題的列表却邓。該應用會周期性地將這張表與一個小文件進行組合硕糊,這個小文件中存著過 去五分鐘內(nèi)發(fā)生的事件——其實就是一個由 (UserID, LinkInfo) 對組成的表,存放著過去 五分鐘內(nèi)某網(wǎng)站各用戶的訪問情況腊徙。例如简十,我們可能需要對用戶訪問其未訂閱主題的頁面 的情況進行統(tǒng)計。我們可以使用 Spark 的 join() 操作來實現(xiàn)這個組合操作撬腾,其中需要把 UserInfo 和 LinkInfo 的有序?qū)Ω鶕?jù) UserID 進行分組螟蝙。我們的應用如例 4-22 所示。
例 4-22:簡單的 Scala 應用
// 初始化代碼;從HDFS商的一個Hadoop SequenceFile中讀取用戶信息
// userData中的元素會根據(jù)它們被讀取時的來源民傻,即HDFS塊所在的節(jié)點來分布
// Spark此時無法獲知某個特定的UserID對應的記錄位于哪個節(jié)點上
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// 周期性調(diào)用函數(shù)來處理過去五分鐘產(chǎn)生的事件日志
// 假設這是一個包含(UserID, LinkInfo)對的SequenceFile def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
這段代碼可以正確運行胰默,但是不夠高效。這是因為在每次調(diào)用 processNewLogs() 時都會用 到 join() 操作漓踢,而我們對數(shù)據(jù)集是如何分區(qū)的卻一無所知牵署。默認情況下,連接操作會將兩 個數(shù)據(jù)集中的所有鍵的哈希值都求出來彭雾,將該哈希值相同的記錄通過網(wǎng)絡傳到同一臺機器 上碟刺,然后在那臺機器上對所有鍵相同的記錄進行連接操作(見圖 4-4)。因為 userData 表比 每五分鐘出現(xiàn)的訪問日志表 events 要大得多薯酝,所以要浪費時間做很多額外工作:在每次調(diào) 用時都對 userData 表進行哈希值計算和跨節(jié)點數(shù)據(jù)混洗半沽,雖然這些數(shù)據(jù)從來都不會變化。
要解決這一問題也很簡單:在程序開始時吴菠,對 userData 表使用 partitionBy() 轉(zhuǎn)化操作者填, 將這張表轉(zhuǎn)為哈希分區(qū)∽隹可以通過向 partitionBy 傳遞一個 spark.HashPartitioner 對象來 實現(xiàn)該操作占哟,如例 4-23 所示。
例 4-23:Scala 自定義分區(qū)方式
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // 構(gòu)造100個分區(qū) .persist()
processNewLogs() 方 法 可 以 保 持 不 變: 在 processNewLogs() 中酿矢,eventsRDD 是 本 地 變 量榨乎,只在該方法中使用了一次,所以為 events 指定分區(qū)方式?jīng)]有什么用處瘫筐。由于在構(gòu) 建 userData 時調(diào)用了 partitionBy()蜜暑,Spark 就知道了該 RDD 是根據(jù)鍵的哈希值來分 區(qū)的,這樣在調(diào)用 join() 時策肝,Spark 就會利用到這一點肛捍。具體來說隐绵,當調(diào)用 userData. join(events) 時,Spark 只會對 events 進行數(shù)據(jù)混洗操作拙毫,將 events 中特定 UserID 的記 錄發(fā)送到 userData 的對應分區(qū)所在的那臺機器上(見圖 4-5)依许。這樣,需要通過網(wǎng)絡傳輸?shù)?數(shù)據(jù)就大大減少了缀蹄,程序運行速度也可以顯著提升了峭跳。
注意,partitionBy() 是一個轉(zhuǎn)化操作袍患,因此它的返回值總是一個新的 RDD坦康,但它不會改變 原來的 RDD竣付。RDD 一旦創(chuàng)建就無法修改诡延。因此應該對 partitionBy() 的結(jié)果進行持久化, 并保存為 userData古胆,而不是原來的 sequenceFile() 的輸出肆良。此外,傳給 partitionBy() 的 100 表示分區(qū)數(shù)目逸绎,它會控制之后對這個 RDD 進行進一步操作(比如連接操作)時有多少 任務會并行執(zhí)行惹恃。總的來說棺牧,這個值至少應該和集群中的總核心數(shù)一樣巫糙。
如果沒有將 partitionBy() 轉(zhuǎn)化操作的結(jié)果持久化,那么后面每次用到這個 RDD 時都會重復地對數(shù)據(jù)進行分區(qū)操作颊乘。不進行持久化會導致整個 RDD 譜 系圖重新求值参淹。那樣的話,partitionBy() 帶來的好處就會被抵消乏悄,導致重 復對數(shù)據(jù)進行分區(qū)以及跨節(jié)點的混洗浙值,和沒有指定分區(qū)方式時發(fā)生的情況十 分相似。
事實上檩小,許多其他 Spark 操作會自動為結(jié)果 RDD 設定已知的分區(qū)方式信息开呐,而且除 join() 外還有很多操作也會利用到已有的分區(qū)信息。比如规求,sortByKey() 和 groupByKey() 會分別生成范圍分區(qū)的 RDD 和哈希分區(qū)的 RDD筐付。而另一方面,諸如 map() 這樣的操作會 導致新的 RDD 失去父 RDD 的分區(qū)信息阻肿,因為這樣的操作理論上可能會修改每條記錄的 鍵瓦戚。接下來的幾節(jié)中,我們會討論如何獲取 RDD 的分區(qū)信息冕茅,以及數(shù)據(jù)分區(qū)是如何影響 各種 Spark 操作的伤极。
4.5.1 獲取RDD的分區(qū)方式
在 Scala 和 Java 中蛹找,你可以使用 RDD 的 partitioner 屬性(Java 中使用 partitioner() 方法)來獲取 RDD 的分區(qū)方式。 它會返回一個 scala.Option 對象哨坪,這是 Scala 中用來存放 可能存在的對象的容器類庸疾。你可以對這個 Option 對象調(diào)用 isDefined() 來檢查其中是否有 值,調(diào)用 get() 來獲取其中的值当编。如果存在值的話届慈,這個值會是一個 spark.Partitioner 對象。這本質(zhì)上是一個告訴我們 RDD 中各個鍵分別屬于哪個分區(qū)的函數(shù)忿偷。
在 Spark shell 中使用 partitioner 屬性不僅是檢驗各種 Spark 操作如何影響分區(qū)方式的一種 好辦法金顿,還可以用來在你的程序中檢查想要使用的操作是否會生成正確的結(jié)果(見例 4-24)。
例 4-24:獲取 RDD 的分區(qū)方式
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at
<console>:12
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)
在這段簡短的代碼中鲤桥,我們創(chuàng)建出了一個由 (Int, Int) 對組成的 RDD揍拆,初始時沒有分 區(qū)方式信息(一個值為 None 的 Option 對象)。然后通過對第一個 RDD 進行哈希分區(qū)茶凳, 創(chuàng)建出了第二個 RDD嫂拴。如果確實要在后續(xù)操作中使用 partitioned,那就應當在定義 partitioned 時贮喧,在第三行輸入的最后加上 persist()筒狠。這和之前的例子中需要對 userData 調(diào)用 persist() 的原因是一樣的:如果不調(diào)用 persist() 的話,后續(xù)的 RDD 操作會對 partitioned 的整個譜系重新求值箱沦,這會導致對 pairs 一遍又一遍地進行哈希分區(qū)操作辩恼。
4.5.2 從分區(qū)中獲益的操作
Spark 的許多操作都引入了將數(shù)據(jù)根據(jù)鍵跨節(jié)點進行混洗的過程。所有這些操作都會 從數(shù)據(jù)分區(qū)中獲益谓形。就 Spark 1.0 而言灶伊,能夠從數(shù)據(jù)分區(qū)中獲益的操作有 cogroup()、 groupWith()套耕、join()啊奄、leftOuterJoin()桩蓉、rightOuterJoin()元暴、groupByKey()苍鲜、reduceByKey()、 combineByKey() 以及 lookup()康愤。
對于像 reduceByKey() 這樣只作用于單個 RDD 的操作儡循,運行在未分區(qū)的 RDD 上的時候會 導致每個鍵的所有對應值都在每臺機器上進行本地計算,只需要把本地最終歸約出的結(jié) 果值從各工作節(jié)點傳回主節(jié)點征冷,所以原本的網(wǎng)絡開銷就不算大择膝。而對于諸如 cogroup() 和 join() 這樣的二元操作,預先進行數(shù)據(jù)分區(qū)會導致其中至少一個 RDD(使用已知分區(qū)器的那個 RDD)不發(fā)生數(shù)據(jù)混洗检激。如果兩個 RDD 使用同樣的分區(qū)方式肴捉,并且它們還緩存在 同樣的機器上(比如一個 RDD 是通過 mapValues() 從另一個 RDD 中創(chuàng)建出來的腹侣,這兩個 RDD 就會擁有相同的鍵和分區(qū)方式),或者其中一個 RDD 還沒有被計算出來齿穗,那么跨節(jié) 點的數(shù)據(jù)混洗就不會發(fā)生了傲隶。
4.5.4 自定義分區(qū)方式
雖然 Spark 提供的 HashPartitioner 與 RangePartitioner 已經(jīng)能夠滿足大多數(shù)用例,但 Spark 還是允許你通過提供一個自定義的 Partitioner 對象來控制 RDD 的分區(qū)方式窃页。這可 以讓你利用領(lǐng)域知識進一步減少通信開銷跺株。
要實現(xiàn)自定義的分區(qū)器,你需要繼承org.apache.spark.Partitioner 類并實現(xiàn)下面三個方法脖卖。
- numPartitions: Int:返回創(chuàng)建出來的分區(qū)數(shù)乒省。
- getPartition(key: Any): Int:返回給定鍵的分區(qū)編號(0 到 numPartitions-1)。
- equals():Java 判斷相等性的標準方法畦木。這個方法的實現(xiàn)非常重要袖扛,Spark 需要用這個方法來檢查你的分區(qū)器對象是否和其他分區(qū)器實例相同,這樣 Spark 才可以判斷兩個 RDD 的分區(qū)方式是否相同馋劈。