Hbase二級索引(BaseRegionObserver 協(xié)處理器)

概述

HBase 是一款基于 Hadoop 的 key-value 數(shù)據(jù)庫,它提供了對 HDFS 上數(shù)據(jù)的高效隨機(jī)讀寫服務(wù)霎奢,完美地填補(bǔ)了 Hadoop MapReduce 僅適于批處理的缺陷偏瓤,正在被越來越多的用戶使用。作為 HBase 的一項(xiàng)重要特性椰憋,Coprocessor 在 HBase 0.92 版本中被加入厅克,并廣受歡迎

利用協(xié)處理器,用戶可以編寫運(yùn)行在 HBase Server 端的代碼橙依。HBase 支持兩種類型的協(xié)處理器证舟,Endpoint 和 Observer硕旗。Endpoint 協(xié)處理器類似傳統(tǒng)數(shù)據(jù)庫中的存儲過程,客戶端可以調(diào)用這些 Endpoint 協(xié)處理器執(zhí)行一段 Server 端代碼女责,并將 Server 端代碼的結(jié)果返回給客戶端進(jìn)一步處理漆枚,最常見的用法就是進(jìn)行聚集操作。如果沒有協(xié)處理器抵知,當(dāng)用戶需要找出一張表中的最大數(shù)據(jù)墙基,即 max 聚合操作,就必須進(jìn)行全表掃描刷喜,在客戶端代碼內(nèi)遍歷掃描結(jié)果残制,并執(zhí)行求最大值的操作。這樣的方法無法利用底層集群的并發(fā)能力掖疮,而將所有計算都集中到 Client 端統(tǒng)一執(zhí)行初茶,勢必效率低下。利用 Coprocessor浊闪,用戶可以將求最大值的代碼部署到 HBase Server 端恼布,HBase 將利用底層 cluster 的多個節(jié)點(diǎn)并發(fā)執(zhí)行求最大值的操作。即在每個 Region 范圍內(nèi)執(zhí)行求最大值的代碼搁宾,將每個 Region 的最大值在 Region Server 端計算出折汞,僅僅將該 max 值返回給客戶端。在客戶端進(jìn)一步將多個 Region 的最大值進(jìn)一步處理而找到其中的最大值盖腿。這樣整體的執(zhí)行效率就會提高很多字支。

另外一種協(xié)處理器叫做 Observer Coprocessor,這種協(xié)處理器類似于傳統(tǒng)數(shù)據(jù)庫中的觸發(fā)器奸忽,當(dāng)發(fā)生某些事件的時候這類協(xié)處理器會被 Server 端調(diào)用堕伪。Observer Coprocessor 就是一些散布在 HBase Server 端代碼中的 hook 鉤子,在固定的事件發(fā)生時被調(diào)用栗菜。比如:put 操作之前有鉤子函數(shù) prePut欠雌,該函數(shù)在 put 操作執(zhí)行前會被 Region Server 調(diào)用;在 put 操作之后則有 postPut 鉤子函數(shù)疙筹。

開發(fā)環(huán)境

  • maven-3.3.9
  • jdk 1.7
  • cdh-hbase-1.2.0
  • myeclipse 10

hbase協(xié)處理器加載

進(jìn)入hbase命令行

# hbase shell

hbase(main):> disable 'test'    

hbase(main):> alter 'test',CONFIGURATION => {'hbase.table.sanity.checks'=>'false'}         //-----》建立表后富俄,執(zhí)行一次就行

hbase(main):> alter 'test','coprocessor'=>'hdfs:///code/jars/regionObserver-put5.jar|com.hbase.observer.App|1001'   //----》加載jar包

hbase(main):> alter 'test', METHOD => 'table_att_unset', NAME => 'coprocessor$1'  //--------》卸載jar包

hbase(main):> desc 'test'    //-------》查看表的屬性描述

hbase(main):> enable 'test'

完整工程代碼

package com.hbase.observer;

/**
 * hbase 二級索引
 * @author wing
 * @createTime 2017-4-7 
 */
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

public class App extends BaseRegionObserver {

    private HTablePool pool = null;

    private final static String SOURCE_TABLE = "test";

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        pool = new HTablePool(env.getConfiguration(), 10);
    }

    @Override
    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> c,
            Get get, List<Cell> results) throws IOException {
        HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
        String newRowkey = Bytes.toString(get.getRow());
        String pre = newRowkey.substring(0, 1);

        if (pre.equals("t")) {
            String[] splits = newRowkey.split("_");
            String prepre = splits[0].substring(1, 3);
            String timestamp = splits[0].substring(3);
            String uid = splits[1];
            String mid = "";
            for (int i = 2; i < splits.length; i++) {
                mid += splits[i];
                mid += "_";
            }
            mid = mid.substring(0, mid.length() - 1);
            String rowkey = prepre + uid + "_" + timestamp + "_" + mid;
            System.out.println(rowkey);
            Get realget = new Get(rowkey.getBytes());
            Result result = table.get(realget);

            List<Cell> cells = result.listCells();
            results.clear();
            for (Cell cell : cells) {
                results.add(cell);
            }

        }
    }

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
            Put put, WALEdit edit, Durability durability) throws IOException {
        try {

            String rowkey = Bytes.toString(put.getRow());
            HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));

            String pre = rowkey.substring(0, 2);
            if (pre.equals("aa") || pre.equals("ab") || pre.equals("ac")
                    || pre.equals("ba") || pre.equals("bb") || pre.equals("bc")
                    || pre.equals("ca") || pre.equals("cb") || pre.equals("cc")) {
                String[] splits = rowkey.split("_");
                String uid = splits[0].substring(2);
                String timestamp = splits[1];
                String mid = "";
                for (int i = 2; i < splits.length; i++) {
                    mid += splits[i];
                    mid += "_";
                }
                mid = mid.substring(0, mid.length() - 1);
                String newRowkey = "t" + pre + timestamp + "_" + uid + "_"
                        + mid;
                System.out.println(newRowkey);
                Put indexput2 = new Put(newRowkey.getBytes());
                indexput2.addColumn("relation".getBytes(),
                        "column10".getBytes(), "45".getBytes());
                table.put(indexput2);

            }
            table.close();

        } catch (Exception ex) {

        }

    }

    @Override
    public boolean postScannerNext(
            ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s,
            List<Result> results, int limit, boolean hasMore)
            throws IOException {
        HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
        List<Result> newresults = new ArrayList<Result>();
        for (Result result : results) {
            String newRowkey = Bytes.toString(result.getRow());

            String pre = newRowkey.substring(0, 1);

            if (pre.equals("t")) {
                String[] splits = newRowkey.split("_");
                String prepre = splits[0].substring(1, 3);
                String timestamp = splits[0].substring(3);
                String uid = splits[1];
                String mid = "";
                for (int i = 2; i < splits.length; i++) {
                    mid += splits[i];
                    mid += "_";
                }
                mid = mid.substring(0, mid.length() - 1);
                String rowkey = prepre + uid + "_" + timestamp + "_" + mid;

                Get realget = new Get(rowkey.getBytes());
                Result newresult = table.get(realget);

                newresults.add(newresult);
            }

        }
         results.clear();
        for (Result result : newresults) {
            results.add(result);
        }

        return hasMore;

    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        pool.close();
    }
    
}

通過maven工程打包后上傳到hdfs相應(yīng)目錄,再通過命令加載jar包而咆。
即可完成二級索引筋粗。

  • 當(dāng)用戶put操作時前标,會將原rowkey,轉(zhuǎn)換為新的rowkey,再存一份索引嘁灯。
  • 當(dāng)用戶get操作時,會將rowkey映射為實(shí)際的rowkey,再根據(jù)實(shí)際的rowkey獲取實(shí)際的結(jié)果。
  • 當(dāng)用戶執(zhí)行scanner操作時望迎,會將scanner的結(jié)果映射為實(shí)際rowkey的結(jié)果,返回給用戶凌外。

通過hbase的BaseRegionObserver 協(xié)處理器辩尊,可以封裝處理很多hbase操作。

BaseRegionObserver的java接口(注意hbase版本)
https://hbase.apache.org/1.2/apidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末康辑,一起剝皮案震驚了整個濱河市摄欲,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌疮薇,老刑警劉巖胸墙,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異惦辛,居然都是意外死亡劳秋,警方通過查閱死者的電腦和手機(jī)仓手,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進(jìn)店門胖齐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人嗽冒,你說我怎么就攤上這事呀伙。” “怎么了添坊?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵剿另,是天一觀的道長。 經(jīng)常有香客問我贬蛙,道長雨女,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任阳准,我火速辦了婚禮氛堕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘野蝇。我一直安慰自己讼稚,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布绕沈。 她就那樣靜靜地躺著锐想,像睡著了一般。 火紅的嫁衣襯著肌膚如雪乍狐。 梳的紋絲不亂的頭發(fā)上赠摇,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼蝉稳。 笑死抒蚜,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的耘戚。 我是一名探鬼主播嗡髓,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼收津!你這毒婦竟也來了饿这?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤撞秋,失蹤者是張志新(化名)和其女友劉穎长捧,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吻贿,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡串结,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了舅列。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肌割。...
    茶點(diǎn)故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖帐要,靈堂內(nèi)的尸體忽然破棺而出把敞,到底是詐尸還是另有隱情,我是刑警寧澤榨惠,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布奋早,位于F島的核電站,受9級特大地震影響赠橙,放射性物質(zhì)發(fā)生泄漏耽装。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一期揪、第九天 我趴在偏房一處隱蔽的房頂上張望掉奄。 院中可真熱鬧,春花似錦横侦、人聲如沸挥萌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽引瀑。三九已至,卻和暖如春榨馁,著一層夾襖步出監(jiān)牢的瞬間憨栽,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留屑柔,地道東北人屡萤。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像掸宛,于是被迫代替她去往敵國和親死陆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容

  • HBase那些事 @(大數(shù)據(jù)工程學(xué)院)[HBase, Hadoop, 優(yōu)化, HadoopChen, hbase]...
    分癡閱讀 3,946評論 3 17
  • 最近在逐步跟進(jìn)Hbase的相關(guān)工作唧瘾,由于之前對Hbase并不怎么了解措译,因此系統(tǒng)地學(xué)習(xí)了下Hbase,為了加深對Hb...
    飛鴻無痕閱讀 50,243評論 19 272
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理饰序,服務(wù)發(fā)現(xiàn)领虹,斷路器,智...
    卡卡羅2017閱讀 134,693評論 18 139
  • Hbase架構(gòu)與原理 HBase是一個分布式的求豫、面向列的開源數(shù)據(jù)庫塌衰,該技術(shù)來源于 Fay Chang所撰寫的Goo...
    全能程序猿閱讀 86,298評論 2 37
  • HBase存儲架構(gòu)圖 HBase Master 為Region server分配region 負(fù)責(zé)Region s...
    kimibob閱讀 5,588評論 0 52