—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過程
Spark Streaming Receiver啟動過程分析
Spark Streaming 數(shù)據(jù)準備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機制分析
—————?—————?—————?—————?—————?—————
1仰禀、Spark Streaming數(shù)據(jù)準備流程
SparkStreaming的全過程分為兩個階段:數(shù)據(jù)準備階段和數(shù)據(jù)計算階段典勇。兩個階段在功能上相互獨立馁启,僅通過數(shù)據(jù)聯(lián)系在一起。本文重點從源碼角度分析Spark Streaming數(shù)據(jù)準備階段的具體流程励翼。
Spark Streaming數(shù)據(jù)準備階段包含對流入數(shù)據(jù)的接收、分片(按照時間片劃分為數(shù)據(jù)集)以及分片數(shù)據(jù)的分發(fā)工作技矮。其轉(zhuǎn)數(shù)據(jù)的接收轉(zhuǎn)化過程主要有以下幾個關(guān)鍵步驟:
- Receiver接收外部數(shù)據(jù)流泳炉,其將接收的數(shù)據(jù)流交由BlockGenerator存儲在ArrayBuffer中,在存儲之前會先獲取許可(由“spark.streaming.receiver.maxRate”指定任洞,spark 1.5之后由backpressure進行自動計算蓄喇,代表可以存取的最大速率,每存儲一條數(shù)據(jù)獲取一個許可交掏,若未獲取到許可接收將阻塞)妆偏。
- BlockGenerater中定義一Timer,其依據(jù)設置的Interval定時將ArrayBuffer中的數(shù)據(jù)取出,包裝成Block,并將Block存放入blocksForPushing中(阻塞隊列ArrayBlockingQueue)盅弛,并將ArrayBuffer清空
- BlockGenerater中的blockPushingThread線程從阻塞隊列中取出取出block信息钱骂,并以onPushBlock的方式將消息通過監(jiān)聽器(listener)發(fā)送給ReceiverSupervisor.
- ReceiverSupervisor收到消息后叔锐,將對消息中攜帶數(shù)據(jù)進行處理,其會通過調(diào)用BlockManager對數(shù)據(jù)進行存儲见秽,并將存儲結(jié)果信息向ReceiverTracker匯報
- ReceiverTracker收到消息后愉烙,將信息存儲在未分配Block隊列(streamidToUnallocatedBlock)中,等待JobGenerator生成Job時將其指定給RDD
1過程持續(xù)進行解取,2-5 以BlockInterval為周期重復執(zhí)行.
2步责、源碼分析
以WordCount應用為例,程序見Spark Streaming概述
2.1 數(shù)據(jù)接收
在Receiver啟動之后肮蛹,其將開始接收外部數(shù)據(jù)源的數(shù)據(jù)(WordCount程序中使用的SocketReceiver是以主動接收的方式獲取數(shù)據(jù))勺择,并對數(shù)據(jù)進行存儲。SocketReceiver實現(xiàn)代碼如下:
def receive() {
try {
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next())
}
......
}
/**
* Store a single item of received data to Spark's memory.
* These single items will be aggregated together into data blocks before
* being pushed into Spark's memory.
*/
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}
其中 supervisor的pushSingle()實現(xiàn)如下:
/** Push a single record of received data into block generator. */
def pushSingle(data: Any) {
defaultBlockGenerator.addData(data)
}
其調(diào)用defaultBlockGenerator的addData將數(shù)據(jù)添加進currentBuffer伦忠,其中defaultBlockGenerator 即為BlockGenerator,其addData方法如下:
/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
分析上述代碼省核,其中waitToPush()方法,是用來控制接收速率的昆码,與BackPressure機制相關(guān),"SparkStreaming Backpressure分析"一章會進行詳細分析气忠。當獲取到許可之后,數(shù)據(jù)將會存入currentBuffer中赋咽,并等待進行后續(xù)處理旧噪。
Receiver會不斷重復上述過程,接收數(shù)據(jù)脓匿,存入currentBuffer.
2.2 數(shù)據(jù)切片
"Spark Streaming Receiver啟動過程分析"提到淘钟,在啟動Receiver進會創(chuàng)建ReceiverSupervisorImpl, ReceiverSupervisorImpl又會創(chuàng)建并啟動BlockGenerator陪毡,用于對Receiver接收的數(shù)據(jù)流進行切片操作米母。其切片是以定時器的方式進行的。其時間周期由“spark.streaming.blockInterval”進行設置毡琉,默認為200ms.
BlockGenerator的start方法實現(xiàn)如下:
/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
if (state == Initialized) {
state = Active
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
} else {
throw new SparkException(
s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
}
}
其中
- blockIntervalTimer為定時器任務铁瞒,其會周期性的執(zhí)行計劃任務
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
- blockPushingThread為新線程,負載不斷的從阻塞隊列中取出打包的數(shù)據(jù)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
2.2.1 數(shù)據(jù)流切分
RecurringTimer為定時器桅滋,其每隔blockIntervalMs時間慧耍,執(zhí)行一次updateCurrentBuffer方法,將currentBuffer中的數(shù)據(jù)進行打包丐谋,并添加到阻塞隊列blocksForPushing中芍碧。
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
synchronized {
if (currentBuffer.nonEmpty) { //如果buffer空,則不生成block.
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
}
}
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
2.2.2 數(shù)據(jù)傳輸
blockPushingThread 線程啟動會号俐,將執(zhí)行keepPushingBlocks()方法师枣,從阻塞隊列中取出切片后的數(shù)據(jù),并通過defaultBlockGeneratorListener轉(zhuǎn)發(fā),并等待下一步存儲萧落、分發(fā)操作践美。(defaultBlockGeneratorListener在ReceiverSupervisorImpl中定義)。
/** Keep pushing blocks to the BlockManager. */
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
def areBlocksBeingGenerated: Boolean = synchronized {
state != StoppedGeneratingBlocks
}
try {
// While blocks are being generated, keep polling for to-be-pushed blocks and push them.
while (areBlocksBeingGenerated) {
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
// At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
while (!blocksForPushing.isEmpty) {
val block = blocksForPushing.take()
logDebug(s"Pushing block $block")
pushBlock(block)
logInfo("Blocks left to push " + blocksForPushing.size())
}
logInfo("Stopped block pushing thread")
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
case e: Exception =>
reportError("Error in block pushing thread", e)
}
}
其中pushBlock方法實現(xiàn)如下:
private def pushBlock(block: Block) {
listener.onPushBlock(block.id, block.buffer)
logInfo("Pushed block " + block.id)
}
2.3 Block 存儲與匯報
BlockGeneratorListener 監(jiān)控到onPushBlock事件后找岖,會對傳輸?shù)臄?shù)據(jù)分片進行存儲操作陨倡,并向ReceiverTracker匯報。
2.3.1 Block存儲
BlockGeneratorListener 監(jiān)控到onPushBlock事件后许布,經(jīng)過一系列調(diào)整兴革,最后將調(diào)用 pushAndReportBlock對數(shù)據(jù)分片進行存儲,pushAndReportBlock的實現(xiàn)如下:
/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
其中蜜唾,數(shù)據(jù)通過receivedBlockHandler存儲為Block, ReceivedBlockHandler有兩種實現(xiàn)
- WriteAheadLogBasedBlockHandler 杂曲, 開啟WAL時會使用此實現(xiàn)
- BlockManagerBasedBlockHandler,默認情況下會使用此實現(xiàn) 袁余。
BlockManagerBasedBlockHandler通過BlockManager的接口對數(shù)據(jù)在Receiver所在節(jié)點進行保存擎勘,并依據(jù)StorageLevel 設置的副本數(shù),在其它Executor中保存副本颖榜。保存副本的方法如下所示:
/**
* Replicate block to another node. Note that this is a blocking call that returns after
* the block has been replicated.
*/
private def replicate(
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
......
var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
getPeers(false),
mutable.HashSet.empty,
blockId,
numPeersToReplicateTo)
......
}
其中副本策略采用棚饵,隨機取樣的方式進行,
/**
* Method to prioritize a bunch of candidate peers of a block. This is a basic implementation,
* that just makes sure we put blocks on different hosts, if possible
*
* @param blockManagerId Id of the current BlockManager for self identification
* @param peers A list of peers of a BlockManager
* @param peersReplicatedTo Set of peers already replicated to
* @param blockId BlockId of the block being replicated. This can be used as a source of
* randomness if needed.
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
*/
override def prioritize(
blockManagerId: BlockManagerId,
peers: Seq[BlockManagerId],
peersReplicatedTo: mutable.HashSet[BlockManagerId],
blockId: BlockId,
numReplicas: Int): List[BlockManagerId] = {
val random = new Random(blockId.hashCode)
logDebug(s"Input peers : ${peers.mkString(", ")}")
val prioritizedPeers = if (peers.size > numReplicas) {
getSampleIds(peers.size, numReplicas, random).map(peers(_))
} else {
if (peers.size < numReplicas) {
logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.")
}
random.shuffle(peers).toList
}
logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
prioritizedPeers
}
2.3.2 Block 匯報
當Block保存完成,并且副本制作完成后掩完,將通過trackerEndpoint向ReceiverTrack進行匯報噪漾。
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
ReceiverTrackEndpoint收到“AddBlock”信息后,將receivedBlockTracker將block信息保存入隊列streamIdToUnallocatedBlockQueues中且蓬,以用于生成Job欣硼。
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
context.reply(addBlock(receivedBlockInfo))
} else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
}
}
})
} else {
context.reply(addBlock(receivedBlockInfo))
}
/** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
其中receivedBlockTracker的addBlock實現(xiàn)如下:
/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
writeResult
} catch {
case NonFatal(e) =>
logError(s"Error adding block $receivedBlockInfo", e)
false
}
}
/** Get the queue of received blocks belonging to a particular stream */
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
至此數(shù)據(jù)準備階段完成,保存在streamIdToUnallocatedBlockQueues中的數(shù)據(jù)信息恶阴,在下一個批次生成Job時會被取出用于封裝成RDD诈胜,且注冊數(shù)據(jù)信息會轉(zhuǎn)移至timeToAllocatedBlocks中。