spark中stage的劃分依據(jù)action算子進行左痢,每一次action(reduceByKey等)算子都會觸發(fā)一次shuffle過程仰楚,該過程涉及到數(shù)據(jù)的重新分區(qū)。spark中的分區(qū)器包括HashPartitioner及RangePartitioner兩種敦姻。HashPartitioner根據(jù)key進行分區(qū)宛徊,當某一個key對應的數(shù)據(jù)較多時會出現(xiàn)數(shù)據(jù)傾斜的情況,又因為每一個partition對應一個task夜只,數(shù)據(jù)較多的task會耗費較多的時間垒在,影響spark任務運行的時間。此時扔亥,可以使用RangePartitioner分區(qū)器场躯,RangePartitioner基于水塘抽樣算法,可以在不知道整體數(shù)據(jù)量的情況下旅挤,等概率地取到每條數(shù)據(jù)踢关。
一、HashPartitioner
/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
/* Calculates 'x' modulo 'mod', takes to consideration sign of x,
* i.e. if 'x' is negative, than 'x' % 'mod' is negative too
* so function return (x % mod) + mod in that case.
*/
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
HashPartitioner主要根據(jù)RDD的key進行分區(qū)粘茄,當key為null時签舞,對應的partitionId為0,當key不為null時柒瓣,partitionId計算過程為:先將key的hashcode值對分區(qū)個數(shù)numPartitions取余儒搭,當余數(shù)小于0時,將余數(shù)與numPartitions相加嘹朗,否則與0相加师妙。很明顯,相同key的數(shù)據(jù)一定會分到同一個分區(qū)中屹培,可能導致數(shù)據(jù)傾斜,進而影響spark運行速度怔檩。
二褪秀、RangePartitioner
HashPartitioner分區(qū)可能導致每個分區(qū)中數(shù)據(jù)量的不均勻。而RangePartitioner分區(qū)則盡量保證每個分區(qū)中數(shù)據(jù)量的均勻薛训,將一定范圍內的數(shù)映射到某一個分區(qū)內媒吗。分區(qū)與分區(qū)之間數(shù)據(jù)是有序的,但分區(qū)內的元素是不能保證順序的乙埃。
1闸英、水塘抽樣算法原理
對于一長度為n(大到無法加載到內存中)的數(shù)組N,如何等概率地從中取出k個元素介袜,組成數(shù)組R甫何?
水塘抽樣算法做法如下:首先,去數(shù)組N前k個元素放入數(shù)組R中遇伞;然后遍歷數(shù)組N中剩余元素辙喂,對于數(shù)組N中第i個元素N[i-1](i大于k),隨機生成一個數(shù)rand,若rand<k巍耗,則將N[i-1]替換掉數(shù)組R中第rand個元素秋麸,否則,保持原樣炬太【捏。可以得知,取得數(shù)組N中每一元素的概率均為k/n亲族。對于數(shù)組N中前k個元素而言炒考,由于第一次就將其取出,因此在后續(xù)迭代過程中只需保持原樣即可孽水,概率為票腰,對于數(shù)組N中剩余的n-k個元素,只需在遍歷到其所在的位置時替換掉現(xiàn)有元素中的一個并在后續(xù)步驟中保持原樣女气,概率為
杏慰。
2、RangePartitioner
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}
/**
* Sketches the input RDD via reservoir sampling on each partition.
*
* @param rdd the input RDD to sketch
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
private[spark] object SamplingUtils {
/**
* Reservoir sampling implementation that also returns the input size.
*
* @param input input size
* @param k reservoir size
* @param seed random seed
* @return (samples, input size)
*/
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
// If we have consumed all the elements, return them. Otherwise do the replacement.
if (i < k) {
// If input size < k, trim the array to return only an array of input size.
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
l += 1
// There are k elements in the reservoir, and the l-th element has been
// consumed. It should be chosen with probability k/l. The expression
// below is a random long chosen uniformly from [0,l)
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
}
(reservoir, l)
}
}
RangePartitioner分區(qū)執(zhí)行原理:
1炼鞠、計算總體的數(shù)據(jù)抽樣大小sampleSize缘滥,計算規(guī)則是:至少每個分區(qū)抽取20個數(shù)據(jù)或者最多1M的數(shù)據(jù)量。
2谒主、根據(jù)sampleSize和分區(qū)數(shù)量計算每個分區(qū)的數(shù)據(jù)抽樣樣本數(shù)量最大值sampleSizePerPartition
3朝扼、根據(jù)以上兩個值進行水塘抽樣,返回RDD的總數(shù)據(jù)量霎肯,分區(qū)ID和每個分區(qū)的采樣數(shù)據(jù)擎颖。
4、計算出數(shù)據(jù)量較大的分區(qū)通過RDD.sample進行重新抽樣观游。
5查描、通過抽樣數(shù)組 candidates: ArrayBuffer[(K, wiegth)]計算出分區(qū)邊界的數(shù)組BoundsArray
6昼窗、在取數(shù)據(jù)時松靡,如果分區(qū)數(shù)小于128則直接獲取键俱,如果大于128則通過二分法,獲取當前Key屬于那個區(qū)間搪柑,返回對應的BoundsArray下標即為partitionsID