Abstract
昨天丟人現(xiàn)眼的寫QuickSort用了40分鐘, 當(dāng)時(shí)感覺整個(gè)人都不好了.
(╯°□°)╯︵┻━┻ 看孩子一天睡4小時(shí)大腦不轉(zhuǎn)哇 d(?`ω′?d*)
External Sort的標(biāo)準(zhǔn)做法是一個(gè)QuickSort后邊跟一個(gè)n-way MergeSort, 理論上的復(fù)雜度也是nlogn.
但是由于存在文件IO, 所以實(shí)際速度要慢于帶內(nèi)排序很多.
在分布式環(huán)境里, 這個(gè)問題進(jìn)一步復(fù)雜化, 每臺(tái)機(jī)器持有的是數(shù)據(jù)的一部分, 如果需要執(zhí)行經(jīng)典的外排序, 則需要不斷的把所有節(jié)點(diǎn)的數(shù)據(jù)向一個(gè)中心節(jié)點(diǎn)進(jìn)行shuffle. 磁盤IO進(jìn)一步衰退為網(wǎng)絡(luò)IO.
更進(jìn)一步分析這個(gè)問題, 可以在一開始處理數(shù)據(jù)的時(shí)候, 把數(shù)據(jù)分為多份. RANGE(0, 1e3)的第一臺(tái)機(jī)器, RANGE(1e3, 2e3)第二臺(tái).....
通過(guò)HASH的方法, 讓每臺(tái)機(jī)器天然有序, 繼而每臺(tái)機(jī)器內(nèi)部跑外排.
這樣就需要對(duì)數(shù)據(jù)的分布有一定的了解, 通過(guò)抽樣來(lái)理解數(shù)據(jù)的整體排布方式, 然后決定每臺(tái)機(jī)器處理的數(shù)據(jù)范圍是一個(gè)大的思路.
在下面這個(gè)網(wǎng)站, 可以找到排序算法的效率排行榜. 你會(huì)發(fā)現(xiàn)BAT三家都在打這個(gè)榜...
TeraSort 原理
TeraSort的核心在于第一步的map()
, 這一步 任何一臺(tái)機(jī)器上的Partition i
里的所有對(duì)象一定小于 任何一臺(tái)機(jī)器上的Partition i+1
,也就是保證了Parition
之間的有序性. 繼而在reduce
階段, 可以保證shuffle
后每個(gè)任務(wù)收集到的數(shù)據(jù)的有序性.
這里可以非常直觀的看到兩個(gè)難點(diǎn)
- 如何確定每個(gè)
Partition
的范圍, 它負(fù)責(zé)的Range(X, Y)里的X和Y是多少 - 如何快速的把一個(gè)值映射到它對(duì)應(yīng)的
Partition
里, 這里需要考慮待排序的是任何實(shí)現(xiàn)了Comparable
接口的對(duì)象. 不一定是個(gè)數(shù).
抽象的解決思路是
- 對(duì)數(shù)據(jù)進(jìn)行抽樣, 根據(jù)抽樣結(jié)果來(lái)構(gòu)筑每個(gè)
Partition
應(yīng)該承載什么范圍內(nèi)的數(shù)據(jù) - 通過(guò)Trie Tree來(lái)構(gòu)筑索引, 當(dāng)一個(gè)String或者Long或者任何能夠被轉(zhuǎn)義成Char Sequence的對(duì)象進(jìn)來(lái)后, 利用Trie來(lái)找到它對(duì)應(yīng)的那個(gè)Partition.實(shí)現(xiàn)中, 對(duì)字典樹有微弱高的改造, 類似下圖中
daz
會(huì)被分到Parition3
, 在最后一層中z > b
image.png
Spark源碼
執(zhí)行結(jié)構(gòu)
2.RDD, 執(zhí)行入口
/spark/core/src/main/scala/org/apache/spark/rdd/RDD.scala
/**
* 按照輸入的key function, 對(duì)這個(gè)RDD進(jìn)行排序
*/
def sortBy[K](
// f 執(zhí)行在key上的 funtion, 返回K型對(duì)象, 這里K需要時(shí)可以compare的
f: (T) => K,
// 默認(rèn)是正序
ascending: Boolean = true,
// 維持當(dāng)前的partition數(shù)量, 這個(gè)對(duì)抽樣后到底怎么分區(qū)有影響
numPartitions: Int = this.partitions.length)
// 可以看到這里對(duì)K的類型進(jìn)行了隱式轉(zhuǎn)換
// 保證它是scala.math.Ordering接口兼容的, 以便能夠排序
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
// 對(duì)所有的Key進(jìn)行處理后, 就可以運(yùn)行排序了, 排序方法在下面
.sortByKey(ascending, numPartitions)
.values
}
3. OrderedRDDFunctions 調(diào)用方法
/spark/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
/**
* 輸入的KEY是能夠支持Scala Math排序的
* 對(duì)于沒有實(shí)現(xiàn)對(duì)應(yīng)接口的.用戶可以自己實(shí)現(xiàn), 或者對(duì)已有的對(duì)象覆蓋自己的邏輯
*/
class OrderedRDDFunctions[K : Ordering : ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag] @DeveloperApi() (
self: RDD[P])
extends Logging with Serializable {
private val ordering = implicitly[Ordering[K]]
/**
* 實(shí)現(xiàn)了對(duì)每個(gè)partition執(zhí)行sort, 由于partition相互之間是有序的
* 調(diào)用`collect`或者`save`可以獲得全局有序的對(duì)象.
*/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
// 初始化一個(gè)RangePartitioner對(duì)象, 這個(gè)對(duì)象負(fù)責(zé)管理告訴RDD應(yīng)該如何分配數(shù)據(jù)
// 以及每個(gè)Range應(yīng)該是多少
val part = new RangePartitioner(numPartitions, self, ascending)
// 對(duì)數(shù)據(jù)進(jìn)行分片, 然后每片內(nèi)部再進(jìn)行排序
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
/**
* 使用傳入的Partition分區(qū)方法來(lái)切割數(shù)據(jù), 然后每個(gè)Partition內(nèi)部再排序
* 這個(gè)方法在特定條件下可以用customer的方法來(lái)提升TeraSort的性能
* 相關(guān)論文很多, 核心思想主要是提升locality, 或者針對(duì)已經(jīng)部分有效的數(shù)據(jù),直接增加分配的有效性.
*/
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = ...
}
/**
* 由于RangePartition中有每個(gè)partition的最大值和最小值, 也就是Range的信息
* 所以給定一個(gè)lower和一個(gè)uppder值, 可以快速的用getPartition方法定位到最小和最大的PartitionIndex是多少. 從而實(shí)現(xiàn)快速的過(guò)濾.
*/
def filterByRange(lower: K, upper: K): RDD[P] = ...
}
4. Partitioner 實(shí)現(xiàn)有序分片
Partitioner
在實(shí)現(xiàn)中有 HashPartitioner
RangePartitioner
, 后者直接對(duì)應(yīng)需要內(nèi)部有序的各種情況.
/spark/core/src/main/scala/org/apache/spark/Partitioner.scala
/**
* 通過(guò)抽樣, 把對(duì)象映射到RANGE范圍大致相同的分片里.
* 分片多少和輸入的分片數(shù), 以及采樣數(shù)有關(guān)
*/
class RangePartitioner[K : Ordering : ClassTag, V](
// 期望的分片
partitions: Int,
// 這里對(duì)RDD進(jìn)行了約束
rdd: RDD[_ <: Product2[K, V]],
// 默認(rèn)正序
private var ascending: Boolean = true,
// 默認(rèn)采樣20
val samplePointsPerPartitionHint: Int = 20)
extends Partitioner {
// 構(gòu)造函數(shù)
def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = ...
// 條件檢查
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
require(samplePointsPerPartitionHint > 0,
s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")
private var ordering = implicitly[Ordering[K]]
// 計(jì)算每個(gè)Partition應(yīng)該存儲(chǔ)的Range
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// 確定最大取樣數(shù), 封頂1M
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// 假設(shè)Dependecy RDD中各個(gè)partition里的items數(shù)量是大致相同的, 采用常規(guī)的采樣
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// 如果分片數(shù)據(jù)傾斜的太嚴(yán)重, 就需要對(duì)這個(gè)分片做重新采樣
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}
def numPartitions: Int = rangeBounds.length + 1
// 利用二分查找用來(lái)快速的定位分片
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int =
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// 直接順序查找
} else {
// 利用二分查找尋找partition, 在實(shí)現(xiàn)中需要考慮幾個(gè)細(xì)節(jié): 小于第一個(gè)分片的range, 大于最后一個(gè)分片的range, 以及倒序排列.
}
}