repartitionAndSortWithinPartitions算是一個高效的算子挚歧,是因為它要比使用repartition And sortByKey 效率高印机,這是由于它的排序是在shuffle過程中進行,一邊shuffle,一邊排序;具體見spark shuffle的讀操作腹备;
關于為什么比repartition And sortByKey 效率高,首先簡要分析repartition 和sortbykey'的流程:
(1)rePartition
(2)sortByKey
repartitionAndSortWithinPartitions的使用
(1)使用repartitionAndSortWithinPartitions時斤蔓,需要自己傳入一個分區(qū)器參數植酥,這個分區(qū)器 可以是系統(tǒng)提供的,也可以是自定義的:例如以下Demo中使用的KeyBasePartitioner弦牡,同時需要自定義一個排序的隱式變量友驮,當我們使用repartitionAndSortWithinPartitions時,我們自定義的my_self_Ordering 排序規(guī)則就會傳入到def implicitly[T](implicit e: T) = e
(2)二次排序
排序規(guī)則都需要在自定義的隱式變量my_self_Ordering中實現(xiàn)
private val ordering = implicitly[Ordering[K]]
//這里是使用了上下文界定驾锰,這個T就是Ordering[K]
def implicitly[T](implicit e: T) = e
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
Demo案例
val sparkConf = new SparkConf().setAppName("test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val wordsRDD: RDD[String] = sc.textFile("D:\\Spark_數據\\numbers_data.txt")
val resultRDD = wordsRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(each => (each._2, each._1))
/**
* key怎么排序卸留,在這里定義
* 為什么在這里聲明一個隱式變量呢,是因為在源碼中椭豫,方法中有一個隱式參數耻瑟;不設置是按照默認的排序規(guī)則進行排序;
*/
implicit val my_self_Ordering = new Ordering[String] {
override def compare(a: String, b: String): Int = {
val a_b: Array[String] = a.split("_")
val a_1 = a_b(0).toInt
val a_2 = a_b(1).toInt
val b_b = b.split("_")
val b_1 = b_b(0).toInt
val b_2 = b_b(1).toInt
if (a_1 == b_1) {
a_2 - b_2
} else {
a_1 - b_1
}
}
}
val rdd = resultRDD.map(x => (x._1 + "_" + x._2, x._2)).repartitionAndSortWithinPartitions(new KeyBasePartitioner(2))
/**
* 自定義分區(qū)器
*
* @param partitions
*/
class KeyBasePartitioner(partitions: Int) extends Partitioner {
//分區(qū)數
override def numPartitions: Int = partitions
//該方法決定了你的數據被分到那個分區(qū)里面
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[String]
Math.abs(k.hashCode() % numPartitions)
}
}