【Kafka源碼】SocketServer啟動過程

SocketServer主要用于接收外部的網(wǎng)絡(luò)請求,并把請求添加到請求隊列中。

一、入口

在KafkaServer.scala中的start方法中拗胜,有這樣的入口:

socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()

這塊就是啟動了一個SocketServer,我們具體看一下怒允。

二埂软、構(gòu)造方法

我們看下SocketServer里面包含的參數(shù):

private val endpoints = config.listeners
private val numProcessorThreads = config.numNetworkThreads
private val maxQueuedRequests = config.queuedMaxRequests
private val totalProcessorThreads = numProcessorThreads * endpoints.siz
private val maxConnectionsPerIp = config.maxConnectionsPerIp
private val maxConnectionsPerIpOverrides config.maxConnectionsPerIpOverride
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "

val requestChannel = new RequestChannel(totalProcessorThreadsmaxQueuedRequests)
private val processors = new Array[Processor](totalProcessorThreads)

private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _

這里面涉及幾個配置內(nèi)容:

  • listeners:默認是PLAINTEXT://:port,前面部分是協(xié)議,可配置為PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL
  • num.network.threads:處理網(wǎng)絡(luò)請求的線程個數(shù)配置,默認是3
  • queued.max.requests:請求隊列的最大個數(shù),默認500
  • max.connections.per.ip:單機IP的最大連接個數(shù)的配置,默認不限制
  • max.connections.per.ip.overrides:針對某個特別的IP的連接個數(shù)限制的重新設(shè)置值.多個IP配置間使用逗號分開,如:host1:500,host2:600

三、啟動SocketServer

啟動的代碼如下:

/**
 * Start the socket server
 */
def startup() {
    this.synchronized {

    //每個ip的連接數(shù)限制
    connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

    val sendBufferSize = config.socketSendBufferBytes
    val recvBufferSize = config.socketReceiveBufferBytes
    val brokerId = config.brokerId

    //這里根據(jù)每一個endpoint(也就是配置的listener的協(xié)議與端口),生成處理的網(wǎng)絡(luò)線程Processor與Acceptor實例.并啟動endpoint對應(yīng)的Acceptor實例.在生成Acceptor的實例時,會同時啟動此實例中對應(yīng)的線程處理實例數(shù)組Processor.
    var processorBeginIndex = 0
    endpoints.values.foreach { endpoint =>
        val protocol = endpoint.protocolType
        val processorEndIndex = processorBeginIndex + numProcessorThreads

        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, protocol)

        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }

    newGauge("NetworkProcessorAvgIdlePercent",
      new Gauge[Double] {
        def value = allMetricNames.map( metricName =>
          metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
      }
    )

    info("Started " + acceptors.size + " acceptor threads")
}

這塊涉及到幾個配置項纫事,主要用于生成socket中的SO_SNDBUF和SO_RCVBUF勘畔。

  • socket.send.buffer.bytes:默認值100kb,這個用于SOCKET發(fā)送數(shù)據(jù)的緩沖區(qū)大小
  • socket.receive.buffer.bytes:默認值100kb,這個用于SOCKET的接收數(shù)據(jù)的緩沖區(qū)大小
  • broker.id

3.1 newProcessor

我們先看下這個簡單的賦值。

protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = {
    new Processor(id,
      time,
      config.socketRequestMaxBytes,
      requestChannel,
      connectionQuotas,
      config.connectionsMaxIdleMs,
      protocol,
      config.values,
      metrics
    )
  }

其實就是Processor的實例生成丽惶,主要涉及幾個配置項:

  • socket.request.max.bytes:設(shè)置每次請求的數(shù)據(jù)大小.默認值,100MB
  • connections.max.idle.ms:默認為10分鐘,用于設(shè)置每個連接最大的空閑回收時間

3.2 Acceptor

每個endPoint對應(yīng)一個Acceptor炫七,也就是每個listener對應(yīng)一個Acceptor。Acceptor主要用于接收網(wǎng)絡(luò)請求钾唬,將請求分發(fā)到processor處理万哪。我們來看下Acceptor的run方法:

def run() {
    //將channel注冊到selector上
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          //這里進行堵塞接收,最多等500ms,如果ready返回的值是0表示還沒有準備好,否則表示準備就緒.表示有通道已經(jīng)被注冊
          val ready = nioSelector.select(500)
          if (ready > 0) {
            //這里得到已經(jīng)準備好的網(wǎng)絡(luò)通道的key的集合
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                //如果selectkey已經(jīng)注冊到accept事件,通過accept函數(shù)與對應(yīng)的線程Processor進行處理.這里表示這個socket的通道包含有一個client端的連接請求.
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread
                //每次接收一個socket請求后,用于處理的線程進行輪詢到一個線程中處理.
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want the
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      swallowError(serverChannel.close())
      swallowError(nioSelector.close())
      shutdownComplete()
    }
  }

下面我們看下accept方法:

  /*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    //得到請求的socket通道
    val socketChannel = serverSocketChannel.accept()
    try {
      //這里檢查當前的IP的連接數(shù)是否已經(jīng)達到了最大的連接數(shù),如果是,直接throw too many connect.
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)

      debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))

      //對應(yīng)的processor處理socket通道
      processor.accept(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
        close(socketChannel)
    }
  }

3.3 Processor

上面accept方法中侠驯,調(diào)用到了processor的accept方法,我們看下這個accept方法:

  /**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

其實就是向隊列中新增了一個socket通道壤圃,等待processor線程處理陵霉。下面我們看下processor是怎么處理的。

override def run() {
    startupComplete()
    while (isRunning) {
      try {
        // setup any new connections that have been queued up
        configureNewConnections()
        // register any new responses for writing
        processNewResponses()
        poll()
        processCompletedReceives()
        processCompletedSends()
        processDisconnected()
      } catch {
        // We catch all the throwables here to prevent the processor thread from exiting. We do this because
        // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
        // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
        // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
        case e: ControlThrowable => throw e
        case e: Throwable =>
          error("Processor got uncaught exception.", e)
      }
    }

    debug("Closing selector - processor " + id)
    swallowError(closeAll())
    shutdownComplete()
  }

這塊其實是個門面模式伍绳,里面調(diào)用的內(nèi)容比較多,我們一一看一下乍桂。

3.3.1 configureNewConnections

這塊是從隊列中取一個連接冲杀,并注冊到selector上。

  /**
   * Register any new connections that have been queued up
   */
  private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        val localHost = channel.socket().getLocalAddress.getHostAddress
        val localPort = channel.socket().getLocalPort
        val remoteHost = channel.socket().getInetAddress.getHostAddress
        val remotePort = channel.socket().getPort
        val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
        selector.register(connectionId, channel)
      } catch {
        // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
        // throwables will be caught in processor and logged as uncaught exceptions.
        case NonFatal(e) =>
          // need to close the channel here to avoid a socket leak.
          close(channel)
          error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e)
      }
    }
  }

3.3.2 processNewResponses

private def processNewResponses() {
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
      try {
        curr.responseAction match {
          case RequestChannel.NoOpAction =>
            // There is no response to send to the client, we need to read more pipelined requests
            // that are sitting in the server's socket buffer
            curr.request.updateRequestMetrics
            trace("Socket server received empty response to send, registering for read: " + curr)
            selector.unmute(curr.request.connectionId)
          case RequestChannel.SendAction =>
            sendResponse(curr)
          case RequestChannel.CloseConnectionAction =>
            curr.request.updateRequestMetrics
            trace("Closing socket connection actively according to the response code.")
            close(selector, curr.request.connectionId)
        }
      } finally {
        curr = requestChannel.receiveResponse(id)
      }
    }
  }

3.3.3 poll

  private def poll() {
    try selector.poll(300)
    catch {
      case e @ (_: IllegalStateException | _: IOException) =>
        error(s"Closing processor $id due to illegal state or IO exception")
        swallow(closeAll())
        shutdownComplete()
        throw e
    }
  }
    @Override
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");

        clear();

        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;

        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);
        }

        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        maybeCloseOldestConnection();
    }

這塊主要看一下pollSelectionKeys方法:

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = channel(key);

            // register all per-connection metrics at once
            sensors.maybeRegisterConnectionMetrics(channel.id());
            lruConnections.put(channel.id(), currentTimeNanos);

            try {

                /* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    } else
                        continue;
                }

                /* if channel is not ready finish prepare */
                if (channel.isConnected() && !channel.ready())
                    channel.prepare();

                /* if channel is ready read from any connections that have readable data */
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }

                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

                /* cancel any defunct sockets */
                if (!key.isValid()) {
                    close(channel);
                    this.disconnected.add(channel.id());
                }

            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel);
                this.disconnected.add(channel.id());
            }
        }
    }

這里開始處理socket通道中的請求,根據(jù)如下幾個流程進行處理:

  • 如果請求中包含有一個isConnectable操作,把這個連接緩存起來.
  • 如果請求中包含有isReadable操作.表示這個client的管道中包含有數(shù)據(jù),需要讀取,接收數(shù)據(jù).
  • 如果包含有isWriteable的操作,表示需要向client端進行寫操作.
  • 最后檢查是否有connect被關(guān)閉的請求或connect連接空閑過期

3.3.4 processCompletedReceives

得到對應(yīng)的請求的Request的實例,并把這個Request通過SocketServer中的RequestChannel的sendRequest的函數(shù),把請求添加到請求的隊列中.等待KafkaApis來進行處理.

private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val channel = selector.channel(receive.source)
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
          channel.socketAddress)
        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
        //這是重點6米谩Hㄋ!可以看下KafkaApis對消息的處理憋沿,后續(xù)會分析到
        requestChannel.sendRequest(req)
        selector.mute(receive.source)
      } catch {
        case e @ (_: InvalidRequestException | _: SchemaException) =>
          // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
          error(s"Closing socket for ${receive.source} because of error", e)
          close(selector, receive.source)
      }
    }
  }

3.3.5 processCompletedSends

這里的send完成表示有向client端進行響應(yīng)的寫操作處理完成

  private def processCompletedSends() {
    selector.completedSends.asScala.foreach { send =>
      val resp = inflightResponses.remove(send.destination).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
      }
      resp.request.updateRequestMetrics()
      selector.unmute(send.destination)
    }
  }

3.3.6 processDisconnected

如果socket server中包含有已經(jīng)關(guān)閉的連接,減少這個quotas中對此ip的連接數(shù)的值.
這個情況包含connect處理超時或者說有connect的消息處理錯誤被發(fā)起了close的請求后的處理成功的消息.

  private def processDisconnected() {
    selector.disconnected.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
      inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
      // the channel has been closed by the selector but the quotas still need to be updated
      connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末旺芽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子辐啄,更是在濱河造成了極大的恐慌采章,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,807評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件壶辜,死亡現(xiàn)場離奇詭異悯舟,居然都是意外死亡,警方通過查閱死者的電腦和手機砸民,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評論 3 399
  • 文/潘曉璐 我一進店門抵怎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人岭参,你說我怎么就攤上這事反惕。” “怎么了演侯?”我有些...
    開封第一講書人閱讀 169,589評論 0 363
  • 文/不壞的土叔 我叫張陵姿染,是天一觀的道長。 經(jīng)常有香客問我蚌本,道長盔粹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,188評論 1 300
  • 正文 為了忘掉前任程癌,我火速辦了婚禮舷嗡,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嵌莉。我一直安慰自己进萄,他們只是感情好,可當我...
    茶點故事閱讀 69,185評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著中鼠,像睡著了一般可婶。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上援雇,一...
    開封第一講書人閱讀 52,785評論 1 314
  • 那天矛渴,我揣著相機與錄音,去河邊找鬼惫搏。 笑死具温,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的筐赔。 我是一名探鬼主播铣猩,決...
    沈念sama閱讀 41,220評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼茴丰!你這毒婦竟也來了达皿?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,167評論 0 277
  • 序言:老撾萬榮一對情侶失蹤贿肩,失蹤者是張志新(化名)和其女友劉穎峦椰,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體尸曼,經(jīng)...
    沈念sama閱讀 46,698評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡们何,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,767評論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了控轿。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冤竹。...
    茶點故事閱讀 40,912評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖茬射,靈堂內(nèi)的尸體忽然破棺而出鹦蠕,到底是詐尸還是另有隱情,我是刑警寧澤在抛,帶...
    沈念sama閱讀 36,572評論 5 351
  • 正文 年R本政府宣布钟病,位于F島的核電站,受9級特大地震影響刚梭,放射性物質(zhì)發(fā)生泄漏肠阱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,254評論 3 336
  • 文/蒙蒙 一朴读、第九天 我趴在偏房一處隱蔽的房頂上張望屹徘。 院中可真熱鬧,春花似錦衅金、人聲如沸噪伊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽鉴吹。三九已至姨伟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間豆励,已是汗流浹背夺荒。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留肆糕,地道東北人般堆。 一個月前我還...
    沈念sama閱讀 49,359評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像诚啃,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子私沮,可洞房花燭夜當晚...
    茶點故事閱讀 45,922評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理始赎,服務(wù)發(fā)現(xiàn),斷路器仔燕,智...
    卡卡羅2017閱讀 134,716評論 18 139
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,340評論 0 6
  • iPhone的標準推薦是CFNetwork 庫編程造垛,其封裝好的開源庫是 cocoa AsyncSocket庫,用它...
    Ethan_Struggle閱讀 2,247評論 2 12
  • 許久不寫字,發(fā)現(xiàn)寫作水平嚴重退步啊~~~ 以前也是個文藝青年,現(xiàn)在也要寫出詩意的代碼啊~ 沒找到以前寫的詩,咱們還...
    掃帚的影子閱讀 3,245評論 2 8
  • kafka的定義:是一個分布式消息系統(tǒng)晰搀,由LinkedIn使用Scala編寫五辽,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,328評論 1 15