1须喂、SocketServer模塊說明
SocketServer為kafka的網(wǎng)絡(luò)通信管理模塊,基于Reactor的模式趁蕊,其中包含了1個Acceptor負(fù)責(zé)接受客戶端請求坞生,N個Processor負(fù)責(zé)讀寫數(shù)據(jù),M個Handler來處理業(yè)務(wù)邏輯掷伙。在Acceptor和Processor是己,Processor和Handler之間都有隊(duì)列來緩沖請求。
SocketServer網(wǎng)絡(luò)通信流程圖如下:
說明:
Acceptor:監(jiān)聽客戶端連接任柜,當(dāng)有客戶端或broker進(jìn)行連接時卒废,將新創(chuàng)建的客戶端連接以輪詢的方式交由某個Processor進(jìn)行讀寫處理;
Processor:接收新建連接宙地,注冊讀事件摔认;處理客戶端的Request;處理broker的Response宅粥;處理客戶端斷開連接事件参袱;限制broker最大連接數(shù)等;
RequestChannel:Request請求處理通道粹胯,客戶端的消息都會入隊(duì)到RequestChannel.requestQueue中等待KafkaRequestHandler進(jìn)行處理蓖柔;
KafkaRequestHandler:請求Request處理線程,其從RequestChannel.requestQueue隊(duì)列中讀取Request并交由KafkaApis進(jìn)行處理风纠;
KafkaApis:協(xié)議處理類况鸣,調(diào)用ReplicaManager、LogManager竹观、GroupManner等對請求進(jìn)行處理镐捧,并將處理結(jié)果以Response的方式入隊(duì)到Processor中,等待Processor將Response發(fā)送給客戶端臭增;
2懂酱、主要處理類說明
2.1、Request及Response說明
Request為客戶端的請求誊抛,其源碼如下:
class Request(
//Processor 線程的序號列牺,即這個請求是由哪個 Processor 線程接收處理的;
//當(dāng) Request 被后面的 I/O 線程處理完成后,還要依靠 Processor 線程發(fā)送 Response 給請求發(fā)送方拗窃,
//因此瞎领,Request 中必須記錄它之前是被哪個 Processor 線程接收的
val processor: Int,
// context 是用來標(biāo)識請求上下文信息泌辫,包括請求頭信息、客戶端的連接id九默、客戶端地址震放、權(quán)鑒信等;
val context: RequestContext,
//startTimeNanos 記錄了 Request 對象被創(chuàng)建的時間驼修,主要用于各種時間統(tǒng)計(jì)指標(biāo)的計(jì)算殿遂。
val startTimeNanos: Long,
//memoryPool 表示源碼定義的一個非阻塞式的內(nèi)存緩沖區(qū),主要作用是避免 Request 對象無限使用內(nèi)存乙各。
memoryPool: MemoryPool,
//buffer 是真正保存 Request 對象內(nèi)容的字節(jié)緩沖區(qū)墨礁。
@volatile private var buffer: ByteBuffer,
metrics: RequestChannel.Metrics。
Response為broker處理請求后的應(yīng)答觅丰,Response接口定義如下:
//Request為Response對應(yīng)的請求
abstract class Response(val request: Request) {
locally {
val nowNs = Time.SYSTEM.nanoseconds
request.responseCompleteTimeNanos = nowNs
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = nowNs
}
//Request對應(yīng)的Processorid
def processor: Int = request.processor
def responseString: Option[String] = Some("")
//發(fā)送完成的回調(diào)
def onComplete: Option[Send => Unit] = None
override def toString: String
}
Response有5個具體的子類:
- SendResponse:Kafka 大多數(shù) Request 處理完成后都需要執(zhí)行一段回調(diào)邏輯饵溅,SendResponse 就是保存返回結(jié)果的 Response 子類。里面最重要的字段是 onCompletionCallback妇萄,即指定處理完成之后的回調(diào)邏輯蜕企。
- NoResponse:有些 Request 處理完成后無需單獨(dú)執(zhí)行額外的回調(diào)邏輯。NoResponse 就是為這類 Response 準(zhǔn)備的冠句。
- CloseConnectionResponse:用于出錯后需要關(guān)閉 TCP 連接的場景轻掩,此時返回 CloseConnectionResponse 給 Request 發(fā)送方,顯式地通知它關(guān)閉連接懦底。
- StartThrottlingResponse:用于通知 Broker 的 Socket Server 組件唇牧,某個 TCP 連接通信通道開始被限流(throttling)。
- EndThrottlingResponse:與 StartThrottlingResponse 對應(yīng)聚唐,通知 Broker 的 SocketServer 組件某個 TCP 連接通信通道的限流已結(jié)束丐重。
2.2、Acceptor說明
Acceptor主要作用是監(jiān)聽客戶端的連接杆查,當(dāng)有新連接建立后扮惦,其會將新的連接通過輪詢交由某個Processor來進(jìn)行管理。即Acceptor處理連接的ACCEPT事件亲桦,而READ和WRITE事件交由Processor來處理崖蜜;同時,Acceptor也提供了幾個接口來對Processor線程池進(jìn)行管理客峭。
Acceptor定義:
private[kafka] class Acceptor(
// 配置的Kafka Broker連接信息豫领,比如 PLAINTEXT://localhost:9092。Acceptor需要用endPoint包含的主機(jī)名和端口信息創(chuàng)建Server Socket
val endPoint: EndPoint,
//SocketOptions 的 SO_SNDBUF舔琅,即用于設(shè)置出站(Outbound)網(wǎng)絡(luò) I/O 的底層緩沖區(qū)大小等恐。該值默認(rèn)是 Broker 端參數(shù) socket.send.buffer.bytes 的值,即 100KB
val sendBufferSize: Int,
//SocketOptions 的 SO_RCVBUF,即用于設(shè)置入站(Inbound)網(wǎng)絡(luò) I/O 的底層緩沖區(qū)大小鼠锈。該值默認(rèn)是 Broker 端參數(shù) socket.receive.buffer.bytes 的值闪檬,即 100KB星著。
val recvBufferSize: Int,
// broker 節(jié)點(diǎn) id
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas)
Acceptor主要成員:
//Selector對象負(fù)責(zé)執(zhí)行底層實(shí)際I/O操作购笆,如監(jiān)聽連接創(chuàng)建請求、讀寫請求等
private val nioSelector = NSelector.open()
// Broker端創(chuàng)建對應(yīng)的ServerSocketChannel實(shí)例虚循,同時將此連接注冊到selector中同欠,用于監(jiān)聽客戶端的連接
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
//Processor線程數(shù)組,用于處理Request/Response等
private val processors = new ArrayBuffer[Processor]()
Acceptor輪詢流程:
處理源碼如下:
def run(): Unit = {
//將ServerSocket的ACCEPT事件注冊到對應(yīng)的Selector中
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessorIndex = 0
while (isRunning) {
try {
//監(jiān)聽selector的事件横缔,超時時間為500ms
val ready = nioSelector.select(500)
if (ready > 0) {
//獲取所有事件的SelectionKey
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
//遍歷SelectionKey
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
//接受客戶端連接
accept(key).foreach { socketChannel =>
// Assign the channel to the next processor (using round-robin) to which the
// channel can be added without blocking. If newConnections queue is full on
// all processors, block until the last one is able to accept a connection.
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
//獲取可用的Processor線程
processor = synchronized {
// adjust the index (if necessary) and retrieve the processor atomically for
// correct behaviour in case the number of processors is reduced dynamically
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
//將新連接放入對應(yīng)Processor的newConnections隊(duì)列中铺遂,同時喚醒Processor的Selector
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}
2.3、Processor說明
Processor為協(xié)議處理線程茎刚,其監(jiān)聽客戶端的讀寫事件襟锐,處理注冊新連接,讀取客戶端請求及發(fā)送客戶端的應(yīng)答等膛锭,是真正進(jìn)行協(xié)議處理的線程粮坞;
2.3.1、主要對象
private[kafka] class Processor(
val id: Int,
time: Time,
maxRequestSize: Int,
//用于存放Request初狰,用于存放Request
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
failedAuthenticationDelayMs: Int,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
//存放新建連接的隊(duì)列莫杈,當(dāng)Acceptor接受新連接時,會將新的ScketServer放入此隊(duì)列中奢入,
//而Processor會將此隊(duì)列中的SocketServer加入Selector中筝闹,并監(jiān)聽對應(yīng)的讀或?qū)懯录? private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
// 這是一個臨時 Response 隊(duì)列。當(dāng) Processor 線程將 Response 返還給 Request 發(fā)送方之后腥光,還要將 Response 放入這個臨時隊(duì)列关顷。
// 有些 Response 回調(diào)邏輯要在 Response 被發(fā)送回發(fā)送方之后,才能執(zhí)行武福,因此需要暫存在一個臨時隊(duì)列里面议双。
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
//Response隊(duì)列,此隊(duì)列中的Response需要發(fā)送給其對應(yīng)的客戶端艘儒;
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
//Processor對應(yīng)的Selector聋伦,用于監(jiān)聽客戶端的讀寫事件
private val selector = createSelector( ChannelBuilders.serverChannelBuilder(listenerName,listenerName == config.interBrokerListenerName, securityProtocol, config, credentialProvider.credentialCache, credentialProvider.tokenCache,time,logContext))
}
2.3.2、主要流程說明
override def run(): Unit = {
startupComplete()
try {
while (isRunning) {
try {
// 1. 處理新連接
// 遍歷newConnections中的SocketChannel對象界睁,在Selector中注冊對應(yīng)SocketChannel的OP_READ事件
configureNewConnections()
// 2. 處理Response隊(duì)列
// 遍歷responseQueue隊(duì)列觉增,并根據(jù)不同類型的Response進(jìn)行不同處理,如將Response發(fā)送給客戶端翻斟,將Response入隊(duì)到inflightResponses中逾礁,處理限流消息等
processNewResponses()
//3、處理底層IO事件
//通過Selector獲取底層IO事件,讀取客戶端的請求數(shù)據(jù)嘹履,并將將數(shù)據(jù)拼接完整并將數(shù)據(jù)入隊(duì)completedReceives
poll()
//4腻扇、解析數(shù)據(jù)及封裝Request
//遍歷completedReceives,并根據(jù)協(xié)議將數(shù)據(jù)封裝為Request消息砾嫉,并將Request入隊(duì)到RequestChannel的requestQueue中
processCompletedReceives()
//5幼苛、處理發(fā)送完成的Response
//遍歷Selector中的completedSends隊(duì)列,并從inflightResponses中移除對應(yīng)的Response焕刮,同時調(diào)用對應(yīng)Response的回調(diào)舶沿,及處理限流等
processCompletedSends()
//6、處理客戶端斷開連接
//遍歷Selector中的disconnected隊(duì)列配并,移除inflightResponses中對應(yīng)連接的Response括荡;
processDisconnected()
//7、最大連接數(shù)限制
//當(dāng)一個listener的連接數(shù)大于配置的最大連接數(shù)時溉旋,按將要關(guān)閉的連接畸冲、LRU算法空閑超時的連接、任意一個連接的順序關(guān)閉一個連接观腊,以此達(dá)到資源釋放
closeExcessConnections()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
2.4邑闲、RequestChannel說明
RequestChannel 是傳輸 Request/Response 的通道;RequestChannel的requestQueue 會緩存Request恕沫,同時processors線程池中的Processor會讀取各個SocketChannel中的數(shù)據(jù)监憎,封裝為Request并入隊(duì)到requestQueue 隊(duì)列中;
RequestChannel主要參數(shù):
class RequestChannel(
//requestQueue的最大容量
val queueSize: Int,
val metricNamePrefix : String) extends KafkaMetricsGroup {
//保存Request的隊(duì)列
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
//Processor線程池
private val processors = new ConcurrentHashMap[Int, Processor]()婶溯、
}
2.5褥影、KafkaRequestHandler 與 KafkaRequestHandlerPool說明
KafkaRequestHandler為真正處理Request的線程哑芹,其主要是從RequestChannel中獲取Request,并將Request交由KafkaApis進(jìn)行處理;而KafkaRequestHandlerPool是KafkaRequestHandler的池化封裝牡直;
KafkaRequestHandler參數(shù)說明:
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
//線程池總體線程數(shù)量
val totalHandlerThreads: AtomicInteger,
//當(dāng)前線程對應(yīng)的RequestChannel
val requestChannel: RequestChannel,
//處理Request的apis
apis: KafkaApis,
time: Time) extends Runnable with Logging {}
KafkaRequestHandler處理流程:
def run(): Unit = {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
// 從請求隊(duì)列中獲取下一個待處理的請求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match {
// 關(guān)閉線程請求倦青,說明該 Broker 發(fā)起了關(guān)閉操作
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
shutdownComplete.countDown()
return
// 普通請求
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
// 由KafkaApis.handle方法執(zhí)行相應(yīng)處理邏輯
apis.handle(request)
} catch {
// 如果出現(xiàn)嚴(yán)重錯誤羹唠,立即關(guān)閉線程
case e: FatalExitError =>
shutdownComplete.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
// 釋放請求對象占用的內(nèi)存緩沖區(qū)資源
request.releaseBuffer()
}
case null => // continue
}
}
shutdownComplete.countDown()
}
KafkaRequestHandlerPool主要參數(shù):
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
//線程個數(shù)
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
//線程池中線程的個數(shù)
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
//線程池志鹃,每個線程就是一個KafkaRequestHandler
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}
2.6、KafkaApis說明
KafkaApis封裝了實(shí)際協(xié)議的處理邏輯信轿,其中會根據(jù)不同的協(xié)議晃痴,調(diào)用LogManager、ReplicaManager等進(jìn)行協(xié)議處理财忽,同時將處理結(jié)果封裝為Response入隊(duì)到Processor中的responseQueue中倘核;
KafkaApis主要參數(shù):
class KafkaApis(val requestChannel: RequestChannel,// 請求通道
val replicaManager: ReplicaManager,// 副本管理器,控制集群所有副本的狀態(tài)轉(zhuǎn)換
val adminManager: AdminManager,// topic即彪、分區(qū)配置等管理器
val groupCoordinator: GroupCoordinator,// 消費(fèi)者組協(xié)調(diào)器組件
val txnCoordinator: TransactionCoordinator,// 事務(wù)管理器組件
val controller: KafkaController, // 控制器組件紧唱,管理與保存原數(shù)據(jù)
val zkClient: KafkaZkClient,// ZooKeeper客戶端程序,Kafka依賴于該類實(shí)現(xiàn)與ZooKeeper交互
val brokerId: Int, // broker.id參數(shù)值
val config: KafkaConfig,// Kafka配置類
val metadataCache: MetadataCache,// 元數(shù)據(jù)緩存類
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,// 配額管理器組件
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,//節(jié)點(diǎn)的主題狀態(tài)信息
val clusterId: String,//集群id
time: Time,
val tokenManager: DelegationTokenManager) extends Logging {}
KafkaApis根據(jù)不同的協(xié)議進(jìn)行對應(yīng)的處理;
2.7漏益、Data plane 與 Control plane說明
Kafka對數(shù)據(jù)類命令和控制類命令的連接處理通道進(jìn)行了拆分蛹锰,Data Plane處理數(shù)據(jù)類請求,Control plane處理命令類請求绰疤;
class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
// SocketServer實(shí)現(xiàn)BrokerReconfigurable trait表明SocketServer的一些參數(shù)配置是允許動態(tài)修改的
// 即在Broker不停機(jī)的情況下修改它們
// SocketServer的請求隊(duì)列長度铜犬,由Broker端參數(shù)queued.max.requests值而定,默認(rèn)值是500
private val maxQueuedRequests = config.queuedMaxRequests
private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
// 處理數(shù)據(jù)類請求的Acceptor線程池峦睡,每套監(jiān)聽器對應(yīng)一個Acceptor線程
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
// control-plane
// 用于處理控制類請求的Processor線程
// 注意:目前定義了專屬的Processor線程而非線程池處理控制類請求
// Control plane 的配套資源只有 1 個 Acceptor 線程 + 1 個 Processor 線程 + 1 個深度是 20 的請求隊(duì)列而已翎苫。
// 一旦你開啟了 Control plane 設(shè)置,其 Processor 線程就只有 1 個榨了,Acceptor 線程也是 1 個。另外攘蔽,你要注意龙屉,它對應(yīng)的 RequestChannel 里面的請求隊(duì)列長度被硬編碼成了 20,而不是一個可配置的值满俗。
// 這揭示了社區(qū)在這里所做的一個假設(shè):即控制類請求的數(shù)量應(yīng)該遠(yuǎn)遠(yuǎn)小于數(shù)據(jù)類請求转捕,因而不需要為它創(chuàng)建線程池和較深的請求隊(duì)列。
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
// 處理控制類請求專屬的RequestChannel對象
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time))
// Data plane 和 Control plane 注釋下面分別定義了一組變量唆垃,即 Processor 線程池五芝、Acceptor 線程池和 RequestChannel 實(shí)例。
// Processor 線程池:即網(wǎng)絡(luò)線程池辕万,負(fù)責(zé)將請求高速地放入到請求隊(duì)列中枢步。
// Acceptor 線程池:保存了 SocketServer 為每個監(jiān)聽器定義的 Acceptor 線程,此線程負(fù)責(zé)分發(fā)該監(jiān)聽器上的入站連接建立請求渐尿。
// RequestChannel:承載請求隊(duì)列的請求處理通道醉途。
}
SocketServer在初始化的時候,對兩種通道進(jìn)行區(qū)分砖茸;
創(chuàng)建Data plane:
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = {
// 遍歷監(jiān)聽器集合
endpoints.foreach { endpoint =>
// 將監(jiān)聽器納入到連接配額管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 為監(jiān)聽器創(chuàng)建對應(yīng)的Acceptor線程
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
// 為監(jiān)聽器創(chuàng)建多個Processor線程隘擎。具體數(shù)目由num.network.threads決定
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
// 將<監(jiān)聽器,Acceptor線程>對保存起來統(tǒng)一管理
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
}
}
創(chuàng)建Control plane:
Control plane 的配套資源只有 1 個 Acceptor 線程 + 1 個 Processor 線程 + 1 個深度是 20 的請求隊(duì)列
private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = {
// 如果為Control plane配置了監(jiān)聽器
endpointOpt.foreach { endpoint =>
// 將監(jiān)聽器納入到連接配額管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 為監(jiān)聽器創(chuàng)建對應(yīng)的Acceptor線程
val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
// 為監(jiān)聽器創(chuàng)建對應(yīng)的Processor線程
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
controlPlaneProcessorOpt = Some(controlPlaneProcessor)
val listenerProcessors = new ArrayBuffer[Processor]()
listenerProcessors += controlPlaneProcessor
// 將Processor線程添加到控制類請求專屬RequestChannel中
// 即添加到RequestChannel實(shí)例保存的Processor線程池中
controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
nextProcessorId += 1
// 把Processor對象也添加到Acceptor線程管理的Processor線程池中
controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
}
}