異常堆棧
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:231)
at org.apache.http.impl.nio.codecs.AbstractMessageParser.fillBuffer(AbstractMessageParser.java:136)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:241)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.io.IOException: Connection reset by peer
... 18 more
Suppressed: java.io.IOException: Connection reset by peer
... 18 more
<meta charset="utf-8">
1.長連接請求
以瀏覽器發(fā)起請求為例刊咳,在Request Headers中會包含Connection: keep-alive信息
當tomcat收到瀏覽器端的長連接請求后缺猛,如在限制范圍內秋秤,則會在Response Headers中返回Connection: keep-alive以保持連接不斷開
如要斷開長連接則在Request Headers中的信息為Connection: close
2址貌、了解了長連接原理后查看源碼ElasticsearchSinkBase.java的創(chuàng)建restclient的open
@Override
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
failureRequestIndexer = new BufferingNoOpRequestIndexer();
}
@Override
public RestHighLevelClient createClient(Map<String, String> clientConfig) throws IOException {
RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
restClientFactory.configureRestClientBuilder(builder);
RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
if (LOG.isInfoEnabled()) {
LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
}
if (!rhlClient.ping(RequestOptions.DEFAULT)) {
throw new RuntimeException("There are no reachable Elasticsearch nodes!");
}
if (LOG.isInfoEnabled()) {
LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
}
return rhlClient;
}
可以看到這里創(chuàng)建了一個RestHighLevelClient,繼續(xù)查看構造函數
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
* {@link RestClient} to be used to perform requests.
*/
public RestHighLevelClient(RestClientBuilder restClientBuilder) {
this(restClientBuilder, Collections.emptyList());
}
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
* {@link RestClient} to be used to perform requests and parsers for custom response sections added to Elasticsearch through plugins.
*/
protected RestHighLevelClient(RestClientBuilder restClientBuilder, List<NamedXContentRegistry.Entry> namedXContentEntries) {
this(restClientBuilder.build(), RestClient::close, namedXContentEntries);
}
//進入restClientBuilder.build()
/**
* Creates a new {@link RestClient} based on the provided configuration.
*/
public RestClient build() {
if (failureListener == null) {
failureListener = new RestClient.FailureListener();
}
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
httpClient.start();
return restClient;
}
//進入 this::createHttpClient
private CloseableHttpAsyncClient createHttpClient() {
//default timeouts are all infinite
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
if (requestConfigCallback != null) {
requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder);
}
try {
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
//default settings for connection pooling may be too constraining
.setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
.setSSLContext(SSLContext.getDefault())
.setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy());
if (httpClientConfigCallback != null) {
httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder);
}
final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("could not create the default ssl context", e);
}
}
// finalBuilder::build可以看到
ConnectionKeepAliveStrategy keepAliveStrategy = this.keepAliveStrategy;
if (keepAliveStrategy == null) {
keepAliveStrategy = DefaultConnectionKeepAliveStrategy.INSTANCE;
}
如果客戶端使用的是apache httpclient 4.x版本贪绘,默認的keep-alive是讀取response heade中Keep-Alive字段,沒有的話就是無限(代碼中返回-1)兑牡。"If the Keep-Alive header is not present in the response, HttpClient assumes the connection can be kept alive indefinitely",詳細代碼DefaultConnectionKeepAliveStrategy.class類設置的税灌,代碼如下:
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class DefaultConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {
public static final DefaultConnectionKeepAliveStrategy INSTANCE = new DefaultConnectionKeepAliveStrategy();
@Override
public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) {
Args.notNull(response, "HTTP response");
final HeaderElementIterator it = new BasicHeaderElementIterator(
response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
final HeaderElement he = it.nextElement();
final String param = he.getName();
final String value = he.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
try {
return Long.parseLong(value) * 1000;
} catch(final NumberFormatException ignore) {
}
}
}
return -1;
}
}
網上搜了一下這個的含義均函,具體如下
這里需要自定義實現(xiàn)ConnectionKeepAliveStrategy
public class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {
/**
* 最大keep alive的時間(分鐘)
* 這里默認為10分鐘亿虽,可以根據實際情況設置”呔疲可以觀察客戶端機器狀態(tài)為TIME_WAIT的TCP連接數经柴,如果太多,可以增大此值墩朦。
*/
private final static long MAX_KEEP_ALIVE_MINUTES = 10;
public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();
private CustomConnectionKeepAliveStrategy() {
super();
}
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
long keepAliveDuration = super.getKeepAliveDuration(response, context);
// <0 為無限期keepalive,將無限期替換成一個默認的時間
if (keepAliveDuration < 0) {
return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
}
return keepAliveDuration;
}
}
// 在客戶端創(chuàng)建的時候設置KeepAlive策略為自定義策略即可
esSinkBuilder.setRestClientFactory(restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);
return httpClientBuilder;
});
});
參考鏈接:【Elasticsearch】解決Elasticsearch HTTP方式查詢報SocketTimeoutException的問題 Connection reset by peer
兩種由java http長連接(keep-alive)導致的問題
Connection keep alive strategy