- 在kafka中有很多操作需要延遲等待, 比如客戶端發(fā)送數(shù)據(jù)到達leader后, 根據(jù)設(shè)置ack方式不同,需要等待其replicas返回ack, 那這個ack就需要延遲等待;對于一個拉取操作, 需要延遲等待期望拉取的字節(jié)數(shù)準備好;
- 有延遲操作, 那必然會存在操作的超時處理. 還記得我們上一篇Kafka中的時間輪中對Timer的分析吧, 這里的延遲操作需要使用它來實現(xiàn);
DelayedOperation
- 所在文件: core/src/main/scala/kafka/server/DelayedOperation.scala
- 這是個抽象類, 所有具體的延遲任務都需要繼承這個類;
- 同時每個延遲任務必然存在操作的超時, 那么其超時操作是通過將對象放到Kafka中的時間輪中的Timer中處理, 因此這個類又繼承自
TimerTask
; -
private val completed = new AtomicBoolean(false)
: 原子變量, 標識當前operation是否已完成; -
def forceComplete(): Boolean
: 強制完成操作;
if (completed.compareAndSet(false, true)) {
// cancel the timeout timer
cancel()
onComplete()
true
} else {
false
}
分兩種情況:
- 當前操作已經(jīng)完成,則不再需要強制完成,返回false;
- 當前操作未完成, 則首先在
Timer
中取消這個定時任務, 然后回調(diào)onComplete
-
override def run(): Unit
: 實現(xiàn)的是TimerTask
的方法, 當超時時會執(zhí)行此操作:
if (forceComplete())
onExpiration()
里面的操作比較簡單, 調(diào)用forceComplete
, 如果成功,表明是真的超時了,回調(diào)onExpiration
;
- 需要由子類實現(xiàn)的方法:
-
def onExpiration(): Unit
: 超時后的回調(diào)處理; -
def onComplete(): Unit
: 操作完成后的回調(diào)處理; -
def tryComplete(): Boolean
: 在放入到Timer
前, 先嘗試著執(zhí)行一下這個操作, 看是否可以完成, 如果可以就不用放到Timer
里了, 這是為了確保任務都盡快完成作的一個優(yōu)化;
Watchers
- 所在文件: core/src/main/scala/kafka/server/DelayedOperation.scala
- 對于一個延遲任務, 一般會有兩個操作加持在身:
- 上面說的作為超時任務放在
Timer
中; - 與某些事件關(guān)聯(lián)在一起, 可以關(guān)聯(lián)多個事件, 當這些事件中的某一個發(fā)生時, 這個任務即可認為是完成;這個就是
Watchers
類要完成的工作;
-
class Watchers(val key: Any)
: 構(gòu)造時需要一個參數(shù)key
, 你可以理解成是一個事件; -
private[this] val operations = new LinkedList[T]()
: 用于存放和這個key
關(guān)聯(lián)的所有操作,一個key
可以關(guān)聯(lián)多個操作, 同時一個操作也可以被多個key
關(guān)聯(lián)(即位于多個Watchers
對象中) -
def purgeCompleted(): Int
: 刪除鏈表中所有已經(jīng)完成的操作
var purged = 0
operations synchronized {
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
iter.remove()
purged += 1
}
}
}
if (operations.size == 0)
removeKeyIfEmpty(key, this)
purged
}
-
def tryCompleteWatched(): Int
:
var completed = 0
operations synchronized {
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
// another thread has completed this operation, just remove it
iter.remove()
} else if (curr synchronized curr.tryComplete()) {
completed += 1
iter.remove()
}
}
}
if (operations.size == 0)
removeKeyIfEmpty(key, this)
completed
掃描整個鏈表:
- 如果任務已完成,則移除;
- 如果任務未完成, 調(diào)用
tryComplete
嘗試立即完成, 如果可以完成, 則移除;
- 添加任務:
def watch(t: T) {
operations synchronized operations.add(t)
}
DelayedOperationPurgatory
- 所在文件: core/src/main/scala/kafka/server/DelayedOperation.scala
- 終于要揭開我們的謎之煉獄啦, 源碼里的注釋如下:
A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations
實際上就是用來通過Timer和Watchers來管理一批延遲任務;
-
private[this] val timeoutTimer = new Timer(executor)
: 用來處理加入的作務的超時行為; -
private val expirationReaper = new ExpiredOperationReaper()
:
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d".format(brokerId),
false) {
override def doWork() {
timeoutTimer.advanceClock(200L)
if (estimatedTotalOperations.get - delayed > purgeInterval) {
estimatedTotalOperations.getAndSet(delayed)
debug("Begin purging watch lists")
val purged = allWatchers.map(_.purgeCompleted()).sum
debug("Purged %d elements from watch lists.".format(purged))
}
}
}
-
timeoutTimer.advanceClock(200L)
: 驅(qū)動Timer向前走, pop出超時的延遲任務; -
val purged = allWatchers.map(_.purgeCompleted()).sum
: 利用閾值(purgeInterval)來周期性地從Watchers
中清理掉已經(jīng)完成的任務;
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean
: 將operation和一系列的事件(key)關(guān)聯(lián)起來, 然后調(diào)用tryComplete
嘗試立即完成該操作,如果不能完成,加入到Timer中;def checkAndComplete(key: Any): Int
: 按key找到相應的Watchers對象, 然后調(diào)用其tryCompleteWatched()
, 說明上面用;