spark算子1:repartitionAndSortWithinPartitions

repartitionAndSortWithinPartitions算是一個高效的算子挚歧,是因為它要比使用repartition And sortByKey 效率高印机,這是由于它的排序是在shuffle過程中進行,一邊shuffle,一邊排序;具體見spark shuffle的讀操作腹备;
關于為什么比repartition And sortByKey 效率高,首先簡要分析repartition 和sortbykey'的流程:

(1)rePartition
(2)sortByKey

repartitionAndSortWithinPartitions的使用

(1)使用repartitionAndSortWithinPartitions時斤蔓,需要自己傳入一個分區(qū)器參數植酥,這個分區(qū)器 可以是系統(tǒng)提供的,也可以是自定義的:例如以下Demo中使用的KeyBasePartitioner弦牡,同時需要自定義一個排序的隱式變量友驮,當我們使用repartitionAndSortWithinPartitions時,我們自定義的my_self_Ordering 排序規(guī)則就會傳入到def implicitly[T](implicit e: T) = e
(2)二次排序
排序規(guī)則都需要在自定義的隱式變量my_self_Ordering中實現(xiàn)

private val ordering = implicitly[Ordering[K]]
//這里是使用了上下文界定驾锰,這個T就是Ordering[K]
def implicitly[T](implicit e: T) = e 
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
  }
Demo案例
val sparkConf = new SparkConf().setAppName("test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val wordsRDD: RDD[String] = sc.textFile("D:\\Spark_數據\\numbers_data.txt")
    val resultRDD = wordsRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(each => (each._2, each._1))
/**
      * key怎么排序卸留,在這里定義
      * 為什么在這里聲明一個隱式變量呢,是因為在源碼中椭豫,方法中有一個隱式參數耻瑟;不設置是按照默認的排序規(guī)則進行排序;
      */
    implicit val my_self_Ordering = new Ordering[String] {
      override def compare(a: String, b: String): Int = {
        val a_b: Array[String] = a.split("_")
        val a_1 = a_b(0).toInt
        val a_2 = a_b(1).toInt
        val b_b = b.split("_")
        val b_1 = b_b(0).toInt
        val b_2 = b_b(1).toInt
        if (a_1 == b_1) {
          a_2 - b_2
        } else {
          a_1 - b_1
        }
      }
    }

val rdd = resultRDD.map(x => (x._1 + "_" + x._2, x._2)).repartitionAndSortWithinPartitions(new KeyBasePartitioner(2))
/**
    * 自定義分區(qū)器
    *
    * @param partitions
    */
  class KeyBasePartitioner(partitions: Int) extends Partitioner {
     //分區(qū)數
    override def numPartitions: Int = partitions
    //該方法決定了你的數據被分到那個分區(qū)里面
    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[String]
      Math.abs(k.hashCode() % numPartitions)
    }
  }

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末赏酥,一起剝皮案震驚了整個濱河市喳整,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌裸扶,老刑警劉巖框都,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異呵晨,居然都是意外死亡魏保,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進店門摸屠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來获询,“玉大人您访,你說我怎么就攤上這事些阅【軉” “怎么了?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵戒傻,是天一觀的道長税手。 經常有香客問我蜂筹,道長需纳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任艺挪,我火速辦了婚禮不翩,結果婚禮上兵扬,老公的妹妹穿的比我還像新娘。我一直安慰自己口蝠,他們只是感情好器钟,可當我...
    茶點故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著妙蔗,像睡著了一般傲霸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上眉反,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天昙啄,我揣著相機與錄音,去河邊找鬼寸五。 笑死梳凛,一個胖子當著我的面吹牛,可吹牛的內容都是我干的梳杏。 我是一名探鬼主播韧拒,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼十性!你這毒婦竟也來了叛溢?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤劲适,失蹤者是張志新(化名)和其女友劉穎雇初,沒想到半個月后,有當地人在樹林里發(fā)現(xiàn)了一具尸體减响,經...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡靖诗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了支示。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片刊橘。...
    茶點故事閱讀 40,427評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖颂鸿,靈堂內的尸體忽然破棺而出促绵,到底是詐尸還是另有隱情,我是刑警寧澤嘴纺,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布败晴,位于F島的核電站,受9級特大地震影響栽渴,放射性物質發(fā)生泄漏尖坤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一闲擦、第九天 我趴在偏房一處隱蔽的房頂上張望慢味。 院中可真熱鬧场梆,春花似錦、人聲如沸纯路。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽驰唬。三九已至顶岸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間叫编,已是汗流浹背蜕琴。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留宵溅,地道東北人凌简。 一個月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像恃逻,于是被迫代替她去往敵國和親雏搂。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,440評論 2 359

推薦閱讀更多精彩內容