flink elasticsearch sink連接報錯:Failed Elasticsearch bulk request: Connection reset by peer

異常堆棧

java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:197)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:231)
    at org.apache.http.impl.nio.codecs.AbstractMessageParser.fillBuffer(AbstractMessageParser.java:136)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:241)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: java.io.IOException: Connection reset by peer
        ... 18 more
        Suppressed: java.io.IOException: Connection reset by peer
            ... 18 more

<meta charset="utf-8">

1.長連接請求

以瀏覽器發(fā)起請求為例刊咳,在Request Headers中會包含Connection: keep-alive信息

image

當tomcat收到瀏覽器端的長連接請求后缺猛,如在限制范圍內秋秤,則會在Response Headers中返回Connection: keep-alive以保持連接不斷開

image

如要斷開長連接則在Request Headers中的信息為Connection: close

2址貌、了解了長連接原理后查看源碼ElasticsearchSinkBase.java的創(chuàng)建restclient的open

    @Override
    public void open(Configuration parameters) throws Exception {
        client = callBridge.createClient(userConfig);
        bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
        requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
        failureRequestIndexer = new BufferingNoOpRequestIndexer();
    }

    @Override
    public RestHighLevelClient createClient(Map<String, String> clientConfig) throws IOException {
        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
        restClientFactory.configureRestClientBuilder(builder);

        RestHighLevelClient rhlClient = new RestHighLevelClient(builder);

        if (LOG.isInfoEnabled()) {
            LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
        }

        if (!rhlClient.ping(RequestOptions.DEFAULT)) {
            throw new RuntimeException("There are no reachable Elasticsearch nodes!");
        }

        if (LOG.isInfoEnabled()) {
            LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
        }

        return rhlClient;
    }

可以看到這里創(chuàng)建了一個RestHighLevelClient,繼續(xù)查看構造函數

 /**
     * Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
     * {@link RestClient} to be used to perform requests.
     */
    public RestHighLevelClient(RestClientBuilder restClientBuilder) {
        this(restClientBuilder, Collections.emptyList());
    }

    /**
     * Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
     * {@link RestClient} to be used to perform requests and parsers for custom response sections added to Elasticsearch through plugins.
     */
    protected RestHighLevelClient(RestClientBuilder restClientBuilder, List<NamedXContentRegistry.Entry> namedXContentEntries) {
        this(restClientBuilder.build(), RestClient::close, namedXContentEntries);
    }

    //進入restClientBuilder.build()

   /**
     * Creates a new {@link RestClient} based on the provided configuration.
     */
    public RestClient build() {
        if (failureListener == null) {
            failureListener = new RestClient.FailureListener();
        }
        CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
            (PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
        RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
                pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
        httpClient.start();
        return restClient;
    }
    //進入 this::createHttpClient
        private CloseableHttpAsyncClient createHttpClient() {
        //default timeouts are all infinite
        RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
                .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
                .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
        if (requestConfigCallback != null) {
            requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder);
        }

        try {
            HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
                //default settings for connection pooling may be too constraining
                .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
                .setSSLContext(SSLContext.getDefault())
                .setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy());
            if (httpClientConfigCallback != null) {
                httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder);
            }

            final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
            return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException("could not create the default ssl context", e);
        }
    }
    
    // finalBuilder::build可以看到
   ConnectionKeepAliveStrategy keepAliveStrategy = this.keepAliveStrategy;
        if (keepAliveStrategy == null) {
            keepAliveStrategy = DefaultConnectionKeepAliveStrategy.INSTANCE;
        }

如果客戶端使用的是apache httpclient 4.x版本贪绘,默認的keep-alive是讀取response heade中Keep-Alive字段,沒有的話就是無限(代碼中返回-1)兑牡。"If the Keep-Alive header is not present in the response, HttpClient assumes the connection can be kept alive indefinitely",詳細代碼DefaultConnectionKeepAliveStrategy.class類設置的税灌,代碼如下:

@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class DefaultConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {

    public static final DefaultConnectionKeepAliveStrategy INSTANCE = new DefaultConnectionKeepAliveStrategy();

    @Override
    public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) {
        Args.notNull(response, "HTTP response");
        final HeaderElementIterator it = new BasicHeaderElementIterator(
                response.headerIterator(HTTP.CONN_KEEP_ALIVE));
        while (it.hasNext()) {
            final HeaderElement he = it.nextElement();
            final String param = he.getName();
            final String value = he.getValue();
            if (value != null && param.equalsIgnoreCase("timeout")) {
                try {
                    return Long.parseLong(value) * 1000;
                } catch(final NumberFormatException ignore) {
                }
            }
        }
        return -1;
    }
}

網上搜了一下這個的含義均函,具體如下


image.png

這里需要自定義實現(xiàn)ConnectionKeepAliveStrategy

public class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {
    /**
     * 最大keep alive的時間(分鐘)
     * 這里默認為10分鐘亿虽,可以根據實際情況設置”呔疲可以觀察客戶端機器狀態(tài)為TIME_WAIT的TCP連接數经柴,如果太多,可以增大此值墩朦。
     */
    private final static long MAX_KEEP_ALIVE_MINUTES = 10;

    public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();

    private CustomConnectionKeepAliveStrategy() {
        super();
    }

    @Override
    public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
        long keepAliveDuration = super.getKeepAliveDuration(response, context);
        // <0 為無限期keepalive,將無限期替換成一個默認的時間
        if (keepAliveDuration < 0) {
            return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
        }
        return keepAliveDuration;
    }
}

// 在客戶端創(chuàng)建的時候設置KeepAlive策略為自定義策略即可
esSinkBuilder.setRestClientFactory(restClientBuilder -> {
            restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);
                return httpClientBuilder;
            });
        });

參考鏈接:【Elasticsearch】解決Elasticsearch HTTP方式查詢報SocketTimeoutException的問題 Connection reset by peer
兩種由java http長連接(keep-alive)導致的問題
Connection keep alive strategy

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末坯认,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子氓涣,更是在濱河造成了極大的恐慌牛哺,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件劳吠,死亡現(xiàn)場離奇詭異引润,居然都是意外死亡,警方通過查閱死者的電腦和手機痒玩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進店門淳附,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蠢古,你說我怎么就攤上這事奴曙。” “怎么了草讶?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵洽糟,是天一觀的道長。 經常有香客問我堕战,道長坤溃,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任嘱丢,我火速辦了婚禮薪介,結果婚禮上,老公的妹妹穿的比我還像新娘越驻。我一直安慰自己昭灵,他們只是感情好,可當我...
    茶點故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布伐谈。 她就那樣靜靜地躺著,像睡著了一般试疙。 火紅的嫁衣襯著肌膚如雪诵棵。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天祝旷,我揣著相機與錄音履澳,去河邊找鬼嘶窄。 笑死,一個胖子當著我的面吹牛距贷,可吹牛的內容都是我干的柄冲。 我是一名探鬼主播,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼忠蝗,長吁一口氣:“原來是場噩夢啊……” “哼现横!你這毒婦竟也來了?” 一聲冷哼從身側響起阁最,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤戒祠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后速种,有當地人在樹林里發(fā)現(xiàn)了一具尸體姜盈,經...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年配阵,在試婚紗的時候發(fā)現(xiàn)自己被綠了馏颂。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,731評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡棋傍,死狀恐怖救拉,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情舍沙,我是刑警寧澤近上,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站拂铡,受9級特大地震影響壹无,放射性物質發(fā)生泄漏。R本人自食惡果不足惜感帅,卻給世界環(huán)境...
    茶點故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一斗锭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧失球,春花似錦岖是、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至黔牵,卻和暖如春聪轿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背猾浦。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工陆错, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留灯抛,地道東北人。 一個月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓音瓷,卻偏偏與公主長得像对嚼,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子绳慎,可洞房花燭夜當晚...
    茶點故事閱讀 44,629評論 2 354

推薦閱讀更多精彩內容