本文將通過描述 Spark RDD 的五大核心要素來描述 RDD魂奥,若希望更全面了解 RDD 的知識焕刮,請移步 RDD 論文:RDD:基于內(nèi)存的集群計算容錯抽象
Spark 的五大核心要素包括:
- partition
- partitioner
- compute func
- dependency
- preferredLocation
下面一一來介紹
(一): partition
partition 個數(shù)怎么定
RDD 由若干個 partition 組成关炼,共有三種生成方式:
- 從 Scala 集合中創(chuàng)建围来,通過調(diào)用
SparkContext#makeRDD
或SparkContext#parallelize
- 加載外部數(shù)據(jù)來創(chuàng)建 RDD痛垛,例如從 HDFS 文件、mysql 數(shù)據(jù)庫讀取數(shù)據(jù)等
- 由其他 RDD 執(zhí)行 transform 操作轉(zhuǎn)換而來
那么粹淋,在使用上述方法生成 RDD 的時候吸祟,會為 RDD 生成多少個 partition 呢?一般來說桃移,加載 Scala 集合或外部數(shù)據(jù)來創(chuàng)建 RDD 時屋匕,是可以指定 partition 個數(shù)的,若指定了具體值借杰,那么 partition 的個數(shù)就等于該值过吻,比如:
val rdd1 = sc.makeRDD( scalaSeqData, 3 ) //< 指定 partition 數(shù)為3
val rdd2 = sc.textFile( hdfsFilePath, 10 ) //< 指定 partition 數(shù)為10
若沒有指定具體的 partition 數(shù)時的 partition 數(shù)為多少呢?
- 對于從 Scala 集合中轉(zhuǎn)換而來的 RDD:默認的 partition 數(shù)為 defaultParallelism蔗衡,該值在不同的部署模式下不同:
- Local 模式:本機 cpu cores 的數(shù)量
- Mesos 模式:8
- Yarn:max(2, 所有 executors 的 cpu cores 個數(shù)總和)
- 對于從外部數(shù)據(jù)加載而來的 RDD:默認的 partition 數(shù)為
min(defaultParallelism, 2)
- 對于執(zhí)行轉(zhuǎn)換操作而得到的 RDD:視具體操作而定纤虽,如 map 得到的 RDD 的 partition 數(shù)與 父 RDD 相同乳绕;union 得到的 RDD 的 partition 數(shù)為父 RDDs 的 partition 數(shù)之和...
partition 的定義
我們常說,partition 是 RDD 的數(shù)據(jù)單位廓推,代表了一個分區(qū)的數(shù)據(jù)刷袍。但這里千萬不要搞錯了,partition 是邏輯概念樊展,是代表了一個分片的數(shù)據(jù),而不是包含或持有一個分片的數(shù)據(jù)堆生。
真正直接持有數(shù)據(jù)的是各個 partition 對應的迭代器专缠,要再次注意的是,partition 對應的迭代器訪問數(shù)據(jù)時也不是把整個分區(qū)的數(shù)據(jù)一股腦加載持有淑仆,而是像常見的迭代器一樣一條條處理涝婉。舉個例子,我們把 HDFS 上10G 的文件加載到 RDD 做處理時蔗怠,并不會消耗10G 的空間墩弯,如果沒有 shuffle 操作(shuffle 操作會持有較多數(shù)據(jù)在內(nèi)存),那么這個操作的內(nèi)存消耗是非常小的寞射,因為在每個 task 中都是一條條處理處理的渔工,在某一時刻只會持有一條數(shù)據(jù)。這也是初學者常有的理解誤區(qū)桥温,一定要注意 Spark 是基于內(nèi)存的計算引矩,但不會傻到什么時候都把所有數(shù)據(jù)全放到內(nèi)存。
讓我們來看看 Partition 的定義幫助理解:
trait Partition extends Serializable {
def index: Int
override def hashCode(): Int = index
}
在 trait Partition 中僅包含返回其索引的 index 方法侵浸。很多具體的 RDD 也會有自己實現(xiàn)的 partition旺韭,比如:
KafkaRDDPartition 提供了獲取 partition 所包含的 kafka msg 條數(shù)的方法
class KafkaRDDPartition(
val index: Int,
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long,
val host: String,
val port: Int
) extends Partition {
/** Number of messages this partition refers to */
def count(): Long = untilOffset - fromOffset
}
UnionRDD 的 partition 類 UnionPartition 提供了獲取依賴的父 partition 及獲取優(yōu)先位置的方法
private[spark] class UnionPartition[T: ClassTag](
idx: Int,
@transient private val rdd: RDD[T],
val parentRddIndex: Int,
@transient private val parentRddPartitionIndex: Int)
extends Partition {
var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)
override val index: Int = idx
}
partition 與 iterator 方法
RDD 的 def iterator(split: Partition, context: TaskContext): Iterator[T]
方法用來獲取 split 指定的 Partition 對應的數(shù)據(jù)的迭代器,有了這個迭代器就能一條一條取出數(shù)據(jù)來按 compute chain 來執(zhí)行一個個transform 操作掏觉。iterator 的實現(xiàn)如下:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
def 前加了 final 說明該函數(shù)是不能被子類重寫的区端,其先判斷 RDD 的 storageLevel 是否為 NONE,若不是澳腹,則嘗試從緩存中讀取织盼,讀取不到則通過計算來獲取該 Partition 對應的數(shù)據(jù)的迭代器;若是遵湖,嘗試從 checkpoint 中獲取 Partition 對應數(shù)據(jù)的迭代器悔政,若 checkpoint 不存在則通過計算來獲取。
剛剛介紹了如果從 cache 或者 checkpoint 無法獲得 Partition 對應的數(shù)據(jù)的迭代器延旧,則需要通過計算來獲取拉队,這將會調(diào)用到 def compute(split: Partition, context: TaskContext): Iterator[T]
方法蔓姚,各個 RDD 最大的不同也體現(xiàn)在該方法中。后文會詳細介紹該方法
(二): partitioner
partitioner 即分區(qū)器快集,說白了就是決定 RDD 的每一條消息應該分到哪個分區(qū)。但只有 k, v 類型的 RDD 才能有 partitioner(當然泰鸡,非 key, value 類型的 RDD 的 partitioner 為 None。
partitioner 為 None 的 RDD 的 partition 的數(shù)據(jù)要么對應數(shù)據(jù)源的某一段數(shù)據(jù)缅糟,要么來自對父 RDDs 的 partitions 的處理結果。
我們先來看看 Partitioner 的定義及注釋說明:
abstract class Partitioner extends Serializable {
//< 返回 partition 數(shù)量
def numPartitions: Int
//< 返回 key 應該屬于哪個 partition
def getPartition(key: Any): Int
}
Partitioner 共有兩種實現(xiàn)祷愉,分別是 HashPartitioner 和 RangePartitioner
HashPartitioner
先來看 HashPartitioner 的實現(xiàn)(省去部分代碼):
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
...
}
// x 對 mod 求于窗宦,若結果為正,則返回該結果二鳄;若結果為負赴涵,返回結果加上 mod
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
numPartitions
直接返回主構造函數(shù)中傳入的 partitions 參數(shù),之前在有本書里看到說 Partitioner 不僅決定了一條 record 應該屬于哪個 partition订讼,還決定了 partition 的數(shù)量髓窜,其實這句話的后半段的有誤的,Partitioner 并不能決定一個 RDD 的 partition 數(shù)欺殿,Partitioner 方法返回的 partition 數(shù)是直接返回外部傳入的值寄纵。
getPartition
方法也不復雜,主要做了:
- 為參數(shù) key 計算一個 hash 值
- 若該哈希值對 partition 個數(shù)取余結果為正脖苏,則該結果即該 key 歸屬的 partition index程拭;否則,以該結果加上 partition 個數(shù)為 partition index
從上面的分析來看帆阳,當 key哺壶, value 類型的 RDD 的 key 的 hash 值分布不均勻時,會導致各個 partition 的數(shù)據(jù)量不均勻蜒谤,極端情況下一個 partition 會持有整個 RDD 的數(shù)據(jù)而其他 partition 則不包含任何數(shù)據(jù)山宾,這顯然不是我們希望看到的,這時就需要 RangePartitioner 出馬了鳍徽。
RangePartitioner
上文也提到了资锰,HashPartitioner 可能會導致各個 partition 數(shù)據(jù)量相差很大的情況。這時阶祭,初衷為使各個 partition 數(shù)據(jù)分布盡量均勻的 RangePartitioner 便有了用武之地绷杜。
RangePartitioner 將一個范圍內(nèi)的數(shù)據(jù)映射到 partition,這樣兩個 partition 之間要么是一個 partition 的數(shù)據(jù)都比另外一個大濒募,或者小鞭盟。RangePartitioner采用水塘抽樣算法,比 HashPartitioner 耗時瑰剃,具體可見:Spark分區(qū)器HashPartitioner和RangePartitioner代碼詳解
歡迎關注我的微信公眾號:FunnyBigData