ES中的中文分詞支持改為用IK分詞
在調(diào)用java api時(shí),需要指定字段使用IK分詞創(chuàng)建mapping
同時(shí)ES還從原來(lái)使用的BulkRequestBuilder粟按,改成參數(shù)更多更靈活的BulkProcessor抹凳。
1.原來(lái)的ElasticSearchOperator
package com.xxx.data;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.ImmutableSettings;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;
public class ElasticSearchOperator {
// 緩沖池容量
private static final int MAX_BULK_COUNT = 10;
// 最大提交間隔(秒)
private static final int MAX_COMMIT_INTERVAL = 60 * 5;
private static Client client = null;
private static BulkRequestBuilder bulkRequestBuilder = null;
private static Lock commitLock = new ReentrantLock();
static {
// elasticsearch1.5.0
// Settings settings = ImmutableSettings.settingsBuilder()
// .put("cluster.name", Config.clusterName).build();
// client = new TransportClient(settings)
// .addTransportAddress(new InetSocketTransportAddress(
// Config.nodeHost, Config.nodePort));
// 2.3.5
client = MyTransportClient.client;
bulkRequestBuilder = client.prepareBulk();
bulkRequestBuilder.setRefresh(true);
Timer timer = new Timer();
timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000);
}
/**
* 判斷緩存池是否已滿遏餐,批量提交
*
* @param threshold
*/
private static void bulkRequest(int threshold) {
if (bulkRequestBuilder.numberOfActions() > threshold) {
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
if (!bulkResponse.hasFailures()) {
bulkRequestBuilder = client.prepareBulk();
}
}
}
/**
* 加入索引請(qǐng)求到緩沖池
*
* @param builder
*/
public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
commitLock.unlock();
}
}
/**
* 加入刪除請(qǐng)求到緩沖池
*
* @param builder
*/
public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
commitLock.unlock();
}
}
/**
* 定時(shí)任務(wù),避免RegionServer遲遲無(wú)數(shù)據(jù)更新赢底,導(dǎo)致ElasticSearch沒(méi)有與HBase同步
*/
static class CommitTimer extends TimerTask {
@Override
public void run() {
commitLock.lock();
try {
bulkRequest(0);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
commitLock.unlock();
}
}
}
private static void test() {
Config.indexName = "flume-2016-08-10";
Config.typeName = "tweet";
for (int i = 10; i < 20; i++) {
Map<String, Object> json = new HashMap<String, Object>();
json.put("field", "ttt");
//添加
// addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, String.valueOf(i)).setDoc(json).setUpsert(json));
//刪除
addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, String.valueOf(i)));
}
System.out.println(bulkRequestBuilder.numberOfActions());
}
public static void main(String[] args) {
test();
}
}
2.改成ElasticSearchBulkProcessor
package com.xxx.data;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.util.*;
/**
* Created by lisiyu on 16/9/19.
*/
public class ElasticSearchBulkProcessor {
private static Client client = null;
private static BulkProcessor bulkProcessor = null;
// 緩沖池容量(計(jì)數(shù),request)
private static final int MAX_BULK_COUNT = 1000;
// 緩沖池容量(大小,MB)
private static final int MAX_BULK_SIZE = 1024;
// 最大提交間隔(秒)
private static final int MAX_COMMIT_INTERVAL = 60 * 1;
// 最大并發(fā)數(shù)量
private static final int MAX_CONCURRENT_REQUEST = 2;
// 失敗重試等待時(shí)間 (ms)
private static final int REJECT_EXCEPTION_RETRY_WAIT = 500;
// 失敗重試次數(shù)
private static final int REJECT_EXCEPTION_RETRY_TIMES = 3;
static {
// 2.3.5
client = MyTransportClient.client;
bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { }
})
.setBulkActions(MAX_BULK_COUNT)
.setBulkSize(new ByteSizeValue(MAX_BULK_SIZE, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(MAX_COMMIT_INTERVAL))
.setConcurrentRequests(MAX_CONCURRENT_REQUEST)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(REJECT_EXCEPTION_RETRY_WAIT),
REJECT_EXCEPTION_RETRY_TIMES))
.build();
}
/**
* 加入索引請(qǐng)求到緩沖池
*
* @param indexRequest
* @param fieldSet
*/
public static void addIndexRequestToBulkProcessor(IndexRequest indexRequest,Set<String> fieldSet) {
try {
// 獲取索引及類(lèi)型信息
System.out.println("index:"+indexRequest.index());
System.out.println("type:"+indexRequest.type());
// 嘗試創(chuàng)建索引,并指定ik中文分詞
createMapping(indexRequest.index(),indexRequest.type(),fieldSet);
// 更新數(shù)據(jù)
bulkProcessor.add(indexRequest);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 創(chuàng)建mapping(feid("indexAnalyzer","ik")該字段分詞IK索引 境输;feid("searchAnalyzer","ik")該字段分詞ik查詢(xún)蔗牡;具體分詞插件請(qǐng)看IK分詞插件說(shuō)明)
* @param index 索引名稱(chēng);
* @param mappingType 索引類(lèi)型
* @param fieldSet 列集合
* @throws Exception
*/
public static void createMapping(String index,String mappingType,Set<String> fieldSet)throws Exception{
// 判斷index是否存在,不存在則創(chuàng)建索引,并啟用ik分詞器
if(client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet().isExists()){
System.out.println("index: '"+index+"' is exist!");
new XContentFactory();
XContentBuilder builder=XContentFactory.jsonBuilder()
.startObject()//注意不要加index和type
.startObject("properties")
.startObject("id").field("type", "string").field("store", "yes").endObject();
for(String field : fieldSet){
builder = builder.startObject(field).field("type", "string").field("store", "yes").field("analyzer", "ik").endObject();
}
builder = builder.endObject().endObject();
PutMappingRequest mapping = Requests.putMappingRequest(index).type(mappingType).source(builder);
client.admin().indices().putMapping(mapping).actionGet();
} else {
System.out.println("create index: '"+index+"'!");
new XContentFactory();
XContentBuilder builder=XContentFactory.jsonBuilder()
.startObject()//注意不要加index和type
.startObject("properties")
.startObject("id").field("type", "string").field("store", "yes").endObject();
for(String field : fieldSet){
builder = builder.startObject(field).field("type", "string").field("store", "yes").field("analyzer", "ik").endObject();
}
builder = builder.endObject().endObject();
client.admin().indices().prepareCreate(index).addMapping(mappingType, builder).get();
}
}
public static void test() {
// on startup
Client client = MyTransportClient.client;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
Map<String, Object> json = new HashMap<String, Object>();
json.put("field", "test");
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1111").source(json));
}
public static void main(String[] args) {
test();
}
}
3.DataSyncObserver類(lèi)修改
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
/**
* 原方法調(diào)用ElasticSearchOperator,沒(méi)有通過(guò)IK創(chuàng)建中文索引嗅剖。
*/
// try {
// String indexId = new String(put.getRow());
// Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
//// NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
// Map<String, Object> json = new HashMap<String, Object>();
// for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
// for (Cell cell : entry.getValue()) {
// String key = Bytes.toString(CellUtil.cloneQualifier(cell));
// String value = Bytes.toString(CellUtil.cloneValue(cell));
// json.put(key, value);
// }
// }
// System.out.println();
// ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json));
// LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);
// } catch (Exception ex) {
// LOG.error(ex);
// }
/**
* 新方法調(diào)用ElasticSearchBulkProcessor,通過(guò)IK創(chuàng)建中文索引辩越。
*/
try {
String indexId = new String(put.getRow());
NavigableMap familyMap = put.getFamilyCellMap();
HashSet set = new HashSet();
HashMap json = new HashMap();
Iterator mapIterator = familyMap.entrySet().iterator();
while(mapIterator.hasNext()) {
Map.Entry entry = (Map.Entry)mapIterator.next();
Iterator valueIterator = ((List)entry.getValue()).iterator();
while(valueIterator.hasNext()) {
Cell cell = (Cell)valueIterator.next();
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
json.put(key, value);
set.add(key);
}
}
System.out.println();
ElasticSearchBulkProcessor.addIndexRequestToBulkProcessor((new IndexRequest(Config.indexName, Config.typeName, indexId)).source(json), set);
LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);
} catch (Exception ex) {
LOG.error(ex);
}
}
4.測(cè)試
- 代碼打包
- jar包上傳到hdfs
- 創(chuàng)建hbase表,并修改表屬性關(guān)聯(lián)observer
- 測(cè)試put新數(shù)據(jù)
- 查看es中數(shù)據(jù)
- 中文分詞測(cè)試
{"query":{"query_string":{"query":"拖鞋"}},"highlight":{"require_field_match":false,"explain":true,"fields":{"*":{}}}}