spark streaming源碼解讀之job動態(tài)生成和深度思考

? ? ? 輸入的ds有很多來源Kafka刃泌、Socket芋酌、Flume塘偎,輸出的DStream其實是邏輯級別的Action虎韵,是Spark Streaming框架提出的易稠,其底層翻譯成為物理級別的額Action,是RDD的Action包蓝,中間是處理過程是transformations驶社,狀態(tài)轉換也就是業(yè)務處理邏輯的過程。

? ? ? ?Spark Streaming二種數(shù)據(jù)來源:

? ? ? ? ? ? ?1测萎、基于DStream數(shù)據(jù)源亡电。

? ? ? ? ? ? ?2、基于其他DStream產生的數(shù)據(jù)源硅瞧。

? ? ? 關鍵性的觀點:做大數(shù)據(jù)的時候不是流失處理份乒,一般會有定時任務,定時任務一般十分鐘觸發(fā)一次零酪、一天觸發(fā)一次冒嫡,做大數(shù)據(jù)的定時任務就是流失處理的感覺,雖然不規(guī)范四苇,一切和流處理沒有關系的數(shù)據(jù)都是沒有價值的孝凌。即使做批處理或數(shù)據(jù)挖掘其實也是在做數(shù)據(jù)流處理,只不過是隱形的流處理月腋,所有的數(shù)據(jù)都會變成流處理蟀架。

? ? ? 所以就有統(tǒng)一的抽象,所有處理都是流處理的方式榆骚,所有的處理都將會被納入流處理片拍。企業(yè)大型開發(fā)項目都有j2ee后臺支撐來提供各種人操作大數(shù)據(jù)中心。

? ? ? Spark streaming程序入口就有batchDuration時間窗口妓肢,每隔五秒鐘JobGeneration都會產生一個job捌省,這個job是邏輯級別的,所以邏輯級別要有這個job碉钠,并且這個job該琢磨做纲缓,但環(huán)沒有做,由底層物理級別的action去做喊废,底層物理級別是基于rdd的依賴關系祝高。Ds的action操作也是邏輯級別的。Ss根據(jù)axtion操作產生邏輯級別的job污筷,但是不會運行工闺,就相當線程runnable接口。邏輯級別的暫時沒有身材物理級別的,所以可以去調度和優(yōu)化陆蟆,假設講ds的操作翻譯成rdd的action雷厂,最后一個操作是rdd的action操作,是不是已翻譯就立即觸發(fā)job遍搞,紀要完成翻譯又不要生成job的話需要

JavaStreamingContext jsc =newJavaStreamingContext(conf, Durations.seconds(5));

下面主要從三個類進行解析:

1罗侯、JobGenerator類:根據(jù)batchDuration及內部默認的時間間隔生成Jobs;

2溪猿、JobScheduler:根據(jù)batchDuration負責Spark Streaming Job的調度钩杰;

3、ReceiverTracker:負責Driver端元數(shù)據(jù)的接收和啟動executor中的接收數(shù)據(jù)線程诊县;

1讲弄、JobGenerator類:

**

* This class generates jobs from DStreams as well as drives checkpointing and cleaning

* up DStream metadata.

*/

private[streaming]

classJobGenerator(jobScheduler: JobScheduler)extendsLogging {

注釋說基于dsg產生數(shù)據(jù)源,JobGenerator隨著時間推移產生很多jobs依痊,ss中除了定時身材的job避除,患有其他方式身材的job,例如基于各種聚合和狀態(tài)的操作胸嘁,狀態(tài)操作不是基于batchd瓶摆,他會對很多btchd處理。為了窗口之類的操作會觸發(fā)JobGenerator性宏,元素局的清理群井,作業(yè)生成的類。

// eventLoop is created when generator starts.

// This not being null means the scheduler has been started and not stopped

private var eventLoop: EventLoop[JobGeneratorEvent] = null //消息循環(huán)體定義

// last batch whose completion,checkpointing and metadata cleanup has been completed

private var lastProcessedBatch: Time = null

/** Start generation of jobs */

def start(): Unit = synchronized {

if (eventLoop != null) return// generator has already been started

// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.

// See SPARK-10125

checkpointWriter //執(zhí)行checkpoint檢查點

eventLoop = newEventLoop[JobGeneratorEvent]("JobGenerator") {//內部匿名類創(chuàng)建

override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) //事件處理邏輯

override protected def onError(e: Throwable): Unit = {

jobScheduler.reportError("Error in job generator", e)

}

}

eventLoop.start()//啟動事件處理線程對隊列事件進行處理

if (ssc.isCheckpointPresent) {

restart()

} else {

startFirstTime()

}

}

/**

* An event loop to receive events from the caller and process all events in the event thread. It

* will start an exclusive event thread to process all events.

*

* Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can

* handle events in time to avoid the potential OOM.

*/

private[spark] abstract classEventLoop[E](name: String) extends Logging {

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()//消息隊列數(shù)據(jù)結構

private val stopped = new AtomicBoolean(false)//原子變量

private val eventThread = new Thread(name) {//封裝線程對象

setDaemon(true) //后臺為線程

override def run(): Unit = { //線程執(zhí)行邏輯

try {

while (!stopped.get) {

val event = eventQueue.take() //從消息隊列中逐一獲取消息對象

try {

onReceive(event) //對獲取的消息對象進行業(yè)務處理

} catch {

case NonFatal(e) => { //處理失敗后回調錯誤邏輯執(zhí)代碼

try {

onError(e)

} catch {

case NonFatal(e) => logError("Unexpected error in " + name, e)

}

}

}

}

} catch {

case ie: InterruptedException => // exit even if eventQueue is not empty

case NonFatal(e) => logError("Unexpected error in " + name, e)

}

}

}

def start(): Unit = { //啟動當前線程類

if (stopped.get) {

throw new IllegalStateException(name + " has already been stopped")

}

// Call onStart before starting the event thread to make sure it happens before onReceive

onStart()

eventThread.start() //啟動當前線程類業(yè)務run方法的執(zhí)行

}

/** Processes all events */

private defprocessEvent(event: JobGeneratorEvent) {//根據(jù)消息對象執(zhí)行相應的處理業(yè)務代碼

logDebug("Got event " + event)

event match {

case GenerateJobs(time) => generateJobs(time) //根據(jù)時間片生成Jobs

case ClearMetadata(time) => clearMetadata(time) //時間片內的Jobs執(zhí)行完畢毫胜,清理Driver上的元數(shù)據(jù)

case DoCheckpoint(time, clearCheckpointDataLater) =>//時間片內的Jobs執(zhí)行完畢书斜,清理checkpint數(shù)據(jù)

doCheckpoint(time, clearCheckpointDataLater)

case ClearCheckpointData(time) => clearCheckpointData(time)

}

}

2、JobSchedule類:

/**

* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate

* the jobs and runs them using a thread pool.

*/

private[streaming]

class JobScheduler(val ssc: StreamingContext) extends Logging {

// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff

// https://gist.github.com/AlainODea/1375759b8720a3f9f094

private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]//在指定的時間片內生成Jobs集合數(shù)據(jù)結構

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)

private val jobExecutor =

ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")//啟動指定大小的線程池

private val jobGenerator = new JobGenerator(this)//啟動JobGenerator對象

val clock = jobGenerator.clock //jobGenerator時鐘

val listenerBus = new StreamingListenerBus() //linstenerBus消息總線

// These two are created only when scheduler starts.

// eventLoop not being null means the scheduler has been started and not stopped

var receiverTracker: ReceiverTracker = null //driver端的元數(shù)據(jù)接收跟蹤器

// A tracker to track all the input stream information as well as processed record number

var inputInfoTracker: InputInfoTracker = null //輸入流信息跟蹤器

private var eventLoop: EventLoop[JobSchedulerEvent] = null //消息循環(huán)體對象

def start(): Unit = synchronized { JobScheudler類啟動主方法

if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")

eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {

override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)

}

eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates

for {

inputDStream <- ssc.graph.getInputStreams? //數(shù)據(jù)流

rateController <- inputDStream.rateController //數(shù)據(jù)接收平率控制

} ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext) //啟動消息總線

receiverTracker = new ReceiverTracker(ssc) //創(chuàng)建接收器對象

inputInfoTracker = new InputInfoTracker(ssc) //創(chuàng)建數(shù)據(jù)輸入對象

receiverTracker.start() //啟動數(shù)據(jù)接收器線程

jobGenerator.start() //啟動jobs產生器線程

logInfo("Started JobScheduler")

}

def submitJobSet(jobSet: JobSet) {

if (jobSet.jobs.isEmpty) {

logInfo("No jobs added for time " + jobSet.time)

} else {

listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

jobSets.put(jobSet.time, jobSet)

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

logInfo("Added jobs for time " + jobSet.time)

}

}

private def handleJobStart(job: Job, startTime: Long) {

val jobSet = jobSets.get(job.time)

val isFirstJobOfJobSet = !jobSet.hasStarted

jobSet.handleJobStart(job)

if (isFirstJobOfJobSet) {

// "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the

// correct "jobSet.processingStartTime".

listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))

}

job.setStartTime(startTime)

listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))

logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)

}

private class JobHandler(job: Job) extends Runnable with Logging {

import JobScheduler._

def run() {

try {

val formattedTime = UIUtils.formatBatchTime(

job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)

val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"

val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

ssc.sc.setJobDescription(

s"""Streaming job from $batchLinkText""")

ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)

ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable. Otherwise, because

// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then

// it's possible that when `post` is called, `eventLoop` happens to null.

var _eventLoop = eventLoop

if (_eventLoop != null) {

_eventLoop.post(JobStarted(job, clock.getTimeMillis()))

// Disable checks for existing output directories in jobs launched by the streaming

// scheduler, since we may need to write output to an existing directory during checkpoint

// recovery; see SPARK-4835 for more details.

PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

job.run()

}

_eventLoop = eventLoop

if (_eventLoop != null) {

_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))

}

} else {

// JobScheduler has been stopped.

}

} finally {

ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)

ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)

}

}

}

}

備注:

資料來源于:DT_大數(shù)據(jù)夢工廠(Spark發(fā)行版本定制)

更多私密內容酵使,請關注微信公眾號:DT_Spark

如果您對大數(shù)據(jù)Spark感興趣荐吉,可以免費聽由王家林老師每天晚上20:00開設的Spark永久免費公開課,地址YY房間號:68917580

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末口渔,一起剝皮案震驚了整個濱河市样屠,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌缺脉,老刑警劉巖瞧哟,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異枪向,居然都是意外死亡,警方通過查閱死者的電腦和手機咧党,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進店門秘蛔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事深员「喝洌” “怎么了?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵倦畅,是天一觀的道長遮糖。 經(jīng)常有香客問我,道長叠赐,這世上最難降的妖魔是什么欲账? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮芭概,結果婚禮上赛不,老公的妹妹穿的比我還像新娘。我一直安慰自己罢洲,他們只是感情好踢故,可當我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著惹苗,像睡著了一般忘闻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瞒斩,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天抒蚜,我揣著相機與錄音,去河邊找鬼触机。 笑死帚戳,一個胖子當著我的面吹牛,可吹牛的內容都是我干的儡首。 我是一名探鬼主播片任,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蔬胯!你這毒婦竟也來了对供?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤氛濒,失蹤者是張志新(化名)和其女友劉穎产场,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體舞竿,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡京景,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了骗奖。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片确徙。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡醒串,死狀恐怖,靈堂內的尸體忽然破棺而出鄙皇,到底是詐尸還是另有隱情芜赌,我是刑警寧澤,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布伴逸,位于F島的核電站缠沈,受9級特大地震影響,放射性物質發(fā)生泄漏错蝴。R本人自食惡果不足惜洲愤,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望漱竖。 院中可真熱鬧禽篱,春花似錦、人聲如沸馍惹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽万矾。三九已至悼吱,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間良狈,已是汗流浹背后添。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留薪丁,地道東北人遇西。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像严嗜,于是被迫代替她去往敵國和親粱檀。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,781評論 2 354

推薦閱讀更多精彩內容