HBase Observer中ES增加創(chuàng)建IK mapping

ES中的中文分詞支持改為用IK分詞
在調(diào)用java api時(shí),需要指定字段使用IK分詞創(chuàng)建mapping
同時(shí)ES還從原來(lái)使用的BulkRequestBuilder粟按,改成參數(shù)更多更靈活的BulkProcessor抹凳。

1.原來(lái)的ElasticSearchOperator

package com.xxx.data;


import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;

import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.ImmutableSettings;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ElasticSearchOperator {

    // 緩沖池容量
    private static final int MAX_BULK_COUNT = 10;
    // 最大提交間隔(秒)
    private static final int MAX_COMMIT_INTERVAL = 60 * 5;

    private static Client client = null;
    private static BulkRequestBuilder bulkRequestBuilder = null;

    private static Lock commitLock = new ReentrantLock();

    static {

        // elasticsearch1.5.0
//        Settings settings = ImmutableSettings.settingsBuilder()
//                .put("cluster.name", Config.clusterName).build();
//        client = new TransportClient(settings)
//                .addTransportAddress(new InetSocketTransportAddress(
//                        Config.nodeHost, Config.nodePort));

        // 2.3.5
        client = MyTransportClient.client;

        bulkRequestBuilder = client.prepareBulk();
        bulkRequestBuilder.setRefresh(true);

        Timer timer = new Timer();
        timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000);
    }

    /**
     * 判斷緩存池是否已滿遏餐,批量提交
     *
     * @param threshold
     */
    private static void bulkRequest(int threshold) {
        if (bulkRequestBuilder.numberOfActions() > threshold) {
            BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
            if (!bulkResponse.hasFailures()) {
                bulkRequestBuilder = client.prepareBulk();
            }
        }
    }

    /**
     * 加入索引請(qǐng)求到緩沖池
     *
     * @param builder
     */
    public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * 加入刪除請(qǐng)求到緩沖池
     *
     * @param builder
     */
    public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * 定時(shí)任務(wù),避免RegionServer遲遲無(wú)數(shù)據(jù)更新赢底,導(dǎo)致ElasticSearch沒(méi)有與HBase同步
     */
    static class CommitTimer extends TimerTask {
        @Override
        public void run() {
            commitLock.lock();
            try {
                bulkRequest(0);
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                commitLock.unlock();
            }
        }
    }

    private static void test() {
        Config.indexName = "flume-2016-08-10";
        Config.typeName = "tweet";
        for (int i = 10; i < 20; i++) {
            Map<String, Object> json = new HashMap<String, Object>();
            json.put("field", "ttt");
            //添加
//            addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, String.valueOf(i)).setDoc(json).setUpsert(json));
            //刪除
            addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, String.valueOf(i)));
        }

        System.out.println(bulkRequestBuilder.numberOfActions());
    }

    public static void main(String[] args) {
        test();
    }
}

2.改成ElasticSearchBulkProcessor

package com.xxx.data;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;

import java.util.*;

/**
 * Created by lisiyu on 16/9/19.
 */
public class ElasticSearchBulkProcessor {

    private static Client client = null;
    private static BulkProcessor bulkProcessor = null;

    // 緩沖池容量(計(jì)數(shù),request)
    private static final int MAX_BULK_COUNT = 1000;
    // 緩沖池容量(大小,MB)
    private static final int MAX_BULK_SIZE = 1024;
    // 最大提交間隔(秒)
    private static final int MAX_COMMIT_INTERVAL = 60 * 1;
    // 最大并發(fā)數(shù)量
    private static final int MAX_CONCURRENT_REQUEST = 2;
    // 失敗重試等待時(shí)間 (ms)
    private static final int REJECT_EXCEPTION_RETRY_WAIT = 500;
    // 失敗重試次數(shù)
    private static final int REJECT_EXCEPTION_RETRY_TIMES = 3;

    static {
        // 2.3.5
        client = MyTransportClient.client;

        bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {  }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {  }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {  }
                })
                .setBulkActions(MAX_BULK_COUNT)
                .setBulkSize(new ByteSizeValue(MAX_BULK_SIZE, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(MAX_COMMIT_INTERVAL))
                .setConcurrentRequests(MAX_CONCURRENT_REQUEST)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(
                                TimeValue.timeValueMillis(REJECT_EXCEPTION_RETRY_WAIT),
                                REJECT_EXCEPTION_RETRY_TIMES))
                .build();
    }

    /**
     * 加入索引請(qǐng)求到緩沖池
     *
     * @param indexRequest
     * @param fieldSet
     */
    public static void addIndexRequestToBulkProcessor(IndexRequest indexRequest,Set<String> fieldSet) {
        try {
            // 獲取索引及類(lèi)型信息
            System.out.println("index:"+indexRequest.index());
            System.out.println("type:"+indexRequest.type());

            // 嘗試創(chuàng)建索引,并指定ik中文分詞
            createMapping(indexRequest.index(),indexRequest.type(),fieldSet);

            // 更新數(shù)據(jù)
            bulkProcessor.add(indexRequest);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 創(chuàng)建mapping(feid("indexAnalyzer","ik")該字段分詞IK索引 境输;feid("searchAnalyzer","ik")該字段分詞ik查詢(xún)蔗牡;具體分詞插件請(qǐng)看IK分詞插件說(shuō)明)
     * @param index 索引名稱(chēng);
     * @param mappingType 索引類(lèi)型
     * @param fieldSet 列集合
     * @throws Exception
     */
    public static void createMapping(String index,String mappingType,Set<String> fieldSet)throws Exception{
        // 判斷index是否存在,不存在則創(chuàng)建索引,并啟用ik分詞器
        if(client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet().isExists()){
            System.out.println("index: '"+index+"' is exist!");
            new XContentFactory();
            XContentBuilder builder=XContentFactory.jsonBuilder()
                    .startObject()//注意不要加index和type
                    .startObject("properties")
                    .startObject("id").field("type", "string").field("store", "yes").endObject();
            for(String field : fieldSet){
                builder = builder.startObject(field).field("type", "string").field("store", "yes").field("analyzer", "ik").endObject();
            }
            builder = builder.endObject().endObject();

            PutMappingRequest mapping = Requests.putMappingRequest(index).type(mappingType).source(builder);
            client.admin().indices().putMapping(mapping).actionGet();

        } else {
            System.out.println("create index: '"+index+"'!");
            new XContentFactory();
            XContentBuilder builder=XContentFactory.jsonBuilder()
                    .startObject()//注意不要加index和type
                    .startObject("properties")
                    .startObject("id").field("type", "string").field("store", "yes").endObject();
            for(String field : fieldSet){
                builder = builder.startObject(field).field("type", "string").field("store", "yes").field("analyzer", "ik").endObject();
            }
            builder = builder.endObject().endObject();

            client.admin().indices().prepareCreate(index).addMapping(mappingType, builder).get();
        }
    }

    public static void test() {
        // on startup
        Client client = MyTransportClient.client;
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {  }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {  }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {  }
                })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();

        Map<String, Object> json = new HashMap<String, Object>();
        json.put("field", "test");
        bulkProcessor.add(new IndexRequest("twitter", "tweet", "1111").source(json));
    }

    public static void main(String[] args) {
        test();
    }
}

3.DataSyncObserver類(lèi)修改

@Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        /**
         * 原方法調(diào)用ElasticSearchOperator,沒(méi)有通過(guò)IK創(chuàng)建中文索引嗅剖。
         */
//        try {
//            String indexId = new String(put.getRow());
//            Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
////            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
//            Map<String, Object> json = new HashMap<String, Object>();
//            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
//                for (Cell cell : entry.getValue()) {
//                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
//                    String value = Bytes.toString(CellUtil.cloneValue(cell));
//                    json.put(key, value);
//                }
//            }
//            System.out.println();
//            ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json));
//            LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);
//        } catch (Exception ex) {
//            LOG.error(ex);
//        }

        /**
         * 新方法調(diào)用ElasticSearchBulkProcessor,通過(guò)IK創(chuàng)建中文索引辩越。
         */
        try {
            String indexId = new String(put.getRow());
            NavigableMap familyMap = put.getFamilyCellMap();
            HashSet set = new HashSet();
            HashMap json = new HashMap();
            Iterator mapIterator = familyMap.entrySet().iterator();

            while(mapIterator.hasNext()) {
                Map.Entry entry = (Map.Entry)mapIterator.next();
                Iterator valueIterator = ((List)entry.getValue()).iterator();

                while(valueIterator.hasNext()) {
                    Cell cell = (Cell)valueIterator.next();
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                    set.add(key);
                }
            }

            System.out.println();
            ElasticSearchBulkProcessor.addIndexRequestToBulkProcessor((new IndexRequest(Config.indexName, Config.typeName, indexId)).source(json), set);
            LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);
        } catch (Exception ex) {
            LOG.error(ex);
        }
    }

4.測(cè)試

  • 代碼打包
  • jar包上傳到hdfs
  • 創(chuàng)建hbase表,并修改表屬性關(guān)聯(lián)observer
  • 測(cè)試put新數(shù)據(jù)
  • 查看es中數(shù)據(jù)
  • 中文分詞測(cè)試

{"query":{"query_string":{"query":"拖鞋"}},"highlight":{"require_field_match":false,"explain":true,"fields":{"*":{}}}}

中文分詞測(cè)試.jpg

5.程序代碼整體和其余測(cè)試等操作可以查看另一篇文章

Sqoop導(dǎo)入HBase信粮,并借助Coprocessor協(xié)處理器同步索引到ES

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末黔攒,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子强缘,更是在濱河造成了極大的恐慌督惰,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件旅掂,死亡現(xiàn)場(chǎng)離奇詭異赏胚,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)商虐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)觉阅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人秘车,你說(shuō)我怎么就攤上這事典勇。” “怎么了叮趴?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵割笙,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我眯亦,道長(zhǎng)伤溉,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任妻率,我火速辦了婚禮乱顾,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘舌涨。我一直安慰自己,他們只是感情好扔字,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布囊嘉。 她就那樣靜靜地躺著,像睡著了一般革为。 火紅的嫁衣襯著肌膚如雪扭粱。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,837評(píng)論 1 290
  • 那天震檩,我揣著相機(jī)與錄音琢蛤,去河邊找鬼蜓堕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛博其,可吹牛的內(nèi)容都是我干的套才。 我是一名探鬼主播,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼慕淡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼背伴!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起峰髓,我...
    開(kāi)封第一講書(shū)人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤傻寂,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后携兵,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體疾掰,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年徐紧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了静檬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡浪汪,死狀恐怖巴柿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情死遭,我是刑警寧澤广恢,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站呀潭,受9級(jí)特大地震影響钉迷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜钠署,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一糠聪、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧谐鼎,春花似錦舰蟆、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至草戈,卻和暖如春塌鸯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背唐片。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工丙猬, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留涨颜,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓茧球,卻偏偏與公主長(zhǎng)得像庭瑰,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子袜腥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容