注:本文轉(zhuǎn)自我的個(gè)人博客(Spark - 利用WeakReference來(lái)清理對(duì)象)鸠天。
最近在stackoverflow上看到有人好奇Spark是在什么時(shí)機(jī)對(duì)Accumulator或者Broadcast這樣的變量進(jìn)行回收的镀脂。自己在看源碼的時(shí)候發(fā)現(xiàn)了這個(gè)有趣的地方。
Spark ContextCleaner
我們?cè)赟park閑置(沒(méi)有任務(wù)執(zhí)行)時(shí)酒觅,容易看到下面的日志:
19/02/12 05:19:51 INFO Spark Context Cleaner org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Cleaned accumulator 108284023
19/02/12 05:19:51 INFO Spark Context Cleaner org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Cleaned accumulator 108283658
追溯到源碼中撮执,可以得知SparkContext在啟動(dòng)時(shí),初始化了ContextCleaner舷丹,并以daemon方式啟動(dòng)了一個(gè)cleaningThread線程抒钱,這個(gè)線程的作用就是不斷循環(huán),回收清理RDD颜凯、Broadcast變量谋币、Accumulator等無(wú)效對(duì)象。
這個(gè)時(shí)候可以提出一個(gè)問(wèn)題:以Accumulator為例症概,當(dāng)某個(gè)Accumulator不再使用(沒(méi)有被任何對(duì)象引用)時(shí)蕾额,ContextCleaner是如何知道這個(gè)信息的?
先看一下整個(gè)清理過(guò)程:
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
可以看到彼城,ContextCleaner是通過(guò)一個(gè)referenceQueue找到了需要回收的對(duì)象(CleanAccum)诅蝶。接下來(lái),從referenceQueue入手募壕,看看JVM中的WeakReference是什么樣的存在调炬。
Java WeakReference
Java中對(duì)Reference有幾種不同的分類:
- StrongReference: 通常我們定義的對(duì)象就屬于這種,較難被GC舱馅。
- WeakReference: 如Spark中封裝的CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) 缰泡,如果引用的對(duì)象(task)只和當(dāng)前的WeakReference對(duì)象聯(lián)結(jié),那么在GC中會(huì)被回收代嗤,并放入referenceQueue中匀谣。
- SoftReference: 相對(duì)WeakReference較強(qiáng)的引用,可以回收资溃,但不一定是在下次GC中武翎。
所以在ContextCleaner中,Spark采用了WeakReference + referenceQueue的方式來(lái)實(shí)現(xiàn)對(duì)象的回收溶锭。當(dāng)我們注冊(cè)一個(gè)Accumulator時(shí)宝恶,會(huì)同時(shí)調(diào)用registerForCleanup:
/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}
referenceBuffer的作用是保證WeakReference在處理前不被GC。
Spark將注冊(cè)的Accumulator封裝到CleanupTask,并基于task初始化了一個(gè)WeakReference垫毙。當(dāng)Accumulator不再被引用時(shí)霹疫,task會(huì)被放入referenceQueue中,而此時(shí)cleaningThread從referenceQueue中提取即將要GC的對(duì)象做處理(見(jiàn)上面的清理過(guò)程代碼)综芥。