最近尔崔,有位朋友問我,distinct去重原理是怎么實現(xiàn)的褥民?
“在面試時,面試官問他了解distinct算子嗎洗搂?”
“了解啊消返,Spark的rdd,一種transFormation去重的算子耘拇,主要用來去重的”撵颊。
“喲,看來你經(jīng)常使用distinct算子惫叛,對distinct算子很熟悉啊”倡勇。
“好說,好說”嘉涌。
“那你能說說distinct是如何實現(xiàn)去重的嗎妻熊?”
我朋友支支吾吾半天:“就是這樣夸浅、那樣去重的啊”。
“這樣扔役、那樣是怎么去重的呢”
“具體有點忘記了(其實是根本就不知道)”帆喇。
那么distinct,底層到底是如何實現(xiàn)去重功能的呢亿胸?這個是面試spark部分時坯钦,經(jīng)常被問到的問題。
先來看一段代碼侈玄,我們測試一下distinct去重的作用:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkDistinct {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkDistinct")
val sc: SparkContext = new SparkContext(conf) //定義一個數(shù)組
val array: Array[Int] = Array(1,1,1,2,2,3,3,4) //把數(shù)組轉為RDD算子,后面的數(shù)字2代表分區(qū)婉刀,也可以指定3,4....個分區(qū)序仙,也可以不指定路星。
val line: RDD[Int] = sc.parallelize(array,2)
line.distinct().foreach(x => println(x)) //輸出的結果已經(jīng)去重:1,2诱桂,3洋丐,4
}
}
通過上面的代碼可以看出,使用distinct以后挥等,會對重復的元素進行去重友绝。我們來看下源碼
/** * Return a new RDD containing the distinct elements in this RDD. */
def distinct(numPartitions: Int(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
/** * Return a new RDD containing the distinct elements in this RDD. */
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
上面是distinct的源碼,有帶參和無參兩種肝劲。當我們調(diào)用無參的distinct時迁客,底層調(diào)用的是如下源碼:
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
而無參distinct()中又調(diào)用了帶參數(shù)的distinct(partitions.length)。
其中辞槐,partitions.length代表是分區(qū)數(shù)掷漱,而這個分區(qū)則是我們在使用 sc.parallelize(array,2) 時指定的2個分區(qū)。
帶參數(shù)的distinct其內(nèi)部就很容易理解了榄檬,這就是一個wordcount統(tǒng)計單詞的方法卜范,區(qū)別是:后者通過元組獲取了第一個單詞元素。
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
其中鹿榜,numPartitions就是分區(qū)數(shù)海雪。
我們也可以寫成這樣:
map(x => (x, null)).reduceByKey((x, y) => x).map(_._1)
也可以這樣寫:
line.map(x =>(x,1)).reduceByKey(_+_).map(_._1)
通過上面的流程圖很清晰的看出來,distinct的原理流程舱殿。
使用map算子把元素轉為一個帶有null的元組奥裸;使用reducebykey對具有相同key的元素進行統(tǒng)計;之后再使用map算子沪袭,取得元組中的單詞元素湾宙,實現(xiàn)去重的效果。