? ? ? 輸入的ds有很多來源Kafka刃泌、Socket芋酌、Flume塘偎,輸出的DStream其實是邏輯級別的Action虎韵,是Spark Streaming框架提出的易稠,其底層翻譯成為物理級別的額Action,是RDD的Action包蓝,中間是處理過程是transformations驶社,狀態(tài)轉換也就是業(yè)務處理邏輯的過程。
? ? ? ?Spark Streaming二種數(shù)據(jù)來源:
? ? ? ? ? ? ?1测萎、基于DStream數(shù)據(jù)源亡电。
? ? ? ? ? ? ?2、基于其他DStream產生的數(shù)據(jù)源硅瞧。
? ? ? 關鍵性的觀點:做大數(shù)據(jù)的時候不是流失處理份乒,一般會有定時任務,定時任務一般十分鐘觸發(fā)一次零酪、一天觸發(fā)一次冒嫡,做大數(shù)據(jù)的定時任務就是流失處理的感覺,雖然不規(guī)范四苇,一切和流處理沒有關系的數(shù)據(jù)都是沒有價值的孝凌。即使做批處理或數(shù)據(jù)挖掘其實也是在做數(shù)據(jù)流處理,只不過是隱形的流處理月腋,所有的數(shù)據(jù)都會變成流處理蟀架。
? ? ? 所以就有統(tǒng)一的抽象,所有處理都是流處理的方式榆骚,所有的處理都將會被納入流處理片拍。企業(yè)大型開發(fā)項目都有j2ee后臺支撐來提供各種人操作大數(shù)據(jù)中心。
? ? ? Spark streaming程序入口就有batchDuration時間窗口妓肢,每隔五秒鐘JobGeneration都會產生一個job捌省,這個job是邏輯級別的,所以邏輯級別要有這個job碉钠,并且這個job該琢磨做纲缓,但環(huán)沒有做,由底層物理級別的action去做喊废,底層物理級別是基于rdd的依賴關系祝高。Ds的action操作也是邏輯級別的。Ss根據(jù)axtion操作產生邏輯級別的job污筷,但是不會運行工闺,就相當線程runnable接口。邏輯級別的暫時沒有身材物理級別的,所以可以去調度和優(yōu)化陆蟆,假設講ds的操作翻譯成rdd的action雷厂,最后一個操作是rdd的action操作,是不是已翻譯就立即觸發(fā)job遍搞,紀要完成翻譯又不要生成job的話需要
JavaStreamingContext jsc =newJavaStreamingContext(conf, Durations.seconds(5));
下面主要從三個類進行解析:
1罗侯、JobGenerator類:根據(jù)batchDuration及內部默認的時間間隔生成Jobs;
2溪猿、JobScheduler:根據(jù)batchDuration負責Spark Streaming Job的調度钩杰;
3、ReceiverTracker:負責Driver端元數(shù)據(jù)的接收和啟動executor中的接收數(shù)據(jù)線程诊县;
1讲弄、JobGenerator類:
**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
*/
private[streaming]
classJobGenerator(jobScheduler: JobScheduler)extendsLogging {
注釋說基于dsg產生數(shù)據(jù)源,JobGenerator隨著時間推移產生很多jobs依痊,ss中除了定時身材的job避除,患有其他方式身材的job,例如基于各種聚合和狀態(tài)的操作胸嘁,狀態(tài)操作不是基于batchd瓶摆,他會對很多btchd處理。為了窗口之類的操作會觸發(fā)JobGenerator性宏,元素局的清理群井,作業(yè)生成的類。
// eventLoop is created when generator starts.
// This not being null means the scheduler has been started and not stopped
private var eventLoop: EventLoop[JobGeneratorEvent] = null //消息循環(huán)體定義
// last batch whose completion,checkpointing and metadata cleanup has been completed
private var lastProcessedBatch: Time = null
/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventLoop != null) return// generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter //執(zhí)行checkpoint檢查點
eventLoop = newEventLoop[JobGeneratorEvent]("JobGenerator") {//內部匿名類創(chuàng)建
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) //事件處理邏輯
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()//啟動事件處理線程對隊列事件進行處理
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
/**
* An event loop to receive events from the caller and process all events in the event thread. It
* will start an exclusive event thread to process all events.
*
* Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
* handle events in time to avoid the potential OOM.
*/
private[spark] abstract classEventLoop[E](name: String) extends Logging {
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()//消息隊列數(shù)據(jù)結構
private val stopped = new AtomicBoolean(false)//原子變量
private val eventThread = new Thread(name) {//封裝線程對象
setDaemon(true) //后臺為線程
override def run(): Unit = { //線程執(zhí)行邏輯
try {
while (!stopped.get) {
val event = eventQueue.take() //從消息隊列中逐一獲取消息對象
try {
onReceive(event) //對獲取的消息對象進行業(yè)務處理
} catch {
case NonFatal(e) => { //處理失敗后回調錯誤邏輯執(zhí)代碼
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
def start(): Unit = { //啟動當前線程類
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped")
}
// Call onStart before starting the event thread to make sure it happens before onReceive
onStart()
eventThread.start() //啟動當前線程類業(yè)務run方法的執(zhí)行
}
/** Processes all events */
private defprocessEvent(event: JobGeneratorEvent) {//根據(jù)消息對象執(zhí)行相應的處理業(yè)務代碼
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time) //根據(jù)時間片生成Jobs
case ClearMetadata(time) => clearMetadata(time) //時間片內的Jobs執(zhí)行完畢毫胜,清理Driver上的元數(shù)據(jù)
case DoCheckpoint(time, clearCheckpointDataLater) =>//時間片內的Jobs執(zhí)行完畢书斜,清理checkpint數(shù)據(jù)
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
2、JobSchedule類:
/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
* the jobs and runs them using a thread pool.
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
// https://gist.github.com/AlainODea/1375759b8720a3f9f094
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]//在指定的時間片內生成Jobs集合數(shù)據(jù)結構
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")//啟動指定大小的線程池
private val jobGenerator = new JobGenerator(this)//啟動JobGenerator對象
val clock = jobGenerator.clock //jobGenerator時鐘
val listenerBus = new StreamingListenerBus() //linstenerBus消息總線
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null //driver端的元數(shù)據(jù)接收跟蹤器
// A tracker to track all the input stream information as well as processed record number
var inputInfoTracker: InputInfoTracker = null //輸入流信息跟蹤器
private var eventLoop: EventLoop[JobSchedulerEvent] = null //消息循環(huán)體對象
def start(): Unit = synchronized { JobScheudler類啟動主方法
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams? //數(shù)據(jù)流
rateController <- inputDStream.rateController //數(shù)據(jù)接收平率控制
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext) //啟動消息總線
receiverTracker = new ReceiverTracker(ssc) //創(chuàng)建接收器對象
inputInfoTracker = new InputInfoTracker(ssc) //創(chuàng)建數(shù)據(jù)輸入對象
receiverTracker.start() //啟動數(shù)據(jù)接收器線程
jobGenerator.start() //啟動jobs產生器線程
logInfo("Started JobScheduler")
}
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
private def handleJobStart(job: Job, startTime: Long) {
val jobSet = jobSets.get(job.time)
val isFirstJobOfJobSet = !jobSet.hasStarted
jobSet.handleJobStart(job)
if (isFirstJobOfJobSet) {
// "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
// correct "jobSet.processingStartTime".
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
}
job.setStartTime(startTime)
listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
}
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
try {
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from $batchLinkText""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
}
}
}
}
備注:
資料來源于:DT_大數(shù)據(jù)夢工廠(Spark發(fā)行版本定制)
更多私密內容酵使,請關注微信公眾號:DT_Spark
如果您對大數(shù)據(jù)Spark感興趣荐吉,可以免費聽由王家林老師每天晚上20:00開設的Spark永久免費公開課,地址YY房間號:68917580