Spark中RangePartitioner的實(shí)現(xiàn)機(jī)制分析

一.分區(qū)器的區(qū)別

  • HashPartitioner分區(qū)可能HashPartitioner導(dǎo)致每個(gè)分區(qū)中數(shù)據(jù)量的不均勻挠唆。
  • RangePartitioner分區(qū)盡量保證每個(gè)分區(qū)中數(shù)據(jù)量的均勻审轮,將一定范圍內(nèi)的數(shù)映射到某一個(gè)分區(qū)內(nèi)滔韵。分區(qū)與分區(qū)之間數(shù)據(jù)是有序的,但分區(qū)內(nèi)的元素是不能保證順序的舰蟆。

二.RangePartitioner分區(qū)執(zhí)行原理概述

1.計(jì)算總體的數(shù)據(jù)抽樣大小sampleSize,計(jì)算規(guī)則是:至少每個(gè)分區(qū)抽取20個(gè)數(shù)據(jù)或者最多1e6的樣本的數(shù)據(jù)量。

2.根據(jù)sampleSize和分區(qū)數(shù)量計(jì)算每個(gè)分區(qū)的數(shù)據(jù)抽樣樣本數(shù)量最大值sampleSizePrePartition问欠。

3.根據(jù)以上兩個(gè)值進(jìn)行水塘抽樣,返回RDD的總數(shù)據(jù)量粒蜈,分區(qū)中總元素的個(gè)數(shù)和每個(gè)分區(qū)的采樣數(shù)據(jù)顺献。

4.計(jì)算出數(shù)據(jù)量較大的分區(qū)通過(guò)RDD.sample進(jìn)行重新抽樣。

5.通過(guò)抽樣數(shù)組 candidates: ArrayBuffer[(K, wiegth)]計(jì)算出分區(qū)邊界的數(shù)組BoundsArray

6.在取數(shù)據(jù)時(shí)枯怖,如果分區(qū)數(shù)小于128則直接獲取注整,如果大于128則通過(guò)二分法,獲取當(dāng)前Key屬于那個(gè)區(qū)間度硝,返回對(duì)應(yīng)的BoundsArray下標(biāo)即為partitionsID肿轨。

源碼分析可參考以下幾篇博客

下面只對(duì)RanagePartitioner的核心機(jī)制進(jìn)行分析總結(jié)。

三.RangePartitioner的實(shí)現(xiàn)機(jī)制

1.在總數(shù)不知道的情況下如何等概率地從中抽取N行蕊程?

類比水塘抽樣法椒袍,該方法可以解決在總數(shù)不知道的情況下如何等概率地從中抽取一行數(shù)據(jù)
定義取出的行號(hào)為choice,第一次直接以第一行作為取出行 choice 藻茂,而后第二次以二分之一概率決定是否用第二行替換 choice 驹暑,第三次以三分之一的概率決定是否以第三行替換 choice ……,以此類推捌治。

由上面的分析我們可以得出結(jié)論岗钩,在取第n個(gè)數(shù)據(jù)的時(shí)候,我們生成一個(gè)0到1的隨機(jī)數(shù)p肖油,如果p小于1/n兼吓,保留第n個(gè)數(shù)。大于1/n森枪,繼續(xù)保留前面的數(shù)视搏。直到數(shù)據(jù)流結(jié)束,返回此數(shù)县袱,算法結(jié)束浑娜。

解決方案:在RangePartition中如何實(shí)現(xiàn)在總數(shù)不知道的情況下如何等概率地從中抽取N行數(shù)據(jù):

采樣算法是RangePartitioner分區(qū)的核心,其內(nèi)部使用的就是水塘抽樣式散,而這個(gè)抽樣特別適合那種總數(shù)很大而且未知筋遭,并無(wú)法將所有的數(shù)據(jù)全部存放到主內(nèi)存中的情況。也就是我們不需要事先知道RDD中元素的個(gè)數(shù)。

def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Long) = {
    val reservoir = new Array[T](k)
    // 把前K個(gè)元素放入到數(shù)組reservoir中漓滔,k為設(shè)置的每個(gè)分區(qū)的樣本數(shù)编饺,及sampleSizePerPartition
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    // 如果分區(qū)記錄數(shù)少于設(shè)置的分區(qū)樣本數(shù),則直接返回
    // 否則使用迭代器响驴,每次迭代出的數(shù)據(jù)透且,為其生成一個(gè)0至 l 的隨機(jī)數(shù),如果隨機(jī)數(shù)小于K豁鲤,則把reservoir數(shù)組中的對(duì)應(yīng)記錄替換
    if (i < k) {
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      var l = i.toLong
      val rand = new XORShiftRandom(seed)
      while (input.hasNext) {
        val item = input.next()
       // l 的值不斷迭代的
        l += 1
        val replacementIndex = (rand.nextDouble() * l).toLong
         //如果隨機(jī)數(shù)小于K秽誊,則把reservoir數(shù)組中的對(duì)應(yīng)記錄替換
        if (replacementIndex < k) {
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }

分析
RangePartition中水塘抽樣的理解,首先從池塘中先從前往后取出足夠的樣本數(shù)據(jù)琳骡,暫且稱這批數(shù)據(jù)為舊數(shù)據(jù)锅论,之后對(duì)未遍歷的數(shù)據(jù)進(jìn)行遍歷,此次i值為該分區(qū)元素的第 i 條記錄日熬,i = k棍厌。

之后 i 值不斷的迭代,然后取出(0竖席,i)的隨機(jī)數(shù),如果該隨機(jī)數(shù)小于k則敬肚,進(jìn)行替換毕荐,保留第 i 個(gè)數(shù),隨著不斷的迭代艳馒,i 值是一定是逐漸變大的憎亚,k 值是不變的,所以取出的新數(shù)據(jù)替換舊數(shù)據(jù)的幾率就越來(lái)越小弄慰,例如第美,一開(kāi)始,i = k陆爽,隨機(jī)值小于 k 的概率為 (k-1)/k什往,迭代 n 次之后,該幾率為 (k-1)/k+n慌闭。

用偽代碼表示如下所示

從S中抽取首k項(xiàng)放入「水塘」中
對(duì)于每一個(gè)S[j]項(xiàng)(j ≥ k):
   隨機(jī)產(chǎn)生一個(gè)范圍0到j(luò)的整數(shù)r
   若 r < k 則把水塘中的第r項(xiàng)換成S[j]項(xiàng)
2.計(jì)算樣本權(quán)重别威,對(duì)數(shù)據(jù)多的分區(qū)再進(jìn)行抽樣

父RDD各分區(qū)中的數(shù)據(jù)量可能不均勻,在極端情況下驴剔,有些分區(qū)內(nèi)的數(shù)據(jù)量會(huì)占有整個(gè)RDD的絕大多數(shù)的數(shù)據(jù)省古,如果按照水塘抽樣進(jìn)行采樣,會(huì)導(dǎo)致該分區(qū)所采樣的數(shù)據(jù)量不足丧失,因此需要對(duì)取樣數(shù)不足的分區(qū)還需要重新進(jìn)行采樣豺妓。

通過(guò)(采樣因子*分區(qū)記錄數(shù))得到每個(gè)分區(qū)應(yīng)采樣本數(shù)。

如果fraction * 分區(qū)內(nèi)記錄數(shù) > sampleSizePerPartition,則該分區(qū)會(huì)再進(jìn)行一次抽樣琳拭,否則計(jì)算權(quán)重weight為 1 /(總樣本數(shù)/總記錄總數(shù))载佳,因?yàn)閟ample中的比例就是(總樣本數(shù)/總記錄總數(shù))。

如果fraction * 分區(qū)內(nèi)記錄數(shù) < sampleSizePerPartition臀栈,權(quán)重weight 為(分區(qū)總數(shù) / 采樣總數(shù))蔫慧,為該分區(qū)的取出的樣本的真實(shí)權(quán)重,可能會(huì)比平均權(quán)重大权薯,因?yàn)橛锌赡茉谏厦娴膔eservoirSampleAndCount水塘抽樣中采樣總數(shù)已經(jīng)達(dá)到了該分區(qū)記錄數(shù)的最大值姑躲。

// 計(jì)算總樣本數(shù)量和總記錄數(shù)的占比,占比最大為1.0
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        // 保存樣本數(shù)據(jù)的集合buffer
        val candidates = ArrayBuffer.empty[(K, Float)]
        // 保存數(shù)據(jù)分布不均衡的分區(qū)id(數(shù)據(jù)量超過(guò)fraction比率的分區(qū))
        val imbalancedPartitions = mutable.Set.empty[Int]
        // 計(jì)算抽取出來(lái)的樣本數(shù)據(jù)
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 如果fraction乘以當(dāng)前分區(qū)中的數(shù)據(jù)量大于之前計(jì)算的每個(gè)分區(qū)的抽象數(shù)據(jù)大小盟蚣,
            // 那么表示當(dāng)前分區(qū)抽取的數(shù)據(jù)太少了黍析,該分區(qū)數(shù)據(jù)分布不均衡,需要重新抽取
            imbalancedPartitions += idx
          } else {
            // 當(dāng)前分區(qū)不屬于數(shù)據(jù)分布不均衡的分區(qū)屎开,計(jì)算占比權(quán)重阐枣,并添加到candidates集合中
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }

        // 對(duì)于數(shù)據(jù)分布不均衡的RDD分區(qū),重新進(jìn)行數(shù)據(jù)抽樣
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 獲取數(shù)據(jù)分布不均衡的RDD分區(qū)奄抽,并構(gòu)成RDD
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          // 隨機(jī)種子
          val seed = byteswap32(-rdd.id - 1)
          // 利用rdd的sample抽樣函數(shù)API進(jìn)行數(shù)據(jù)抽樣
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }

        // 將最終的抽樣數(shù)據(jù)計(jì)算出rangeBounds出來(lái)
        RangePartitioner.determineBounds(candidates, partitions)
3.根據(jù)樣本權(quán)重解決分區(qū)邊界問(wèn)題

先將candidate(Array[(key, weight)])按照key排序蔼两,計(jì)算總權(quán)重sumWeights,除以分區(qū)數(shù)逞度,得到每個(gè)分區(qū)的平均權(quán)重step额划,接下來(lái)while循環(huán)遍歷已排序的candidate,累加其權(quán)重cumWeight档泽,每當(dāng)累加權(quán)重達(dá)到一個(gè)分區(qū)的平均權(quán)重step俊戳,就獲取一個(gè)key作為分區(qū)間隔符,最后返回所有獲取到的分隔符馆匿,determineBounds執(zhí)行完畢抑胎,也就返回了變量rangeBounds作為每個(gè)分區(qū)邊界的key的集合。

def determineBounds[K: Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    // 按照數(shù)據(jù)進(jìn)行數(shù)據(jù)排序渐北,默認(rèn)升序排列
    val ordered = candidates.sortBy(_._1)
    // 獲取總的樣本數(shù)量大小
    val numCandidates = ordered.size
    // 計(jì)算總的權(quán)重大小
    val sumWeights = ordered.map(_._2.toDouble).sum
    // 計(jì)算步長(zhǎng)
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      // 獲取排序后的第i個(gè)數(shù)據(jù)及權(quán)重
      val (key, weight) = ordered(i)
      // 累計(jì)權(quán)重
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        // 權(quán)重已經(jīng)達(dá)到一個(gè)步長(zhǎng)的范圍阿逃,計(jì)算出一個(gè)分區(qū)id的值
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          // 上一個(gè)邊界值為空,或者當(dāng)前邊界key數(shù)據(jù)大于上一個(gè)邊界的值腔稀,那么當(dāng)前key有效盆昙,進(jìn)行計(jì)算
          // 添加當(dāng)前key到邊界集合中
          bounds += key
          // 累計(jì)target步長(zhǎng)界限
          target += step
          // 分區(qū)數(shù)量加1
          j += 1
          // 上一個(gè)邊界的值重置為當(dāng)前邊界的值
          previousBound = Some(key)
        }
      }
      i += 1
    }
    // 返回結(jié)果
    bounds.toArray
  }
4.由rangeBounds計(jì)算分區(qū)數(shù)和key屬于哪個(gè)分區(qū)

rangeBounds少于128,直接遍歷比較key和分隔符焊虏,得到PartitionId淡喜,否則使用二分查找,并做了邊界條件的判斷诵闭,最后炼团,根據(jù)升序還是降序返回PartitionId澎嚣。

5.父RDD中每個(gè)分區(qū)采樣樣本數(shù)的確定
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt

父RDD每個(gè)分區(qū)需要采樣的數(shù)據(jù)量是正常數(shù)的3倍。

因?yàn)楦窻DD各分區(qū)中的數(shù)據(jù)量可能會(huì)出現(xiàn)傾斜的情況瘟芝,乘于3的目的就是保證數(shù)據(jù)量小的分區(qū)能夠采樣到足夠的數(shù)據(jù)易桃,而對(duì)于數(shù)據(jù)量大的分區(qū)會(huì)進(jìn)行第二次采樣。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末锌俱,一起剝皮案震驚了整個(gè)濱河市晤郑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌贸宏,老刑警劉巖造寝,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異吭练,居然都是意外死亡诫龙,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)鲫咽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)签赃,“玉大人,你說(shuō)我怎么就攤上這事分尸〗趿模” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵寓落,是天一觀的道長(zhǎng)括丁。 經(jīng)常有香客問(wèn)我,道長(zhǎng)伶选,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任尖昏,我火速辦了婚禮仰税,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘抽诉。我一直安慰自己陨簇,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布迹淌。 她就那樣靜靜地躺著河绽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪唉窃。 梳的紋絲不亂的頭發(fā)上耙饰,一...
    開(kāi)封第一講書(shū)人閱讀 51,208評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音纹份,去河邊找鬼苟跪。 笑死廷痘,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的件已。 我是一名探鬼主播笋额,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼篷扩!你這毒婦竟也來(lái)了兄猩?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤鉴未,失蹤者是張志新(化名)和其女友劉穎枢冤,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體歼狼,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡掏导,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了羽峰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片趟咆。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖梅屉,靈堂內(nèi)的尸體忽然破棺而出值纱,到底是詐尸還是另有隱情,我是刑警寧澤坯汤,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布虐唠,位于F島的核電站,受9級(jí)特大地震影響惰聂,放射性物質(zhì)發(fā)生泄漏疆偿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一搓幌、第九天 我趴在偏房一處隱蔽的房頂上張望杆故。 院中可真熱鬧,春花似錦溉愁、人聲如沸处铛。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)撤蟆。三九已至,卻和暖如春堂污,著一層夾襖步出監(jiān)牢的瞬間家肯,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工敷鸦, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留息楔,地道東北人寝贡。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像值依,于是被迫代替她去往敵國(guó)和親圃泡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容