- 在正式開始扒代碼之前, 先來(lái)個(gè)開胃菜,簡(jiǎn)單介紹一下kafka的基礎(chǔ)組件和一些代碼實(shí)現(xiàn)中用到的基礎(chǔ)類庫(kù)
Kafka基礎(chǔ)組件概述
- KafkaServer是整個(gè)Kafka的核心組件土砂,里面包含了kafka對(duì)外提供功能的所有角色围小;
- 一圖頂千言:
kafkaserver1.png
Kafka輔助類庫(kù)簡(jiǎn)介
KafkaScheduler
- 所在文件: core/src/main/scala/kafka/utils/KafkaScheduler.scala
- 功能: 接收需周期性執(zhí)行的任務(wù)和延遲作務(wù)的添加, 使用一組thread pool來(lái)執(zhí)行具體的任務(wù);
- 實(shí)現(xiàn): 封裝了 java.util.concurrent.ScheduledThreadPoolExecutor;
- 接口(原有注釋已經(jīng)很清晰):
/**
* Initialize this scheduler so it is ready to accept scheduling of tasks
*/
def startup()
/**
* Shutdown this scheduler. When this method is complete no more executions of background tasks will occur.
* This includes tasks scheduled with a delayed execution.
*/
def shutdown()
/**
* Check if the scheduler has been started
*/
def isStarted: Boolean
/**
* Schedule a task
* @param name The name of this task
* @param delay The amount of time to wait before the first execution
* @param period The period with which to execute the task. If < 0 the task will execute only once.
* @param unit The unit for the preceding times.
*/
def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
ZkUtils
- 所在文件: core/scr/main/scala/kafka/utils/ZkUtils.scala
- 功能: 封裝了可能用到的對(duì)zk上節(jié)點(diǎn)的創(chuàng)建,讀,寫,解析(主要是json)操作;
- 實(shí)現(xiàn): 使用了一個(gè)小眾的類庫(kù) I0Itec 來(lái)操作zk;
- 涉及到以下zk節(jié)點(diǎn):
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val BrokerSequenceIdPath = "/brokers/seqid"
val IsrChangeNotificationPath = "/isr_change_notification"
val EntityConfigPath = "/config"
val EntityConfigChangesPath = "/config/changes"
Pool
- 所在文件: core/src/main/scala/kafka/utils/Pool.scala
- 功能: 簡(jiǎn)單的并發(fā)對(duì)象池;
- 實(shí)現(xiàn): 對(duì)ConcurrentHashMap的封裝;
- getAndMaybePut實(shí)現(xiàn)小技巧, 使用了double check技術(shù), 在有值的情況下降低鎖的開銷;
def getAndMaybePut(key: K) = {
if (valueFactory.isEmpty)
throw new KafkaException("Empty value factory in pool.")
val curr = pool.get(key)
if (curr == null) {
createLock synchronized {
val curr = pool.get(key)
if (curr == null)
pool.put(key, valueFactory.get(key))
pool.get(key)
}
}
else
curr
}
Logging
- 所在文件: core/src/main/scala/kafka/utils/Logging.scala
- 功能: 定義了
trait Logging
供其他類繼承,方便寫日志; - 實(shí)現(xiàn): 對(duì)
org.apache.log4j.Logger
的封裝;
FileLock
- 所在文件: core/src/main/scala/kafka/utils/FileLock.scala
- 功能: 文件鎖, 相當(dāng)于linux的
/usr/bin/lockf
; - 實(shí)現(xiàn): 使用
java.nio.channels.FileLock
實(shí)現(xiàn);
ByteBounderBlockingQueue
- 所在文件: core/src/main/scala/kafkak/utils/ByteBoundedBlockingQueue.scala;
- 功能: 阻塞隊(duì)列, 隊(duì)列滿的衡量標(biāo)準(zhǔn)有兩條: 隊(duì)列內(nèi)元素個(gè)數(shù)達(dá)到了上限, 隊(duì)列內(nèi)所有元素的size之各達(dá)到了上限;
- 實(shí)現(xiàn): 使用
java.util.concurrent.LinkedBlockingQueue
實(shí)現(xiàn), 加上了對(duì)隊(duì)列內(nèi)已有元素size大小的check; - 接口:
def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean
def offer(e: E): Boolean
def put(e: E): Boolean
def poll(timeout: Long, unit: TimeUnit)
def poll()
def take(): E
...
DelayedItem
- 所在文件: core/src/main/scala/kafaka/utils/DelayedItem.scala
- 功能: 定義了可以放入到DelayQueue隊(duì)列的對(duì)象;
- 實(shí)現(xiàn): 實(shí)現(xiàn)了Delayed接口;