在某些業(yè)務(wù)場(chǎng)景中驮吱,索引中的某些文檔已經(jīng)失去意義茧妒,我們需要對(duì)這部分索引文檔進(jìn)行刪除,本來僅僅是java調(diào)用一個(gè)批量刪除的接口就能完成左冬,但是在使用過程中還是遇到一些小問題桐筏,在此記錄一下自己的處理方法。
需求
在一個(gè)檢索服務(wù)當(dāng)中拇砰,我們可能需要從檢索結(jié)果中篩選出一部分文檔進(jìn)行刪除梅忌,在一般操作中狰腌,直接調(diào)用Java Client的刪除接口即可完成索引文檔的刪除,但在業(yè)務(wù)中牧氮,因?yàn)闄z索文檔數(shù)的原因琼腔,導(dǎo)致部分需要?jiǎng)h除的文檔未被檢索出來,所以當(dāng)被檢索出來的文檔被刪除時(shí)踱葛,需要重復(fù)執(zhí)行一次檢索丹莲,然后再判斷是否有文檔需要?jiǎng)h除。
循環(huán)檢索刪除索引文檔的函數(shù):
public List<String> deleteIndexesByString(String str){
boolean deleteFlag = true;
List<String> result = Lists.newArrayList();
while (deleteFlag) {
Optional<List<String>> deletedIndexesIdsOpt = deleteIndexesByString(question);
// 刪除中有錯(cuò)誤
if (!deletedIndexesOpt.isPresent()) {
log.error("delete indexes by string occurred a error", e);
return Lists.newArrayList();
}
List<String> deletedIndexesIds = deletedIndexesIdsOpt.get();
result.addAll(deletedIndexesIds);
// 如果沒有刪除索引尸诽,則結(jié)束循環(huán)
if (deletedIndexesIds.size() == 0) {
deleteFlag = false;
}
}
return result;
}
/**
* 檢索文檔甥材,并計(jì)算之后刪除部分文檔
**/
public Optional<List<String>> deleteIndexesByString(String str){
// 根據(jù)句子在elastic search庫(kù)中檢索索引文檔
List<Index> indexes =indexService.queryIndexByString(str);
log.debug("query indexes from es:[{}]", indexes);
// 通過重新計(jì)算索引文檔的分?jǐn)?shù), 獲取要?jiǎng)h除的文檔Id
List<String> deleteIndexIds=rankService.getDeleteIndexIds(indexes, question);
// 批量刪除索引文檔
List<BulkItemResponse.Failure> failures = indexService.bulkDeleteIndexDocs(indexName, type, deleteIndexIds);
// 刪除沒成功
if(!failures.isEmpty()){
return Optional.empty();
}
// 刪除成功
return Optional.of(deleteIndexIds);
}
批量刪除索引文檔的函數(shù):
private List<BulkItemResponse.Failure> bulkDeleteIndexDocs(String indexName, String type, List<String> ids) {
List<BulkItemResponse.Failure> failures = new LinkedList<>();
BulkRequestBuilder bulkRequest = getClient().prepareBulk();
for (String id : ids) {
bulkRequest.add(getClient().prepareDelete(indexName, type, id));
}
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
for (BulkItemResponse single :
bulkResponse.getItems()) {
if (single.isFailed()) {
failures.add(single.getFailure());
}
}
}
return failures;
}
問題
在上述代碼邏輯中逊谋,運(yùn)行檢索刪除時(shí)擂达,許多索引文檔被重復(fù)刪除土铺,即bulkDeleteIndexDocs
方法在執(zhí)行完畢之后胶滋,elastic search庫(kù)中的索引并未同步更新。
解決
在剛開始悲敷,以為是bulkDeleteIndexDocs
中的bulkRequest.get();
方法是異步執(zhí)行的究恤,所以在執(zhí)行完這個(gè)方法之后,循環(huán)又執(zhí)行了許多遍后德。但是經(jīng)查文檔之后部宿,發(fā)現(xiàn)該方法為同步執(zhí)行的方法, 異步另有其他接口client.bulkAsync(request, RequestOptions.DEFAULT, listener);
瓢湃。
既是同步執(zhí)行方法理张,但是又多遍執(zhí)行檢索刪除操作,只能以為批量刪除之后绵患,elastic search索引庫(kù)并未及時(shí)更新雾叭,多方查閱找不到較好辦法來強(qiáng)制更新,所以在執(zhí)行完批量操作之后暫時(shí)使線程睡眠1000ms來等待elastic search索引庫(kù)更新完成再繼續(xù)下一次循環(huán)落蝙,此種方法雖能使檢索刪除操作次數(shù)減少织狐,但僅僅是權(quán)益之際,還得另尋方法筏勒。
private List<BulkItemResponse.Failure> bulkDeleteIndexDocs(String indexName, String type, List<String> ids) {
List<BulkItemResponse.Failure> failures = new LinkedList<>();
BulkRequestBuilder bulkRequest = getClient().prepareBulk();
for (String id : ids) {
bulkRequest.add(getClient().prepareDelete(indexName, type, id));
}
BulkResponse bulkResponse = bulkRequest.get();
// 進(jìn)程睡眠1000毫秒移迫,使elastic search索引值更新
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("thread sleep error!");
}
if (bulkResponse.hasFailures()) {
for (BulkItemResponse single :
bulkResponse.getItems()) {
if (single.isFailed()) {
failures.add(single.getFailure());
}
}
}
return failures;
}