一:Receiver啟動的方式設(shè)想
1.Spark Streaming通過Receiver持續(xù)不斷的從外部數(shù)據(jù)源接收數(shù)據(jù)幔戏,并把數(shù)據(jù)匯報給Driver端,由此每個Batch Durations就可以根據(jù)匯報的數(shù)據(jù)生成不同的Job披诗,在不同的機器之上啟動,每個reveiver 相當(dāng)于一個分片熟空,由于Sapark core 感覺不到它的特殊性藤巢,按普通的調(diào)度,即有可能在同一個Executor之中啟動多個Receiver息罗,這種情況之下導(dǎo)致負(fù)載不均勻或者由于Executor運行本身的故障掂咒,task 有可能啟動失敗,整個job啟動就失敗迈喉,即receiver啟動失敗绍刮。
啟動Receiver
1. 從Spark Core的角度來看,Receiver的啟動Spark Core并不知道挨摸, Receiver是通過Job的方式啟動的孩革,運行在Executor之上的,由task運行得运。
2. 一般情況下膝蜈,只有一個Receiver,但是可以創(chuàng)建不同的數(shù)據(jù)來源的InputDStream.
3.啟動Receiver的時候,實其上一個receiver就是一個partition分片熔掺,由一個Job啟動饱搏,這個Job里面有RDD的transformations操作和action的操作,隨著定時器觸發(fā)置逻,不斷的產(chǎn)生有數(shù)據(jù)接收推沸,每個時間段中產(chǎn)生的接收數(shù)據(jù)實其上就是一個partition分片,
4.? 以上設(shè)計思想產(chǎn)生的如下問題:
(1)如果有多個InputDStream券坞,那就要啟動多個Receiver鬓催,每個Receiver也就相當(dāng)于分片partition,那我啟動Receiver的時候理想的情況下是在不同的機器上啟動Receiver恨锚,但是SparkCore的角度來看就是應(yīng)用程序宇驾,感覺不到Receiver的特殊性,所以就會按照正常的Job啟動的方式來處理猴伶,極有可能在一個Executor上啟動多個Receiver.這樣的話就可能導(dǎo)致負(fù)載不均衡飞苇。(2)有可能啟動Receiver失敗,只要集群存在蜗顽,Receiver就不應(yīng)該啟動失敗布卡。
(3)從運行過程中看,一個Reveiver就是一個partition的話雇盖,啟動的由一個Task忿等,如果Task啟動失敗,相應(yīng)的Receiver也會失敗崔挖。由此贸街,可以得出庵寞,對于Receiver失敗的話,后果是非常嚴(yán)重的薛匪,那么在SparkStreaming如何防止這些事的呢捐川?Spark Streaming源碼分析,在Spark Streaming之中就指定如下信息:
一是Spark使用一個Job啟動一個Receiver.最大程度的保證了負(fù)載均衡逸尖。
二是Spark Streaming已經(jīng)指定每個Receiver運行在那些Executor上古沥,在Receiver運行之前就指定了運行的地方!
三是 如果Receiver啟動失敗娇跟,此時并不是Job失敗岩齿,在內(nèi)部會重新啟動Receiver.
在StreamingContext的start方法被調(diào)用的時候,JobScheduler的start
def start(): Unit = synchronized {
state match {
caseINITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Startthe streaming scheduler in a new thread, so that
thread local properties
// likecall sites and job groups can be reset without
affecting those of the
//current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,"false")
//啟動子線程苞俘,一方面為了本地初始化工作盹沈,另外一方面是不要阻塞主線程。
scheduler.start()
}
state =StreamingContextState.ACTIVE
} catch {
caseNonFatal(e) =>
logError("Error starting the context, marking it as
stopped",e)
scheduler.stop(false)
state =StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
//Registering Streaming Metrics at the start of the
StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE=>
logWarning("StreamingContext has already been started")
case STOPPED=>
throw newIllegalStateException("StreamingContext has already
been stopped")
}
}
2.而在JobScheduler的start方法中ReceiverTracker的start方法被調(diào)用吃谣,Receiver就啟動了乞封。
def start(): Unit = synchronized {
if (eventLoop !=null) return // scheduler has already been
started
logDebug("Starting JobScheduler")
eventLoop = newEventLoop[JobSchedulerEvent]("JobScheduler")
{
overrideprotected def onReceive(event: JobSchedulerEvent):
Unit = processEvent(event)
overrideprotected def onError(e: Throwable): Unit =
reportError("Error in jobscheduler", e)
}
eventLoop.start()
// attach ratecontrollers of input streams to receive batch
completion updates
for {
inputDStream<- ssc.graph.getInputStreams
rateController<- inputDStream.rateController
}ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
receiverTracker =new ReceiverTracker(ssc)
inputInfoTracker= new InputInfoTracker(ssc)
//啟動receiverTracker
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
3.ReceiverTracker的start方法啟動RPC消息通信體,為啥呢岗憋?因為receiverTracker會監(jiān)控整個集群中的Receiver歌亲,Receiver轉(zhuǎn)過來要向ReceiverTrackerEndpoint匯報自己的狀態(tài),接收的數(shù)據(jù)澜驮,包括生命周期等信息
def start(): Unit = synchronized {
if(isTrackerStarted) {
throw newSparkException("ReceiverTracker already started")
}
//Receiver的啟動是依據(jù)輸入數(shù)據(jù)流的。
if(!receiverInputStreams.isEmpty) {
endpoint =ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker",
newReceiverTrackerEndpoint(ssc.env.rpcEnv))
if(!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState =Started
}
}
4.基于ReceiverInputDStream(是在Driver端)來獲得具體的Receivers實例惋鸥,然后再把他們分不到Worker節(jié)點上杂穷。一個ReceiverInputDStream只產(chǎn)生一個Receiver
private def launchReceivers(): Unit = {
val receivers =receiverInputStreams.map(nis => {
//一個數(shù)據(jù)輸入來源(receiverInputDStream)只產(chǎn)生一個Receiver
val rcvr =nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo("Starting " + receivers.length + "receivers")
//此時的endpoint就是上面代碼中在ReceiverTracker的start方法中構(gòu)造的ReceiverTrackerEndpoint
endpoint.send(StartAllReceivers(receivers))
}
5. 其中runDummySparkJob()為了確保所有節(jié)點活著,而且避免所有的receivers集中在一個節(jié)點上卦绣。
private def runDummySparkJob(): Unit = {
if(!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x,
1)).reduceByKey(_+ _, 20).collect()
}
assert(getExecutors.nonEmpty)
}
ReceiverInputDStream中的getReceiver()方法獲得receiver對象然后將它發(fā)送到worker節(jié)點上實例化receiver,然后去接收數(shù)據(jù)耐量。
def getReceiver(): Receiver[T] //返回的是Receiver對象
6. ?根據(jù)繼承關(guān)系,這里看一下SocketInputDStream中的getReceiver方法滤港。
def getReceiver(): Receiver[T] = {
newSocketReceiver(host, port, bytesToObjects,
storageLevel)
}
}
啟動后臺線程廊蜒,調(diào)用receive方法。
private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects:InputStream => Iterator[T],
storageLevel:StorageLevel
) extendsReceiver[T](storageLevel) with Logging {
def onStart() {
// Start thethread that receives data over a connection
newThread("Socket Receiver") {
setDaemon(true)
override defrun() { receive() }
}.start()
}
啟動socket開始接收數(shù)據(jù)溅漾。
/** Create a socket connection and receive data untilreceiver is
stopped */
def receive() {
var socket:Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = newSocket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator= bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
if(!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case e:java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port,e)
caseNonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket !=null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
}
7.?ReceiverTrackerEndpoint源碼如下:
/** RpcEndpoint to receive messages from the receivers.*/
private class ReceiverTrackerEndpoint(override valrpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint {
// TODO Removethis thread pool after
https://github.com/apache/spark/issues/7385 is merged
private valsubmitJobThreadPool =
ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))
private valwalBatchingThreadPool =
ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
@volatile privatevar active: Boolean = true
override defreceive: PartialFunction[Any, Unit] = {
// Localmessages
caseStartAllReceivers(receivers) =>
valscheduledLocations =
// schedulingPolicy調(diào)度策略
//receivers就是要啟動的receiver
//getExecutors獲得集群中的Executors的列表
// scheduleReceivers就可以確定receiver可以運行在哪些Executor上
schedulingPolicy.scheduleReceivers(receivers,getExecutors)
for (receiver<- receivers) {
//
scheduledLocations根據(jù)receiver的Id就找到了當(dāng)前那些Executors可以運行Receiver
val executors= scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId,
executors)
receiverPreferredLocations(receiver.streamId)
=receiver.preferredLocation
//上述代碼之后要啟動的Receiver確定了山叮,具體Receiver運行在哪些Executors上也確定了。
//循環(huán)receivers添履,每次將一個receiver傳入過去屁倔。
startReceiver(receiver, executors)
}
//用于接收RestartReceiver消息,從新啟動Receiver.
caseRestartReceiver(receiver) =>
// Oldscheduled executors minus the ones that are not active
any more
//如果Receiver失敗的話暮胧,從可選列表中減去锐借。
valoldScheduledExecutors =
//剛在調(diào)度為Receiver分配給哪個Executor的時候會有一些列可選的Executor列表
getStoredScheduledExecutors(receiver.streamId)
//從新獲取Executors
valscheduledLocations = if (oldScheduledExecutors.nonEmpty)
{
// Tryglobal scheduling again
oldScheduledExecutors
} else {
//如果可選的Executor使用完了问麸,則會重新執(zhí)行rescheduleReceiver重新獲取Executor.
valoldReceiverInfo =
receiverTrackingInfos(receiver.streamId)
// Clear"scheduledLocations" to indicate we are going to
do local scheduling
valnewReceiverInfo = oldReceiverInfo.copy(
state =ReceiverState.INACTIVE, scheduledLocations =
None)
receiverTrackingInfos(receiver.streamId) =
newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assumethere is one receiver restarting at one time, so we
don't need to update
//receiverTrackingInfos
//重復(fù)調(diào)用startReceiver
startReceiver(receiver, scheduledLocations)
case c:CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
caseUpdateReceiverRateLimit(streamUID, newRate) =>
for (info<- receiverTrackingInfos.get(streamUID); eP
<- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
// Remotemessages
caseReportError(streamId, message, error) =>
reportError(streamId, message, error)
}
8.? 從注釋中可以看到,Spark Streaming指定receiver在那些Executors運行钞翔,而不是基于Spark
Core中的Task來指定严卖。
Spark使用submitJob的方式啟動Receiver,而在應(yīng)用程序執(zhí)行的時候會有很多Receiver,這個時候是啟動一個Receiver呢布轿,還是把所有的Receiver通過這一個Job啟動哮笆?
在ReceiverTracker的receive方法中startReceiver方法第一個參數(shù)就是receiver,從實現(xiàn)的可以看出for循環(huán)不 斷取出receiver,然后調(diào)用startReceiver。由此就可以得出一個Job只啟動一個Receiver.
如果Receiver啟動失敗驮捍,此時并不會認(rèn)為是作業(yè)失敗疟呐,會重新發(fā)消息給ReceiverTrackerEndpoint重新啟動Receiver,這樣也就確保了Receivers一定會被啟動东且,這樣就不會像Task啟動Receiver的話如果失敗受重試次數(shù)的影響启具。
private def startReceiver(
receiver:Receiver[_],
// scheduledLocations指定的是在具體的那臺物理機器上執(zhí)行。
scheduledLocations: Seq[TaskLocation]): Unit = {
//判斷下Receiver的狀態(tài)是否正常珊泳。
defshouldStartReceiver: Boolean = {
// It's okay tostart when trackerState is Initialized or
Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId =receiver.streamId
//如果不需要啟動Receiver則會調(diào)用onReceiverJobFinish()
if(!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
valcheckpointDirOption = Option(ssc.checkpointDir)
valserializableHadoopConf =
newSerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
//startReceiverFunc封裝了在worker上啟動receiver的動作鲁冯。
// Function tostart the receiver on the worker node
valstartReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator:Iterator[Receiver[_]]) => {
if(!iterator.hasNext) {
throw newSparkException(
"Could not start receiver as object not found.")
}
if(TaskContext.get().attemptNumber() == 0) {
valreceiver = iterator.next()
assert(iterator.hasNext == false)
// ReceiverSupervisorImpl是Receiver的監(jiān)控器,同時負(fù)責(zé)數(shù)據(jù)的寫等操作色查。
valsupervisor = new ReceiverSupervisorImpl(
receiver,SparkEnv.get, serializableHadoopConf.value,
checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
//如果你想重新啟動receiver的話薯演,你需要重新完成上面的調(diào)度,從新schedule秧了,而不是Task重試跨扮。
// It'srestarted by TaskScheduler, but we want to
reschedule it again. So exit it.
}
}
// Create the RDDusing the scheduledLocations to run the
receiver in a Spark job
val receiverRDD:RDD[Receiver[_]] =
if(scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
valpreferredLocations =
scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
//receiverId可以看出,receiver只有一個
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running
receiver$receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
//每個Receiver的啟動都會觸發(fā)一個Job,而不是一個作業(yè)的Task去啟動所有的Receiver.
//應(yīng)用程序一般會有很多Receiver,
//調(diào)用SparkContext的submitJob验毡,為了啟動Receiver衡创,啟動了Spark一個作業(yè).
val future =ssc.sparkContext.submitJob[Receiver[_], Unit,
Unit](
receiverRDD,startReceiverFunc, Seq(0), (_, _) => Unit,
())
// We will keeprestarting the receiver job until ReceiverTracker
is stopped
future.onComplete{
case Success(_)=>
// shouldStartReceiver默認(rèn)是true
if(!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e)=>
if(!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.",
e)
logInfo(s"Restarting Receiver $receiverId")
//RestartReceiver
self.send(RestartReceiver(receiver))
}
//使用線程池的方式提交Job,這樣的好處是可以并發(fā)的啟動Receiver晶通。
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")
}
9. 當(dāng)Receiver啟動失敗的話璃氢,就會調(diào)用ReceiverTrackEndpoint重新啟動一個Spark
Job去啟動Receiver.
/**
* This messagewill trigger ReceiverTrackerEndpoint to restart a
Spark job for the receiver.
*/
private[streaming] case class
RestartReceiver(receiver:Receiver[_])
extendsReceiverTrackerLocalMessage
11. 當(dāng)Receiver關(guān)閉的話,并不需要重新啟動Spark Job.
/**
* Call when areceiver is terminated. It means we won't restart
its Spark job.
*/
private def onReceiverJobFinish(receiverId: Int): Unit ={
receiverJobExitLatch.countDown()
//使用foreach將receiver從receiverTrackingInfo中去掉狮辽。
receiverTrackingInfos.remove(receiverId).foreach {
receiverTrackingInfo=>
if(receiverTrackingInfo.state == ReceiverState.ACTIVE) {
logWarning(s"Receiver $receiverId exited but didn't
deregister")
}
}
}
12.
Supervisor.start()一也,在子類ReceiverSupervisorImpl中并沒有start方法,因此調(diào)用的是父類ReceiverSupervisor的start方法喉脖。
/** Start the supervisor */
def start() {
onStart() //具體實現(xiàn)是子類實現(xiàn)的椰苟。
startReceiver()
}
Onstart方法源碼如下:
/**
* Called whensupervisor is started.
* Note that thismust be called before the receiver.onStart() is
called to ensure
* things like[[BlockGenerator]]s are started before the receiver
starts sending data.
*/
protected def onStart() { }
其具體實現(xiàn)是在子類的ReceiverSupervivorImpl的onstart方法
override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
}
此時的start方法調(diào)用的是BlockGenerator的start方法。
/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
if (state ==Initialized) {
state = Active
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
} else {
throw newSparkException(
s"Cannotstart BlockGenerator as its not in the Initialized
state [state =$state]")
}
}
備注:
資料來源于:DT_大數(shù)據(jù)夢工廠(Spark發(fā)行版本定制)
更多私密內(nèi)容树叽,請關(guān)注微信公眾號:DT_Spark
如果您對大數(shù)據(jù)Spark感興趣尊剔,可以免費聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費公開課,地址YY房間號:68917580