Kafka Zero-Copy 使用分析

之前有聽過Zero-Copy 技術(shù)雾鬼,而Kafka是典型的使用者葱轩。網(wǎng)上找了找,竟然沒有找到合適的介紹文章狠毯。正好這段時間正在閱讀Kafka的相關(guān)代碼护糖,于是有了這篇內(nèi)容。這篇文章會簡要介紹Zero-Copy技術(shù)在Kafka的使用情況嚼松,希望能給大家一定借鑒和學(xué)習(xí)樣例嫡良。

前言

Kafka 我個人感覺是性能優(yōu)化的典范。而且使用Scala開發(fā)献酗,代碼寫的也很漂亮的皆刺。重點我覺得有四個

  • NIO
  • Zero Copy
  • 磁盤順序讀寫
  • Queue數(shù)據(jù)結(jié)構(gòu)的極致使用

Zero-Copy 實際的原理,大家還是去Google下凌摄。這篇文章重點會分析這項技術(shù)是怎么被嵌入到Kafa里的。包含兩部分:

  1. Kafka在什么場景下用了這個技術(shù)
  2. Zero-Copy 是如何被調(diào)用漓帅,并且發(fā)揮作用的锨亏。

Kafka在什么場景下使用該技術(shù)

答案是:

消息消費的時候

包括外部Consumer以及Follower 從partiton Leader同步數(shù)據(jù),都是如此忙干。簡單描述就是:

Consumer從Broker獲取文件數(shù)據(jù)的時候器予,直接通過下面的方法進行channel到channel的數(shù)據(jù)傳輸。

java.nio.FileChannel.transferTo(
long position, 
long count,                                
WritableByteChannel target)`

也就是說你的數(shù)據(jù)源是一個Channel,數(shù)據(jù)接收端也是一個Channel(SocketChannel),則通過該方式進行數(shù)據(jù)傳輸捐迫,是直接在內(nèi)核態(tài)進行的乾翔,避免拷貝數(shù)據(jù)導(dǎo)致的內(nèi)核態(tài)和用戶態(tài)的多次切換。

Kafka 如何使用Zero-Copy流程分析

估計看完這段內(nèi)容施戴,你對整個Kafka的數(shù)據(jù)處理流程也差不多了解了個大概反浓。為了避免過于繁雜,以至于將整個Kafka的體系都拖進來赞哗,我們起始點從KafkaApis相關(guān)的類開始雷则。

數(shù)據(jù)的生成

對應(yīng)的類名稱為:

kaka.server.KafkaApis

該類是負責(zé)真正的Kafka業(yè)務(wù)邏輯處理的。在此之前的肪笋,譬如 SocketServer等類似Tomcat服務(wù)器一樣月劈,側(cè)重于交互度迂,屬于框架層次的東西。KafkaApis 則類似于部署在Tomcat里的應(yīng)用猜揪。

def handle(request: RequestChannel.Request) {
       ApiKeys.forId(request.requestId) match {
        case ApiKeys.PRODUCE => handleProducerRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        .....

handle 方法是所有處理的入口惭墓,然后根據(jù)請求的不同,有不同的處理邏輯而姐。這里我們關(guān)注ApiKeys.FETCH這塊腊凶,也就是有消費者要獲取數(shù)據(jù)的邏輯。進入 handleFetchRequest方法毅人,你會看到最后一行代碼如下:

replicaManager.fetchMessages(  
       fetchRequest.maxWait.toLong, 
      fetchRequest.replicaId, 
      fetchRequest.minBytes,  
      authorizedRequestInfo,  
      sendResponseCallback)

ReplicaManager 包含所有主題的所有partition消息吭狡。大部分針對Partition的操作都是通過該類來完成的。

replicaManager.fetchMessages 這個方法非常的長丈莺。我們只關(guān)注一句代碼:

val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)

該方法獲取本地日志信息數(shù)據(jù)划煮。內(nèi)部會調(diào)用kafka.cluster.Log對象的read方法:

log.read(offset, fetchSize, maxOffsetOpt)

Log 對象是啥呢?其實就是對應(yīng)的一個Topic的Partition. 一個Partition是由很多端(Segment)組成的缔俄,這和Lucene非常相似弛秋。一個Segment就是一個文件。實際的數(shù)據(jù)自然是從這里讀到的俐载。代碼如下:

val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)

這里的fetchInfo(FetchDataInfo)對象包含兩個字段:

  • offsetMetadata
  • FileMessageSet

FileMessageSet 其實就是用戶在這個Partition這一次消費能夠拿到的數(shù)據(jù)集合蟹略。當(dāng)然,真實的數(shù)據(jù)還躺在byteBuffer里遏佣,并沒有記在到內(nèi)存中挖炬。FileMessageSet 里面包含了一個很重要的方法:

def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
    ......
    
    val bytesTransferred = (destChannel match {
      case tl: TransportLayer => tl.transferFrom(channel, position, count)
      case dc => channel.transferTo(position, count, dc)
    }).toInt
   
    bytesTransferred
  }

這里我們看到了久違的transferFrom方法。那么這個方法什么時候被調(diào)用呢状婶?我們先擱置下意敛,因為那個是另外一個流程。我們繼續(xù)分析上面的代碼膛虫。也就是接著從這段代碼開始分析:

val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)

獲取到這個信息后草姻,會執(zhí)行如下操作:

val fetchPartitionData = logReadResults.mapValues(result =>  FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
responseCallback(fetchPartitionData)

logReadResults 的信息被包裝成FetchResponsePartitionData, FetchResponsePartitionData 包喊了我們的FileMessageSet 對象。還記得么稍刀,這個對象包含了我們要跟蹤的tranferTo方法撩独。然后FetchResponsePartitionData 會給responseCallback作為參數(shù)進行回調(diào)。

responseCallback 的函數(shù)簽名如下(我去掉了一些我們不關(guān)心的信息):

def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus

      def fetchResponseCallback(delayTimeMs: Int) {
        val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)
        requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
      }

    }

我們重點關(guān)注這個回調(diào)方法里的fetchResponseCallback账月。 我們會發(fā)現(xiàn)這里 FetchResponsePartitionData 會被封裝成一個FetchResponseSend ,然后由requestChannel發(fā)送出去综膀。

因為Kafka完全應(yīng)用是NIO的異步機制,所以到這里捶障,我們無法再跟進去了僧须,需要從另外一部分開始分析。

數(shù)據(jù)的發(fā)送

前面只是涉及到數(shù)據(jù)的獲取项炼。讀取日志担平,并且獲得對應(yīng)MessageSet對象示绊。MessageSet 是一段數(shù)據(jù)的集合,但是該數(shù)據(jù)沒有真實的被加載暂论。
這里會涉及到Kafka 如何將數(shù)據(jù)發(fā)送回Consumer端面褐。

在SocketServer,也就是負責(zé)和所有的消費者打交道取胎,建立連接的中樞里展哭,會不斷的進行poll操作

override def run() {
    startupComplete()
    while(isRunning) {
      try {
        // setup any new connections that have been queued up
        configureNewConnections()
        // register any new responses for writing
        processNewResponses()

首先會注冊新的連接,如果有的話闻蛀。接著就是處理新的響應(yīng)了匪傍。還記得剛剛上面我們通過requestChannelFetchResponseSend發(fā)出來吧。

private def processNewResponses() {
    var curr = requestChannel.receiveResponse(id)
    while(curr != null) {
      try {
        curr.responseAction match {         
          case RequestChannel.SendAction =>
            selector.send(curr.responseSend)
            inflightResponses += (curr.request.connectionId -> curr)
          
        }
      } finally {
        curr = requestChannel.receiveResponse(id)
      }
    }
  }

這里類似的觉痛,processNewResponses方法會先通過send方法把FetchResponseSend注冊到selector上役衡。 這個操作其實做的事情如下:

//SocketServer.scala    
public void send(Send send) {
        KafkaChannel channel = channelOrFail(send.destination());
        channel.setSend(send);
    }

//KafkaChannel.scala
   public void setSend(Send send) {
         this.send = send;          this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);     
    }

為了方便看代碼,我對代碼做了改寫薪棒。我們看到手蝎,其實send就是做了一個WRITE時間注冊。這個是和NIO機制相關(guān)的俐芯。如果大家看的有障礙棵介,不妨先學(xué)習(xí)下相關(guān)的機制。

回到 SocketServer 的run方法里吧史,也就是上面已經(jīng)貼過的代碼:

  override def run() {
    startupComplete()
    while(isRunning) {
      try {
        // setup any new connections that have been queued up
        configureNewConnections()
        // register any new responses for writing
        processNewResponses()

        try {
          selector.poll(300)
        } catch {
          case...
        }

SocketServer 會poll隊列邮辽,一旦對應(yīng)的KafkaChannel 寫操作ready了,就會調(diào)用KafkaChannel的write方法:

//KafkaChannel.scala
public Send write() throws IOException {
        if (send != null && send(send)) 
    }
//
//KafkaChannel.scala
private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();
    }

依然的贸营,為了減少代碼逆巍,我做了些調(diào)整,其中write會調(diào)用 send方法莽使,對應(yīng)的Send對象其實就是上面我們注冊的FetchResponseSend 對象。

這段代碼里真實發(fā)送數(shù)據(jù)的代碼是send.writeTo(transportLayer);笙僚,

對應(yīng)的writeTo方法為:

private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map {
    case(topic, data) => new TopicDataSend(dest, TopicData(topic,
                                                     data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)}))
    }))
override def writeTo(channel: GatheringByteChannel): Long = {
    .....    
     written += sends.writeTo(channel)
    ....
  }

這里我依然做了代碼簡化芳肌,只讓我們關(guān)注核心的。 這里最后是調(diào)用了sends的writeTo方法肋层,而sends 其實是個MultiSend亿笤。
這個MultiSend 里有兩個東西:

  • topicAndPartition.partition: 分區(qū)
  • message:FetchResponsePartitionData

還記得這個FetchResponsePartitionData 么?我們的MessageSet 就被放在了FetchResponsePartitionData這個對象里栋猖。

TopicDataSend 也包含了sends,該sends 包含了 PartitionDataSend净薛,而 PartitionDataSend則包含了FetchResponsePartitionData。

最后進行writeTo的時候蒲拉,其實是調(diào)用了

//partitionData 就是 FetchResponsePartitionData
//messages 其實就是FileMessageSet
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)

如果你還記得的話肃拜,F(xiàn)ileMessageSet 也有個writeTo方法痴腌,就是我們之前已經(jīng)提到過的那段代碼:

def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
    ......

    val bytesTransferred = (destChannel match {
      case tl: TransportLayer => tl.transferFrom(channel, position, count)
      case dc => channel.transferTo(position, count, dc)
    }).toInt

    bytesTransferred
  }

終于走到最底層了,最后其實是通過tl.transferFrom(channel, position, count) 來完成最后的數(shù)據(jù)發(fā)送的燃领。這里你可能比較好奇士聪,不應(yīng)該是調(diào)用transferTo 方法么? transferFrom其實是Kafka自己封裝的一個方法,最終里面調(diào)用的也是transerTo:

  @Override
    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
        return fileChannel.transferTo(position, count, socketChannel);
    }

總結(jié)

Kafka的整個調(diào)用棧還是非常繞的猛蔽。尤其是引入了NIO的事件機制剥悟,有點類似Shuffle,把流程調(diào)用給切斷了,無法簡單通過代碼引用來進行跟蹤曼库。Kafka還有一個非常優(yōu)秀的機制就是DelayQueue機制区岗,我們在分析的過程中,為了方便毁枯,把這塊完全給抹掉了慈缔。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市后众,隨后出現(xiàn)的幾起案子胀糜,更是在濱河造成了極大的恐慌,老刑警劉巖蒂誉,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件教藻,死亡現(xiàn)場離奇詭異,居然都是意外死亡右锨,警方通過查閱死者的電腦和手機括堤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绍移,“玉大人悄窃,你說我怎么就攤上這事□褰眩” “怎么了轧抗?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長瞬测。 經(jīng)常有香客問我横媚,道長,這世上最難降的妖魔是什么月趟? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任灯蝴,我火速辦了婚禮,結(jié)果婚禮上孝宗,老公的妹妹穿的比我還像新娘穷躁。我一直安慰自己,他們只是感情好因妇,可當(dāng)我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布问潭。 她就那樣靜靜地躺著猿诸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪睦授。 梳的紋絲不亂的頭發(fā)上两芳,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天,我揣著相機與錄音去枷,去河邊找鬼怖辆。 笑死,一個胖子當(dāng)著我的面吹牛删顶,可吹牛的內(nèi)容都是我干的竖螃。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼逗余,長吁一口氣:“原來是場噩夢啊……” “哼特咆!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起录粱,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤腻格,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后啥繁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體菜职,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年旗闽,在試婚紗的時候發(fā)現(xiàn)自己被綠了酬核。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡适室,死狀恐怖嫡意,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情捣辆,我是刑警寧澤蔬螟,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站汽畴,受9級特大地震影響促煮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜整袁,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望佑吝。 院中可真熱鬧坐昙,春花似錦、人聲如沸芋忿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至痹仙,卻和暖如春是尔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背开仰。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工拟枚, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人众弓。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓恩溅,卻偏偏與公主長得像,于是被迫代替她去往敵國和親谓娃。 傳聞我的和親對象是個殘疾皇子脚乡,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)滨达,斷路器奶稠,智...
    卡卡羅2017閱讀 134,654評論 18 139
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫捡遍,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,317評論 1 15
  • Design 1. Motivation 我們設(shè)計Kafka用來作為統(tǒng)一的平臺來處理大公司可能擁有的所有實時數(shù)據(jù)源...
    BlackManba_24閱讀 1,373評論 0 8
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,827評論 4 54
  • “你有多久沒戀愛了锌订?” “你好像在問我的年齡』颍” 第一次知道“母胎單身狗”這個詞是在一個暑假已經(jīng)做護士的發(fā)小告訴我...
    不萬能少女閱讀 196評論 0 1