Kafka基礎(chǔ)組件和輔助類庫(kù)簡(jiǎn)介

  • 在正式開始扒代碼之前, 先來(lái)個(gè)開胃菜,簡(jiǎn)單介紹一下kafka的基礎(chǔ)組件和一些代碼實(shí)現(xiàn)中用到的基礎(chǔ)類庫(kù)

Kafka基礎(chǔ)組件概述

  • KafkaServer是整個(gè)Kafka的核心組件土砂,里面包含了kafka對(duì)外提供功能的所有角色围小;
  • 一圖頂千言:
kafkaserver1.png

Kafka輔助類庫(kù)簡(jiǎn)介

KafkaScheduler

  • 所在文件: core/src/main/scala/kafka/utils/KafkaScheduler.scala
  • 功能: 接收需周期性執(zhí)行的任務(wù)和延遲作務(wù)的添加, 使用一組thread pool來(lái)執(zhí)行具體的任務(wù);
  • 實(shí)現(xiàn): 封裝了 java.util.concurrent.ScheduledThreadPoolExecutor;
  • 接口(原有注釋已經(jīng)很清晰):
/**
   * Initialize this scheduler so it is ready to accept scheduling of tasks
   */
  def startup()
  
  /**
   * Shutdown this scheduler. When this method is complete no more executions of background tasks will occur. 
   * This includes tasks scheduled with a delayed execution.
   */
  def shutdown()
  
  /**
   * Check if the scheduler has been started
   */
  def isStarted: Boolean
  
  /**
   * Schedule a task
   * @param name The name of this task
   * @param delay The amount of time to wait before the first execution
   * @param period The period with which to execute the task. If < 0 the task will execute only once.
   * @param unit The unit for the preceding times.
   */
  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)

ZkUtils

  • 所在文件: core/scr/main/scala/kafka/utils/ZkUtils.scala
  • 功能: 封裝了可能用到的對(duì)zk上節(jié)點(diǎn)的創(chuàng)建,讀,寫,解析(主要是json)操作;
  • 實(shí)現(xiàn): 使用了一個(gè)小眾的類庫(kù) I0Itec 來(lái)操作zk;
  • 涉及到以下zk節(jié)點(diǎn):
  val ConsumersPath = "/consumers"
  val BrokerIdsPath = "/brokers/ids"
  val BrokerTopicsPath = "/brokers/topics"
  val ControllerPath = "/controller"
  val ControllerEpochPath = "/controller_epoch"
  val ReassignPartitionsPath = "/admin/reassign_partitions"
  val DeleteTopicsPath = "/admin/delete_topics"
  val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
  val BrokerSequenceIdPath = "/brokers/seqid"
  val IsrChangeNotificationPath = "/isr_change_notification"
  val EntityConfigPath = "/config"
  val EntityConfigChangesPath = "/config/changes"

Pool

  • 所在文件: core/src/main/scala/kafka/utils/Pool.scala
  • 功能: 簡(jiǎn)單的并發(fā)對(duì)象池;
  • 實(shí)現(xiàn): 對(duì)ConcurrentHashMap的封裝;
  • getAndMaybePut實(shí)現(xiàn)小技巧, 使用了double check技術(shù), 在有值的情況下降低鎖的開銷;
def getAndMaybePut(key: K) = {
    if (valueFactory.isEmpty)
      throw new KafkaException("Empty value factory in pool.")
    val curr = pool.get(key)
    if (curr == null) {
      createLock synchronized {
        val curr = pool.get(key)
        if (curr == null)
          pool.put(key, valueFactory.get(key))
        pool.get(key)
      }
    }
    else
      curr
  }

Logging

  • 所在文件: core/src/main/scala/kafka/utils/Logging.scala
  • 功能: 定義了trait Logging 供其他類繼承,方便寫日志;
  • 實(shí)現(xiàn): 對(duì)org.apache.log4j.Logger的封裝;

FileLock

  • 所在文件: core/src/main/scala/kafka/utils/FileLock.scala
  • 功能: 文件鎖, 相當(dāng)于linux的/usr/bin/lockf;
  • 實(shí)現(xiàn): 使用java.nio.channels.FileLock實(shí)現(xiàn);

ByteBounderBlockingQueue

  • 所在文件: core/src/main/scala/kafkak/utils/ByteBoundedBlockingQueue.scala;
  • 功能: 阻塞隊(duì)列, 隊(duì)列滿的衡量標(biāo)準(zhǔn)有兩條: 隊(duì)列內(nèi)元素個(gè)數(shù)達(dá)到了上限, 隊(duì)列內(nèi)所有元素的size之各達(dá)到了上限;
  • 實(shí)現(xiàn): 使用java.util.concurrent.LinkedBlockingQueue實(shí)現(xiàn), 加上了對(duì)隊(duì)列內(nèi)已有元素size大小的check;
  • 接口:
def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean
def offer(e: E): Boolean
def put(e: E): Boolean
def poll(timeout: Long, unit: TimeUnit)
def poll()
def take(): E
...

DelayedItem

  • 所在文件: core/src/main/scala/kafaka/utils/DelayedItem.scala
  • 功能: 定義了可以放入到DelayQueue隊(duì)列的對(duì)象;
  • 實(shí)現(xiàn): 實(shí)現(xiàn)了Delayed接口;

先寫這么多吧,其他的遇到的時(shí)候再來(lái)分析,不得不感嘆java的類庫(kù)真是豐富啊~~~

# 下一篇我們來(lái)開始介紹Kafka的Request和Response

Kafka源碼分析-匯總
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末们豌,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子记罚,更是在濱河造成了極大的恐慌群井,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件毫胜,死亡現(xiàn)場(chǎng)離奇詭異书斜,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)酵使,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門荐吉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人口渔,你說(shuō)我怎么就攤上這事样屠。” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵痪欲,是天一觀的道長(zhǎng)悦穿。 經(jīng)常有香客問(wèn)我,道長(zhǎng)业踢,這世上最難降的妖魔是什么栗柒? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮知举,結(jié)果婚禮上瞬沦,老公的妹妹穿的比我還像新娘。我一直安慰自己雇锡,他們只是感情好逛钻,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著锰提,像睡著了一般曙痘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上立肘,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天屡江,我揣著相機(jī)與錄音,去河邊找鬼赛不。 笑死,一個(gè)胖子當(dāng)著我的面吹牛罢洲,可吹牛的內(nèi)容都是我干的踢故。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼惹苗,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼殿较!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起桩蓉,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤淋纲,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后院究,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體洽瞬,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年业汰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伙窃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡样漆,死狀恐怖为障,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤鳍怨,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布呻右,位于F島的核電站,受9級(jí)特大地震影響鞋喇,放射性物質(zhì)發(fā)生泄漏声滥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一确徙、第九天 我趴在偏房一處隱蔽的房頂上張望醒串。 院中可真熱鬧,春花似錦鄙皇、人聲如沸芜赌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)缠沈。三九已至,卻和暖如春错蝴,著一層夾襖步出監(jiān)牢的瞬間洲愤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工顷锰, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留柬赐,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓官紫,卻偏偏與公主長(zhǎng)得像肛宋,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子束世,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容