1.附一個數(shù)據(jù)示例:
2.第一種方法
private void batchUpdateTask(int type, String old, String reset) {
log.info("-----開始更新ES 類型:" + type + "-----" + old);
//查詢
NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
switch (type) {
case 1:
boolQueryBuilder.must(QueryBuilders.matchQuery("category", old));
break;
case 2:
boolQueryBuilder.must(QueryBuilders.matchQuery("tags.keyword", old));
break;
case 3:
boolQueryBuilder.must(QueryBuilders.matchQuery("types.keyword", old));
break;
default:
return;
}
searchQueryBuilder.withQuery(boolQueryBuilder);
NativeSearchQuery searchQuery = searchQueryBuilder.build();
SearchHits<EsDataInfo> search = elasticsearchTemplate.search(searchQuery, EsDataInfo.class);
if (!search.getSearchHits().isEmpty()) {
int i = 0;
for (SearchHit<EsDataInfo> searchHit : search.getSearchHits()) {
EsDataInfo info = searchHit.getContent();
switch (type) {
case 1:
info.setCategory(info.getCategory().replace(old, reset));
break;
case 2:
List<String> infoTags = info.getTags();
infoTags.remove(old);
infoTags.add(reset);
info.setTags(infoTags);
break;
case 3:
List<String> infoTypes = info.getTypes();
infoTypes.remove(old);
infoTypes.add(reset);
info.setTypes(infoTypes);
break;
default:
}
i++;
esDataInfoRepository.save(info);
}
log.info("更新了:" + i + "條");
}
log.info("-----結(jié)束更新ES 類型:" + type + "-----" + old);
}
3.第二種方法
@Resource
private ElasticsearchRestClient elasticsearchRestClient;
private void batchUpdateTask2(int type, String old, String reset) {
log.info("-----batchUpdateTask2開始更新ES 類型:" + type + "-----" + old);
// 設(shè)置查詢條件笼裳,第一個參數(shù)是字段名,第二個參數(shù)是字段的值
switch (type) {
case 1:
MatchPhraseQueryBuilder category = new MatchPhraseQueryBuilder("category", old);
Script script1 = new Script(ScriptType.INLINE, "painless",
"if (ctx._source.category == '" + old + "') {ctx._source.category = '" + reset + "';}",
Collections.emptyMap());
updateByQuery(category, script1);
break;
case 2:
MatchPhraseQueryBuilder tags = new MatchPhraseQueryBuilder("tags.keyword", old);
// 修改時粱玲,新值如果已存在則只刪除舊值
Script script2 = new Script(ScriptType.INLINE, "painless",
"if(ctx._source.tags!=null && ctx._source.tags.length!=0 && ctx._source.tags.contains" +
"('"+old+"')){ \n" +
" for (int i = 0; i < ctx._source.tags.length; ++i) { \n" +
" if (ctx._source.tags[i] == '" + old + "'){\n" +
" if (ctx._source.tags.contains('" + reset + "')) { \n" +
" ctx._source.tags.remove(ctx._source.tags.indexOf" +
"('" + old + "')) \n" +
" } else{" +
" ctx._source.tags[i]=('" + reset + "')" +
" }" +
" } " +
" }" +
"}",
Collections.emptyMap());
updateByQuery(tags, script2);
break;
case 3:
MatchPhraseQueryBuilder types = new MatchPhraseQueryBuilder("types.keyword", old);
Script script3 = new Script(ScriptType.INLINE, "painless",
"if(ctx._source.types!=null && ctx._source.types.length!=0 && ctx._source.types.contains" +
"('"+old+"')){ \n" +
"for (int i = 0; i < ctx._source.types.length; ++i) { \n" +
"if (ctx._source.types[i] == '" + old + "'){\n" +
"ctx._source.types[i]=('" + reset + "')} " +
"}" +
"}",
Collections.emptyMap());
updateByQuery(types, script3);
break;
default:
return;
}
log.info("-----batchUpdateTask2結(jié)束更新ES 類型:" + type + "-----" + old);
}
private void updateByQuery(QueryBuilder queryBuilder, Script script) {
//參數(shù)為索引名躬柬,可以不指定,可以一個抽减,可以多個
UpdateByQueryRequest request = new UpdateByQueryRequest("datainfo_dev");
// 更新時版本沖突
request.setConflicts("proceed");
// 設(shè)置查詢條件
request.setQuery(queryBuilder);
// 更新最大文檔數(shù)
request.setMaxDocs(9000);
// 批次大小
request.setBatchSize(1000);
// 傳入 script 對象(執(zhí)行腳本)
request.setScript(script);
// 并行
request.setSlices(2);
// 使用滾動參數(shù)來控制“搜索上下文”存活的時間
request.setScroll(TimeValue.timeValueMinutes(10));
// 如果提供路由允青,則將路由復(fù)制到滾動查詢,將流程限制為匹配該路由值的切分
// request.setRouting("=cat");
// 可選參數(shù)
// 超時
request.setTimeout(TimeValue.timeValueMinutes(2));
// 刷新索引
request.setRefresh(true);
RestHighLevelClient client = elasticsearchRestClient.getRHLClient();
try {
BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
log.info("更新了:" + response.getTotal() + "條");
} catch (IOException e) {
log.error("updateByQuery error!", e);
}
}
4.調(diào)用示例
/**
* 批量更新es中分類卵沉、標簽颠锉、類型數(shù)據(jù)
*
* @param type 1.category 2.tag 3.type
* @param old 舊數(shù)據(jù)
* @param reset 新數(shù)據(jù)
*/
public void batchUpdateName(int type, String old, String reset) {
ExecutorService executorService = SingleThreadPool.getExecutorService();
// 開啟線程
executorService.execute(() -> {
try {
Thread.sleep(3 * 1000);
batchUpdateTask2(type, old, reset);
} catch (Exception e) {
log.error("batchUpdateName error.", e);
}
});
}