Spark RDD的處理過程

閑來無事肃拜,研究一下Spark Rdd的處理過程。
以一個(gè)簡(jiǎn)單的例子看看:

val textRDD = sc.textFile("/home/ubuntu/people.txt")
val filterRDD = textRDD.filter(_.startsWith("123"))
val mapRDD = filterRDD.map(line => (line, 1))
val reduceRDD = mapRDD.reduceByKey(_+_)
reduceRDD.foreach(println)

首先看第一行代碼。
val textRDD = sc.textFile("/home/ubuntu/people.txt")
看一下textFile的代碼:

def filter(f: T => Boolean): RDD[T] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[T, T](
    this,
    (context, pid, iter) => iter.filter(cleanF),
    preservesPartitioning = true)
}

可以看到,經(jīng)過filter尸疆,重新生成了一個(gè)RDD。

再看第二行:
val mapRDD = filterRDD.map(line => (line, 1))
看一下map的代碼:

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

同樣是生成一個(gè)RDD。
再看第三行:
val reduceRDD = mapRDD.reduceByKey(_+_)
這次reduceByKey的代碼定位到了PairRDDFunctions寿弱,也是重新生成了一個(gè)RDD

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  reduceByKey(defaultPartitioner(self), func)
}

在看最后一行
reduceRDD.foreach(println)
代碼定位到RDD.scala

def foreach(f: T => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

可以看到犯眠,最終調(diào)用了SparkContext的runJob,看一下runJob的代碼:

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(fund)
 logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()}

大致可以看出症革,最終由DAGScheduler來執(zhí)行了任務(wù)阔逼。
再看看DAGScheduler的runJob

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
  // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
  // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
  // safe to pass in null here. For more detail, see SPARK-13747.
  val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
  waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
  waiter.completionFuture.value.get match {
    case scala.util.Success(_) =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case scala.util.Failure(exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}

可以看到,DAGScheduler.runJob將任務(wù)提交后地沮,就等待任務(wù)執(zhí)行完成。
再看看是如何提交任務(wù)的:

def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }

  val jobId = nextJobId.getAndIncrement()
  if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }

  assert(partitions.size > 0)
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
  waiter
}

大致邏輯羡亩,找到空閑的partitions摩疑,然后將任務(wù)提交上去。
暫時(shí)就看到這里畏铆,下一篇再做更深入的研究雷袋。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市辞居,隨后出現(xiàn)的幾起案子楷怒,更是在濱河造成了極大的恐慌,老刑警劉巖瓦灶,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸠删,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡贼陶,警方通過查閱死者的電腦和手機(jī)刃泡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來碉怔,“玉大人烘贴,你說我怎么就攤上這事〈殡剩” “怎么了桨踪?”我有些...
    開封第一講書人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)芹啥。 經(jīng)常有香客問我锻离,道長(zhǎng),這世上最難降的妖魔是什么叁征? 我笑而不...
    開封第一講書人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任纳账,我火速辦了婚禮,結(jié)果婚禮上捺疼,老公的妹妹穿的比我還像新娘疏虫。我一直安慰自己,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開白布卧秘。 她就那樣靜靜地躺著呢袱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪翅敌。 梳的紋絲不亂的頭發(fā)上羞福,一...
    開封第一講書人閱讀 52,475評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音蚯涮,去河邊找鬼治专。 笑死,一個(gè)胖子當(dāng)著我的面吹牛遭顶,可吹牛的內(nèi)容都是我干的张峰。 我是一名探鬼主播,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了昭灵?” 一聲冷哼從身側(cè)響起荞估,我...
    開封第一講書人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡敌厘,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了朽合。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片额湘。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖旁舰,靈堂內(nèi)的尸體忽然破棺而出锋华,到底是詐尸還是另有隱情,我是刑警寧澤箭窜,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布毯焕,位于F島的核電站,受9級(jí)特大地震影響磺樱,放射性物質(zhì)發(fā)生泄漏纳猫。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一竹捉、第九天 我趴在偏房一處隱蔽的房頂上張望芜辕。 院中可真熱鬧,春花似錦块差、人聲如沸侵续。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)状蜗。三九已至需五,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間轧坎,已是汗流浹背宏邮。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留缸血,地道東北人蜜氨。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像捎泻,于是被迫代替她去往敵國(guó)和親记劝。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容