Hbase Scan 流程分析
公司在集群在從0.94.6升到0.98.6-cdh5.2.0后, 原來(lái)執(zhí)行的hbase scan 任務(wù)出現(xiàn)很多問(wèn)題.
表現(xiàn)在:
setBatch() 與filter 不兼容, 導(dǎo)致代碼需要修改(刪掉setBatch())
scan 效率變慢, 并且經(jīng)常超時(shí). mapper 報(bào)OutOfOrderException. 影響任務(wù)執(zhí)行效率.
所以有必要在理解scan 流程的基礎(chǔ)上, 進(jìn)行優(yōu)化.
從應(yīng)用hbase角度來(lái)講, 需要理解scan 幾個(gè)配置. 包括setCaching() setBatch(), 以及scan的過(guò)程.
另外一個(gè), 是過(guò)濾器如何工作. 在哪一步發(fā)揮作用.
1, Hbase MR
- Hbase MR 主流程代碼
Configurationconf= getConf();
Scanscan= buildScan(conf);
//初始化認(rèn)證
TableMapReduceUtil.initCredentials(job);
// run job
TableMapReduceUtil.initTableMapperJob(tableName,scan, HbaseSearchCheckerMapper.class, Text.class, Text.class,job);
- 創(chuàng)建Scan實(shí)例
Scanscan=newScan();
scan.setCaching(1000);
scan.setCacheBlocks(false);// no read cache
scan.addFamily(Bytes.toBytes("t"));
- 和普通MR程序一樣定義mapper類 map 函數(shù)
public void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
String rowKey = null;
for (Cell kv : value.rawCells()) {
if(rowKey == null) {
rowKey = Bytes.toString(CellUtil.cloneRow(kv));
}
count ++;
}
}
以上是Hbase MR 程序. 非常簡(jiǎn)單.
但是在執(zhí)行過(guò)程中, scan 實(shí)例的設(shè)置影響最后的性能. 一般情況下, 可以通過(guò)過(guò)濾器filter, 來(lái)優(yōu)化scan.
所以有必要了解scan 的流程.
2, Scan 流程
Hbase 結(jié)構(gòu)
Hbase 通過(guò)Region Server 管理數(shù)據(jù)
Hbase 作為列式數(shù)據(jù)庫(kù), 不同種類數(shù)據(jù)通過(guò)CF管理 [ 分為不同目錄](méi) ,數(shù)據(jù)保存在HFile中. 內(nèi)存中數(shù)據(jù)在flush前保留在MemStore中.
Hbase RowKey 結(jié)構(gòu). rowKey 是有序的.
Scan client 流程
- Scan 類
hbase-client 下package:org.apache.hadoop.hbase.client
這個(gè)相當(dāng)于是配置類, 作用是構(gòu)造用戶scan 的條件. 也就是用戶buildScan時(shí)的配置.
比如用戶需要scan的column, column會(huì)轉(zhuǎn)化為如下結(jié)構(gòu).
Map> fams = scan.getFamilyMap();
注意下這倆函數(shù).
publicScan addFamily(byte[]family) {
familyMap.remove(family);
familyMap.put(family,null);
returnthis;
}
publicScan addColumn(byte[]family,byte[]qualifier) {
NavigableSet set = familyMap.get(family);
if(set ==null) {
set =newTreeSet(Bytes.BYTES_COMPARATOR);
}
if(qualifier ==null) {
qualifier = HConstants.EMPTY_BYTE_ARRAY;
}
set.add(qualifier);
familyMap.put(family, set);
returnthis;
}
這里有個(gè)坑. 如果你調(diào)用addFamily, 然后在用addColumn, 那么顯然addFamily等于是無(wú)效的. 需要注意.
- ClientScanner 類
這個(gè)類是封裝了scan請(qǐng)求, 以及返回結(jié)構(gòu).
在構(gòu)造這個(gè)類的時(shí)候, 調(diào)用nextScanner 構(gòu)造
ScannerCallable callable = getScannerCallable(localStartKey, nbRows);
發(fā)起scan請(qǐng)求:callable.call():
this.scannerId = openScanner();
request = RequestConverter.buildScanRequest(scannerId, caching,false, nextCallSeq);
response = getStub().scan(controller, request);
// Results are returned via controller
CellScanner cellScanner = controller.cellScanner();
rrs = ResponseConverter.getResults(cellScanner, response);
client 發(fā)起scan請(qǐng)求, 并接受返回的結(jié)果.
Scan Server 流程
- Scanner 在server 結(jié)構(gòu)
1), 請(qǐng)求到RegionServer 構(gòu)造 RegionScanner
2), Region Server 管理一堆ColumnFamily. 構(gòu)造StoreFileScanner, 包括MemStoreScanner
3),StoreFile 管理一堆HFile, 構(gòu)造HFileScanner. 這個(gè)是實(shí)際讀取數(shù)據(jù)的地方.
這里有一個(gè)問(wèn)題. 過(guò)濾器是在哪一步來(lái)執(zhí)行的?
- RegionSanner
實(shí)際在HRegion中, 通過(guò)RegionScannerImpl構(gòu)造
HRegion:
protectedRegionScanner getScanner(Scan scan,
List additionalScanners)throwsIOException {
startRegionOperation(Operation.SCAN);
try{
// Verify families are all valid
prepareScanner(scan);
if(scan.hasFamilies()) {
for(byte[] family : scan.getFamilyMap().keySet()) {
checkFamily(family);
}
}
returninstantiateRegionScanner(scan, additionalScanners);
}finally{
closeRegionOperation(Operation.SCAN);
}
}
//instantiateRegionScanner 函數(shù)里:
returnnewRegionScannerImpl(scan, additionalScanners,this);
- StoreScanner 類:
RegionScannerImpl 構(gòu)造StoreScanner:
for(Map.Entry> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(),this.readPt);
if(this.filter ==null|| !scan.doLoadColumnFamiliesOnDemand()
||this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
}else{
joinedScanners.add(scanner);
}
}
- Server Scan 處理流程
1), HRegionServer 獲取到Scan的RPC請(qǐng)求
前面client提到:
response = getStub().scan(controller, request);
getStub() 返回的是ClientService.BlockingInterface
HRegionServer 的定義:
public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
所以HRegionServer 其實(shí)是繼承自一個(gè)protobuf 類. 可以方便的交換數(shù)據(jù). 并且定義了scan接口. 調(diào)用方式可能是通過(guò)序列化, 反射的方式來(lái)執(zhí)行.這個(gè)后面再說(shuō).
2), scan 流程
// 初始化
region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
Scan scan = ProtobufUtil.toScan(protoScan);
region.prepareScanner(scan);
RegionScannerscanner = region.getScanner(scan);
// scan 函數(shù):
region.startRegionOperation(Operation.SCAN);
while (i < rows) {
// Stop collecting results if maxScannerResultSize is set and we have exceeded it
if ((maxScannerResultSize < Long.MAX_VALUE) &&
(currentScanResultSize >= maxResultSize)) {
break;
}
// Collect values to be returned here
boolean moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
for (Cell cell : values) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
currentScanResultSize += kv.heapSize();
totalKvSize += kv.getLength();
}
results.add(Result.create(values));
i++;
}
if (!moreRows) {
break;
}
values.clear();
}
注意上面的rows 就是scan.setCaching(rows)的設(shè)置的.
核心的代碼是 RegionScannerImp 實(shí)現(xiàn)的nextRaw函數(shù)
nextRaw 調(diào)用nextInternal 函數(shù)
while (true) {
// First, check if we are at a stop row. If so, there are no more results.
if (stopRow) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRowCells(results);
}
return false;
}
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear();
continue;
}
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
length);
// Ok, we are good, let's try to get some results from the main heap.
if (nextKv == KV_LIMIT) {
if (this.filter != null && filter.hasFilterRow()) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
}
return true; // We hit the limit.
}
FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (filter != null && filter.hasFilterRow()) {
ret = filter.filterRowCellsWithRet(results);
}
if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
results.clear();
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!stopRow) continue;
return false;
}
} else {
// Populating from the joined heap was stopped by limits, populate some more.
populateFromJoinedHeap(results, limit);
}
}
// nextRow 代碼:
protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
KeyValue next;
while ((next = this.storeHeap.peek()) != null &&
next.matchingRow(currentRow, offset, length)) {
this.storeHeap.next(MOCKED_LIST);
}
resetFilters();
// Calling the hook in CP which allows it to do a fast forward
return this.region.getCoprocessorHost() == null
|| this.region.getCoprocessorHost()
.postScannerFilterRow(this, currentRow, offset, length);
}
注意, 這里的limit 是setBatch(N)設(shè)置的.
也就是說(shuō), 這里的limit是不能設(shè)置的. 否則也會(huì)報(bào)錯(cuò).
但是如果不設(shè)置, scan 會(huì)找到一個(gè)rowKey 一行所有的列. 對(duì)于某些較大的數(shù)據(jù), 就會(huì)非常慢.
如上代碼里, 過(guò)濾器在哪一步執(zhí)行也非常明顯了.
那么之前提到的StoreScanner 在哪里呢, 也就是真正去讀HFile文件的地方呢?
答案來(lái)了:
// scanners 就是之前定義的CF scanner
this.storeHeap =newKeyValueHeap(scanners, region.comparator);
//this.storeHeap.peek():
我們知道, StoreScanner 下管理很多的HFile. 這相當(dāng)于是一個(gè)多路歸并拉數(shù)據(jù)的算法.
這塊的調(diào)用比較復(fù)雜. 但是看StoreScanner下next() 函數(shù)
這里是個(gè)遞歸的調(diào)用:
LOOP: while((kv = this.heap.peek()) != null) {
if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
checkScanOrder(prevKV, kv, comparator);
prevKV = kv;
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
switch(qcode) {
case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW:
case INCLUDE_AND_SEEK_NEXT_COL:
this.countPerRow++;
if (storeLimit > -1 &&
this.countPerRow > (storeLimit + storeOffset)) {
// do what SEEK_NEXT_ROW does.
if (!matcher.moreRowsMayExistAfter(kv)) {
return false;
}
seekToNextRow(kv);
break LOOP;
}
// add to results only if we have skipped #storeOffset kvs
// also update metric accordingly
if (this.countPerRow > storeOffset) {
outResult.add(kv);
count++;
}
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (!matcher.moreRowsMayExistAfter(kv)) {
return false;
}
seekToNextRow(kv);
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
seekAsDirection(matcher.getKeyForNextColumn(kv));
} else {
this.heap.next();
}
if (limit > 0 && (count == limit)) {
break LOOP;
}
continue;
default:
throw new RuntimeException("UNEXPECTED");
}
}
注意storeLimit變量
java:
// set storeLimit
this.storeLimit = scan.getMaxResultsPerColumnFamily();
也就是, 在這里, 可以通過(guò)這個(gè)給函數(shù). 在實(shí)際scan table的時(shí)候, 對(duì)那些column 非常多的行, 做過(guò)濾. 實(shí)際上不需要所有的行都讀.
這樣可以近似的加快數(shù)據(jù)的統(tǒng)計(jì).