Es7.x使用RestHighLevelClient進(jìn)行增刪改和批量操作

  1. 引入依賴
  2. 初始化RestHighLevelClient和BulkProcessor對象
  3. 增刪改操作
    3.1 數(shù)據(jù)準(zhǔn)備
    3.2 單條數(shù)據(jù)異步插入
    3.3 單條數(shù)據(jù)同步插入
    3.4 批量插入
    3.5 更新操作
    3.6 帶條件的更新語句
    3.7 批量更新
    3.8 刪除操作
    3.9 條件刪除

Java層面操作elasticSearch7.x以清,為了便于操作饱狂,不集成Spring嗦锐,使用main方法進(jìn)行調(diào)用淹朋。

1. 引入依賴

        <!--解決:java.lang.NoClassDefFoundError: org/elasticsearch/common/xcontent/DeprecationHandler-->
        <!-- elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.5.1</version>
        </dependency>

        <!-- elasticsearch-rest-client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.5.1</version>
        </dependency>

        <!-- elasticsearch-rest-high-level-client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.5.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

2. 初始化RestHighLevelClient和BulkProcessor對象

RestHighLevelClientRestHighLevelClient是官方指定的連接API。另外一個是TransportClient而叼,但是TransportClient這個是已經(jīng)廢棄不用的塔粒,所以會在ES8.0之后完全移除品追,也就是說8.0之后就無法使用了。

@Slf4j
public class EsTest {

    //es操作客戶端
    private static RestHighLevelClient restHighLevelClient;
    //批量操作的對象
    private static BulkProcessor bulkProcessor;

    static {
        List<HttpHost> httpHosts = new ArrayList<>();
        //填充數(shù)據(jù)
        httpHosts.add(new HttpHost("172.26.17.11", 9200));
        httpHosts.add(new HttpHost("172.26.17.11", 9201));
        httpHosts.add(new HttpHost("172.26.17.11", 9202));
        //填充host節(jié)點(diǎn)
        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));

        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(1000);
            requestConfigBuilder.setSocketTimeout(1000);
            requestConfigBuilder.setConnectionRequestTimeout(1000);
            return requestConfigBuilder;
        });

        //填充用戶名密碼
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userName", "password"));

        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(30);
            httpClientBuilder.setMaxConnPerRoute(30);
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            return httpClientBuilder;
        });

        restHighLevelClient = new RestHighLevelClient(builder);
    }

    static {
        bulkProcessor=createBulkProcessor();
    }

    private static BulkProcessor createBulkProcessor() {

        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                log.info("1. 【beforeBulk】批次[{}] 攜帶 {} 請求數(shù)量", executionId, request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,
                                  BulkResponse response) {
                if (!response.hasFailures()) {
                    log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
                } else {
                    BulkItemResponse[] items = response.getItems();
                    for (BulkItemResponse item : items) {
                        if (item.isFailed()) {
                            log.info("2. 【afterBulk-失敗】批量 [{}] 出現(xiàn)異常的原因 : {}", executionId, item.getFailureMessage());
                            break;
                        }
                    }
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,
                                  Throwable failure) {

                List<DocWriteRequest<?>> requests = request.requests();
                List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
                log.error("3. 【afterBulk-failure失敗】es執(zhí)行bluk失敗,失敗的esId為:{}", esIds, failure);
            }
        };

        BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
            restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
        }), listener);
        //到達(dá)10000條時刷新
        builder.setBulkActions(10000);
        //內(nèi)存到達(dá)8M時刷新
        builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
        //設(shè)置的刷新間隔10s
        builder.setFlushInterval(TimeValue.timeValueSeconds(10));
        //設(shè)置允許執(zhí)行的并發(fā)請求數(shù)鸦泳。
        builder.setConcurrentRequests(8);
        //設(shè)置重試策略
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
        return builder.build();
    }
}

整個項(xiàng)目可以共用一個BulkProcessor银锻,可以配置多種刷新策略,將數(shù)據(jù)由內(nèi)存刷新到es中做鹰。

3. 增刪改操作

3.1 數(shù)據(jù)準(zhǔn)備

PUT test_demo

PUT test_demo/_mapping
{
  "properties":{
    "title":{
      "type":"text"
    },
    "tag":{
      "type":"keyword"
    },
    "publishTime":{
      "type":"date",
      "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
    }
  }
}

GET test_demo/_search
{
  "query": {
    "match_all": {}
  }
}

3.2 單條數(shù)據(jù)異步插入

    public static void testAsyncSingle() {
        IndexRequest indexRequest = new IndexRequest("test_demo");
        DemoDto demoDto = new DemoDto(2001L, "印度新冠疫情失控", "世界", new Date());
        indexRequest.source(JSON.toJSONString(demoDto), XContentType.JSON);
        indexRequest.timeout(TimeValue.timeValueSeconds(1));
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        //數(shù)據(jù)為存儲而不是更新
        indexRequest.create(false);
        indexRequest.id(demoDto.getId() + "");
        restHighLevelClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        log.error("將id為:{}的數(shù)據(jù)存入ES時存在失敗的分片击纬,原因?yàn)椋簕}", indexRequest.id(), failure.getCause());
                    }
                }
            }

            @Override
            public void onFailure(Exception e) {
                log.error("{}:存儲es時異常,數(shù)據(jù)信息為", indexRequest.id(), e);
            }
        });
    }

3.3 單條數(shù)據(jù)同步插入

    public static void testSingleAdd() throws IOException {
        IndexRequest indexRequest = new IndexRequest("test_demo");
        DemoDto demoDto = new DemoDto(3001L, "es單數(shù)據(jù)同步插入2", "IT", new Date());
        indexRequest.source(JSON.toJSONString(demoDto), XContentType.JSON);
        indexRequest.id("3001");
        indexRequest.timeout(TimeValue.timeValueSeconds(1));
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        indexRequest.create(true);
        indexRequest.id(demoDto.getId() + "");
        restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
    }
  1. indexRequest.id(demoDto.getId() + ""); —— 填充"_id"字段钾麸。
    image.png
  2. indexRequest.create(true);——設(shè)置操作類型
    public IndexRequest create(boolean create) {
        if (create) {
            return opType(OpType.CREATE);
        } else {
            return opType(OpType.INDEX);
        }
    }
  • OpType.CREATE:當(dāng)存在相同的_id時更振,插入會出現(xiàn)異常;
  • OpType.INDEX:當(dāng)存在相同_id時喂走,插入會進(jìn)行覆蓋殃饿;

當(dāng)設(shè)置OpType.CREATE時相同id插入異常看出芋肠,es進(jìn)行了樂觀鎖控制并發(fā)寫沖突乎芳。

Elasticsearch exception [type=version_conflict_engine_exception, reason=[3001]: version conflict, document already exists (current version [3])]

3.4 批量插入

由于設(shè)置了BulkProcessor對象,可以將數(shù)據(jù)設(shè)置到BulkProcessor對象中,根據(jù)策略批量的刷新到Es中奈惑。

    /**
     * 批量插入
     */
    public static void testBatch() {
        List<IndexRequest> indexRequests = new ArrayList<>();

        ArrayList<DemoDto> demoDtos = new ArrayList<>();


        demoDtos.add(new DemoDto(1001L, "中國是中國人的中國", "中國", new Date()));
        demoDtos.add(new DemoDto(1002L, "2008年奧運(yùn)會", "體育", new Date()));


        demoDtos.forEach(e -> {
            IndexRequest request = new IndexRequest("test_demo");
            //填充id
            request.id(e.getId() + "");
            //先不修改id
            request.source(JSON.toJSONString(e), XContentType.JSON);
            request.opType(DocWriteRequest.OpType.CREATE);
            indexRequests.add(request);
        });
        indexRequests.forEach(bulkProcessor::add);
    }

3.5 更新操作

更新操作傳入的doc為map對象吭净,而不是json字符串,否則會拋出異常肴甸。

    public static void testSingleUpdate() throws IOException {

        UpdateRequest updateRequest = new UpdateRequest("test_demo", "3001");

        Map<String, Object> kvs = new HashMap<>();
        kvs.put("title", "es單數(shù)據(jù)更新啦寂殉!");
        updateRequest.doc(kvs);
        updateRequest.timeout(TimeValue.timeValueSeconds(1));
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        //數(shù)據(jù)為存儲而不是更新
        restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
    }

3.6 帶條件的更新語句

    public static void testSingleUpdateQuery() throws IOException {


        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
        updateByQueryRequest.indices("test_demo");

        updateByQueryRequest.setQuery(new TermQueryBuilder("id", 3001));

        updateByQueryRequest.setScript(new Script(ScriptType.INLINE,
                "painless",
                "ctx._source.tag='電腦'", Collections.emptyMap()));
        //數(shù)據(jù)為存儲而不是更新
        restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
    }

3.7 批量更新

    /**
     * 批量更新
     */
    private static void testBatchUpdate() {

        List<UpdateRequest> updateRequests=new ArrayList<>();

        //更新的數(shù)據(jù)
        List<DemoDto> params=new ArrayList<>();
        params.add(new DemoDto(2001L));
        params.add(new DemoDto(3001L));

        params.forEach(e->{
            //獲取id
            UpdateRequest updateRequest = new UpdateRequest();
            updateRequest.index("test_demo");
            //更新的id
            updateRequest.id(e.getId()+"");
            //更新的數(shù)據(jù)
            Map<String,Object> map=new HashMap<>();
            map.put("title","美國社會動蕩");

            updateRequest.doc(map);
            updateRequests.add(updateRequest);
        });
        updateRequests.forEach(bulkProcessor::add);
    }

3.8 刪除操作

    /**
     * 單個刪除
     */
    private static void testSingleDel() throws IOException {
        DeleteRequest deleteRequest=new DeleteRequest();
        deleteRequest.index("test_demo");
        deleteRequest.id("3001");
        restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT);
    }

3.9 條件刪除

    /**
     * 單個條件刪除
     */
    private static void testSingleDelQuery() throws IOException {
        DeleteByQueryRequest deleteByQueryRequest=new DeleteByQueryRequest();
        deleteByQueryRequest.indices("test_demo");
        deleteByQueryRequest.setQuery(new MatchQueryBuilder("title","國年"));
        //分詞式刪除
        restHighLevelClient.deleteByQuery(deleteByQueryRequest,RequestOptions.DEFAULT);
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市原在,隨后出現(xiàn)的幾起案子友扰,更是在濱河造成了極大的恐慌,老刑警劉巖庶柿,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件村怪,死亡現(xiàn)場離奇詭異,居然都是意外死亡浮庐,警方通過查閱死者的電腦和手機(jī)甚负,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來审残,“玉大人梭域,你說我怎么就攤上這事〗两危” “怎么了病涨?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長介时。 經(jīng)常有香客問我没宾,道長,這世上最難降的妖魔是什么沸柔? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任循衰,我火速辦了婚禮,結(jié)果婚禮上褐澎,老公的妹妹穿的比我還像新娘会钝。我一直安慰自己,他們只是感情好工三,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布迁酸。 她就那樣靜靜地躺著,像睡著了一般俭正。 火紅的嫁衣襯著肌膚如雪奸鬓。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天掸读,我揣著相機(jī)與錄音串远,去河邊找鬼宏多。 笑死,一個胖子當(dāng)著我的面吹牛澡罚,可吹牛的內(nèi)容都是我干的伸但。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼留搔,長吁一口氣:“原來是場噩夢啊……” “哼更胖!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起隔显,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤却妨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后括眠,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體管呵,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年哺窄,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片账锹。...
    茶點(diǎn)故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡萌业,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出奸柬,到底是詐尸還是另有隱情生年,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布廓奕,位于F島的核電站抱婉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏桌粉。R本人自食惡果不足惜蒸绩,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望铃肯。 院中可真熱鬧患亿,春花似錦、人聲如沸押逼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽挑格。三九已至咙冗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間漂彤,已是汗流浹背雾消。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工灾搏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人仪或。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓确镊,卻偏偏與公主長得像,于是被迫代替她去往敵國和親范删。 傳聞我的和親對象是個殘疾皇子蕾域,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容