一省撑、開發(fā)背景
知己知彼,百戰(zhàn)不殆俯在。既然要開發(fā)Elasticsearch批量寫入插件丁侄,那我們首先了解下ElasticSearch。
? Elasticsearch是一個實時分布式存儲朝巫、搜索和數(shù)據(jù)分析引擎鸿摇。它讓你以前所未有的速度處理大數(shù) 據(jù)成為可能。它用于全文搜索劈猿、結(jié)構(gòu)化搜索拙吉、分析以及將這三者混合使用潮孽。
? Elasticsearch也使用Java開發(fā)并使用Lucene作為其核心來實現(xiàn)所有索引和搜索的功能,但是它的目的是通過簡單的RESTful API來隱藏Lucene的復(fù)雜性筷黔,從而讓全文搜索變得簡單往史。
? Elasticsearch很快,快到不可思議佛舱。強(qiáng)大的設(shè)計椎例,方便我們通過有限狀態(tài)轉(zhuǎn)換器實現(xiàn)了用于全文檢索的倒排索引,實現(xiàn)了用于存儲數(shù)值數(shù)據(jù)和地理位置數(shù)據(jù)的 BKD 樹请祖,以及用于分析的列存儲订歪。
? Elasticsearch和傳統(tǒng)數(shù)據(jù)庫RDBMS比較
二、索引創(chuàng)建
基于Kettle環(huán)境平臺肆捕,構(gòu)建app-pentaho-es6或app-pentaho-es7插件刷晋,實現(xiàn)原理是動態(tài)數(shù)據(jù)流字段,自定動態(tài)索引慎陵,來實現(xiàn)ElasticSearch Bulk Insert批量寫入眼虱。所以,我們首先要了解如何基于索引模板(kettle-es)模式席纽,按日期分片創(chuàng)建動態(tài)索引(kettle-es_*)實現(xiàn)寫入捏悬,按別名kettle-es-query來實現(xiàn)索引檢索。具體示例如下:
PUT _template/kettle-es
{
"index_patterns": [
"kettle-es_*"
],
"settings": {
"index": {
"max_result_window": "100000",
"number_of_shards": "3",
"number_of_replicas": "1"
}
},
"aliases": {
"kettle-es-query": {}
},
"mappings": {
"properties": {
"agent_ip": {
"type": "ip"
},
"record_id": {
"type": "integer"
},
"start_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"fire_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"end_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"pid": {
"type": "keyword"
},
"message": {
"index": false,
"type": "text"
}
}
}
}
三润梯、ElasticSearch Bulk Insert源代碼介紹
3.1过牙、源代碼目錄結(jié)構(gòu)
無論是app-pentaho-es6或app-pentaho-es7,從代碼目錄結(jié)構(gòu)來看仆救,都是ElasticSearchBulk步驟類抒和、ElasticSearchBulkData數(shù)據(jù)類矫渔、ElasticSearchBulkMeta元數(shù)據(jù)類和ElasticSearchBulkDialog對話框類彤蔽、日志消息提醒配置message。具體可查看源代碼
3.1.1庙洼、目錄結(jié)構(gòu)對比
3.1.2顿痪、maven配置對比
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
<build.revision>${project.version}</build.revision>
<timestamp>${maven.build.timestamp}</timestamp>
<build.description>${project.description}</build.description>
<maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
<elasticsearch.version>6.4.2</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
<build.revision>${project.version}</build.revision>
<timestamp>${maven.build.timestamp}</timestamp>
<build.description>${project.description}</build.description>
<maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
<elasticsearch.version>7.2.0</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependencies>
3.2、app-pentaho-es6插件
3.2.1油够、TransportClient API說明
app-pentaho-es6插件蚁袭,基于TransportClient使用傳輸模塊遠(yuǎn)程連接到集群。后面石咬,elastic計劃在Elasticsearch 7.0中棄用TransportClient揩悄,并在8.0中完全刪除它。
獲取Elasticsearch客戶端最常用方法是創(chuàng)建連接到群集的TransportClient鬼悠。它不加入集群删性,而只是獲取一個或多個初始傳輸?shù)刂房髂龋⒃诿總€操作上以循環(huán)方式與它們通信。
3.2.2蹬挺、客戶端初始化
(1)確定PreBuiltTransportClient連接es集群名稱维贺、地址、端口和協(xié)議等信息巴帮,設(shè)置TransportAddress配置
(2)測試制定Index是否正常連接成功溯泣,得到Client
private void initClient() throws UnknownHostException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
Map<String, String> tMetaMap = meta.getSettingsMap();
Iterator<Entry<String, String>> iter = tMetaMap.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, String> entry = (Entry<String, String>) iter.next();
settingsBuilder.put(entry.getKey(),
environmentSubstitute(entry.getValue()));
}
PreBuiltTransportClient tClient = new PreBuiltTransportClient(
settingsBuilder.build());
for (Server server : meta.getServers()) {
tClient.addTransportAddress(new TransportAddress(InetAddress
.getByName(environmentSubstitute(server.getAddress())),
server.getPort()));
}
client = tClient;
}
3.2.3、數(shù)據(jù)流處理
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
throws KettleException {
Object[] rowData = getRow();
if (rowData == null) {
if (currentRequest != null && currentRequest.numberOfActions() > 0) {
// didn't fill a whole batch
processBatch(false);
}
setOutputDone();
return false;
}
if (first) {
first = false;
setupData();
currentRequest = client.prepareBulk();
requestsBuffer = new ArrayList<IndexRequestBuilder>(this.batchSize);
initFieldIndexes();
}
try {
data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
return indexRow(data.inputRowMeta, rowData) || !stopOnError;
} catch (KettleStepException e) {
throw e;
} catch (Exception e) {
rejectAllRows(e.getLocalizedMessage());
String msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
logError(msg);
throw new KettleStepException(msg, e);
}
}
3.2.4榕茧、數(shù)據(jù)批次處理
private boolean processBatch(boolean makeNew) throws KettleStepException {
ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
boolean responseOk = false;
BulkResponse response = null;
try {
if (timeout != null && timeoutUnit != null) {
response = actionFuture.actionGet(timeout, timeoutUnit);
} else {
response = actionFuture.actionGet();
}
} catch (ElasticsearchException e) {
String msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Error.BatchExecuteFail",
e.getLocalizedMessage());
if (e instanceof ElasticsearchTimeoutException) {
msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Error.Timeout");
}
logError(msg);
rejectAllRows(msg);
}
if (response != null) {
responseOk = handleResponse(response);
requestsBuffer.clear();
} else { // have to assume all failed
numberOfErrors += currentRequest.numberOfActions();
setErrors(numberOfErrors);
}
// duration += response.getTookInMillis(); //just in trunk..
if (makeNew) {
currentRequest = client.prepareBulk();
data.nextBufferRowIdx = 0;
data.inputRowBuffer = new Object[batchSize][];
} else {
currentRequest = null;
data.inputRowBuffer = null;
}
return responseOk;
}
3.3垃沦、app-pentaho-es7插件
3.3.1、RestHighLevelClient API說明
app-pentaho-es7插件雪猪,基于Elasticsearch提供的Java高級REST客戶端RestHighLevelClient栏尚,它執(zhí)行HTTP請求而不是序列化的Java請求。Java客戶端主要用途有:
(1)在現(xiàn)有集群上執(zhí)行標(biāo)準(zhǔn)索引只恨,獲取译仗,刪除和搜索操作
(2)在正在運行的集群上執(zhí)行管理任務(wù)
3.3.2、客戶端初始化
(1)使用CredentialsProvider初始化Elasticsearch身份認(rèn)證
(2)確定RestHighLevelClient連接es集群名稱官觅、地址纵菌、端口和協(xié)議等信息,設(shè)置setHttpClientConfigCallback回調(diào)配置
(3)測試制定Index是否正常連接成功休涤,得到Client
private void initClient() throws UnknownHostException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
meta.getSettingsMap()
.entrySet()
.stream()
.forEach(
(s) -> settingsBuilder.put(s.getKey(),
environmentSubstitute(s.getValue())));
RestHighLevelClient rclient = null;
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(settingsBuilder.get("es.user"),
settingsBuilder.get("es.password")));
for (Server server : meta.getServers()) {
rclient = new RestHighLevelClient(RestClient.builder(
new HttpHost(server.getAddress(), Integer.valueOf(server
.getPort()), "http")).setHttpClientConfigCallback(
new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
}));
}
client = rclient;
}
3.3.3咱圆、數(shù)據(jù)流處理
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
throws KettleException {
Object[] rowData = getRow();
if (rowData == null) {
if (currentRequest != null && currentRequest.numberOfActions() > 0) {
processBatch(false);
}
setOutputDone();
return false;
}
if (first) {
first = false;
setupData();
requestsBuffer = new ArrayList<IndexRequest>(this.batchSize);
initFieldIndexes();
}
try {
data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
return indexRow(data.inputRowMeta, rowData) || !stopOnError;
} catch (KettleStepException e) {
throw e;
} catch (Exception e) {
rejectAllRows(e.getLocalizedMessage());
String msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
logError(msg);
throw new KettleStepException(msg, e);
}
}
3.3.4、數(shù)據(jù)批次處理
private boolean processBatch(boolean makeNew) throws KettleStepException {
BulkResponse response = null;
// ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
try {
response = client.bulk(currentRequest, RequestOptions.DEFAULT);
} catch (IOException e1) {
rejectAllRows(e1.getLocalizedMessage());
String msg = BaseMessages
.getString(PKG, "ElasticSearchBulk.Log.Exception",
e1.getLocalizedMessage());
logError(msg);
throw new KettleStepException(msg, e1);
}
boolean responseOk = false;
if (response != null) {
responseOk = handleResponse(response);
requestsBuffer.clear();
} else { // have to assume all failed
numberOfErrors += currentRequest.numberOfActions();
setErrors(numberOfErrors);
}
if (makeNew) {
// currentRequest = client.prepareBulk();
try {
client.bulk(currentRequest, RequestOptions.DEFAULT);
} catch (IOException e1) {
rejectAllRows(e1.getLocalizedMessage());
String msg = BaseMessages.getString(PKG,
"ElasticSearchBulk.Log.Exception",
e1.getLocalizedMessage());
logError(msg);
throw new KettleStepException(msg, e1);
}
data.nextBufferRowIdx = 0;
data.inputRowBuffer = new Object[batchSize][];
} else {
currentRequest = null;
data.inputRowBuffer = null;
}
return responseOk;
}
無論服務(wù)端是那個版本的Elasticsearch集群功氨,客戶端必須具有與服務(wù)端群集中的節(jié)點相同的主要版本(例如6.x或7.x)
四序苏、ElasticSearch Bulk Insert使用說明
4.1、General參數(shù)
①Index:動態(tài)索引字段捷凄,索引前綴+動態(tài)日期
②Type:默認(rèn)_doc
③Test Index:在線檢查索引是否存在
④Batch Size:批次大小
⑤Stop on error:遇到錯誤是否終止
⑥Batch Timeout:批次寫入超時時間忱详,單位秒
⑦Id Field:即文檔ID,doc_id
⑧Overwrite if exists:存在是否覆蓋
⑨Output Rows:輸出行
4.2跺涤、Servers參數(shù)
①Address:Elasticsearch集群地址列表
②Port:匹配端口號
4.3匈睁、Fields輸出字段
①Name:數(shù)據(jù)流字段
②Target Name:Elasticsearch集群對應(yīng)index,目標(biāo)mapping字段
4.4桶错、Settings參數(shù)
①cluster.name:集群名稱
②es.user:es鑒權(quán)認(rèn)證用戶名航唆,自定義參數(shù)名
③es.password:es鑒權(quán)認(rèn)證密碼,自定義參數(shù)名
五院刁、總結(jié)
如果你需要源碼或者了解更多自定義插件及集成方式糯钙,抑或有開發(fā)過程或者使用過程中的任何疑問或建議,請關(guān)注小編"游走在數(shù)據(jù)之間"。