spark源碼解析之partitioner

spark中stage的劃分依據(jù)action算子進行左痢,每一次action(reduceByKey等)算子都會觸發(fā)一次shuffle過程仰楚,該過程涉及到數(shù)據(jù)的重新分區(qū)。spark中的分區(qū)器包括HashPartitioner及RangePartitioner兩種敦姻。HashPartitioner根據(jù)key進行分區(qū)宛徊,當某一個key對應的數(shù)據(jù)較多時會出現(xiàn)數(shù)據(jù)傾斜的情況,又因為每一個partition對應一個task夜只,數(shù)據(jù)較多的task會耗費較多的時間垒在,影響spark任務運行的時間。此時扔亥,可以使用RangePartitioner分區(qū)器场躯,RangePartitioner基于水塘抽樣算法,可以在不知道整體數(shù)據(jù)量的情況下旅挤,等概率地取到每條數(shù)據(jù)踢关。

一、HashPartitioner

/**
 * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
 * Java's `Object.hashCode`.
 *
 * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
 * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
 * produce an unexpected or incorrect result.
 */
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}
 /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
  * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
  * so function return (x % mod) + mod in that case.
  */
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

HashPartitioner主要根據(jù)RDD的key進行分區(qū)粘茄,當key為null時签舞,對應的partitionId為0,當key不為null時柒瓣,partitionId計算過程為:先將key的hashcode值對分區(qū)個數(shù)numPartitions取余儒搭,當余數(shù)小于0時,將余數(shù)與numPartitions相加嘹朗,否則與0相加师妙。很明顯,相同key的數(shù)據(jù)一定會分到同一個分區(qū)中屹培,可能導致數(shù)據(jù)傾斜,進而影響spark運行速度怔檩。

二褪秀、RangePartitioner

HashPartitioner分區(qū)可能導致每個分區(qū)中數(shù)據(jù)量的不均勻。而RangePartitioner分區(qū)則盡量保證每個分區(qū)中數(shù)據(jù)量的均勻薛训,將一定范圍內的數(shù)映射到某一個分區(qū)內媒吗。分區(qū)與分區(qū)之間數(shù)據(jù)是有序的,但分區(qū)內的元素是不能保證順序的乙埃。

1闸英、水塘抽樣算法原理

對于一長度為n(大到無法加載到內存中)的數(shù)組N,如何等概率地從中取出k個元素介袜,組成數(shù)組R甫何?
水塘抽樣算法做法如下:首先,去數(shù)組N前k個元素放入數(shù)組R中遇伞;然后遍歷數(shù)組N中剩余元素辙喂,對于數(shù)組N中第i個元素N[i-1](i大于k),隨機生成一個數(shù)rand,若rand<k巍耗,則將N[i-1]替換掉數(shù)組R中第rand個元素秋麸,否則,保持原樣炬太【捏。可以得知,取得數(shù)組N中每一元素的概率均為k/n亲族。對于數(shù)組N中前k個元素而言炒考,由于第一次就將其取出,因此在后續(xù)迭代過程中只需保持原樣即可孽水,概率為\frac{ k}{k+1 }*\frac{ k+1}{k+2 }*...\frac{n-1}{n }=\frac{ k}{n}\票腰,對于數(shù)組N中剩余的n-k個元素,只需在遍歷到其所在的位置時替換掉現(xiàn)有元素中的一個并在后續(xù)步驟中保持原樣女气,概率為\frac{ k}{i }*\frac{i}{i+1}*...\frac{n-1}{n }=\frac{ k}{n}\杏慰。

2、RangePartitioner

// An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // Cast to double to avoid overflowing ints or longs
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }
  /**
   * Sketches the input RDD via reservoir sampling on each partition.
   *
   * @param rdd the input RDD to sketch
   * @param sampleSizePerPartition max sample size per partition
   * @return (total number of items, an array of (partitionId, number of items, sample))
   */
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }
private[spark] object SamplingUtils {

  /**
   * Reservoir sampling implementation that also returns the input size.
   *
   * @param input input size
   * @param k reservoir size
   * @param seed random seed
   * @return (samples, input size)
   */
  def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Long) = {
    val reservoir = new Array[T](k)
    // Put the first k elements in the reservoir.
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    // If we have consumed all the elements, return them. Otherwise do the replacement.
    if (i < k) {
      // If input size < k, trim the array to return only an array of input size.
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      // If input size > k, continue the sampling process.
      var l = i.toLong
      val rand = new XORShiftRandom(seed)
      while (input.hasNext) {
        val item = input.next()
        l += 1
        // There are k elements in the reservoir, and the l-th element has been
        // consumed. It should be chosen with probability k/l. The expression
        // below is a random long chosen uniformly from [0,l)
        val replacementIndex = (rand.nextDouble() * l).toLong
        if (replacementIndex < k) {
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }

RangePartitioner分區(qū)執(zhí)行原理:
1炼鞠、計算總體的數(shù)據(jù)抽樣大小sampleSize缘滥,計算規(guī)則是:至少每個分區(qū)抽取20個數(shù)據(jù)或者最多1M的數(shù)據(jù)量。
2谒主、根據(jù)sampleSize和分區(qū)數(shù)量計算每個分區(qū)的數(shù)據(jù)抽樣樣本數(shù)量最大值sampleSizePerPartition
3朝扼、根據(jù)以上兩個值進行水塘抽樣,返回RDD的總數(shù)據(jù)量霎肯,分區(qū)ID和每個分區(qū)的采樣數(shù)據(jù)擎颖。
4、計算出數(shù)據(jù)量較大的分區(qū)通過RDD.sample進行重新抽樣观游。
5查描、通過抽樣數(shù)組 candidates: ArrayBuffer[(K, wiegth)]計算出分區(qū)邊界的數(shù)組BoundsArray
6昼窗、在取數(shù)據(jù)時松靡,如果分區(qū)數(shù)小于128則直接獲取键俱,如果大于128則通過二分法,獲取當前Key屬于那個區(qū)間搪柑,返回對應的BoundsArray下標即為partitionsID

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末聋丝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子工碾,更是在濱河造成了極大的恐慌弱睦,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件倚喂,死亡現(xiàn)場離奇詭異每篷,居然都是意外死亡瓣戚,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門焦读,熙熙樓的掌柜王于貴愁眉苦臉地迎上來子库,“玉大人,你說我怎么就攤上這事矗晃÷匦幔” “怎么了?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵张症,是天一觀的道長仓技。 經(jīng)常有香客問我,道長俗他,這世上最難降的妖魔是什么脖捻? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮兆衅,結果婚禮上地沮,老公的妹妹穿的比我還像新娘。我一直安慰自己羡亩,他們只是感情好摩疑,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著畏铆,像睡著了一般雷袋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辞居,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天楷怒,我揣著相機與錄音,去河邊找鬼瓦灶。 笑死率寡,一個胖子當著我的面吹牛,可吹牛的內容都是我干的倚搬。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼乾蛤,長吁一口氣:“原來是場噩夢啊……” “哼每界!你這毒婦竟也來了?” 一聲冷哼從身側響起家卖,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤眨层,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后上荡,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體趴樱,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡馒闷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了叁征。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纳账。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖捺疼,靈堂內的尸體忽然破棺而出疏虫,到底是詐尸還是另有隱情,我是刑警寧澤啤呼,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布卧秘,位于F島的核電站,受9級特大地震影響官扣,放射性物質發(fā)生泄漏翅敌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一惕蹄、第九天 我趴在偏房一處隱蔽的房頂上張望蚯涮。 院中可真熱鬧,春花似錦焊唬、人聲如沸恋昼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽液肌。三九已至,卻和暖如春鸥滨,著一層夾襖步出監(jiān)牢的瞬間嗦哆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工婿滓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留老速,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓凸主,卻偏偏與公主長得像橘券,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子卿吐,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355