一.Spark streaming Job 架構(gòu)
SparkStreaming框架會自動啟動Job并每隔BatchDuration時間會自動觸發(fā)Job的調(diào)用粗截。
Spark Streaming的Job 分為兩大類:
-
每隔BatchInterval時間片就會產(chǎn)生的一個個Job极祸,這里的Job并不是Spark Core中的Job遥金,它只是基于DStreamGraph而生成的RDD的DAG而已蒜田;從Java角度講相當于Runnable接口的實現(xiàn)類冲粤,要想運行Job需要將Job提交給JobScheduler,在JobScheduler內(nèi)部會通過線程池的方式創(chuàng)建運行Job的一個個線程厢呵,當找到一個空閑的線程后會將Job提交到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行)襟铭。為什么使用線程池呢?
a. Job根據(jù)BatchInterval不斷生成,為了減少線程創(chuàng)建而帶來的效率提升我們需要使用線程池(這和在Executor中通過啟動線程池的方式來執(zhí)行Task有異曲同工之妙)暖呕;
b. 如果Job的運行設(shè)置為FAIR公平調(diào)度的方式典徊,這個時候也需要多線程的支持咐汞;
上面Job提交的Spark Job本身碉考。單從這個時刻來看挺身,此次的Job和Spark core中的Job沒有任何的區(qū)別。
理解Spark Streaming的Job的整個架構(gòu)和運行機制對于精通Spark Streaming是至關(guān)重要的墙贱。
我們運行以下的程序惨撇,通過這個程序的運行過程進一步加深理解Spark Streaming流處理的Job的執(zhí)行的過程府寒,代碼如下:
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
public class ConnectionPool {
private static LinkedList<Connection> connectionQueue;
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public synchronized static Connection getConnection() {
try {
if(connectionQueue == null) {
connectionQueue = new LinkedList<Connection>();
for(int i = 0; i < 5; i++) {
Connection conn = DriverManager.getConnection(
"jdbc:mysql://Master:3306/sparkstreaming",
"root",
"778899..");
connectionQueue.push(conn);
}
}
} catch (Exception e) {
e.printStackTrace();
}
return connectionQueue.poll();
}
public static void returnConnection(Connection conn) {
connectionQueue.push(conn);
}
}
第二部分: 通過sparkstreaming 將網(wǎng)絡(luò)產(chǎn)生的數(shù)據(jù)進行統(tǒng)計統(tǒng)計,并將結(jié)果寫入mysql數(shù)據(jù)庫
object OnlineForeachRDD2DB {
def main(args: Array[String]){
/**
* 第1步:創(chuàng)建Spark的配置對象SparkConf株搔,設(shè)置Spark程序的運行時的配置信息纤房,
* 例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置
* 為local,則代表Spark程序在本地運行捌刮,特別適合于機器配置條件非常差(例如
* 只有1G的內(nèi)存)的初學者
*/
val conf = new SparkConf() //創(chuàng)建SparkConf對象
conf.setAppName("OnlineForeachRDD") //設(shè)置應用程序的名稱绅作,在程序運行的監(jiān)控界面可以看到名稱
conf.setMaster("spark://Master:7077") //此時蛾派,程序在Spark集群
conf.setMaster("local[6]")
//設(shè)置batchDuration時間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("Master", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD{ rdd =>
rdd.foreachPartition{ partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
val stmt = connection.createStatement();
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}}
}
/**
* 在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán)稍算,
* 在JobScheduler的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker糊探,并且調(diào)用JobGenerator和
* ReceiverTacker的start方法:
* 1河闰,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job
* 2姜性,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動
* ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把
* 數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker弃酌,在ReceiverTracker內(nèi)部會通過
* ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息每個BatchInterval會產(chǎn)生一個具體的Job妓湘,
* 其實這里的Job不是Spark Core中所指的Job乌询,它只是基于DStreamGraph而生成的RDD的DAG
* 而已,從Java角度講唬党,相當于Runnable接口實例驶拱,此時要想運行Job需要提交給JobScheduler沮趣,
* 在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中
* 基于RDD的Action觸發(fā)真正的作業(yè)的運行)房铭,
* 為什么使用線程池呢温眉?
* 1类溢,作業(yè)不斷生成露懒,所以為了提升效率懈词,我們需要線程池辩诞;這和在Executor中通過線程池執(zhí)行Task
* 有異曲同工之妙译暂;
* 2,有可能設(shè)置了Job的FAIR公平調(diào)度的方式崎脉,這個時候也需要多線程的支持伯顶。
*/
ssc.start()
ssc.awaitTermination()
}
}
代碼中以注釋的方式描述了Spakr job 啟動的過程,下面結(jié)合源碼做進一步分析:
StreamingContext的start()方法:
/**
* Start the execution of the streams.
*
* @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(
sparkContext.localProperties.get()).asInstanceOf[Properties])
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
可以看到StreamingContext的start()的方法中調(diào)用了scheduler.start(),其scheduler 是JobScheduler的對象,該對象在StreamingContext創(chuàng)建是被實例化:
private[streaming] val scheduler = new JobScheduler(this)
接下來在JobScheduler.start()內(nèi)部實例化EventLoop,并執(zhí)行EventLoop.start()進行消息循環(huán)汪厨,在JobScheduler.start()內(nèi)部構(gòu)造ReceiverTacker劫乱,并且調(diào)用JobGenerator和ReceiverTacker的start方法:
JobGenerator的start()方法中會調(diào)用startFirstTime()方法和restart()方法
最終調(diào)用generateJobs()方法不斷生成job:
ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動 ReceiverSupervisor)狭吼,在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息.過程如圖所示:
源碼如下:(注意紅色字體部分代碼)
每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job殖妇,它只是基于DStreamGraph而生成的RDD的DAG而已刁笙,從Java角度講,相當于Runnable接口實例谦趣,此時要想運行Job需要提交給JobScheduler疲吸, 在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中 基于RDD的Action觸發(fā)真正的作業(yè)的運行)
二 Spark Streaming Job容錯架構(gòu)和運行機制
Spark容錯分為:Driver級別的容錯和Executor級別的容錯。
- 在Driver級別的容錯具體為DAG生成的模板前鹅,即DStreamGraph摘悴,RecevierTracker中存儲的元數(shù)據(jù)信息和JobScheduler中存儲的Job進行的進度情況等信息,只要通過checkpoint就可以了蹂喻,每個Job生成之前進行checkpoint葱椭,在Job生成之后再進行checkpoint,如果出錯的話就從checkpoint中恢復口四。
- 在Executor級別的容錯具體為接收數(shù)據(jù)的安全性和任務(wù)執(zhí)行的安全性孵运。在接收數(shù)據(jù)安全性方面,一種方式是Spark Streaming接收到數(shù)據(jù)默認為MEMORY_AND_DISK_2的方式蔓彩,在兩臺機器的內(nèi)存中掐松,如果一臺機器上的Executor掛了,立即切換到另一臺機器上的Executor粪小,這種方式一般情況下非炒蠡牵可靠且沒有切換時間。另外一種方式是WAL(Write Ahead Log)探膊,在數(shù)據(jù)到來時先通過WAL機制將數(shù)據(jù)進行日志記錄杠愧,如果有問題則從日志記錄中恢復,然后再把數(shù)據(jù)存到Executor中逞壁,再進行其他副本的復制流济。WAL這種方式對性能有影響,在生產(chǎn)環(huán)境中不常用腌闯,一般使用Kafka存儲绳瘟,Spark Streaming接收到數(shù)據(jù)丟失時可以從Kafka中回放。在任務(wù)執(zhí)行的安全性方面姿骏,靠RDD的容錯糖声。
Spark Streaming的容錯機制是基于RDD的容錯機制。
主要表現(xiàn)為:
1. checkpoint
2. 基于血統(tǒng)(lineage)的高度容錯機制
3. 出錯了之后會從出錯的位置從新計算分瘦,而不會導致重復計算