- NetworkClient: 顧名思義哈, 用于網(wǎng)絡(luò)連接,消息發(fā)送的客戶端封裝, 源碼中的注釋:
A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.
- 用在何處:
1. kafka本身實(shí)現(xiàn)了java版的producer和consumer,里面的網(wǎng)絡(luò)連接,請(qǐng)求發(fā)送均使用NetworkClient實(shí)現(xiàn);
2. KafkaController中controller與其他broker的通訊,使用NetworkClient實(shí)現(xiàn);
InFlightRequests類
- 所在文件: clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
- 實(shí)現(xiàn)了request的集合, 包括正在發(fā)送的和已經(jīng)發(fā)送的但還沒(méi)有接收到response的request;
- 主要成員變量:
private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
針對(duì)每個(gè)連接使用Deque<ClientRequest>
數(shù)據(jù)結(jié)構(gòu)來(lái)保存所有的request;Deque<ClientRequest>
是個(gè)雙端隊(duì)列; - 添加新的request, 新的reqeust總是通過(guò)
addFirst
放到隊(duì)首
public void add(ClientRequest request) {
Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
if (reqs == null) {
reqs = new ArrayDeque<>();
this.requests.put(request.request().destination(), reqs);
}
reqs.addFirst(request);
}
- 取出最早發(fā)送的request, 通過(guò)
pollLast()
取出
public ClientRequest completeNext(String node) {
return requestQueue(node).pollLast();
}
-
public boolean canSendMore(String node)
決定是否可以通過(guò)NetworkClient來(lái)發(fā)送請(qǐng)求
對(duì)于通過(guò)NetworkClient來(lái)發(fā)送的request, 如果之前發(fā)送的請(qǐng)求并沒(méi)有通過(guò)底層socket實(shí)際發(fā)送完成, 是不允許發(fā)送新的request的
public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
```
# ClusterConnectionStates
* 所在文件:clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
* 記錄到各個(gè)broker node的連接狀態(tài):
`private final Map<String, NodeConnectionState> nodeState`
* 對(duì)同一node的兩次連接有一定的時(shí)間間隔限制, 即采用延遲連接:
`private final long reconnectBackoffMs`
* 連接狀態(tài)有如下三種:
```
ConnectionState.DISCONNECTED -- 未連接
ConnectionState.DISCONNECTING -- 正在連接
ConnectionState.CONNECTED -- 已連接
```
* `canConnect`: 判斷是否允許連接到node:如果從未連接過(guò)或者連接當(dāng)前是斷開(kāi)的并且距離上次連接的間隔大于`reconnectBackoffMs`藐不, 則允許連接铣减;
```
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
}
```
# NetworkClien類
* 所在文件: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
* 非線程安全
* 繼承自 `KafkaClient`
* 使用了 `org.apache.kafka.common.network.Selector`來(lái)處理網(wǎng)絡(luò)IO, [詳情點(diǎn)這里 => Kafka源碼分析-網(wǎng)絡(luò)層](http://www.reibang.com/p/8cbc7618abcb)
* 簡(jiǎn)單講這個(gè)類用來(lái)管理一個(gè)到broker node的連接曼验,請(qǐng)求發(fā)送和響應(yīng)接收:
>A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.
* 核心函數(shù) `poll`
使用`selector.poll`來(lái)處理實(shí)現(xiàn)的socket讀寫事件;
```
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
```
經(jīng)過(guò)`selector.poll`的調(diào)用,所有**發(fā)送完成的requet**, **接收完成的response**, **所有斷開(kāi)的連接**椿浓, **所有新建成功的連接**都將放到`selector`中相應(yīng)的隊(duì)列里充择;
* 處理發(fā)送完成的request
```
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse()) {
this.inFlightRequests.completeLastSent(send.destination());
responses.add(new ClientResponse(request, now, false, null));
}
}
}
```
對(duì)于不需要回應(yīng)response的請(qǐng)求,將從`ifFlightRequests`中刪除;
* 處理接收到的response
```
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
ResponseHeader header = ResponseHeader.parse(receive.payload());
// Always expect the response version id to be the same as the request version id
short apiKey = req.request().header().apiKey();
short apiVer = req.request().header().apiVersion();
Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
correlate(req.request().header(), header);
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
```
如果是`metadata`的更新response,則調(diào)用`metadataUpdater.maybeHandleCompletedReceive` 處理metadata的更新;
* 處理新建的連接
```
private void handleConnections() {
for (String node : this.selector.connected()) {
log.debug("Completed connection to node {}", node);
this.connectionStates.connected(node);
}
}
```
* 處理所有的`handle***`函數(shù)返回的responses
```
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
```
# NetworkClientBlockingOps
* 所在文件: core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
* 利用非阻塞的`NetworkClient`的方法, 實(shí)現(xiàn)了阻塞的方法;
* 阻塞直到`Client.ready`
```
def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
if (client.isReady(node, now))
true
else if (client.connectionFailed(node))
throw new IOException(s"Connection to $node failed")
else false
}
}
```
* 阻塞發(fā)送request直到收到response
```
def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime): Option[ClientResponse] = {
client.send(request, time.milliseconds())
pollUntilFound(timeout) { case (responses, _) =>
val response = responses.find { response =>
response.request.request.header.correlationId == request.request.header.correlationId
}
response.foreach { r =>
if (r.wasDisconnected) {
val destination = request.request.destination
throw new IOException(s"Connection to $destination was disconnected before the response was read")
}
}
response
}
}
```
##### [Kafka源碼分析-匯總](http://www.reibang.com/p/aa274f8fe00f)