Spark是如何實(shí)現(xiàn)排序的?

Abstract

昨天丟人現(xiàn)眼的寫QuickSort用了40分鐘, 當(dāng)時(shí)感覺整個(gè)人都不好了.
(╯°□°)╯︵┻━┻ 看孩子一天睡4小時(shí)大腦不轉(zhuǎn)哇 d(?`ω′?d*)

External Sort的標(biāo)準(zhǔn)做法是一個(gè)QuickSort后邊跟一個(gè)n-way MergeSort, 理論上的復(fù)雜度也是nlogn.

但是由于存在文件IO, 所以實(shí)際速度要慢于帶內(nèi)排序很多.

在分布式環(huán)境里, 這個(gè)問題進(jìn)一步復(fù)雜化, 每臺(tái)機(jī)器持有的是數(shù)據(jù)的一部分, 如果需要執(zhí)行經(jīng)典的外排序, 則需要不斷的把所有節(jié)點(diǎn)的數(shù)據(jù)向一個(gè)中心節(jié)點(diǎn)進(jìn)行shuffle. 磁盤IO進(jìn)一步衰退為網(wǎng)絡(luò)IO.

更進(jìn)一步分析這個(gè)問題, 可以在一開始處理數(shù)據(jù)的時(shí)候, 把數(shù)據(jù)分為多份. RANGE(0, 1e3)的第一臺(tái)機(jī)器, RANGE(1e3, 2e3)第二臺(tái).....

通過(guò)HASH的方法, 讓每臺(tái)機(jī)器天然有序, 繼而每臺(tái)機(jī)器內(nèi)部跑外排.

這樣就需要對(duì)數(shù)據(jù)的分布有一定的了解, 通過(guò)抽樣來(lái)理解數(shù)據(jù)的整體排布方式, 然后決定每臺(tái)機(jī)器處理的數(shù)據(jù)范圍是一個(gè)大的思路.

在下面這個(gè)網(wǎng)站, 可以找到排序算法的效率排行榜. 你會(huì)發(fā)現(xiàn)BAT三家都在打這個(gè)榜...

http://sortbenchmark.org/

TeraSort 原理

TeraSort流程圖

TeraSort的核心在于第一步的map(), 這一步 任何一臺(tái)機(jī)器上的Partition i 里的所有對(duì)象一定小于 任何一臺(tái)機(jī)器上的Partition i+1,也就是保證了Parition之間的有序性. 繼而在reduce階段, 可以保證shuffle后每個(gè)任務(wù)收集到的數(shù)據(jù)的有序性.

這里可以非常直觀的看到兩個(gè)難點(diǎn)

  1. 如何確定每個(gè)Partition的范圍, 它負(fù)責(zé)的Range(X, Y)里的X和Y是多少
  2. 如何快速的把一個(gè)值映射到它對(duì)應(yīng)的Partition里, 這里需要考慮待排序的是任何實(shí)現(xiàn)了Comparable接口的對(duì)象. 不一定是個(gè)數(shù).

抽象的解決思路是

  1. 對(duì)數(shù)據(jù)進(jìn)行抽樣, 根據(jù)抽樣結(jié)果來(lái)構(gòu)筑每個(gè)Partition應(yīng)該承載什么范圍內(nèi)的數(shù)據(jù)
  2. 通過(guò)Trie Tree來(lái)構(gòu)筑索引, 當(dāng)一個(gè)String或者Long或者任何能夠被轉(zhuǎn)義成Char Sequence的對(duì)象進(jìn)來(lái)后, 利用Trie來(lái)找到它對(duì)應(yīng)的那個(gè)Partition.實(shí)現(xiàn)中, 對(duì)字典樹有微弱高的改造, 類似下圖中daz會(huì)被分到Parition3, 在最后一層中z > b
    image.png

Spark源碼

執(zhí)行結(jié)構(gòu)

2.RDD, 執(zhí)行入口

/spark/core/src/main/scala/org/apache/spark/rdd/RDD.scala

  /**
   * 按照輸入的key function, 對(duì)這個(gè)RDD進(jìn)行排序
   */
  def sortBy[K](
      // f 執(zhí)行在key上的 funtion, 返回K型對(duì)象, 這里K需要時(shí)可以compare的
      f: (T) => K,

      // 默認(rèn)是正序
      ascending: Boolean = true,

      // 維持當(dāng)前的partition數(shù)量, 這個(gè)對(duì)抽樣后到底怎么分區(qū)有影響
      numPartitions: Int = this.partitions.length)

      // 可以看到這里對(duì)K的類型進(jìn)行了隱式轉(zhuǎn)換
      // 保證它是scala.math.Ordering接口兼容的, 以便能夠排序
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)

         // 對(duì)所有的Key進(jìn)行處理后, 就可以運(yùn)行排序了, 排序方法在下面
        .sortByKey(ascending, numPartitions)
        .values
  }

3. OrderedRDDFunctions 調(diào)用方法

/spark/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

/**
 * 輸入的KEY是能夠支持Scala Math排序的
 * 對(duì)于沒有實(shí)現(xiàn)對(duì)應(yīng)接口的.用戶可以自己實(shí)現(xiàn), 或者對(duì)已有的對(duì)象覆蓋自己的邏輯
 */
class OrderedRDDFunctions[K : Ordering : ClassTag,
                          V: ClassTag,
                          P <: Product2[K, V] : ClassTag] @DeveloperApi() (
    self: RDD[P])
  extends Logging with Serializable {
  private val ordering = implicitly[Ordering[K]]

  /**
   * 實(shí)現(xiàn)了對(duì)每個(gè)partition執(zhí)行sort, 由于partition相互之間是有序的
   * 調(diào)用`collect`或者`save`可以獲得全局有序的對(duì)象.
   */
  // TODO: this currently doesn't work on P other than Tuple2!
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {
    // 初始化一個(gè)RangePartitioner對(duì)象, 這個(gè)對(duì)象負(fù)責(zé)管理告訴RDD應(yīng)該如何分配數(shù)據(jù)
    // 以及每個(gè)Range應(yīng)該是多少
    val part = new RangePartitioner(numPartitions, self, ascending)
    // 對(duì)數(shù)據(jù)進(jìn)行分片, 然后每片內(nèi)部再進(jìn)行排序
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }

  /**
   * 使用傳入的Partition分區(qū)方法來(lái)切割數(shù)據(jù), 然后每個(gè)Partition內(nèi)部再排序
   * 這個(gè)方法在特定條件下可以用customer的方法來(lái)提升TeraSort的性能
   * 相關(guān)論文很多, 核心思想主要是提升locality, 或者針對(duì)已經(jīng)部分有效的數(shù)據(jù),直接增加分配的有效性.
   */
  def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = ...
  }

  /**
   *  由于RangePartition中有每個(gè)partition的最大值和最小值, 也就是Range的信息
   *  所以給定一個(gè)lower和一個(gè)uppder值, 可以快速的用getPartition方法定位到最小和最大的PartitionIndex是多少. 從而實(shí)現(xiàn)快速的過(guò)濾.
   */
  def filterByRange(lower: K, upper: K): RDD[P]  = ...
}

4. Partitioner 實(shí)現(xiàn)有序分片

Partitioner在實(shí)現(xiàn)中有 HashPartitioner RangePartitioner, 后者直接對(duì)應(yīng)需要內(nèi)部有序的各種情況.

/spark/core/src/main/scala/org/apache/spark/Partitioner.scala

/**
 * 通過(guò)抽樣, 把對(duì)象映射到RANGE范圍大致相同的分片里. 
 * 分片多少和輸入的分片數(shù), 以及采樣數(shù)有關(guān)
 */
class RangePartitioner[K : Ordering : ClassTag, V](
    // 期望的分片
    partitions: Int,

    // 這里對(duì)RDD進(jìn)行了約束
    rdd: RDD[_ <: Product2[K, V]],

    // 默認(rèn)正序
    private var ascending: Boolean = true,
    
    // 默認(rèn)采樣20
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {

  // 構(gòu)造函數(shù) 
  def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = ...

  // 條件檢查
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
  require(samplePointsPerPartitionHint > 0,
    s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")

  private var ordering = implicitly[Ordering[K]]

  // 計(jì)算每個(gè)Partition應(yīng)該存儲(chǔ)的Range
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // 確定最大取樣數(shù), 封頂1M
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)

      // 假設(shè)Dependecy RDD中各個(gè)partition里的items數(shù)量是大致相同的, 采用常規(guī)的采樣
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // 如果分片數(shù)據(jù)傾斜的太嚴(yán)重, 就需要對(duì)這個(gè)分片做重新采樣
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }

  def numPartitions: Int = rangeBounds.length + 1

  // 利用二分查找用來(lái)快速的定位分片
  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  def getPartition(key: Any): Int = 
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // 直接順序查找
    } else {
      // 利用二分查找尋找partition, 在實(shí)現(xiàn)中需要考慮幾個(gè)細(xì)節(jié):  小于第一個(gè)分片的range, 大于最后一個(gè)分片的range, 以及倒序排列.
  }
  
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末砌创,一起剝皮案震驚了整個(gè)濱河市兔沃,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捐迫,老刑警劉巖磺樱,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件但狭,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡传于,警方通過(guò)查閱死者的電腦和手機(jī)反浓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門墓塌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)牲迫,“玉大人豌汇,你說(shuō)我怎么就攤上這事验游〕涞海” “怎么了保檐?”我有些...
    開封第一講書人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)崔梗。 經(jīng)常有香客問我夜只,道長(zhǎng),這世上最難降的妖魔是什么蒜魄? 我笑而不...
    開封第一講書人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任扔亥,我火速辦了婚禮,結(jié)果婚禮上谈为,老公的妹妹穿的比我還像新娘旅挤。我一直安慰自己,他們只是感情好伞鲫,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開白布粘茄。 她就那樣靜靜地躺著,像睡著了一般榔昔。 火紅的嫁衣襯著肌膚如雪驹闰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,708評(píng)論 1 305
  • 那天撒会,我揣著相機(jī)與錄音嘹朗,去河邊找鬼。 笑死诵肛,一個(gè)胖子當(dāng)著我的面吹牛屹培,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播怔檩,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼褪秀,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了薛训?” 一聲冷哼從身側(cè)響起媒吗,我...
    開封第一講書人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎乙埃,沒想到半個(gè)月后闸英,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡介袜,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年甫何,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片遇伞。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡辙喂,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情巍耗,我是刑警寧澤秋麸,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站芍锦,受9級(jí)特大地震影響竹勉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜娄琉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一次乓、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧孽水,春花似錦票腰、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至炼鞠,卻和暖如春缘滥,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谒主。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工朝扼, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人霎肯。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓擎颖,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親观游。 傳聞我的和親對(duì)象是個(gè)殘疾皇子搂捧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容