Kettle插件開發(fā)之Elasticsearch篇

一省撑、開發(fā)背景

知己知彼,百戰(zhàn)不殆俯在。既然要開發(fā)Elasticsearch批量寫入插件丁侄,那我們首先了解下ElasticSearch。
? Elasticsearch是一個實時分布式存儲朝巫、搜索和數(shù)據(jù)分析引擎鸿摇。它讓你以前所未有的速度處理大數(shù) 據(jù)成為可能。它用于全文搜索劈猿、結(jié)構(gòu)化搜索拙吉、分析以及將這三者混合使用潮孽。
? Elasticsearch也使用Java開發(fā)并使用Lucene作為其核心來實現(xiàn)所有索引和搜索的功能,但是它的目的是通過簡單的RESTful API來隱藏Lucene的復(fù)雜性筷黔,從而讓全文搜索變得簡單往史。
? Elasticsearch很快,快到不可思議佛舱。強(qiáng)大的設(shè)計椎例,方便我們通過有限狀態(tài)轉(zhuǎn)換器實現(xiàn)了用于全文檢索的倒排索引,實現(xiàn)了用于存儲數(shù)值數(shù)據(jù)和地理位置數(shù)據(jù)的 BKD 樹请祖,以及用于分析的列存儲订歪。
? Elasticsearch和傳統(tǒng)數(shù)據(jù)庫RDBMS比較


image.png

二、索引創(chuàng)建

基于Kettle環(huán)境平臺肆捕,構(gòu)建app-pentaho-es6或app-pentaho-es7插件刷晋,實現(xiàn)原理是動態(tài)數(shù)據(jù)流字段,自定動態(tài)索引慎陵,來實現(xiàn)ElasticSearch Bulk Insert批量寫入眼虱。所以,我們首先要了解如何基于索引模板(kettle-es)模式席纽,按日期分片創(chuàng)建動態(tài)索引(kettle-es_*)實現(xiàn)寫入捏悬,按別名kettle-es-query來實現(xiàn)索引檢索。具體示例如下:

PUT _template/kettle-es
{
  "index_patterns": [
    "kettle-es_*"
  ],
  "settings": {
    "index": {
      "max_result_window": "100000",
      "number_of_shards": "3",
      "number_of_replicas": "1"
    }
  },
  "aliases": {
    "kettle-es-query": {}
  },
  "mappings": {
    "properties": {
      "agent_ip": {
        "type": "ip"
      },
      "record_id": {
        "type": "integer"
      },
      "start_time": {
        "format": "yyyy-MM-dd HH:mm:ss",
        "type": "date"
      },
      "fire_time": {
        "format": "yyyy-MM-dd HH:mm:ss",
        "type": "date"
      },
      "end_time": {
        "format": "yyyy-MM-dd HH:mm:ss",
        "type": "date"
      },
      "pid": {
        "type": "keyword"
      },
      "message": {
        "index": false,
        "type": "text"
      }
    }
  }
}

三润梯、ElasticSearch Bulk Insert源代碼介紹

3.1过牙、源代碼目錄結(jié)構(gòu)

無論是app-pentaho-es6或app-pentaho-es7,從代碼目錄結(jié)構(gòu)來看仆救,都是ElasticSearchBulk步驟類抒和、ElasticSearchBulkData數(shù)據(jù)類矫渔、ElasticSearchBulkMeta元數(shù)據(jù)類和ElasticSearchBulkDialog對話框類彤蔽、日志消息提醒配置message。具體可查看源代碼

3.1.1庙洼、目錄結(jié)構(gòu)對比

image.png

3.1.2顿痪、maven配置對比

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
        <build.revision>${project.version}</build.revision>
        <timestamp>${maven.build.timestamp}</timestamp>
        <build.description>${project.description}</build.description>
        <maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
        <elasticsearch.version>6.4.2</elasticsearch.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>compile</scope>
        </dependency>
   <dependencies>
<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
        <build.revision>${project.version}</build.revision>
        <timestamp>${maven.build.timestamp}</timestamp>
        <build.description>${project.description}</build.description>
        <maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
        <elasticsearch.version>7.2.0</elasticsearch.version>
    </properties>
    <dependencies>
       <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>compile</scope>
        </dependency>
   <dependencies>

3.2、app-pentaho-es6插件

3.2.1油够、TransportClient API說明

app-pentaho-es6插件蚁袭,基于TransportClient使用傳輸模塊遠(yuǎn)程連接到集群。后面石咬,elastic計劃在Elasticsearch 7.0中棄用TransportClient揩悄,并在8.0中完全刪除它。
  獲取Elasticsearch客戶端最常用方法是創(chuàng)建連接到群集的TransportClient鬼悠。它不加入集群删性,而只是獲取一個或多個初始傳輸?shù)刂房髂龋⒃诿總€操作上以循環(huán)方式與它們通信。

3.2.2蹬挺、客戶端初始化

(1)確定PreBuiltTransportClient連接es集群名稱维贺、地址、端口和協(xié)議等信息巴帮,設(shè)置TransportAddress配置
(2)測試制定Index是否正常連接成功溯泣,得到Client

private void initClient() throws UnknownHostException {
        Settings.Builder settingsBuilder = Settings.builder();
        settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
        Map<String, String> tMetaMap = meta.getSettingsMap();
        Iterator<Entry<String, String>> iter = tMetaMap.entrySet().iterator();
        while (iter.hasNext()) {
            Entry<String, String> entry = (Entry<String, String>) iter.next();
            settingsBuilder.put(entry.getKey(),
                    environmentSubstitute(entry.getValue()));
        }
        PreBuiltTransportClient tClient = new PreBuiltTransportClient(
                settingsBuilder.build());
        for (Server server : meta.getServers()) {
            tClient.addTransportAddress(new TransportAddress(InetAddress
                    .getByName(environmentSubstitute(server.getAddress())),
                    server.getPort()));
        }
        client = tClient;
    }

3.2.3、數(shù)據(jù)流處理

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
            throws KettleException {
        Object[] rowData = getRow();
        if (rowData == null) {
            if (currentRequest != null && currentRequest.numberOfActions() > 0) {
                // didn't fill a whole batch
                processBatch(false);
            }
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            setupData();
            currentRequest = client.prepareBulk();
            requestsBuffer = new ArrayList<IndexRequestBuilder>(this.batchSize);
            initFieldIndexes();
        }
        try {
            data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
            return indexRow(data.inputRowMeta, rowData) || !stopOnError;
        } catch (KettleStepException e) {
            throw e;
        } catch (Exception e) {
            rejectAllRows(e.getLocalizedMessage());
            String msg = BaseMessages.getString(PKG,
                    "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
            logError(msg);
            throw new KettleStepException(msg, e);
        }
    }

3.2.4榕茧、數(shù)據(jù)批次處理

private boolean processBatch(boolean makeNew) throws KettleStepException {
        ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
        boolean responseOk = false;
        BulkResponse response = null;
        try {
            if (timeout != null && timeoutUnit != null) {
                response = actionFuture.actionGet(timeout, timeoutUnit);
            } else {
                response = actionFuture.actionGet();
            }
        } catch (ElasticsearchException e) {
            String msg = BaseMessages.getString(PKG,
                    "ElasticSearchBulk.Error.BatchExecuteFail",
                    e.getLocalizedMessage());
            if (e instanceof ElasticsearchTimeoutException) {
                msg = BaseMessages.getString(PKG,
                        "ElasticSearchBulk.Error.Timeout");
            }
            logError(msg);
            rejectAllRows(msg);
        }
        if (response != null) {
            responseOk = handleResponse(response);
            requestsBuffer.clear();
        } else { // have to assume all failed
            numberOfErrors += currentRequest.numberOfActions();
            setErrors(numberOfErrors);
        }
        // duration += response.getTookInMillis(); //just in trunk..
        if (makeNew) {
            currentRequest = client.prepareBulk();
            data.nextBufferRowIdx = 0;
            data.inputRowBuffer = new Object[batchSize][];
        } else {
            currentRequest = null;
            data.inputRowBuffer = null;
        }
        return responseOk;
    }

3.3垃沦、app-pentaho-es7插件

3.3.1、RestHighLevelClient API說明

app-pentaho-es7插件雪猪,基于Elasticsearch提供的Java高級REST客戶端RestHighLevelClient栏尚,它執(zhí)行HTTP請求而不是序列化的Java請求。Java客戶端主要用途有:
(1)在現(xiàn)有集群上執(zhí)行標(biāo)準(zhǔn)索引只恨,獲取译仗,刪除和搜索操作
(2)在正在運行的集群上執(zhí)行管理任務(wù)

3.3.2、客戶端初始化

(1)使用CredentialsProvider初始化Elasticsearch身份認(rèn)證
(2)確定RestHighLevelClient連接es集群名稱官觅、地址纵菌、端口和協(xié)議等信息,設(shè)置setHttpClientConfigCallback回調(diào)配置
(3)測試制定Index是否正常連接成功休涤,得到Client

private void initClient() throws UnknownHostException {
        Settings.Builder settingsBuilder = Settings.builder();
        settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
        meta.getSettingsMap()
                .entrySet()
                .stream()
                .forEach(
                        (s) -> settingsBuilder.put(s.getKey(),
                                environmentSubstitute(s.getValue())));
        RestHighLevelClient rclient = null;
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(settingsBuilder.get("es.user"),
                        settingsBuilder.get("es.password")));
        for (Server server : meta.getServers()) {
            rclient = new RestHighLevelClient(RestClient.builder(
                    new HttpHost(server.getAddress(), Integer.valueOf(server
                            .getPort()), "http")).setHttpClientConfigCallback(
                    new RestClientBuilder.HttpClientConfigCallback() {
                        public HttpAsyncClientBuilder customizeHttpClient(
                                HttpAsyncClientBuilder httpClientBuilder) {
                            return httpClientBuilder
                                    .setDefaultCredentialsProvider(credentialsProvider);
                        }
                    }));
        }
        client = rclient;
    }

3.3.3咱圆、數(shù)據(jù)流處理

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
            throws KettleException {
        Object[] rowData = getRow();
        if (rowData == null) {
            if (currentRequest != null && currentRequest.numberOfActions() > 0) {
                processBatch(false);
            }
            setOutputDone();
            return false;
        }
        if (first) {
            first = false;
            setupData();
            requestsBuffer = new ArrayList<IndexRequest>(this.batchSize);
            initFieldIndexes();
        }
        try {
            data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
            return indexRow(data.inputRowMeta, rowData) || !stopOnError;
        } catch (KettleStepException e) {
            throw e;
        } catch (Exception e) {
            rejectAllRows(e.getLocalizedMessage());
            String msg = BaseMessages.getString(PKG,
                    "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
            logError(msg);
            throw new KettleStepException(msg, e);
        }
    }

3.3.4、數(shù)據(jù)批次處理

private boolean processBatch(boolean makeNew) throws KettleStepException {
        BulkResponse response = null;
        // ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
        try {
            response = client.bulk(currentRequest, RequestOptions.DEFAULT);
        } catch (IOException e1) {
            rejectAllRows(e1.getLocalizedMessage());
            String msg = BaseMessages
                    .getString(PKG, "ElasticSearchBulk.Log.Exception",
                            e1.getLocalizedMessage());
            logError(msg);
            throw new KettleStepException(msg, e1);
        }
        boolean responseOk = false;
        if (response != null) {
            responseOk = handleResponse(response);
            requestsBuffer.clear();
        } else { // have to assume all failed
            numberOfErrors += currentRequest.numberOfActions();
            setErrors(numberOfErrors);
        }
        if (makeNew) {
            // currentRequest = client.prepareBulk();
            try {
                client.bulk(currentRequest, RequestOptions.DEFAULT);
            } catch (IOException e1) {
                rejectAllRows(e1.getLocalizedMessage());
                String msg = BaseMessages.getString(PKG,
                        "ElasticSearchBulk.Log.Exception",
                        e1.getLocalizedMessage());
                logError(msg);
                throw new KettleStepException(msg, e1);
            }
            data.nextBufferRowIdx = 0;
            data.inputRowBuffer = new Object[batchSize][];
        } else {
            currentRequest = null;
            data.inputRowBuffer = null;
        }
        return responseOk;
    }

無論服務(wù)端是那個版本的Elasticsearch集群功氨,客戶端必須具有與服務(wù)端群集中的節(jié)點相同的主要版本(例如6.x或7.x)

四序苏、ElasticSearch Bulk Insert使用說明

4.1、General參數(shù)

①Index:動態(tài)索引字段捷凄,索引前綴+動態(tài)日期

②Type:默認(rèn)_doc

③Test Index:在線檢查索引是否存在

④Batch Size:批次大小

⑤Stop on error:遇到錯誤是否終止

⑥Batch Timeout:批次寫入超時時間忱详,單位秒

⑦Id Field:即文檔ID,doc_id

⑧Overwrite if exists:存在是否覆蓋

⑨Output Rows:輸出行

image.png

4.2跺涤、Servers參數(shù)

①Address:Elasticsearch集群地址列表

②Port:匹配端口號

image.png

4.3匈睁、Fields輸出字段

①Name:數(shù)據(jù)流字段

②Target Name:Elasticsearch集群對應(yīng)index,目標(biāo)mapping字段

image.png

4.4桶错、Settings參數(shù)

①cluster.name:集群名稱

②es.user:es鑒權(quán)認(rèn)證用戶名航唆,自定義參數(shù)名

③es.password:es鑒權(quán)認(rèn)證密碼,自定義參數(shù)名

image.png

五院刁、總結(jié)

如果你需要源碼或者了解更多自定義插件及集成方式糯钙,抑或有開發(fā)過程或者使用過程中的任何疑問或建議,請關(guān)注小編"游走在數(shù)據(jù)之間"。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末任岸,一起剝皮案震驚了整個濱河市鸳玩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌演闭,老刑警劉巖不跟,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異米碰,居然都是意外死亡窝革,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門吕座,熙熙樓的掌柜王于貴愁眉苦臉地迎上來虐译,“玉大人,你說我怎么就攤上這事吴趴∑岱蹋” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵锣枝,是天一觀的道長厢拭。 經(jīng)常有香客問我,道長撇叁,這世上最難降的妖魔是什么供鸠? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮陨闹,結(jié)果婚禮上楞捂,老公的妹妹穿的比我還像新娘。我一直安慰自己趋厉,他們只是感情好寨闹,可當(dāng)我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著君账,像睡著了一般繁堡。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上杈绸,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天帖蔓,我揣著相機(jī)與錄音矮瘟,去河邊找鬼瞳脓。 笑死,一個胖子當(dāng)著我的面吹牛澈侠,可吹牛的內(nèi)容都是我干的劫侧。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼烧栋!你這毒婦竟也來了写妥?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤审姓,失蹤者是張志新(化名)和其女友劉穎珍特,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體魔吐,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡扎筒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了酬姆。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嗜桌。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖辞色,靈堂內(nèi)的尸體忽然破棺而出骨宠,到底是詐尸還是另有隱情,我是刑警寧澤相满,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布层亿,位于F島的核電站,受9級特大地震影響立美,放射性物質(zhì)發(fā)生泄漏棕所。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一悯辙、第九天 我趴在偏房一處隱蔽的房頂上張望琳省。 院中可真熱鬧,春花似錦躲撰、人聲如沸针贬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽桦他。三九已至,卻和暖如春谆棱,著一層夾襖步出監(jiān)牢的瞬間快压,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工垃瞧, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留蔫劣,地道東北人。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓个从,卻偏偏與公主長得像脉幢,于是被迫代替她去往敵國和親歪沃。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容