一.分區(qū)器的區(qū)別
- HashPartitioner分區(qū)可能HashPartitioner導(dǎo)致每個(gè)分區(qū)中數(shù)據(jù)量的不均勻挠唆。
- RangePartitioner分區(qū)盡量保證每個(gè)分區(qū)中數(shù)據(jù)量的均勻审轮,將一定范圍內(nèi)的數(shù)映射到某一個(gè)分區(qū)內(nèi)滔韵。分區(qū)與分區(qū)之間數(shù)據(jù)是有序的,但分區(qū)內(nèi)的元素是不能保證順序的舰蟆。
二.RangePartitioner分區(qū)執(zhí)行原理概述
1.計(jì)算總體的數(shù)據(jù)抽樣大小sampleSize,計(jì)算規(guī)則是:至少每個(gè)分區(qū)抽取20個(gè)數(shù)據(jù)或者最多1e6的樣本的數(shù)據(jù)量。
2.根據(jù)sampleSize和分區(qū)數(shù)量計(jì)算每個(gè)分區(qū)的數(shù)據(jù)抽樣樣本數(shù)量最大值sampleSizePrePartition问欠。
3.根據(jù)以上兩個(gè)值進(jìn)行水塘抽樣,返回RDD的總數(shù)據(jù)量粒蜈,分區(qū)中總元素的個(gè)數(shù)和每個(gè)分區(qū)的采樣數(shù)據(jù)顺献。
4.計(jì)算出數(shù)據(jù)量較大的分區(qū)通過(guò)RDD.sample進(jìn)行重新抽樣。
5.通過(guò)抽樣數(shù)組 candidates: ArrayBuffer[(K, wiegth)]計(jì)算出分區(qū)邊界的數(shù)組BoundsArray
6.在取數(shù)據(jù)時(shí)枯怖,如果分區(qū)數(shù)小于128則直接獲取注整,如果大于128則通過(guò)二分法,獲取當(dāng)前Key屬于那個(gè)區(qū)間度硝,返回對(duì)應(yīng)的BoundsArray下標(biāo)即為partitionsID肿轨。
源碼分析可參考以下幾篇博客
下面只對(duì)RanagePartitioner的核心機(jī)制進(jìn)行分析總結(jié)。
三.RangePartitioner的實(shí)現(xiàn)機(jī)制
1.在總數(shù)不知道的情況下如何等概率地從中抽取N行蕊程?
類比水塘抽樣法椒袍,該方法可以解決在總數(shù)不知道的情況下如何等概率地從中抽取一行數(shù)據(jù)
定義取出的行號(hào)為choice,第一次直接以第一行作為取出行 choice 藻茂,而后第二次以二分之一概率決定是否用第二行替換 choice 驹暑,第三次以三分之一的概率決定是否以第三行替換 choice ……,以此類推捌治。
由上面的分析我們可以得出結(jié)論岗钩,在取第n個(gè)數(shù)據(jù)的時(shí)候,我們生成一個(gè)0到1的隨機(jī)數(shù)p肖油,如果p小于1/n兼吓,保留第n個(gè)數(shù)。大于1/n森枪,繼續(xù)保留前面的數(shù)视搏。直到數(shù)據(jù)流結(jié)束,返回此數(shù)县袱,算法結(jié)束浑娜。
解決方案:在RangePartition中如何實(shí)現(xiàn)在總數(shù)不知道的情況下如何等概率地從中抽取N行數(shù)據(jù):
采樣算法是RangePartitioner分區(qū)的核心,其內(nèi)部使用的就是水塘抽樣式散,而這個(gè)抽樣特別適合那種總數(shù)很大而且未知筋遭,并無(wú)法將所有的數(shù)據(jù)全部存放到主內(nèi)存中的情況。也就是我們不需要事先知道RDD中元素的個(gè)數(shù)。
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// 把前K個(gè)元素放入到數(shù)組reservoir中漓滔,k為設(shè)置的每個(gè)分區(qū)的樣本數(shù)编饺,及sampleSizePerPartition
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
// 如果分區(qū)記錄數(shù)少于設(shè)置的分區(qū)樣本數(shù),則直接返回
// 否則使用迭代器响驴,每次迭代出的數(shù)據(jù)透且,為其生成一個(gè)0至 l 的隨機(jī)數(shù),如果隨機(jī)數(shù)小于K豁鲤,則把reservoir數(shù)組中的對(duì)應(yīng)記錄替換
if (i < k) {
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
// l 的值不斷迭代的
l += 1
val replacementIndex = (rand.nextDouble() * l).toLong
//如果隨機(jī)數(shù)小于K秽誊,則把reservoir數(shù)組中的對(duì)應(yīng)記錄替換
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
}
(reservoir, l)
}
}
分析
RangePartition中水塘抽樣的理解,首先從池塘中先從前往后取出足夠的樣本數(shù)據(jù)琳骡,暫且稱這批數(shù)據(jù)為舊數(shù)據(jù)锅论,之后對(duì)未遍歷的數(shù)據(jù)進(jìn)行遍歷,此次i值為該分區(qū)元素的第 i 條記錄日熬,i = k棍厌。
之后 i 值不斷的迭代,然后取出(0竖席,i)的隨機(jī)數(shù),如果該隨機(jī)數(shù)小于k則敬肚,進(jìn)行替換毕荐,保留第 i 個(gè)數(shù),隨著不斷的迭代艳馒,i 值是一定是逐漸變大的憎亚,k 值是不變的,所以取出的新數(shù)據(jù)替換舊數(shù)據(jù)的幾率就越來(lái)越小弄慰,例如第美,一開(kāi)始,i = k陆爽,隨機(jī)值小于 k 的概率為 (k-1)/k什往,迭代 n 次之后,該幾率為 (k-1)/k+n慌闭。
用偽代碼表示如下所示
從S中抽取首k項(xiàng)放入「水塘」中
對(duì)于每一個(gè)S[j]項(xiàng)(j ≥ k):
隨機(jī)產(chǎn)生一個(gè)范圍0到j(luò)的整數(shù)r
若 r < k 則把水塘中的第r項(xiàng)換成S[j]項(xiàng)
2.計(jì)算樣本權(quán)重别威,對(duì)數(shù)據(jù)多的分區(qū)再進(jìn)行抽樣
父RDD各分區(qū)中的數(shù)據(jù)量可能不均勻,在極端情況下驴剔,有些分區(qū)內(nèi)的數(shù)據(jù)量會(huì)占有整個(gè)RDD的絕大多數(shù)的數(shù)據(jù)省古,如果按照水塘抽樣進(jìn)行采樣,會(huì)導(dǎo)致該分區(qū)所采樣的數(shù)據(jù)量不足丧失,因此需要對(duì)取樣數(shù)不足的分區(qū)還需要重新進(jìn)行采樣豺妓。
通過(guò)(采樣因子*分區(qū)記錄數(shù))得到每個(gè)分區(qū)應(yīng)采樣本數(shù)。
如果fraction * 分區(qū)內(nèi)記錄數(shù) > sampleSizePerPartition,則該分區(qū)會(huì)再進(jìn)行一次抽樣琳拭,否則計(jì)算權(quán)重weight為 1 /(總樣本數(shù)/總記錄總數(shù))载佳,因?yàn)閟ample中的比例就是(總樣本數(shù)/總記錄總數(shù))。
如果fraction * 分區(qū)內(nèi)記錄數(shù) < sampleSizePerPartition臀栈,權(quán)重weight 為(分區(qū)總數(shù) / 采樣總數(shù))蔫慧,為該分區(qū)的取出的樣本的真實(shí)權(quán)重,可能會(huì)比平均權(quán)重大权薯,因?yàn)橛锌赡茉谏厦娴膔eservoirSampleAndCount水塘抽樣中采樣總數(shù)已經(jīng)達(dá)到了該分區(qū)記錄數(shù)的最大值姑躲。
// 計(jì)算總樣本數(shù)量和總記錄數(shù)的占比,占比最大為1.0
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
// 保存樣本數(shù)據(jù)的集合buffer
val candidates = ArrayBuffer.empty[(K, Float)]
// 保存數(shù)據(jù)分布不均衡的分區(qū)id(數(shù)據(jù)量超過(guò)fraction比率的分區(qū))
val imbalancedPartitions = mutable.Set.empty[Int]
// 計(jì)算抽取出來(lái)的樣本數(shù)據(jù)
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
// 如果fraction乘以當(dāng)前分區(qū)中的數(shù)據(jù)量大于之前計(jì)算的每個(gè)分區(qū)的抽象數(shù)據(jù)大小盟蚣,
// 那么表示當(dāng)前分區(qū)抽取的數(shù)據(jù)太少了黍析,該分區(qū)數(shù)據(jù)分布不均衡,需要重新抽取
imbalancedPartitions += idx
} else {
// 當(dāng)前分區(qū)不屬于數(shù)據(jù)分布不均衡的分區(qū)屎开,計(jì)算占比權(quán)重阐枣,并添加到candidates集合中
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
// 對(duì)于數(shù)據(jù)分布不均衡的RDD分區(qū),重新進(jìn)行數(shù)據(jù)抽樣
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
// 獲取數(shù)據(jù)分布不均衡的RDD分區(qū)奄抽,并構(gòu)成RDD
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
// 隨機(jī)種子
val seed = byteswap32(-rdd.id - 1)
// 利用rdd的sample抽樣函數(shù)API進(jìn)行數(shù)據(jù)抽樣
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
// 將最終的抽樣數(shù)據(jù)計(jì)算出rangeBounds出來(lái)
RangePartitioner.determineBounds(candidates, partitions)
3.根據(jù)樣本權(quán)重解決分區(qū)邊界問(wèn)題
先將candidate(Array[(key, weight)])按照key排序蔼两,計(jì)算總權(quán)重sumWeights,除以分區(qū)數(shù)逞度,得到每個(gè)分區(qū)的平均權(quán)重step额划,接下來(lái)while循環(huán)遍歷已排序的candidate,累加其權(quán)重cumWeight档泽,每當(dāng)累加權(quán)重達(dá)到一個(gè)分區(qū)的平均權(quán)重step俊戳,就獲取一個(gè)key作為分區(qū)間隔符,最后返回所有獲取到的分隔符馆匿,determineBounds執(zhí)行完畢抑胎,也就返回了變量rangeBounds作為每個(gè)分區(qū)邊界的key的集合。
def determineBounds[K: Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
// 按照數(shù)據(jù)進(jìn)行數(shù)據(jù)排序渐北,默認(rèn)升序排列
val ordered = candidates.sortBy(_._1)
// 獲取總的樣本數(shù)量大小
val numCandidates = ordered.size
// 計(jì)算總的權(quán)重大小
val sumWeights = ordered.map(_._2.toDouble).sum
// 計(jì)算步長(zhǎng)
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
// 獲取排序后的第i個(gè)數(shù)據(jù)及權(quán)重
val (key, weight) = ordered(i)
// 累計(jì)權(quán)重
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
// 權(quán)重已經(jīng)達(dá)到一個(gè)步長(zhǎng)的范圍阿逃,計(jì)算出一個(gè)分區(qū)id的值
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
// 上一個(gè)邊界值為空,或者當(dāng)前邊界key數(shù)據(jù)大于上一個(gè)邊界的值腔稀,那么當(dāng)前key有效盆昙,進(jìn)行計(jì)算
// 添加當(dāng)前key到邊界集合中
bounds += key
// 累計(jì)target步長(zhǎng)界限
target += step
// 分區(qū)數(shù)量加1
j += 1
// 上一個(gè)邊界的值重置為當(dāng)前邊界的值
previousBound = Some(key)
}
}
i += 1
}
// 返回結(jié)果
bounds.toArray
}
4.由rangeBounds計(jì)算分區(qū)數(shù)和key屬于哪個(gè)分區(qū)
rangeBounds少于128,直接遍歷比較key和分隔符焊虏,得到PartitionId淡喜,否則使用二分查找,并做了邊界條件的判斷诵闭,最后炼团,根據(jù)升序還是降序返回PartitionId澎嚣。
5.父RDD中每個(gè)分區(qū)采樣樣本數(shù)的確定
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
父RDD每個(gè)分區(qū)需要采樣的數(shù)據(jù)量是正常數(shù)的3倍。
因?yàn)楦窻DD各分區(qū)中的數(shù)據(jù)量可能會(huì)出現(xiàn)傾斜的情況瘟芝,乘于3的目的就是保證數(shù)據(jù)量小的分區(qū)能夠采樣到足夠的數(shù)據(jù)易桃,而對(duì)于數(shù)據(jù)量大的分區(qū)會(huì)進(jìn)行第二次采樣。