kafka源碼愫讀(3)台谍、SocketServer模塊源碼分析

1须喂、SocketServer模塊說明

SocketServer為kafka的網(wǎng)絡(luò)通信管理模塊,基于Reactor的模式趁蕊,其中包含了1個Acceptor負(fù)責(zé)接受客戶端請求坞生,N個Processor負(fù)責(zé)讀寫數(shù)據(jù),M個Handler來處理業(yè)務(wù)邏輯掷伙。在Acceptor和Processor是己,Processor和Handler之間都有隊(duì)列來緩沖請求。

SocketServer網(wǎng)絡(luò)通信流程圖如下:

SocketServer網(wǎng)絡(luò)通信流程.png

說明:
Acceptor:監(jiān)聽客戶端連接任柜,當(dāng)有客戶端或broker進(jìn)行連接時卒废,將新創(chuàng)建的客戶端連接以輪詢的方式交由某個Processor進(jìn)行讀寫處理;
Processor:接收新建連接宙地,注冊讀事件摔认;處理客戶端的Request;處理broker的Response宅粥;處理客戶端斷開連接事件参袱;限制broker最大連接數(shù)等;
RequestChannel:Request請求處理通道粹胯,客戶端的消息都會入隊(duì)到RequestChannel.requestQueue中等待KafkaRequestHandler進(jìn)行處理蓖柔;
KafkaRequestHandler:請求Request處理線程,其從RequestChannel.requestQueue隊(duì)列中讀取Request并交由KafkaApis進(jìn)行處理风纠;
KafkaApis:協(xié)議處理類况鸣,調(diào)用ReplicaManager、LogManager竹观、GroupManner等對請求進(jìn)行處理镐捧,并將處理結(jié)果以Response的方式入隊(duì)到Processor中,等待Processor將Response發(fā)送給客戶端臭增;

2懂酱、主要處理類說明

2.1、Request及Response說明

Request為客戶端的請求誊抛,其源碼如下:

class Request(
              //Processor 線程的序號列牺,即這個請求是由哪個 Processor 線程接收處理的;
              //當(dāng) Request 被后面的 I/O 線程處理完成后,還要依靠 Processor 線程發(fā)送 Response 給請求發(fā)送方拗窃,
              //因此瞎领,Request 中必須記錄它之前是被哪個 Processor 線程接收的
              val processor: Int,
              // context 是用來標(biāo)識請求上下文信息泌辫,包括請求頭信息、客戶端的連接id九默、客戶端地址震放、權(quán)鑒信等;
              val context: RequestContext,
              //startTimeNanos 記錄了 Request 對象被創(chuàng)建的時間驼修,主要用于各種時間統(tǒng)計(jì)指標(biāo)的計(jì)算殿遂。
              val startTimeNanos: Long,
             //memoryPool 表示源碼定義的一個非阻塞式的內(nèi)存緩沖區(qū),主要作用是避免 Request 對象無限使用內(nèi)存乙各。
              memoryPool: MemoryPool,
              //buffer 是真正保存 Request 對象內(nèi)容的字節(jié)緩沖區(qū)墨礁。
              @volatile private var buffer: ByteBuffer,
              metrics: RequestChannel.Metrics。

Response為broker處理請求后的應(yīng)答觅丰,Response接口定義如下:

//Request為Response對應(yīng)的請求
abstract class Response(val request: Request) {
  locally {
    val nowNs = Time.SYSTEM.nanoseconds
    request.responseCompleteTimeNanos = nowNs
    if (request.apiLocalCompleteTimeNanos == -1L)
      request.apiLocalCompleteTimeNanos = nowNs
  }
    
  //Request對應(yīng)的Processorid  
  def processor: Int = request.processor

  def responseString: Option[String] = Some("")
  
  //發(fā)送完成的回調(diào)
  def onComplete: Option[Send => Unit] = None

  override def toString: String
}

Response有5個具體的子類:

  • SendResponse:Kafka 大多數(shù) Request 處理完成后都需要執(zhí)行一段回調(diào)邏輯饵溅,SendResponse 就是保存返回結(jié)果的 Response 子類。里面最重要的字段是 onCompletionCallback妇萄,即指定處理完成之后的回調(diào)邏輯蜕企。
  • NoResponse:有些 Request 處理完成后無需單獨(dú)執(zhí)行額外的回調(diào)邏輯。NoResponse 就是為這類 Response 準(zhǔn)備的冠句。
  • CloseConnectionResponse:用于出錯后需要關(guān)閉 TCP 連接的場景轻掩,此時返回 CloseConnectionResponse 給 Request 發(fā)送方,顯式地通知它關(guān)閉連接懦底。
  • StartThrottlingResponse:用于通知 Broker 的 Socket Server 組件唇牧,某個 TCP 連接通信通道開始被限流(throttling)。
  • EndThrottlingResponse:與 StartThrottlingResponse 對應(yīng)聚唐,通知 Broker 的 SocketServer 組件某個 TCP 連接通信通道的限流已結(jié)束丐重。

2.2、Acceptor說明

Acceptor主要作用是監(jiān)聽客戶端的連接杆查,當(dāng)有新連接建立后扮惦,其會將新的連接通過輪詢交由某個Processor來進(jìn)行管理。即Acceptor處理連接的ACCEPT事件亲桦,而READ和WRITE事件交由Processor來處理崖蜜;同時,Acceptor也提供了幾個接口來對Processor線程池進(jìn)行管理客峭。

Acceptor定義:

private[kafka] class Acceptor(
        // 配置的Kafka Broker連接信息豫领,比如 PLAINTEXT://localhost:9092。Acceptor需要用endPoint包含的主機(jī)名和端口信息創(chuàng)建Server Socket
        val endPoint: EndPoint,
        //SocketOptions 的 SO_SNDBUF舔琅,即用于設(shè)置出站(Outbound)網(wǎng)絡(luò) I/O 的底層緩沖區(qū)大小等恐。該值默認(rèn)是 Broker 端參數(shù) socket.send.buffer.bytes 的值,即 100KB
        val sendBufferSize: Int,
        //SocketOptions 的 SO_RCVBUF,即用于設(shè)置入站(Inbound)網(wǎng)絡(luò) I/O 的底層緩沖區(qū)大小鼠锈。該值默認(rèn)是 Broker 端參數(shù) socket.receive.buffer.bytes 的值闪檬,即 100KB星著。
        val recvBufferSize: Int,
        // broker 節(jié)點(diǎn) id
         brokerId: Int,
        connectionQuotas: ConnectionQuotas,
        metricPrefix: String) extends AbstractServerThread(connectionQuotas)

Acceptor主要成員:

//Selector對象負(fù)責(zé)執(zhí)行底層實(shí)際I/O操作购笆,如監(jiān)聽連接創(chuàng)建請求、讀寫請求等
private val nioSelector = NSelector.open()
// Broker端創(chuàng)建對應(yīng)的ServerSocketChannel實(shí)例虚循,同時將此連接注冊到selector中同欠,用于監(jiān)聽客戶端的連接
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
//Processor線程數(shù)組,用于處理Request/Response等
private val processors = new ArrayBuffer[Processor]()

Acceptor輪詢流程:

Acceptor輪詢流程.png

處理源碼如下:

def run(): Unit = {
  //將ServerSocket的ACCEPT事件注冊到對應(yīng)的Selector中  
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  startupComplete()
  try {
    var currentProcessorIndex = 0
    while (isRunning) {
      try {
        //監(jiān)聽selector的事件横缔,超時時間為500ms
        val ready = nioSelector.select(500)
        if (ready > 0) {
          //獲取所有事件的SelectionKey  
          val keys = nioSelector.selectedKeys()
          val iter = keys.iterator()
          //遍歷SelectionKey
          while (iter.hasNext && isRunning) {
            try {
              val key = iter.next
              iter.remove()

              if (key.isAcceptable) {
                //接受客戶端連接  
                accept(key).foreach { socketChannel =>

                  // Assign the channel to the next processor (using round-robin) to which the
                  // channel can be added without blocking. If newConnections queue is full on
                  // all processors, block until the last one is able to accept a connection.
                  var retriesLeft = synchronized(processors.length)
                  var processor: Processor = null
                  do {
                    retriesLeft -= 1
                    //獲取可用的Processor線程
                    processor = synchronized {
                      // adjust the index (if necessary) and retrieve the processor atomically for
                      // correct behaviour in case the number of processors is reduced dynamically
                      currentProcessorIndex = currentProcessorIndex % processors.length
                      processors(currentProcessorIndex)
                    }
                    currentProcessorIndex += 1
                    //將新連接放入對應(yīng)Processor的newConnections隊(duì)列中铺遂,同時喚醒Processor的Selector
                  } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                }
              } else
                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
            } catch {
              case e: Throwable => error("Error while accepting connection", e)
            }
          }
        }
      }
      catch {
        // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
        // to a select operation on a specific channel or a bad request. We don't want
        // the broker to stop responding to requests from other clients in these scenarios.
        case e: ControlThrowable => throw e
        case e: Throwable => error("Error occurred", e)
      }
    }
  } finally {
    debug("Closing server socket and selector.")
    CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
    CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
    shutdownComplete()
  }
}

2.3、Processor說明

Processor為協(xié)議處理線程茎刚,其監(jiān)聽客戶端的讀寫事件襟锐,處理注冊新連接,讀取客戶端請求及發(fā)送客戶端的應(yīng)答等膛锭,是真正進(jìn)行協(xié)議處理的線程粮坞;

2.3.1、主要對象

private[kafka] class Processor(
        val id: Int,
        time: Time,
        maxRequestSize: Int,
        //用于存放Request初狰,用于存放Request
        requestChannel: RequestChannel,
        connectionQuotas: ConnectionQuotas,
        connectionsMaxIdleMs: Long,
        failedAuthenticationDelayMs: Int,
        listenerName: ListenerName,
        securityProtocol: SecurityProtocol,
        config: KafkaConfig,
        metrics: Metrics,
        credentialProvider: CredentialProvider,
        memoryPool: MemoryPool,
        logContext: LogContext,
        connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
         
    //存放新建連接的隊(duì)列莫杈,當(dāng)Acceptor接受新連接時,會將新的ScketServer放入此隊(duì)列中奢入,
    //而Processor會將此隊(duì)列中的SocketServer加入Selector中筝闹,并監(jiān)聽對應(yīng)的讀或?qū)懯录?    private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
    // 這是一個臨時 Response 隊(duì)列。當(dāng) Processor 線程將 Response 返還給 Request 發(fā)送方之后腥光,還要將 Response 放入這個臨時隊(duì)列关顷。
    // 有些 Response 回調(diào)邏輯要在 Response 被發(fā)送回發(fā)送方之后,才能執(zhí)行武福,因此需要暫存在一個臨時隊(duì)列里面议双。
    private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
    //Response隊(duì)列,此隊(duì)列中的Response需要發(fā)送給其對應(yīng)的客戶端艘儒;
    private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
    //Processor對應(yīng)的Selector聋伦,用于監(jiān)聽客戶端的讀寫事件
    private val selector = createSelector(  ChannelBuilders.serverChannelBuilder(listenerName,listenerName == config.interBrokerListenerName,    securityProtocol,    config,    credentialProvider.credentialCache,    credentialProvider.tokenCache,time,logContext))
}

2.3.2、主要流程說明

override def run(): Unit = {
  startupComplete()
  try {
    while (isRunning) {
      try {
        // 1. 處理新連接
        // 遍歷newConnections中的SocketChannel對象界睁,在Selector中注冊對應(yīng)SocketChannel的OP_READ事件
        configureNewConnections()
        // 2. 處理Response隊(duì)列
       // 遍歷responseQueue隊(duì)列觉增,并根據(jù)不同類型的Response進(jìn)行不同處理,如將Response發(fā)送給客戶端翻斟,將Response入隊(duì)到inflightResponses中逾礁,處理限流消息等
        processNewResponses()
        //3、處理底層IO事件
        //通過Selector獲取底層IO事件,讀取客戶端的請求數(shù)據(jù)嘹履,并將將數(shù)據(jù)拼接完整并將數(shù)據(jù)入隊(duì)completedReceives
        poll()
        //4腻扇、解析數(shù)據(jù)及封裝Request
        //遍歷completedReceives,并根據(jù)協(xié)議將數(shù)據(jù)封裝為Request消息砾嫉,并將Request入隊(duì)到RequestChannel的requestQueue中
        processCompletedReceives()
        //5幼苛、處理發(fā)送完成的Response
        //遍歷Selector中的completedSends隊(duì)列,并從inflightResponses中移除對應(yīng)的Response焕刮,同時調(diào)用對應(yīng)Response的回調(diào)舶沿,及處理限流等
        processCompletedSends()
        //6、處理客戶端斷開連接
        //遍歷Selector中的disconnected隊(duì)列配并,移除inflightResponses中對應(yīng)連接的Response括荡;
        processDisconnected()
        //7、最大連接數(shù)限制
        //當(dāng)一個listener的連接數(shù)大于配置的最大連接數(shù)時溉旋,按將要關(guān)閉的連接畸冲、LRU算法空閑超時的連接、任意一個連接的順序關(guān)閉一個連接观腊,以此達(dá)到資源釋放
        closeExcessConnections()
      } catch {
        // We catch all the throwables here to prevent the processor thread from exiting. We do this because
        // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
        // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
        // be either associated with a specific socket channel or a bad request. These exceptions are caught and
        // processed by the individual methods above which close the failing channel and continue processing other
        // channels. So this catch block should only ever see ControlThrowables.
        case e: Throwable => processException("Processor got uncaught exception.", e)
      }
    }
  } finally {
    debug(s"Closing selector - processor $id")
    CoreUtils.swallow(closeAll(), this, Level.ERROR)
    shutdownComplete()
  }
}

2.4邑闲、RequestChannel說明

RequestChannel 是傳輸 Request/Response 的通道;RequestChannel的requestQueue 會緩存Request恕沫,同時processors線程池中的Processor會讀取各個SocketChannel中的數(shù)據(jù)监憎,封裝為Request并入隊(duì)到requestQueue 隊(duì)列中;

RequestChannel主要參數(shù):

class RequestChannel(
    //requestQueue的最大容量
    val queueSize: Int, 
    val metricNamePrefix : String) extends KafkaMetricsGroup {
    
    //保存Request的隊(duì)列
    private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
    //Processor線程池
    private val processors = new ConcurrentHashMap[Int, Processor]()婶溯、
  }

2.5褥影、KafkaRequestHandler 與 KafkaRequestHandlerPool說明

KafkaRequestHandler為真正處理Request的線程哑芹,其主要是從RequestChannel中獲取Request,并將Request交由KafkaApis進(jìn)行處理;而KafkaRequestHandlerPool是KafkaRequestHandler的池化封裝牡直;

KafkaRequestHandler參數(shù)說明:

class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          //線程池總體線程數(shù)量
                          val totalHandlerThreads: AtomicInteger,
                          //當(dāng)前線程對應(yīng)的RequestChannel
                          val requestChannel: RequestChannel,
                          //處理Request的apis
                          apis: KafkaApis,
                          time: Time) extends Runnable with Logging {}

KafkaRequestHandler處理流程:

def run(): Unit = {
  while (!stopped) {
    // We use a single meter for aggregate idle percentage for the thread pool.
    // Since meter is calculated as total_recorded_value / time_window and
    // time_window is independent of the number of threads, each recorded idle
    // time should be discounted by # threads.
    val startSelectTime = time.nanoseconds
    // 從請求隊(duì)列中獲取下一個待處理的請求
    val req = requestChannel.receiveRequest(300)
    val endTime = time.nanoseconds
    val idleTime = endTime - startSelectTime
    aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

    req match {
      // 關(guān)閉線程請求倦青,說明該 Broker 發(fā)起了關(guān)閉操作
      case RequestChannel.ShutdownRequest =>
        debug(s"Kafka request handler $id on broker $brokerId received shut down command")
        shutdownComplete.countDown()
        return
      // 普通請求
      case request: RequestChannel.Request =>
        try {
          request.requestDequeueTimeNanos = endTime
          trace(s"Kafka request handler $id on broker $brokerId handling request $request")
          // 由KafkaApis.handle方法執(zhí)行相應(yīng)處理邏輯
          apis.handle(request)
        } catch {
          // 如果出現(xiàn)嚴(yán)重錯誤羹唠,立即關(guān)閉線程  
          case e: FatalExitError =>
            shutdownComplete.countDown()
            Exit.exit(e.statusCode)
          case e: Throwable => error("Exception when handling request", e)
        } finally {
          // 釋放請求對象占用的內(nèi)存緩沖區(qū)資源  
          request.releaseBuffer()
        }

      case null => // continue
    }
  }
  shutdownComplete.countDown()
}

KafkaRequestHandlerPool主要參數(shù):

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              //線程個數(shù)
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

  //線程池中線程的個數(shù)
  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
  
  //線程池志鹃,每個線程就是一個KafkaRequestHandler
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

2.6、KafkaApis說明

KafkaApis封裝了實(shí)際協(xié)議的處理邏輯信轿,其中會根據(jù)不同的協(xié)議晃痴,調(diào)用LogManager、ReplicaManager等進(jìn)行協(xié)議處理财忽,同時將處理結(jié)果封裝為Response入隊(duì)到Processor中的responseQueue中倘核;

KafkaApis主要參數(shù):

class KafkaApis(val requestChannel: RequestChannel,// 請求通道
                val replicaManager: ReplicaManager,// 副本管理器,控制集群所有副本的狀態(tài)轉(zhuǎn)換
                val adminManager: AdminManager,// topic即彪、分區(qū)配置等管理器
                val groupCoordinator: GroupCoordinator,// 消費(fèi)者組協(xié)調(diào)器組件
                val txnCoordinator: TransactionCoordinator,// 事務(wù)管理器組件
                val controller: KafkaController, // 控制器組件紧唱,管理與保存原數(shù)據(jù)
                val zkClient: KafkaZkClient,// ZooKeeper客戶端程序,Kafka依賴于該類實(shí)現(xiàn)與ZooKeeper交互
                val brokerId: Int, // broker.id參數(shù)值
                val config: KafkaConfig,// Kafka配置類
                val metadataCache: MetadataCache,// 元數(shù)據(jù)緩存類
                val metrics: Metrics,
                val authorizer: Option[Authorizer],
                val quotas: QuotaManagers,// 配額管理器組件
                val fetchManager: FetchManager,
                brokerTopicStats: BrokerTopicStats,//節(jié)點(diǎn)的主題狀態(tài)信息
                    val clusterId: String,//集群id
                time: Time,
                val tokenManager: DelegationTokenManager) extends Logging {}

KafkaApis根據(jù)不同的協(xié)議進(jìn)行對應(yīng)的處理;

2.7漏益、Data plane 與 Control plane說明

Kafka對數(shù)據(jù)類命令和控制類命令的連接處理通道進(jìn)行了拆分蛹锰,Data Plane處理數(shù)據(jù)類請求,Control plane處理命令類請求绰疤;

class SocketServer(val config: KafkaConfig,
                   val metrics: Metrics,
                   val time: Time,
                   val credentialProvider: CredentialProvider)
  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
  // SocketServer實(shí)現(xiàn)BrokerReconfigurable trait表明SocketServer的一些參數(shù)配置是允許動態(tài)修改的
  // 即在Broker不停機(jī)的情況下修改它們
  // SocketServer的請求隊(duì)列長度铜犬,由Broker端參數(shù)queued.max.requests值而定,默認(rèn)值是500
  private val maxQueuedRequests = config.queuedMaxRequests

  private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
  this.logIdent = logContext.logPrefix

  private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
  private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
  private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
  memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
  private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
  // data-plane
  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
  // 處理數(shù)據(jù)類請求的Acceptor線程池峦睡,每套監(jiān)聽器對應(yīng)一個Acceptor線程
  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
  // control-plane
  // 用于處理控制類請求的Processor線程
  // 注意:目前定義了專屬的Processor線程而非線程池處理控制類請求
  // Control plane 的配套資源只有 1 個 Acceptor 線程 + 1 個 Processor 線程 + 1 個深度是 20 的請求隊(duì)列而已翎苫。
  // 一旦你開啟了 Control plane 設(shè)置,其 Processor 線程就只有 1 個榨了,Acceptor 線程也是 1 個。另外攘蔽,你要注意龙屉,它對應(yīng)的 RequestChannel 里面的請求隊(duì)列長度被硬編碼成了 20,而不是一個可配置的值满俗。
  // 這揭示了社區(qū)在這里所做的一個假設(shè):即控制類請求的數(shù)量應(yīng)該遠(yuǎn)遠(yuǎn)小于數(shù)據(jù)類請求转捕,因而不需要為它創(chuàng)建線程池和較深的請求隊(duì)列。
  private var controlPlaneProcessorOpt : Option[Processor] = None
  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
  // 處理控制類請求專屬的RequestChannel對象
  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
    new RequestChannel(20, ControlPlaneMetricPrefix, time))
  // Data plane 和 Control plane 注釋下面分別定義了一組變量唆垃,即 Processor 線程池五芝、Acceptor 線程池和 RequestChannel 實(shí)例。
  // Processor 線程池:即網(wǎng)絡(luò)線程池辕万,負(fù)責(zé)將請求高速地放入到請求隊(duì)列中枢步。
  // Acceptor 線程池:保存了 SocketServer 為每個監(jiān)聽器定義的 Acceptor 線程,此線程負(fù)責(zé)分發(fā)該監(jiān)聽器上的入站連接建立請求渐尿。
  // RequestChannel:承載請求隊(duì)列的請求處理通道醉途。
}

SocketServer在初始化的時候,對兩種通道進(jìn)行區(qū)分砖茸;

創(chuàng)建Data plane:

private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                    endpoints: Seq[EndPoint]): Unit = {
    // 遍歷監(jiān)聽器集合
    endpoints.foreach { endpoint =>
      // 將監(jiān)聽器納入到連接配額管理之下
      connectionQuotas.addListener(config, endpoint.listenerName)
      // 為監(jiān)聽器創(chuàng)建對應(yīng)的Acceptor線程
      val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
      // 為監(jiān)聽器創(chuàng)建多個Processor線程隘擎。具體數(shù)目由num.network.threads決定
      addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
      // 將<監(jiān)聽器,Acceptor線程>對保存起來統(tǒng)一管理
      dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
      info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
    }
  }

創(chuàng)建Control plane:

Control plane 的配套資源只有 1 個 Acceptor 線程 + 1 個 Processor 線程 + 1 個深度是 20 的請求隊(duì)列

private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = {
    // 如果為Control plane配置了監(jiān)聽器
    endpointOpt.foreach { endpoint =>
      // 將監(jiān)聽器納入到連接配額管理之下
      connectionQuotas.addListener(config, endpoint.listenerName)
      // 為監(jiān)聽器創(chuàng)建對應(yīng)的Acceptor線程
      val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
      // 為監(jiān)聽器創(chuàng)建對應(yīng)的Processor線程
      val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
      controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
      controlPlaneProcessorOpt = Some(controlPlaneProcessor)
      val listenerProcessors = new ArrayBuffer[Processor]()
      listenerProcessors += controlPlaneProcessor
      // 將Processor線程添加到控制類請求專屬RequestChannel中
      // 即添加到RequestChannel實(shí)例保存的Processor線程池中
      controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
      nextProcessorId += 1
      // 把Processor對象也添加到Acceptor線程管理的Processor線程池中
      controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
      info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
    }
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末凉夯,一起剝皮案震驚了整個濱河市货葬,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌劲够,老刑警劉巖震桶,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異再沧,居然都是意外死亡尼夺,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來淤堵,“玉大人寝衫,你說我怎么就攤上這事」招埃” “怎么了慰毅?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長扎阶。 經(jīng)常有香客問我汹胃,道長,這世上最難降的妖魔是什么东臀? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任着饥,我火速辦了婚禮,結(jié)果婚禮上惰赋,老公的妹妹穿的比我還像新娘宰掉。我一直安慰自己,他們只是感情好赁濒,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布轨奄。 她就那樣靜靜地躺著,像睡著了一般拒炎。 火紅的嫁衣襯著肌膚如雪挪拟。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天击你,我揣著相機(jī)與錄音玉组,去河邊找鬼。 笑死果漾,一個胖子當(dāng)著我的面吹牛球切,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绒障,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼吨凑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了户辱?” 一聲冷哼從身側(cè)響起鸵钝,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎庐镐,沒想到半個月后恩商,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡必逆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年怠堪,在試婚紗的時候發(fā)現(xiàn)自己被綠了揽乱。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡粟矿,死狀恐怖凰棉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情陌粹,我是刑警寧澤撒犀,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站掏秩,受9級特大地震影響或舞,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蒙幻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一映凳、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧杆煞,春花似錦魏宽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽派桩。三九已至构诚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間铆惑,已是汗流浹背范嘱。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留员魏,地道東北人丑蛤。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像撕阎,于是被迫代替她去往敵國和親受裹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容