Spark 核心 RDD 剖析(上)

本文將通過描述 Spark RDD 的五大核心要素來描述 RDD魂奥,若希望更全面了解 RDD 的知識焕刮,請移步 RDD 論文:RDD:基于內(nèi)存的集群計算容錯抽象

Spark 的五大核心要素包括:

  • partition
  • partitioner
  • compute func
  • dependency
  • preferredLocation

下面一一來介紹

(一): partition

partition 個數(shù)怎么定

RDD 由若干個 partition 組成关炼,共有三種生成方式:

  • 從 Scala 集合中創(chuàng)建围来,通過調(diào)用 SparkContext#makeRDDSparkContext#parallelize
  • 加載外部數(shù)據(jù)來創(chuàng)建 RDD痛垛,例如從 HDFS 文件、mysql 數(shù)據(jù)庫讀取數(shù)據(jù)等
  • 由其他 RDD 執(zhí)行 transform 操作轉(zhuǎn)換而來

那么粹淋,在使用上述方法生成 RDD 的時候吸祟,會為 RDD 生成多少個 partition 呢?一般來說桃移,加載 Scala 集合或外部數(shù)據(jù)來創(chuàng)建 RDD 時屋匕,是可以指定 partition 個數(shù)的,若指定了具體值借杰,那么 partition 的個數(shù)就等于該值过吻,比如:

val rdd1 = sc.makeRDD( scalaSeqData, 3 )    //< 指定 partition 數(shù)為3
val rdd2 = sc.textFile( hdfsFilePath, 10 )  //< 指定 partition 數(shù)為10

若沒有指定具體的 partition 數(shù)時的 partition 數(shù)為多少呢?

  • 對于從 Scala 集合中轉(zhuǎn)換而來的 RDD:默認的 partition 數(shù)為 defaultParallelism蔗衡,該值在不同的部署模式下不同:
    • Local 模式:本機 cpu cores 的數(shù)量
    • Mesos 模式:8
    • Yarn:max(2, 所有 executors 的 cpu cores 個數(shù)總和)
  • 對于從外部數(shù)據(jù)加載而來的 RDD:默認的 partition 數(shù)為 min(defaultParallelism, 2)
  • 對于執(zhí)行轉(zhuǎn)換操作而得到的 RDD:視具體操作而定纤虽,如 map 得到的 RDD 的 partition 數(shù)與 父 RDD 相同乳绕;union 得到的 RDD 的 partition 數(shù)為父 RDDs 的 partition 數(shù)之和...

partition 的定義

我們常說,partition 是 RDD 的數(shù)據(jù)單位廓推,代表了一個分區(qū)的數(shù)據(jù)刷袍。但這里千萬不要搞錯了,partition 是邏輯概念樊展,是代表了一個分片的數(shù)據(jù),而不是包含或持有一個分片的數(shù)據(jù)堆生。

真正直接持有數(shù)據(jù)的是各個 partition 對應的迭代器专缠,要再次注意的是,partition 對應的迭代器訪問數(shù)據(jù)時也不是把整個分區(qū)的數(shù)據(jù)一股腦加載持有淑仆,而是像常見的迭代器一樣一條條處理涝婉。舉個例子,我們把 HDFS 上10G 的文件加載到 RDD 做處理時蔗怠,并不會消耗10G 的空間墩弯,如果沒有 shuffle 操作(shuffle 操作會持有較多數(shù)據(jù)在內(nèi)存),那么這個操作的內(nèi)存消耗是非常小的寞射,因為在每個 task 中都是一條條處理處理的渔工,在某一時刻只會持有一條數(shù)據(jù)。這也是初學者常有的理解誤區(qū)桥温,一定要注意 Spark 是基于內(nèi)存的計算引矩,但不會傻到什么時候都把所有數(shù)據(jù)全放到內(nèi)存。

讓我們來看看 Partition 的定義幫助理解:

trait Partition extends Serializable {
  def index: Int

  override def hashCode(): Int = index
}

在 trait Partition 中僅包含返回其索引的 index 方法侵浸。很多具體的 RDD 也會有自己實現(xiàn)的 partition旺韭,比如:

KafkaRDDPartition 提供了獲取 partition 所包含的 kafka msg 條數(shù)的方法

class KafkaRDDPartition(
  val index: Int,
  val topic: String,
  val partition: Int,
  val fromOffset: Long,
  val untilOffset: Long,
  val host: String,
  val port: Int
) extends Partition {
  /** Number of messages this partition refers to */
  def count(): Long = untilOffset - fromOffset
}

UnionRDD 的 partition 類 UnionPartition 提供了獲取依賴的父 partition 及獲取優(yōu)先位置的方法

private[spark] class UnionPartition[T: ClassTag](
    idx: Int,
    @transient private val rdd: RDD[T],
    val parentRddIndex: Int,
    @transient private val parentRddPartitionIndex: Int)
  extends Partition {

  var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)

  def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)

  override val index: Int = idx
}

partition 與 iterator 方法

RDD 的 def iterator(split: Partition, context: TaskContext): Iterator[T] 方法用來獲取 split 指定的 Partition 對應的數(shù)據(jù)的迭代器,有了這個迭代器就能一條一條取出數(shù)據(jù)來按 compute chain 來執(zhí)行一個個transform 操作掏觉。iterator 的實現(xiàn)如下:

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

def 前加了 final 說明該函數(shù)是不能被子類重寫的区端,其先判斷 RDD 的 storageLevel 是否為 NONE,若不是澳腹,則嘗試從緩存中讀取织盼,讀取不到則通過計算來獲取該 Partition 對應的數(shù)據(jù)的迭代器;若是遵湖,嘗試從 checkpoint 中獲取 Partition 對應數(shù)據(jù)的迭代器悔政,若 checkpoint 不存在則通過計算來獲取。

剛剛介紹了如果從 cache 或者 checkpoint 無法獲得 Partition 對應的數(shù)據(jù)的迭代器延旧,則需要通過計算來獲取拉队,這將會調(diào)用到 def compute(split: Partition, context: TaskContext): Iterator[T] 方法蔓姚,各個 RDD 最大的不同也體現(xiàn)在該方法中。后文會詳細介紹該方法

(二): partitioner

partitioner 即分區(qū)器快集,說白了就是決定 RDD 的每一條消息應該分到哪個分區(qū)。但只有 k, v 類型的 RDD 才能有 partitioner(當然泰鸡,非 key, value 類型的 RDD 的 partitioner 為 None。

partitioner 為 None 的 RDD 的 partition 的數(shù)據(jù)要么對應數(shù)據(jù)源的某一段數(shù)據(jù)缅糟,要么來自對父 RDDs 的 partitions 的處理結果。

我們先來看看 Partitioner 的定義及注釋說明:

abstract class Partitioner extends Serializable {
  //< 返回 partition 數(shù)量
  def numPartitions: Int
  //< 返回 key 應該屬于哪個 partition
  def getPartition(key: Any): Int
}

Partitioner 共有兩種實現(xiàn)祷愉,分別是 HashPartitioner 和 RangePartitioner

HashPartitioner

先來看 HashPartitioner 的實現(xiàn)(省去部分代碼):

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  ...
}

// x 對 mod 求于窗宦,若結果為正,則返回該結果二鳄;若結果為負赴涵,返回結果加上 mod
def nonNegativeMod(x: Int, mod: Int): Int = {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

numPartitions 直接返回主構造函數(shù)中傳入的 partitions 參數(shù),之前在有本書里看到說 Partitioner 不僅決定了一條 record 應該屬于哪個 partition订讼,還決定了 partition 的數(shù)量髓窜,其實這句話的后半段的有誤的,Partitioner 并不能決定一個 RDD 的 partition 數(shù)欺殿,Partitioner 方法返回的 partition 數(shù)是直接返回外部傳入的值寄纵。

getPartition 方法也不復雜,主要做了:

  1. 為參數(shù) key 計算一個 hash 值
  2. 若該哈希值對 partition 個數(shù)取余結果為正脖苏,則該結果即該 key 歸屬的 partition index程拭;否則,以該結果加上 partition 個數(shù)為 partition index

從上面的分析來看帆阳,當 key哺壶, value 類型的 RDD 的 key 的 hash 值分布不均勻時,會導致各個 partition 的數(shù)據(jù)量不均勻蜒谤,極端情況下一個 partition 會持有整個 RDD 的數(shù)據(jù)而其他 partition 則不包含任何數(shù)據(jù)山宾,這顯然不是我們希望看到的,這時就需要 RangePartitioner 出馬了鳍徽。

RangePartitioner

上文也提到了资锰,HashPartitioner 可能會導致各個 partition 數(shù)據(jù)量相差很大的情況。這時阶祭,初衷為使各個 partition 數(shù)據(jù)分布盡量均勻的 RangePartitioner 便有了用武之地绷杜。

RangePartitioner 將一個范圍內(nèi)的數(shù)據(jù)映射到 partition,這樣兩個 partition 之間要么是一個 partition 的數(shù)據(jù)都比另外一個大濒募,或者小鞭盟。RangePartitioner采用水塘抽樣算法,比 HashPartitioner 耗時瑰剃,具體可見:Spark分區(qū)器HashPartitioner和RangePartitioner代碼詳解


歡迎關注我的微信公眾號:FunnyBigData

FunnyBigData
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末齿诉,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌粤剧,老刑警劉巖歇竟,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異抵恋,居然都是意外死亡焕议,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門弧关,熙熙樓的掌柜王于貴愁眉苦臉地迎上來盅安,“玉大人,你說我怎么就攤上這事世囊】矶眩” “怎么了?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵茸习,是天一觀的道長。 經(jīng)常有香客問我壁肋,道長号胚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任浸遗,我火速辦了婚禮猫胁,結果婚禮上,老公的妹妹穿的比我還像新娘跛锌。我一直安慰自己弃秆,他們只是感情好,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布髓帽。 她就那樣靜靜地躺著菠赚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪郑藏。 梳的紋絲不亂的頭發(fā)上衡查,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機與錄音必盖,去河邊找鬼拌牲。 笑死,一個胖子當著我的面吹牛歌粥,可吹牛的內(nèi)容都是我干的塌忽。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼失驶,長吁一口氣:“原來是場噩夢啊……” “哼土居!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤装盯,失蹤者是張志新(化名)和其女友劉穎坷虑,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體埂奈,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡迄损,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了账磺。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片芹敌。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖垮抗,靈堂內(nèi)的尸體忽然破棺而出氏捞,到底是詐尸還是另有隱情,我是刑警寧澤冒版,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布液茎,位于F島的核電站,受9級特大地震影響辞嗡,放射性物質(zhì)發(fā)生泄漏捆等。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一续室、第九天 我趴在偏房一處隱蔽的房頂上張望栋烤。 院中可真熱鬧,春花似錦挺狰、人聲如沸明郭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽薯定。三九已至,卻和暖如春趁耗,著一層夾襖步出監(jiān)牢的瞬間沉唠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工苛败, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留满葛,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓罢屈,卻偏偏與公主長得像嘀韧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子缠捌,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容