概述
HBase 是一款基于 Hadoop 的 key-value 數(shù)據(jù)庫,它提供了對 HDFS 上數(shù)據(jù)的高效隨機(jī)讀寫服務(wù)霎奢,完美地填補(bǔ)了 Hadoop MapReduce 僅適于批處理的缺陷偏瓤,正在被越來越多的用戶使用。作為 HBase 的一項(xiàng)重要特性椰憋,Coprocessor 在 HBase 0.92 版本中被加入厅克,并廣受歡迎
利用協(xié)處理器,用戶可以編寫運(yùn)行在 HBase Server 端的代碼橙依。HBase 支持兩種類型的協(xié)處理器证舟,Endpoint 和 Observer硕旗。Endpoint 協(xié)處理器類似傳統(tǒng)數(shù)據(jù)庫中的存儲過程,客戶端可以調(diào)用這些 Endpoint 協(xié)處理器執(zhí)行一段 Server 端代碼女责,并將 Server 端代碼的結(jié)果返回給客戶端進(jìn)一步處理漆枚,最常見的用法就是進(jìn)行聚集操作。如果沒有協(xié)處理器抵知,當(dāng)用戶需要找出一張表中的最大數(shù)據(jù)墙基,即 max 聚合操作,就必須進(jìn)行全表掃描刷喜,在客戶端代碼內(nèi)遍歷掃描結(jié)果残制,并執(zhí)行求最大值的操作。這樣的方法無法利用底層集群的并發(fā)能力掖疮,而將所有計算都集中到 Client 端統(tǒng)一執(zhí)行初茶,勢必效率低下。利用 Coprocessor浊闪,用戶可以將求最大值的代碼部署到 HBase Server 端恼布,HBase 將利用底層 cluster 的多個節(jié)點(diǎn)并發(fā)執(zhí)行求最大值的操作。即在每個 Region 范圍內(nèi)執(zhí)行求最大值的代碼搁宾,將每個 Region 的最大值在 Region Server 端計算出折汞,僅僅將該 max 值返回給客戶端。在客戶端進(jìn)一步將多個 Region 的最大值進(jìn)一步處理而找到其中的最大值盖腿。這樣整體的執(zhí)行效率就會提高很多字支。
另外一種協(xié)處理器叫做 Observer Coprocessor,這種協(xié)處理器類似于傳統(tǒng)數(shù)據(jù)庫中的觸發(fā)器奸忽,當(dāng)發(fā)生某些事件的時候這類協(xié)處理器會被 Server 端調(diào)用堕伪。Observer Coprocessor 就是一些散布在 HBase Server 端代碼中的 hook 鉤子,在固定的事件發(fā)生時被調(diào)用栗菜。比如:put 操作之前有鉤子函數(shù) prePut欠雌,該函數(shù)在 put 操作執(zhí)行前會被 Region Server 調(diào)用;在 put 操作之后則有 postPut 鉤子函數(shù)疙筹。
開發(fā)環(huán)境
- maven-3.3.9
- jdk 1.7
- cdh-hbase-1.2.0
- myeclipse 10
hbase協(xié)處理器加載
進(jìn)入hbase命令行
# hbase shell
hbase(main):> disable 'test'
hbase(main):> alter 'test',CONFIGURATION => {'hbase.table.sanity.checks'=>'false'} //-----》建立表后富俄,執(zhí)行一次就行
hbase(main):> alter 'test','coprocessor'=>'hdfs:///code/jars/regionObserver-put5.jar|com.hbase.observer.App|1001' //----》加載jar包
hbase(main):> alter 'test', METHOD => 'table_att_unset', NAME => 'coprocessor$1' //--------》卸載jar包
hbase(main):> desc 'test' //-------》查看表的屬性描述
hbase(main):> enable 'test'
完整工程代碼
package com.hbase.observer;
/**
* hbase 二級索引
* @author wing
* @createTime 2017-4-7
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
public class App extends BaseRegionObserver {
private HTablePool pool = null;
private final static String SOURCE_TABLE = "test";
@Override
public void start(CoprocessorEnvironment env) throws IOException {
pool = new HTablePool(env.getConfiguration(), 10);
}
@Override
public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> c,
Get get, List<Cell> results) throws IOException {
HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
String newRowkey = Bytes.toString(get.getRow());
String pre = newRowkey.substring(0, 1);
if (pre.equals("t")) {
String[] splits = newRowkey.split("_");
String prepre = splits[0].substring(1, 3);
String timestamp = splits[0].substring(3);
String uid = splits[1];
String mid = "";
for (int i = 2; i < splits.length; i++) {
mid += splits[i];
mid += "_";
}
mid = mid.substring(0, mid.length() - 1);
String rowkey = prepre + uid + "_" + timestamp + "_" + mid;
System.out.println(rowkey);
Get realget = new Get(rowkey.getBytes());
Result result = table.get(realget);
List<Cell> cells = result.listCells();
results.clear();
for (Cell cell : cells) {
results.add(cell);
}
}
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
try {
String rowkey = Bytes.toString(put.getRow());
HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
String pre = rowkey.substring(0, 2);
if (pre.equals("aa") || pre.equals("ab") || pre.equals("ac")
|| pre.equals("ba") || pre.equals("bb") || pre.equals("bc")
|| pre.equals("ca") || pre.equals("cb") || pre.equals("cc")) {
String[] splits = rowkey.split("_");
String uid = splits[0].substring(2);
String timestamp = splits[1];
String mid = "";
for (int i = 2; i < splits.length; i++) {
mid += splits[i];
mid += "_";
}
mid = mid.substring(0, mid.length() - 1);
String newRowkey = "t" + pre + timestamp + "_" + uid + "_"
+ mid;
System.out.println(newRowkey);
Put indexput2 = new Put(newRowkey.getBytes());
indexput2.addColumn("relation".getBytes(),
"column10".getBytes(), "45".getBytes());
table.put(indexput2);
}
table.close();
} catch (Exception ex) {
}
}
@Override
public boolean postScannerNext(
ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s,
List<Result> results, int limit, boolean hasMore)
throws IOException {
HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
List<Result> newresults = new ArrayList<Result>();
for (Result result : results) {
String newRowkey = Bytes.toString(result.getRow());
String pre = newRowkey.substring(0, 1);
if (pre.equals("t")) {
String[] splits = newRowkey.split("_");
String prepre = splits[0].substring(1, 3);
String timestamp = splits[0].substring(3);
String uid = splits[1];
String mid = "";
for (int i = 2; i < splits.length; i++) {
mid += splits[i];
mid += "_";
}
mid = mid.substring(0, mid.length() - 1);
String rowkey = prepre + uid + "_" + timestamp + "_" + mid;
Get realget = new Get(rowkey.getBytes());
Result newresult = table.get(realget);
newresults.add(newresult);
}
}
results.clear();
for (Result result : newresults) {
results.add(result);
}
return hasMore;
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
pool.close();
}
}
通過maven工程打包后上傳到hdfs相應(yīng)目錄,再通過命令加載jar包而咆。
即可完成二級索引筋粗。
- 當(dāng)用戶put操作時前标,會將原rowkey,轉(zhuǎn)換為新的rowkey,再存一份索引嘁灯。
- 當(dāng)用戶get操作時,會將rowkey映射為實(shí)際的rowkey,再根據(jù)實(shí)際的rowkey獲取實(shí)際的結(jié)果。
- 當(dāng)用戶執(zhí)行scanner操作時望迎,會將scanner的結(jié)果映射為實(shí)際rowkey的結(jié)果,返回給用戶凌外。
通過hbase的BaseRegionObserver 協(xié)處理器辩尊,可以封裝處理很多hbase操作。
BaseRegionObserver的java接口(注意hbase版本)
https://hbase.apache.org/1.2/apidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html