??Kafka 服務端通過Kafka.scala
的主函數(shù)main
方法啟動疹尾。KafkaServerStartable
類提供讀取配置文件颖医、啟動/停止服務的方法量没。而啟動/停止服務最終調(diào)用的是KafkaServer
的startup/shutdown
方法丑罪。
啟動流程
- 啟動 zk 客戶端芍耘。
- 啟動動態(tài)配置佳谦。
- 啟動調(diào)度線程池戴涝。
- 啟動日志管理器的后臺線程,包括日志清理钻蔑、日志刷盤啥刻、日志刪除、日志壓縮咪笑。
-
啟動 NIO Socket 服務可帽。
- 初始化一個接收器
Acceptor
,即啟動 NIO Socket蒲肋。 - 添加
num.network.threads
個接收器到請求通道RequestChannel
的處理器緩存ConcurrentHashMap
蘑拯,key 為遞增編號,value 為處理器Processor
兜粘。 -
Acceptor
執(zhí)行CountDownLatch.await
等待通知啟動申窘。 - 緩存
Acceptor
到ConcurrentHashMap
,key 為EndPoint
孔轴,value 為Acceptor
剃法。
- 初始化一個接收器
- 啟動副本管理器。
- 在 zk 注冊 broker路鹰。
- 啟動控制器贷洲。
- 啟動組協(xié)調(diào)器。
- 啟動事務協(xié)調(diào)器晋柱。
- 初始化
KafkaApis
优构。 -
初始化處理器線程緩存池。
- 啟動
num.io.threads
個請求處理器線程KafkaRequestHandler
雁竞。 - 從阻塞隊列
ArrayBlockingQueue
獲取請求钦椭,調(diào)用KafkaApis.handle
方法拧额,進行集中處理請求。
- 啟動
-
啟動處理器線程彪腔。
- 首先
CountDownLatch.countDown
通知喚醒Acceptor
線程侥锦。- 使用
NIO.select
輪詢。 - 如果有可接收就緒的事件德挣,則將當前的
SocketChannel
加入緩存隊列ConcurrentLinkedQueue
- 使用
- 從上述緩存隊列取出
SocketChannel
恭垦,綁定到KafkaChannel
。 - 將接收到的請求緩存到限長阻塞隊列
ArrayBlockingQueue
- 首先
請求處理流程
服務端請求處理流程
詳細源碼分析
Acceptor 線程
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 注冊接收事件
startupComplete() // 通知 Acceptor 線程
var currentProcessor = 0
while (isRunning) {
val ready = nioSelector.select(500) // 輪詢事件
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
val key = iter.next
iter.remove()
if (key.isAcceptable) { // 有可接受事件
val processor = synchronized {
currentProcessor = currentProcessor % processors.size
processors(currentProcessor) // 緩存 Processor
}
accept(key, processor) // 將 SocketChannel 緩存到隊列
}
}
}
}
}
Processor 線程
override def run() {
startupComplete() // CountDownLatch.countDown 喚醒 Acceptor 線程格嗅。
while (isRunning) {
configureNewConnections() // 從緩存隊列取出 SocketChannel番挺,綁定到 KafkaChannel
processNewResponses() // 處理返回客戶端的響應
poll() // Kafka.Selector 輪詢讀取/寫入事件
processCompletedReceives() // 處理客戶端的請求,放到阻塞隊列
processCompletedSends() // 處理返回客戶端響應后的回調(diào)
processDisconnected() // 斷開連接后的處理
}
}
KafkaRequestHandler 線程阻塞隊列
def run() {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 從阻塞隊列拉取請求
val req = requestChannel.receiveRequest(300)
req match {
case request: RequestChannel.Request =>
try {
apis.handle(request) // 調(diào)用`KafkaApis.handle`方法吗浩,進行集中處理請求建芙。
}
}
}
}
KSelector
??參考客戶端源碼分析。