SocketServer主要用于接收外部的網(wǎng)絡(luò)請求,并把請求添加到請求隊列中。
一、入口
在KafkaServer.scala中的start方法中拗胜,有這樣的入口:
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
這塊就是啟動了一個SocketServer,我們具體看一下怒允。
二埂软、構(gòu)造方法
我們看下SocketServer里面包含的參數(shù):
private val endpoints = config.listeners
private val numProcessorThreads = config.numNetworkThreads
private val maxQueuedRequests = config.queuedMaxRequests
private val totalProcessorThreads = numProcessorThreads * endpoints.siz
private val maxConnectionsPerIp = config.maxConnectionsPerIp
private val maxConnectionsPerIpOverrides config.maxConnectionsPerIpOverride
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
val requestChannel = new RequestChannel(totalProcessorThreadsmaxQueuedRequests)
private val processors = new Array[Processor](totalProcessorThreads)
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _
這里面涉及幾個配置內(nèi)容:
- listeners:默認是PLAINTEXT://:port,前面部分是協(xié)議,可配置為PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL
- num.network.threads:處理網(wǎng)絡(luò)請求的線程個數(shù)配置,默認是3
- queued.max.requests:請求隊列的最大個數(shù),默認500
- max.connections.per.ip:單機IP的最大連接個數(shù)的配置,默認不限制
- max.connections.per.ip.overrides:針對某個特別的IP的連接個數(shù)限制的重新設(shè)置值.多個IP配置間使用逗號分開,如:host1:500,host2:600
三、啟動SocketServer
啟動的代碼如下:
/**
* Start the socket server
*/
def startup() {
this.synchronized {
//每個ip的連接數(shù)限制
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
//這里根據(jù)每一個endpoint(也就是配置的listener的協(xié)議與端口),生成處理的網(wǎng)絡(luò)線程Processor與Acceptor實例.并啟動endpoint對應(yīng)的Acceptor實例.在生成Acceptor的實例時,會同時啟動此實例中對應(yīng)的線程處理實例數(shù)組Processor.
var processorBeginIndex = 0
endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, protocol)
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
def value = allMetricNames.map( metricName =>
metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
}
)
info("Started " + acceptors.size + " acceptor threads")
}
這塊涉及到幾個配置項纫事,主要用于生成socket中的SO_SNDBUF和SO_RCVBUF勘畔。
- socket.send.buffer.bytes:默認值100kb,這個用于SOCKET發(fā)送數(shù)據(jù)的緩沖區(qū)大小
- socket.receive.buffer.bytes:默認值100kb,這個用于SOCKET的接收數(shù)據(jù)的緩沖區(qū)大小
- broker.id
3.1 newProcessor
我們先看下這個簡單的賦值。
protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = {
new Processor(id,
time,
config.socketRequestMaxBytes,
requestChannel,
connectionQuotas,
config.connectionsMaxIdleMs,
protocol,
config.values,
metrics
)
}
其實就是Processor的實例生成丽惶,主要涉及幾個配置項:
- socket.request.max.bytes:設(shè)置每次請求的數(shù)據(jù)大小.默認值,100MB
- connections.max.idle.ms:默認為10分鐘,用于設(shè)置每個連接最大的空閑回收時間
3.2 Acceptor
每個endPoint對應(yīng)一個Acceptor炫七,也就是每個listener對應(yīng)一個Acceptor。Acceptor主要用于接收網(wǎng)絡(luò)請求钾唬,將請求分發(fā)到processor處理万哪。我們來看下Acceptor的run方法:
def run() {
//將channel注冊到selector上
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
//這里進行堵塞接收,最多等500ms,如果ready返回的值是0表示還沒有準備好,否則表示準備就緒.表示有通道已經(jīng)被注冊
val ready = nioSelector.select(500)
if (ready > 0) {
//這里得到已經(jīng)準備好的網(wǎng)絡(luò)通道的key的集合
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
//如果selectkey已經(jīng)注冊到accept事件,通過accept函數(shù)與對應(yīng)的線程Processor進行處理.這里表示這個socket的通道包含有一個client端的連接請求.
if (key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread
//每次接收一個socket請求后,用于處理的線程進行輪詢到一個線程中處理.
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want the
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}
下面我們看下accept方法:
/*
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
//得到請求的socket通道
val socketChannel = serverSocketChannel.accept()
try {
//這里檢查當前的IP的連接數(shù)是否已經(jīng)達到了最大的連接數(shù),如果是,直接throw too many connect.
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))
//對應(yīng)的processor處理socket通道
processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}
3.3 Processor
上面accept方法中侠驯,調(diào)用到了processor的accept方法,我們看下這個accept方法:
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
其實就是向隊列中新增了一個socket通道壤圃,等待processor線程處理陵霉。下面我們看下processor是怎么處理的。
override def run() {
startupComplete()
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}
debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}
這塊其實是個門面模式伍绳,里面調(diào)用的內(nèi)容比較多,我們一一看一下乍桂。
3.3.1 configureNewConnections
這塊是從隊列中取一個連接冲杀,并注冊到selector上。
/**
* Register any new connections that have been queued up
*/
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
selector.register(connectionId, channel)
} catch {
// We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
// throwables will be caught in processor and logged as uncaught exceptions.
case NonFatal(e) =>
// need to close the channel here to avoid a socket leak.
close(channel)
error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e)
}
}
}
3.3.2 processNewResponses
private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
curr.request.updateRequestMetrics
trace("Socket server received empty response to send, registering for read: " + curr)
selector.unmute(curr.request.connectionId)
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
curr.request.updateRequestMetrics
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id)
}
}
}
3.3.3 poll
private def poll() {
try selector.poll(300)
catch {
case e @ (_: IllegalStateException | _: IOException) =>
error(s"Closing processor $id due to illegal state or IO exception")
swallow(closeAll())
shutdownComplete()
throw e
}
}
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false);
pollSelectionKeys(immediatelyConnectedKeys, true);
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
這塊主要看一下pollSelectionKeys方法:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
這里開始處理socket通道中的請求,根據(jù)如下幾個流程進行處理:
- 如果請求中包含有一個isConnectable操作,把這個連接緩存起來.
- 如果請求中包含有isReadable操作.表示這個client的管道中包含有數(shù)據(jù),需要讀取,接收數(shù)據(jù).
- 如果包含有isWriteable的操作,表示需要向client端進行寫操作.
- 最后檢查是否有connect被關(guān)閉的請求或connect連接空閑過期
3.3.4 processCompletedReceives
得到對應(yīng)的請求的Request的實例,并把這個Request通過SocketServer中的RequestChannel的sendRequest的函數(shù),把請求添加到請求的隊列中.等待KafkaApis來進行處理.
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
//這是重點6米谩Hㄋ!可以看下KafkaApis對消息的處理憋沿,后續(xù)會分析到
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}
3.3.5 processCompletedSends
這里的send完成表示有向client端進行響應(yīng)的寫操作處理完成
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
selector.unmute(send.destination)
}
}
3.3.6 processDisconnected
如果socket server中包含有已經(jīng)關(guān)閉的連接,減少這個quotas中對此ip的連接數(shù)的值.
這個情況包含connect處理超時或者說有connect的消息處理錯誤被發(fā)起了close的請求后的處理成功的消息.
private def processDisconnected() {
selector.disconnected.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
}