Kafka源碼分析-網(wǎng)絡(luò)層-3

  • 這篇將是網(wǎng)絡(luò)層源碼分析的最后一篇
  • 北京的天,無力吐槽啊~
  • 快年底了, 每個(gè)團(tuán)隊(duì)都在旁邊錄制新年寄語,各種口號(hào)~

對(duì)nio的封裝:Selector類

  • 所在文件: clients/src/main/java/org/apache/kafka/commmon/network/Selector.java
  • 源碼中的注釋:

A nioSelector interface for doing non-blocking multi-connection network I/O. This class works with NetworkSend} and NetworkReceive to transmit size-delimited network requests and responses.

  • 重要函數(shù)解析:
    (1) register(String id, SocketChannel socketChannel): 注冊(cè)這個(gè)socketChannel到一個(gè)nio selector, 將其讀事件添加到selector的監(jiān)聽隊(duì)列; 這個(gè)socketChannel通常是服務(wù)器接收到的客戶端的連接:
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);

同時(shí)創(chuàng)建KafkaChannel, 負(fù)責(zé)實(shí)際的數(shù)據(jù)接收和發(fā)送:

KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);

上面的id即為我們?cè)谏掀榻B的非常重要的ConnectionId;
(2) connect: 使用nio的SocketChannel連接到給定的地址,并且注冊(cè)到nio selector,同時(shí)也創(chuàng)建了KafkaChannel,負(fù)責(zé)實(shí)際的數(shù)據(jù)接收和發(fā)送;

SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
socketChannel.connect(address);
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);

(3) poll: 核心函數(shù):

Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing disconnections, initiating new sends, or making progress on in-progress sends or receives.

處理作為客戶端的主動(dòng)連接事件:

if (key.isConnectable()) {
        channel.finishConnect();
         this.connected.add(channel.id());
         this.sensors.connectionCreated.record();
}

處理連接建立或接收后的ssl握手或sasl簽權(quán)操作:

if (channel.isConnected() && !channel.ready())
      channel.prepare();

處理觸發(fā)的讀事件:

if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
          NetworkReceive networkReceive;
          while ((networkReceive = channel.read()) != null)
                     addToStagedReceives(channel, networkReceive);
}

使用一個(gè)while循環(huán)力求每次讀事件觸發(fā)時(shí)都讀盡可能多的數(shù)據(jù);
channel.read()里會(huì)作拆包處理(后面會(huì)講到),返回非null表示當(dāng)前返回的NetworkReceive里包含了完整的應(yīng)用層協(xié)議數(shù)據(jù);
處理觸發(fā)的寫事件:

if (channel.ready() && key.isWritable()) {
         Send send = channel.write();
          if (send != null) {
                 this.completedSends.add(send);
                  this.sensors.recordBytesSent(channel.id(), send.size());
          }
}

需要發(fā)送數(shù)據(jù)通過調(diào)用Selector::send方法,設(shè)置封裝了寫數(shù)據(jù)的NetworkSend,再將這個(gè)NetworkSend通過KafkaChannel::setSend接口設(shè)置到KafkaChannel,同時(shí)將寫事件添加到selector的監(jiān)聽隊(duì)列中,等待寫事件被觸發(fā)后,通過KafkaChannel::write將數(shù)據(jù)發(fā)送出去;
addToCompletedReceives()
將當(dāng)前接收到的完整的的request到添加到completedReceives中,上一篇中介紹的SocketServer會(huì)作completedReceives中取出這些request作處理;

封裝對(duì)單個(gè)連接的讀寫操作:KafkaChannel類

  • 所在文件: clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
  • 包括transportLayer和authenticator, 完成ssh握手,sasl簽權(quán),數(shù)據(jù)的接收和發(fā)送;

傳輸層:TransportLayer類

  • 所在文件 clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
  • 兩個(gè)子類: PlanintextTransportLayer和SslTransportLayer
  • PlanintextTransportLayer的實(shí)現(xiàn)主要是通過NetworkReceive和NetworkSend;
  • SslTransportLayer的實(shí)現(xiàn)主要是通過SocketChannel,ByteBuffers和SSLEngine實(shí)際了加密數(shù)據(jù)的接收和發(fā)送(看到ssl就頭大啊,這部分先忽略~~~);

Kafka協(xié)議的包結(jié)構(gòu):

  • 前4個(gè)字節(jié)固定, 值是后面的實(shí)際數(shù)據(jù)的長(zhǎng)度;
  • NetworkReceive: 接收時(shí)先接收4個(gè)字節(jié), 獲取到長(zhǎng)度,然后再接收實(shí)際的數(shù)據(jù);
  • NetworkSend: 發(fā)送時(shí)實(shí)際數(shù)據(jù)前先加上4個(gè)字節(jié)的數(shù)據(jù)長(zhǎng)度再發(fā)送;

上圖:

selector.png

下一篇開始講Kafka的服務(wù)端配置

Kafka源碼分析-配置文件

Kafka源碼分析-匯總
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末豁辉,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖毒嫡,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異借杰,居然都是意外死亡粪狼,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門纯衍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來栋齿,“玉大人,你說我怎么就攤上這事襟诸⊥叨拢” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵歌亲,是天一觀的道長(zhǎng)菇用。 經(jīng)常有香客問我,道長(zhǎng)陷揪,這世上最難降的妖魔是什么惋鸥? 我笑而不...
    開封第一講書人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮悍缠,結(jié)果婚禮上卦绣,老公的妹妹穿的比我還像新娘。我一直安慰自己飞蚓,他們只是感情好滤港,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著趴拧,像睡著了一般蜗搔。 火紅的嫁衣襯著肌膚如雪劲藐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評(píng)論 1 312
  • 那天樟凄,我揣著相機(jī)與錄音聘芜,去河邊找鬼。 笑死缝龄,一個(gè)胖子當(dāng)著我的面吹牛汰现,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播叔壤,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼瞎饲,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了炼绘?” 一聲冷哼從身側(cè)響起嗅战,我...
    開封第一講書人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎俺亮,沒想到半個(gè)月后驮捍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡脚曾,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年东且,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片本讥。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡珊泳,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出拷沸,到底是詐尸還是另有隱情色查,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布撞芍,位于F島的核電站秧了,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏勤庐。R本人自食惡果不足惜示惊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望愉镰。 院中可真熱鬧米罚,春花似錦、人聲如沸丈探。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至隘竭,卻和暖如春塘秦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背动看。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工尊剔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留售睹,地道東北人氢哮。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓崇众,卻偏偏與公主長(zhǎng)得像迁杨,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子玩焰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • kafka的定義:是一個(gè)分布式消息系統(tǒng)荠呐,由LinkedIn使用Scala編寫挥吵,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,328評(píng)論 1 15
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理篷店,服務(wù)發(fā)現(xiàn)祭椰,斷路器,智...
    卡卡羅2017閱讀 134,711評(píng)論 18 139
  • 6.2 Channel實(shí)現(xiàn) ![Netty_Channel類圖][2] Channel的類圖比較清晰疲陕。我們主要分析...
    Hypercube閱讀 8,533評(píng)論 6 19
  • Java NIO(New IO)是從Java 1.4版本開始引入的一個(gè)新的IO API方淤,可以替代標(biāo)準(zhǔn)的Java I...
    JackChen1024閱讀 7,557評(píng)論 1 143
  • 許久不寫字,發(fā)現(xiàn)寫作水平嚴(yán)重退步啊~~~ 以前也是個(gè)文藝青年,現(xiàn)在也要寫出詩意的代碼啊~ 沒找到以前寫的詩,咱們還...
    掃帚的影子閱讀 3,245評(píng)論 2 8