Spark鍵值隊操作

Spark 為包含鍵值對類型的 RDD 提供了一些專有的操作负敏。這些 RDD 被稱為 pair RDD1渡讼。Pair RDD 是很多程序的構(gòu)成要素锐秦,因為它們提供了并行操作各個鍵或跨節(jié)點重新進行數(shù)據(jù)分組 的操作接口坛掠。例如赊锚,pair RDD 提供 reduceByKey() 方法,可以分別歸約每個鍵對應的數(shù)據(jù)屉栓, 還有 join() 方法舷蒲,可以把兩個 RDD 中鍵相同的元素組合到一起,合并為一個 RDD友多。我們 通常從一個 RDD 中提取某些字段(例如代表事件時間牲平、用戶 ID 或者其他標識符的字段), 并使用這些字段作為 pair RDD 操作中的鍵域滥。

創(chuàng)建Pair RDD

在 Spark 中有很多種創(chuàng)建 pair RDD 的方式欠拾。第 5 章會講到,很多存儲鍵值對的數(shù)據(jù)格式會 在讀取時直接返回由其鍵值對數(shù)據(jù)組成的 pair RDD骗绕。此外藐窄,當需要把一個普通的 RDD 轉(zhuǎn) 為 pair RDD 時,可以調(diào)用 map() 函數(shù)來實現(xiàn)酬土,傳遞的函數(shù)需要返回鍵值對荆忍。后面會展示如 何將由文本行組成的 RDD 轉(zhuǎn)換為以每行的第一個單詞為鍵的 pair RDD。

構(gòu)建鍵值對 RDD 的方法在不同的語言中會有所不同撤缴。在 Python 中刹枉,為了讓提取鍵之后的 數(shù)據(jù)能夠在函數(shù)中使用,需要返回一個由二元組組成的 RDD(見例 4-1)屈呕。

例 4-1:在 Python 中使用第一個單詞作為鍵創(chuàng)建出一個 pair RDD pairs = lines.map(lambda x: (x.split(" ")[0], x))

在 Scala 中微宝,為了讓提取鍵之后的數(shù)據(jù)能夠在函數(shù)中使用,同樣需要返回二元組(見例 4-2)虎眨。隱式轉(zhuǎn)換可以讓二元組 RDD 支持附加的鍵值對函數(shù)蟋软。
例 4-2:在 Scala 中使用第一個單詞作為鍵創(chuàng)建出一個 pair RDD val pairs = lines.map(x => (x.split(" ")(0), x))

Java 沒有自帶的二元組類型,因此 Spark 的 Java API 讓用戶使用 scala.Tuple2 類來創(chuàng)建二 元組嗽桩。這個類很簡單:Java 用戶可以通過 new Tuple2(elem1, elem2) 來創(chuàng)建一個新的二元 組岳守,并且可以通過 ._1() 和 ._2() 方法訪問其中的元素。

Java 用戶還需要調(diào)用專門的 Spark 函數(shù)來創(chuàng)建 pair RDD碌冶。例如湿痢,要使用 mapToPair() 函數(shù) 來代替基礎(chǔ)版的 map() 函數(shù),這在 3.5.2 節(jié)中的“Java”一節(jié)有過更詳細的討論扑庞。下面通過 例 4-3 中展示一個簡單的例子譬重。

例 4-3:在 Java 中使用第一個單詞作為鍵創(chuàng)建出一個 pair RDD
     PairFunction<String, String, String> keyData =
       new PairFunction<String, String, String>() {
       public Tuple2<String, String> call(String x) {
         return new Tuple2(x.split(" ")[0], x);
       }
     };
     JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);

當用 Scala 和 Python 從一個內(nèi)存中的數(shù)據(jù)集創(chuàng)建 pair RDD 時拒逮,只需要對這個由二元組組成 的集合調(diào)用 SparkContext.parallelize() 方法。而要使用 Java 從內(nèi)存數(shù)據(jù)集創(chuàng)建 pair RDD 的話臀规,則需要使用 SparkContext.parallelizePairs()消恍。

4.3 Pair RDD的轉(zhuǎn)化操作

Pair RDD 可以使用所有標準 RDD 上的可用的轉(zhuǎn)化操作。3.4 節(jié)中介紹的所有有關(guān)傳遞函數(shù)的規(guī)則也都同樣適用于 pair RDD以现。由于 pair RDD 中包含二元組狠怨,所以需要傳遞的函數(shù)應 當操作二元組而不是獨立的元素。表 4-1 和表 4-2 總結(jié)了對 pair RDD 的一些轉(zhuǎn)化操作

image.png

Pair RDD 也還是 RDD(元素為 Java 或 Scala 中的 Tuple2 對象或 Python 中的元組)邑遏,因此 同樣支持 RDD 所支持的函數(shù)佣赖。例如,我們可以拿前一節(jié)中的 pair RDD记盒,篩選掉長度超過 20 個字符的行憎蛤,如例 4-4 至例 4-6 以及圖 4-1 所示。

例 4-4:用 Python 對第二個元素進行篩選
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)
例 4-5:用 Scala 對第二個元素進行篩選 pairs.filter{case (key, value) => value.length < 20}
例 4-6:用 Java 對第二個元素進行篩選
     Function<Tuple2<String, String>, Boolean> longWordFilter =
       new Function<Tuple2<String, String>, Boolean>() {
         public Boolean call(Tuple2<String, String> keyValue) {
           return (keyValue._2().length() < 20);
} };
     JavaPairRDD<String, String> result = pairs.filter(longWordFilter);

image.png

有時纪吮,我們只想訪問 pair RDD 的值部分俩檬,這時操作二元組很麻煩。由于這是一種常見的 使用模式碾盟,因此 Spark 提供了 mapValues(func) 函數(shù)棚辽,功能類似于 map{case (x, y): (x, func(y))}”龋可以在很多例子中使用這個函數(shù)屈藐。

4.3.1 聚合操作

當數(shù)據(jù)集以鍵值對形式組織的時候,聚合具有相同鍵的元素進行一些統(tǒng)計是很常見的操 作熙尉。之前講解過基礎(chǔ) RDD 上的 fold()联逻、combine()、reduce() 等行動操作检痰,pair RDD 上則 有相應的針對鍵的轉(zhuǎn)化操作包归。Spark 有一組類似的操作,可以組合具有相同鍵的值铅歼。這些 操作返回 RDD公壤,因此它們是轉(zhuǎn)化操作而不是行動操作。

reduceByKey() 與 reduce() 相當類似;它們都接收一個函數(shù)谭贪,并使用該函數(shù)對值進行合并境钟。 reduceByKey() 會為數(shù)據(jù)集中的每個鍵進行并行的歸約操作锦担,每個歸約操作會將鍵相同的值合 并起來俭识。因為數(shù)據(jù)集中可能有大量的鍵,所以 reduceByKey() 沒有被實現(xiàn)為向用戶程序返回一 個值的行動操作洞渔。實際上套媚,它會返回一個由各鍵和對應鍵歸約出來的結(jié)果值組成的新的 RDD缚态。

例 4-8:在 Scala 中使用 reduceByKey() 和 mapValues() 計算每個鍵對應的平均值 rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

image.png
例 4-10:用 Scala 實現(xiàn)單詞計數(shù)
     val input = sc.textFile("s3://...")
     val words = input.flatMap(x => x.split(" "))
     val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
例 4-11:用 Java 實現(xiàn)單詞計數(shù)
     JavaRDD<String> input = sc.textFile("s3://...")
     JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
       public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }
     });
     JavaPairRDD<String, Integer> result = words.mapToPair(
       new PairFunction<String, String, Integer>() {
         public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); }
     }).reduceByKey(
       new Function2<Integer, Integer, Integer>() {
         public Integer call(Integer a, Integer b) { return a + b; }
});

事實上,我們可以對第一個 RDD 使用 countByValue() 函數(shù)堤瘤,以更快地實現(xiàn) 單詞計數(shù):
input.flatMap(x => x.split(" ")).countByValue()玫芦。

combineByKey() 是最為常用的基于鍵進行聚合的函數(shù)。大多數(shù)基于鍵聚合的函數(shù)都是用它 實現(xiàn)的本辐。和 aggregate() 一樣桥帆,combineByKey() 可以讓用戶返回與輸入數(shù)據(jù)的類型不同的 返回值。

要理解 combineByKey()慎皱,要先理解它在處理數(shù)據(jù)時是如何處理每個元素的老虫。由于 combineByKey() 會遍歷分區(qū)中的所有元素,因此每個元素的鍵要么還沒有遇到過茫多,要么就 和之前的某個元素的鍵相同祈匙。

如果這是一個新的元素,combineByKey() 會使用一個叫作 createCombiner() 的函數(shù)來創(chuàng)建 那個鍵對應的累加器的初始值天揖。需要注意的是夺欲,這一過程會在每個分區(qū)中第一次出現(xiàn)各個 鍵時發(fā)生,而不是在整個 RDD 中第一次出現(xiàn)一個鍵時發(fā)生今膊。

如果這是一個在處理當前分區(qū)之前已經(jīng)遇到的鍵些阅,它會使用 mergeValue() 方法將該鍵的累 加器對應的當前值與這個新的值進行合并。

由于每個分區(qū)都是獨立處理的斑唬,因此對于同一個鍵可以有多個累加器扑眉。如果有兩個或者更 多的分區(qū)都有對應同一個鍵的累加器,就需要使用用戶提供的 mergeCombiners() 方法將各 個分區(qū)的結(jié)果進行合并赖钞。

combineByKey() 有多個參數(shù)分別對應聚合操作的各個階段腰素,因而非常適合用來解釋聚合操 作各個階段的功能劃分。為了更好地演示 combineByKey() 是如何工作的雪营,下面來看看如何 計算各鍵對應的平均值弓千,

例 4-13:在 Scala 中使用 combineByKey() 求每個鍵對應的平均值
     val result = input.combineByKey(
       (v) => (v, 1),
       (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
       (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
       ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
       result.collectAsMap().map(println(_))
例 4-14:在 Java 中使用 combineByKey() 求每個鍵對應的平均值
     public static class AvgCount implements Serializable {
       public AvgCount(int total, int num) {   total_ = total;  num_ = num; }
       public int total_;
       public int num_;
       public float avg() {   returntotal_/(float)num_; }
}
     Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
       public AvgCount call(Integer x) {
    return new AvgCount(x, 1);
       }
     };
     Function2<AvgCount, Integer, AvgCount> addAndCount =
       new Function2<AvgCount, Integer, AvgCount>() {
       public AvgCount call(AvgCount a, Integer x) {
         a.total_ += x;
         a.num_ += 1;
         return a;
} };
     Function2<AvgCount, AvgCount, AvgCount> combine =
       new Function2<AvgCount, AvgCount, AvgCount>() {
       public AvgCount call(AvgCount a, AvgCount b) {
         a.total_ += b.total_;
         a.num_ += b.num_;
         return a;
} };
     AvgCount initial = new AvgCount(0,0);
     JavaPairRDD<String, AvgCount> avgCounts =
       nums.combineByKey(createAcc, addAndCount, combine);
     Map<String, AvgCount> countMap = avgCounts.collectAsMap();
     for (Entry<String, AvgCount> entry : countMap.entrySet()) {
       System.out.println(entry.getKey() + ":" + entry.getValue().avg());
     }

image.png

有很多函數(shù)可以進行基于鍵的數(shù)據(jù)合并。它們中的大多數(shù)都是在 combineByKey() 的基礎(chǔ)上實現(xiàn)的献起,為用戶提供了更簡單的接口洋访。不管怎樣,在 Spark 中使用這些專用的聚合函數(shù)谴餐,始終要比手動將數(shù)據(jù)分組再歸約快很多姻政。

并行度調(diào)優(yōu)

到目前為止,我們已經(jīng)討論了所有的轉(zhuǎn)化操作的分發(fā)方式岂嗓,但是還沒有探討 Spark 是怎樣 確定如何分割工作的汁展。每個 RDD 都有固定數(shù)目的分區(qū),分區(qū)數(shù)決定了在 RDD 上執(zhí)行操作 時的并行度。

在執(zhí)行聚合或分組操作時食绿,可以要求 Spark 使用給定的分區(qū)數(shù)侈咕。Spark 始終嘗試根據(jù)集群的大小推斷出一個有意義的默認值,但是有時候你可能要對并行度進行調(diào)優(yōu)來獲取更好的性能表現(xiàn)器紧。

本章討論的大多數(shù)操作符都能接收第二個參數(shù)耀销,這個參數(shù)用來指定分組結(jié)果或聚合結(jié)果的 RDD 的分區(qū)數(shù),如例 4-15 和例 4-16 所示铲汪。

例 4-15:在 Python 中自定義 reduceByKey() 的并行度
data = [("a", 3), ("b", 4), ("a", 1)] sc.parallelize(data).reduceByKey(lambda x, y: x + y) # 默認并行度 sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # 自定義并行度
例 4-16:在 Scala 中自定義 reduceByKey() 的并行度
val data = Seq(("a", 3), ("b", 4), ("a", 1)) sc.parallelize(data).reduceByKey((x, y) => x + y) // 默認并行度 sc.parallelize(data).reduceByKey((x, y) => x + y, 10) // 自定義并行度

有時熊尉,我們希望在除分組操作和聚合操作之外的操作中也能改變 RDD 的分區(qū)。對于 這樣的情況掌腰,Spark 提供了 repartition() 函數(shù)帽揪。它會把數(shù)據(jù)通過網(wǎng)絡進行混洗,并創(chuàng) 建出新的分區(qū)集合辅斟。切記转晰,對數(shù)據(jù)進行重新分區(qū)是代價相對比較大的操作。Spark 中也 有一個優(yōu)化版的 repartition()士飒,叫作 coalesce()查邢。你可以使用 Java 或 Scala 中的 rdd. partitions.size() 以及 Python 中的 rdd.getNumPartitions 查看 RDD 的分區(qū)數(shù),并確保調(diào) 用 coalesce() 時將 RDD 合并到比現(xiàn)在的分區(qū)數(shù)更少的分區(qū)中酵幕。

4.3.2 數(shù)據(jù)分組

對于有鍵的數(shù)據(jù)扰藕,一個常見的用例是將數(shù)據(jù)根據(jù)鍵進行分組——比如查看一個顧客的所有訂單。

如果數(shù)據(jù)已經(jīng)以預期的方式提取了鍵芳撒,groupByKey() 就會使用 RDD 中的鍵來對數(shù)據(jù)進行 分組邓深。對于一個由類型 K 的鍵和類型 V 的值組成的 RDD,所得到的結(jié)果 RDD 類型會是 [K, Iterable[V]]笔刹。

groupBy() 可以用于未成對的數(shù)據(jù)上芥备,也可以根據(jù)除鍵相同以外的條件進行分組。它可以 接收一個函數(shù)舌菜,對源 RDD 中的每個元素使用該函數(shù)萌壳,將返回結(jié)果作為鍵再進行分組。

4.3.3 連接

將有鍵的數(shù)據(jù)與另一組有鍵的數(shù)據(jù)一起使用是對鍵值對數(shù)據(jù)執(zhí)行的最有用的操作之一日月。連 接數(shù)據(jù)可能是 pair RDD 最常用的操作之一袱瓮。連接方式多種多樣:右外連接、左外連接爱咬、交 叉連接以及內(nèi)連接尺借。

普通的 join 操作符表示內(nèi)連接 2。只有在兩個 pair RDD 中都存在的鍵才叫輸出精拟。當一個輸 入對應的某個鍵有多個值時燎斩,生成的 pair RDD 會包括來自兩個輸入 RDD 的每一組相對應 的記錄虱歪。例 4-17 可以幫你理解這個定義。

例 4-17:在 Scala shell 中進行內(nèi)連接
     storeAddress = {
       (Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
       (Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")}
     storeRating = {
       (Store("Ritual"), 4.9), (Store("Philz"), 4.8))}
     storeAddress.join(storeRating) == {
       (Store("Ritual"), ("1026 Valencia St", 4.9)),
       (Store("Philz"), ("748 Van Ness Ave", 4.8)),
       (Store("Philz"), ("3101 24th St", 4.8))}
4.3.4 數(shù)據(jù)排序

很多時候瘫里,讓數(shù)據(jù)排好序是很有用的实蔽,尤其是在生成下游輸出時荡碾。如果鍵有已定義的順 序谨读,就可以對這種鍵值對 RDD 進行排序。當把數(shù)據(jù)排好序后坛吁,后續(xù)對數(shù)據(jù)進行 collect() 或 save() 等操作都會得到有序的數(shù)據(jù)劳殖。

我們經(jīng)常要將 RDD 倒序排列,因此 sortByKey() 函數(shù)接收一個叫作 ascending 的參數(shù)拨脉,表 示我們是否想要讓結(jié)果按升序排序(默認值為 true)哆姻。有時我們也可能想按完全不同的排 序依據(jù)進行排序。要支持這種情況玫膀,我們可以提供自定義的比較函數(shù)矛缨。

例 4-19:在 Python 中以字符串順序?qū)φ麛?shù)進行自定義排序
     rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))
例 4-20:在 Scala 中以字符串順序?qū)φ麛?shù)進行自定義排序
     val input: RDD[(Int, Venue)] = ...
     implicit val sortIntegersByString = new Ordering[Int] {
       override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
     }
     rdd.sortByKey()
例 4-21:在 Java 中以字符串順序?qū)φ麛?shù)進行自定義排序
     class IntegerComparator implements Comparator<Integer> {
        public int compare(Integer a, Integer b) {
          return String.valueOf(a).compareTo(String.valueOf(b))
       }
     }
     rdd.sortByKey(comp)

4.4 Pair RDD的行動操作

和轉(zhuǎn)化操作一樣,所有基礎(chǔ) RDD 支持的傳統(tǒng)行動操作也都在 pair RDD 上可用帖旨。Pair RDD 提供了一些額外的行動操作箕昭,可以讓我們充分利用數(shù)據(jù)的鍵值對特性。這些操作列在了表 4-3 中解阅。

image.png

4.5 數(shù)據(jù)分區(qū)

本章要討論的最后一個 Spark 特性是對數(shù)據(jù)集在節(jié)點間的分區(qū)進行控制落竹。在分布式程序中, 通信的代價是很大的货抄,因此控制數(shù)據(jù)分布以獲得最少的網(wǎng)絡傳輸可以極大地提升整體性 能述召。和單節(jié)點的程序需要為記錄集合選擇合適的數(shù)據(jù)結(jié)構(gòu)一樣,Spark 程序可以通過控制 RDD 分區(qū)方式來減少通信開銷蟹地。分區(qū)并不是對所有應用都有好處的——比如积暖,如果給定 RDD 只需要被掃描一次,我們完全沒有必要對其預先進行分區(qū)處理怪与。只有當數(shù)據(jù)集多次在 諸如連接這種基于鍵的操作中使用時呀酸,分區(qū)才會有幫助。我們會給出一些小例子來說明這點琼梆。

Spark 中所有的鍵值對 RDD 都可以進行分區(qū)性誉。系統(tǒng)會根據(jù)一個針對鍵的函數(shù)對元素進行分 組。盡管 Spark 沒有給出顯示控制每個鍵具體落在哪一個工作節(jié)點上的方法(部分原因是 Spark 即使在某些節(jié)點失敗時依然可以工作)茎杂,但 Spark 可以確保同一組的鍵出現(xiàn)在同一個 節(jié)點上错览。比如,你可能使用哈希分區(qū)將一個 RDD 分成了 100 個分區(qū)煌往,此時鍵的哈希值對 100 取模的結(jié)果相同的記錄會被放在一個節(jié)點上倾哺。你也可以使用范圍分區(qū)法轧邪,將鍵在同一 個范圍區(qū)間內(nèi)的記錄都放在同一個節(jié)點上。

舉個簡單的例子羞海,我們分析這樣一個應用忌愚,它在內(nèi)存中保存著一張很大的用戶信息表—— 也就是一個由 (UserID, UserInfo) 對組成的 RDD,其中 UserInfo 包含一個該用戶所訂閱 的主題的列表却邓。該應用會周期性地將這張表與一個小文件進行組合硕糊,這個小文件中存著過 去五分鐘內(nèi)發(fā)生的事件——其實就是一個由 (UserID, LinkInfo) 對組成的表,存放著過去 五分鐘內(nèi)某網(wǎng)站各用戶的訪問情況腊徙。例如简十,我們可能需要對用戶訪問其未訂閱主題的頁面 的情況進行統(tǒng)計。我們可以使用 Spark 的 join() 操作來實現(xiàn)這個組合操作撬腾,其中需要把 UserInfo 和 LinkInfo 的有序?qū)Ω鶕?jù) UserID 進行分組螟蝙。我們的應用如例 4-22 所示。

例 4-22:簡單的 Scala 應用
// 初始化代碼;從HDFS商的一個Hadoop SequenceFile中讀取用戶信息
// userData中的元素會根據(jù)它們被讀取時的來源民傻,即HDFS塊所在的節(jié)點來分布
// Spark此時無法獲知某個特定的UserID對應的記錄位于哪個節(jié)點上
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// 周期性調(diào)用函數(shù)來處理過去五分鐘產(chǎn)生的事件日志
// 假設這是一個包含(UserID, LinkInfo)對的SequenceFile def processNewLogs(logFileName: String) {
       val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
       val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
       val offTopicVisits = joined.filter {
         case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
           !userInfo.topics.contains(linkInfo.topic)
}.count()
       println("Number of visits to non-subscribed topics: " + offTopicVisits)
     }

這段代碼可以正確運行胰默,但是不夠高效。這是因為在每次調(diào)用 processNewLogs() 時都會用 到 join() 操作漓踢,而我們對數(shù)據(jù)集是如何分區(qū)的卻一無所知牵署。默認情況下,連接操作會將兩 個數(shù)據(jù)集中的所有鍵的哈希值都求出來彭雾,將該哈希值相同的記錄通過網(wǎng)絡傳到同一臺機器 上碟刺,然后在那臺機器上對所有鍵相同的記錄進行連接操作(見圖 4-4)。因為 userData 表比 每五分鐘出現(xiàn)的訪問日志表 events 要大得多薯酝,所以要浪費時間做很多額外工作:在每次調(diào) 用時都對 userData 表進行哈希值計算和跨節(jié)點數(shù)據(jù)混洗半沽,雖然這些數(shù)據(jù)從來都不會變化。

image.png

要解決這一問題也很簡單:在程序開始時吴菠,對 userData 表使用 partitionBy() 轉(zhuǎn)化操作者填, 將這張表轉(zhuǎn)為哈希分區(qū)∽隹可以通過向 partitionBy 傳遞一個 spark.HashPartitioner 對象來 實現(xiàn)該操作占哟,如例 4-23 所示。

例 4-23:Scala 自定義分區(qū)方式
     val sc = new SparkContext(...)
     val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // 構(gòu)造100個分區(qū) .persist()

processNewLogs() 方 法 可 以 保 持 不 變: 在 processNewLogs() 中酿矢,eventsRDD 是 本 地 變 量榨乎,只在該方法中使用了一次,所以為 events 指定分區(qū)方式?jīng)]有什么用處瘫筐。由于在構(gòu) 建 userData 時調(diào)用了 partitionBy()蜜暑,Spark 就知道了該 RDD 是根據(jù)鍵的哈希值來分 區(qū)的,這樣在調(diào)用 join() 時策肝,Spark 就會利用到這一點肛捍。具體來說隐绵,當調(diào)用 userData. join(events) 時,Spark 只會對 events 進行數(shù)據(jù)混洗操作拙毫,將 events 中特定 UserID 的記 錄發(fā)送到 userData 的對應分區(qū)所在的那臺機器上(見圖 4-5)依许。這樣,需要通過網(wǎng)絡傳輸?shù)?數(shù)據(jù)就大大減少了缀蹄,程序運行速度也可以顯著提升了峭跳。

注意,partitionBy() 是一個轉(zhuǎn)化操作袍患,因此它的返回值總是一個新的 RDD坦康,但它不會改變 原來的 RDD竣付。RDD 一旦創(chuàng)建就無法修改诡延。因此應該對 partitionBy() 的結(jié)果進行持久化, 并保存為 userData古胆,而不是原來的 sequenceFile() 的輸出肆良。此外,傳給 partitionBy() 的 100 表示分區(qū)數(shù)目逸绎,它會控制之后對這個 RDD 進行進一步操作(比如連接操作)時有多少 任務會并行執(zhí)行惹恃。總的來說棺牧,這個值至少應該和集群中的總核心數(shù)一樣巫糙。

image.png

如果沒有將 partitionBy() 轉(zhuǎn)化操作的結(jié)果持久化,那么后面每次用到這個 RDD 時都會重復地對數(shù)據(jù)進行分區(qū)操作颊乘。不進行持久化會導致整個 RDD 譜 系圖重新求值参淹。那樣的話,partitionBy() 帶來的好處就會被抵消乏悄,導致重 復對數(shù)據(jù)進行分區(qū)以及跨節(jié)點的混洗浙值,和沒有指定分區(qū)方式時發(fā)生的情況十 分相似。

事實上檩小,許多其他 Spark 操作會自動為結(jié)果 RDD 設定已知的分區(qū)方式信息开呐,而且除 join() 外還有很多操作也會利用到已有的分區(qū)信息。比如规求,sortByKey() 和 groupByKey() 會分別生成范圍分區(qū)的 RDD 和哈希分區(qū)的 RDD筐付。而另一方面,諸如 map() 這樣的操作會 導致新的 RDD 失去父 RDD 的分區(qū)信息阻肿,因為這樣的操作理論上可能會修改每條記錄的 鍵瓦戚。接下來的幾節(jié)中,我們會討論如何獲取 RDD 的分區(qū)信息冕茅,以及數(shù)據(jù)分區(qū)是如何影響 各種 Spark 操作的伤极。

4.5.1 獲取RDD的分區(qū)方式

在 Scala 和 Java 中蛹找,你可以使用 RDD 的 partitioner 屬性(Java 中使用 partitioner() 方法)來獲取 RDD 的分區(qū)方式。 它會返回一個 scala.Option 對象哨坪,這是 Scala 中用來存放 可能存在的對象的容器類庸疾。你可以對這個 Option 對象調(diào)用 isDefined() 來檢查其中是否有 值,調(diào)用 get() 來獲取其中的值当编。如果存在值的話届慈,這個值會是一個 spark.Partitioner 對象。這本質(zhì)上是一個告訴我們 RDD 中各個鍵分別屬于哪個分區(qū)的函數(shù)忿偷。

在 Spark shell 中使用 partitioner 屬性不僅是檢驗各種 Spark 操作如何影響分區(qū)方式的一種 好辦法金顿,還可以用來在你的程序中檢查想要使用的操作是否會生成正確的結(jié)果(見例 4-24)。

例 4-24:獲取 RDD 的分區(qū)方式
     scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
     pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at
     <console>:12
     scala> pairs.partitioner
     res0: Option[spark.Partitioner] = None
     scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
     partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
     scala> partitioned.partitioner
     res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)

在這段簡短的代碼中鲤桥,我們創(chuàng)建出了一個由 (Int, Int) 對組成的 RDD揍拆,初始時沒有分 區(qū)方式信息(一個值為 None 的 Option 對象)。然后通過對第一個 RDD 進行哈希分區(qū)茶凳, 創(chuàng)建出了第二個 RDD嫂拴。如果確實要在后續(xù)操作中使用 partitioned,那就應當在定義 partitioned 時贮喧,在第三行輸入的最后加上 persist()筒狠。這和之前的例子中需要對 userData 調(diào)用 persist() 的原因是一樣的:如果不調(diào)用 persist() 的話,后續(xù)的 RDD 操作會對 partitioned 的整個譜系重新求值箱沦,這會導致對 pairs 一遍又一遍地進行哈希分區(qū)操作辩恼。

4.5.2 從分區(qū)中獲益的操作

Spark 的許多操作都引入了將數(shù)據(jù)根據(jù)鍵跨節(jié)點進行混洗的過程。所有這些操作都會 從數(shù)據(jù)分區(qū)中獲益谓形。就 Spark 1.0 而言灶伊,能夠從數(shù)據(jù)分區(qū)中獲益的操作有 cogroup()、 groupWith()套耕、join()啊奄、leftOuterJoin()桩蓉、rightOuterJoin()元暴、groupByKey()苍鲜、reduceByKey()、 combineByKey() 以及 lookup()康愤。

對于像 reduceByKey() 這樣只作用于單個 RDD 的操作儡循,運行在未分區(qū)的 RDD 上的時候會 導致每個鍵的所有對應值都在每臺機器上進行本地計算,只需要把本地最終歸約出的結(jié) 果值從各工作節(jié)點傳回主節(jié)點征冷,所以原本的網(wǎng)絡開銷就不算大择膝。而對于諸如 cogroup() 和 join() 這樣的二元操作,預先進行數(shù)據(jù)分區(qū)會導致其中至少一個 RDD(使用已知分區(qū)器的那個 RDD)不發(fā)生數(shù)據(jù)混洗检激。如果兩個 RDD 使用同樣的分區(qū)方式肴捉,并且它們還緩存在 同樣的機器上(比如一個 RDD 是通過 mapValues() 從另一個 RDD 中創(chuàng)建出來的腹侣,這兩個 RDD 就會擁有相同的鍵和分區(qū)方式),或者其中一個 RDD 還沒有被計算出來齿穗,那么跨節(jié) 點的數(shù)據(jù)混洗就不會發(fā)生了傲隶。

4.5.4 自定義分區(qū)方式

雖然 Spark 提供的 HashPartitioner 與 RangePartitioner 已經(jīng)能夠滿足大多數(shù)用例,但 Spark 還是允許你通過提供一個自定義的 Partitioner 對象來控制 RDD 的分區(qū)方式窃页。這可 以讓你利用領(lǐng)域知識進一步減少通信開銷跺株。

要實現(xiàn)自定義的分區(qū)器,你需要繼承org.apache.spark.Partitioner 類并實現(xiàn)下面三個方法脖卖。

  • numPartitions: Int:返回創(chuàng)建出來的分區(qū)數(shù)乒省。
  • getPartition(key: Any): Int:返回給定鍵的分區(qū)編號(0 到 numPartitions-1)。
  • equals():Java 判斷相等性的標準方法畦木。這個方法的實現(xiàn)非常重要袖扛,Spark 需要用這個方法來檢查你的分區(qū)器對象是否和其他分區(qū)器實例相同,這樣 Spark 才可以判斷兩個 RDD 的分區(qū)方式是否相同馋劈。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末攻锰,一起剝皮案震驚了整個濱河市晾嘶,隨后出現(xiàn)的幾起案子妓雾,更是在濱河造成了極大的恐慌,老刑警劉巖垒迂,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件械姻,死亡現(xiàn)場離奇詭異,居然都是意外死亡机断,警方通過查閱死者的電腦和手機楷拳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吏奸,“玉大人欢揖,你說我怎么就攤上這事》芪担” “怎么了她混?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵泊碑,是天一觀的道長。 經(jīng)常有香客問我臭脓,道長,這世上最難降的妖魔是什么腹忽? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任来累,我火速辦了婚禮,結(jié)果婚禮上嘹锁,老公的妹妹穿的比我還像新娘。我一直安慰自己兼耀,他們只是感情好压昼,可當我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著瘤运,像睡著了一般。 火紅的嫁衣襯著肌膚如雪但金。 梳的紋絲不亂的頭發(fā)上郁季,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機與錄音梦裂,去河邊找鬼。 笑死凿歼,一個胖子當著我的面吹牛冗恨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播掀抹,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼傲武,長吁一口氣:“原來是場噩夢啊……” “哼蓉驹!你這毒婦竟也來了谱轨?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤诗茎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后王污,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡楚午,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年矾柜,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片怪蔑。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡缆瓣,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出弓坞,到底是詐尸還是另有隱情,我是刑警寧澤戚扳,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布菩帝,位于F島的核電站,受9級特大地震影響呼奢,放射性物質(zhì)發(fā)生泄漏切平。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一禀综、第九天 我趴在偏房一處隱蔽的房頂上張望苔严。 院中可真熱鬧,春花似錦欠窒、人聲如沸退子。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至腕铸,卻和暖如春铛碑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背亚茬。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工刹缝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留碗暗,地道東北人梢夯。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓颂砸,卻偏偏與公主長得像,于是被迫代替她去往敵國和親人乓。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,884評論 2 354