1. spark streaming Job 架構(gòu)和容錯解析

一.Spark streaming Job 架構(gòu)

SparkStreaming框架會自動啟動Job并每隔BatchDuration時間會自動觸發(fā)Job的調(diào)用粗截。

Spark Streaming的Job 分為兩大類:

  1. 每隔BatchInterval時間片就會產(chǎn)生的一個個Job极祸,這里的Job并不是Spark Core中的Job遥金,它只是基于DStreamGraph而生成的RDD的DAG而已蒜田;從Java角度講相當于Runnable接口的實現(xiàn)類冲粤,要想運行Job需要將Job提交給JobScheduler,在JobScheduler內(nèi)部會通過線程池的方式創(chuàng)建運行Job的一個個線程厢呵,當找到一個空閑的線程后會將Job提交到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行)襟铭。為什么使用線程池呢?

    a. Job根據(jù)BatchInterval不斷生成,為了減少線程創(chuàng)建而帶來的效率提升我們需要使用線程池(這和在Executor中通過啟動線程池的方式來執(zhí)行Task有異曲同工之妙)暖呕;

    b. 如果Job的運行設(shè)置為FAIR公平調(diào)度的方式典徊,這個時候也需要多線程的支持咐汞;

  2. 上面Job提交的Spark Job本身碉考。單從這個時刻來看挺身,此次的Job和Spark core中的Job沒有任何的區(qū)別。

理解Spark Streaming的Job的整個架構(gòu)和運行機制對于精通Spark Streaming是至關(guān)重要的墙贱。

我們運行以下的程序惨撇,通過這個程序的運行過程進一步加深理解Spark Streaming流處理的Job的執(zhí)行的過程府寒,代碼如下:

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;

public class ConnectionPool {

private static LinkedList<Connection> connectionQueue;

static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
} 
}

public synchronized static Connection getConnection() {
try {
if(connectionQueue == null) {
connectionQueue = new LinkedList<Connection>();
for(int i = 0; i < 5; i++) {
Connection conn = DriverManager.getConnection(
"jdbc:mysql://Master:3306/sparkstreaming",
"root",
"778899..");
connectionQueue.push(conn); 
}
}
} catch (Exception e) {
e.printStackTrace();
}
return connectionQueue.poll();
}

public static void returnConnection(Connection conn) {
connectionQueue.push(conn); 
}
}

第二部分: 通過sparkstreaming 將網(wǎng)絡(luò)產(chǎn)生的數(shù)據(jù)進行統(tǒng)計統(tǒng)計,并將結(jié)果寫入mysql數(shù)據(jù)庫

object OnlineForeachRDD2DB {

  def main(args: Array[String]){

    /**

    * 第1步:創(chuàng)建Spark的配置對象SparkConf株搔,設(shè)置Spark程序的運行時的配置信息纤房,

     * 例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置

     * 為local,則代表Spark程序在本地運行捌刮,特別適合于機器配置條件非常差(例如

    * 只有1G的內(nèi)存)的初學者

     */

    val conf = new SparkConf() //創(chuàng)建SparkConf對象

    conf.setAppName("OnlineForeachRDD") //設(shè)置應用程序的名稱绅作,在程序運行的監(jiān)控界面可以看到名稱

    conf.setMaster("spark://Master:7077") //此時蛾派,程序在Spark集群

    conf.setMaster("local[6]")

    //設(shè)置batchDuration時間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口

    val ssc = new StreamingContext(conf, Seconds(5))

    val lines = ssc.socketTextStream("Master", 9999)

    val words = lines.flatMap(_.split(" "))

    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)



    wordCounts.foreachRDD{ rdd =>

      rdd.foreachPartition{ partitionOfRecords => {

        // ConnectionPool is a static, lazily initialized pool of connections

        val connection = ConnectionPool.getConnection()

        partitionOfRecords.foreach(record => {

          val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"

          val stmt = connection.createStatement();

          stmt.executeUpdate(sql);

        })

        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse

      }}

    }



    /**

      *  在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán)稍算,

      *  在JobScheduler的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker糊探,并且調(diào)用JobGenerator和

      *  ReceiverTacker的start方法:

      *  1河闰,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job

      *  2姜性,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動

      *  ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把

      *  數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker弃酌,在ReceiverTracker內(nèi)部會通過

      *  ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息每個BatchInterval會產(chǎn)生一個具體的Job妓湘,

      *  其實這里的Job不是Spark Core中所指的Job乌询,它只是基于DStreamGraph而生成的RDD的DAG

      *  而已,從Java角度講唬党,相當于Runnable接口實例驶拱,此時要想運行Job需要提交給JobScheduler沮趣,

      *  在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中

      *  基于RDD的Action觸發(fā)真正的作業(yè)的運行)房铭,

      *  為什么使用線程池呢温眉?

      *  1类溢,作業(yè)不斷生成露懒,所以為了提升效率懈词,我們需要線程池辩诞;這和在Executor中通過線程池執(zhí)行Task

      *  有異曲同工之妙译暂;

      *  2,有可能設(shè)置了Job的FAIR公平調(diào)度的方式崎脉,這個時候也需要多線程的支持伯顶。

      */

    ssc.start()

    ssc.awaitTermination()

  }

}

代碼中以注釋的方式描述了Spakr job 啟動的過程,下面結(jié)合源碼做進一步分析:
StreamingContext的start()方法:

/**
 * Start the execution of the streams.
 *
 * @throws IllegalStateException if the StreamingContext is already stopped.
 */
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()

          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
            savedProperties.set(SerializationUtils.clone(
              sparkContext.localProperties.get()).asInstanceOf[Properties])
          scheduler.start()
          }
          state = StreamingContextState.ACTIVE
        } catch {
          case NonFatal(e) =>
            logError("Error starting the context, marking it as stopped", e)
            scheduler.stop(false)
            state = StreamingContextState.STOPPED
            throw e
        }
        StreamingContext.setActiveContext(this)
      }
      shutdownHookRef = ShutdownHookManager.addShutdownHook(
        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
      // Registering Streaming Metrics at the start of the StreamingContext
      assert(env.metricsSystem != null)
      env.metricsSystem.registerSource(streamingSource)
      uiTab.foreach(_.attach())
      logInfo("StreamingContext started")
    case ACTIVE =>
      logWarning("StreamingContext has already been started")
    case STOPPED =>
      throw new IllegalStateException("StreamingContext has already been stopped")
  }
}

可以看到StreamingContext的start()的方法中調(diào)用了scheduler.start(),其scheduler 是JobScheduler的對象,該對象在StreamingContext創(chuàng)建是被實例化:

private[streaming] val scheduler = new JobScheduler(this)

接下來在JobScheduler.start()內(nèi)部實例化EventLoop,并執(zhí)行EventLoop.start()進行消息循環(huán)汪厨,在JobScheduler.start()內(nèi)部構(gòu)造ReceiverTacker劫乱,并且調(diào)用JobGenerator和ReceiverTacker的start方法:



JobGenerator的start()方法中會調(diào)用startFirstTime()方法和restart()方法



最終調(diào)用generateJobs()方法不斷生成job:

ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動 ReceiverSupervisor)狭吼,在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息.過程如圖所示:



源碼如下:(注意紅色字體部分代碼)

每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job殖妇,它只是基于DStreamGraph而生成的RDD的DAG而已刁笙,從Java角度講,相當于Runnable接口實例谦趣,此時要想運行Job需要提交給JobScheduler疲吸, 在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中 基于RDD的Action觸發(fā)真正的作業(yè)的運行)

二 Spark Streaming Job容錯架構(gòu)和運行機制

Spark容錯分為:Driver級別的容錯和Executor級別的容錯。

  • 在Driver級別的容錯具體為DAG生成的模板前鹅,即DStreamGraph摘悴,RecevierTracker中存儲的元數(shù)據(jù)信息和JobScheduler中存儲的Job進行的進度情況等信息,只要通過checkpoint就可以了蹂喻,每個Job生成之前進行checkpoint葱椭,在Job生成之后再進行checkpoint,如果出錯的話就從checkpoint中恢復口四。
  • 在Executor級別的容錯具體為接收數(shù)據(jù)的安全性和任務(wù)執(zhí)行的安全性孵运。在接收數(shù)據(jù)安全性方面,一種方式是Spark Streaming接收到數(shù)據(jù)默認為MEMORY_AND_DISK_2的方式蔓彩,在兩臺機器的內(nèi)存中掐松,如果一臺機器上的Executor掛了,立即切換到另一臺機器上的Executor粪小,這種方式一般情況下非炒蠡牵可靠且沒有切換時間。另外一種方式是WAL(Write Ahead Log)探膊,在數(shù)據(jù)到來時先通過WAL機制將數(shù)據(jù)進行日志記錄杠愧,如果有問題則從日志記錄中恢復,然后再把數(shù)據(jù)存到Executor中逞壁,再進行其他副本的復制流济。WAL這種方式對性能有影響,在生產(chǎn)環(huán)境中不常用腌闯,一般使用Kafka存儲绳瘟,Spark Streaming接收到數(shù)據(jù)丟失時可以從Kafka中回放。在任務(wù)執(zhí)行的安全性方面姿骏,靠RDD的容錯糖声。

Spark Streaming的容錯機制是基于RDD的容錯機制。

主要表現(xiàn)為:
  1. checkpoint
  2. 基于血統(tǒng)(lineage)的高度容錯機制
  3. 出錯了之后會從出錯的位置從新計算分瘦,而不會導致重復計算

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蘸泻,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子嘲玫,更是在濱河造成了極大的恐慌悦施,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件去团,死亡現(xiàn)場離奇詭異抡诞,居然都是意外死亡,警方通過查閱死者的電腦和手機土陪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進店門昼汗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人旺坠,你說我怎么就攤上這事乔遮“绯” “怎么了取刃?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵蹋肮,是天一觀的道長。 經(jīng)常有香客問我璧疗,道長坯辩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任崩侠,我火速辦了婚禮,結(jié)果婚禮上却音,老公的妹妹穿的比我還像新娘。我一直安慰自己系瓢,他們只是感情好,可當我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布夷陋。 她就那樣靜靜地躺著欠拾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪藐窄。 梳的紋絲不亂的頭發(fā)上酬土,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天荆忍,我揣著相機與錄音,去河邊找鬼撤缴。 笑死东揣,一個胖子當著我的面吹牛腹泌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播凉袱,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼钟鸵!你這毒婦竟也來了涤躲?” 一聲冷哼從身側(cè)響起种樱,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎消恍,沒想到半個月后以现,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狠怨,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年茵汰,在試婚紗的時候發(fā)現(xiàn)自己被綠了孽鸡。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡豆胸,死狀恐怖巷疼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤骡尽,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布攀细,位于F島的核電站谭贪,受9級特大地震影響俭识,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜缚态,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望宙橱。 院中可真熱鬧师郑,春花似錦宝冕、人聲如沸地梨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至腰素,卻和暖如春雪营,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工征唬, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留总寒,地道東北人。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像乎完,于是被迫代替她去往敵國和親树姨。 傳聞我的和親對象是個殘疾皇子桥状,可洞房花燭夜當晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容