Hbase Scan 主要流程分析.md

Hbase Scan 流程分析

公司在集群在從0.94.6升到0.98.6-cdh5.2.0后, 原來(lái)執(zhí)行的hbase scan 任務(wù)出現(xiàn)很多問(wèn)題.

表現(xiàn)在:

  • setBatch() 與filter 不兼容, 導(dǎo)致代碼需要修改(刪掉setBatch())

  • scan 效率變慢, 并且經(jīng)常超時(shí). mapper 報(bào)OutOfOrderException. 影響任務(wù)執(zhí)行效率.

所以有必要在理解scan 流程的基礎(chǔ)上, 進(jìn)行優(yōu)化.

從應(yīng)用hbase角度來(lái)講, 需要理解scan 幾個(gè)配置. 包括setCaching() setBatch(), 以及scan的過(guò)程.

另外一個(gè), 是過(guò)濾器如何工作. 在哪一步發(fā)揮作用.

1, Hbase MR

  • Hbase MR 主流程代碼

Configurationconf= getConf();

Scanscan= buildScan(conf);

//初始化認(rèn)證

TableMapReduceUtil.initCredentials(job);

// run job

TableMapReduceUtil.initTableMapperJob(tableName,scan, HbaseSearchCheckerMapper.class, Text.class, Text.class,job);

  • 創(chuàng)建Scan實(shí)例

Scanscan=newScan();

scan.setCaching(1000);

scan.setCacheBlocks(false);// no read cache

scan.addFamily(Bytes.toBytes("t"));

  • 和普通MR程序一樣定義mapper類 map 函數(shù)

public void map(ImmutableBytesWritable key, Result value, Context context)

throws IOException, InterruptedException {

String rowKey = null;

for (Cell kv : value.rawCells()) {

if(rowKey == null) {

rowKey = Bytes.toString(CellUtil.cloneRow(kv));

}

count ++;

}

}

以上是Hbase MR 程序. 非常簡(jiǎn)單.

但是在執(zhí)行過(guò)程中, scan 實(shí)例的設(shè)置影響最后的性能. 一般情況下, 可以通過(guò)過(guò)濾器filter, 來(lái)優(yōu)化scan.

所以有必要了解scan 的流程.

2, Scan 流程

Hbase 結(jié)構(gòu)
  • Hbase 通過(guò)Region Server 管理數(shù)據(jù)

  • Hbase 作為列式數(shù)據(jù)庫(kù), 不同種類數(shù)據(jù)通過(guò)CF管理 [ 分為不同目錄](méi) ,數(shù)據(jù)保存在HFile中. 內(nèi)存中數(shù)據(jù)在flush前保留在MemStore中.

  • Hbase RowKey 結(jié)構(gòu). rowKey 是有序的.

Scan client 流程
  • Scan 類

hbase-client 下package:org.apache.hadoop.hbase.client

這個(gè)相當(dāng)于是配置類, 作用是構(gòu)造用戶scan 的條件. 也就是用戶buildScan時(shí)的配置.

比如用戶需要scan的column, column會(huì)轉(zhuǎn)化為如下結(jié)構(gòu).


Map> fams = scan.getFamilyMap();

注意下這倆函數(shù).


publicScan addFamily(byte[]family) {

familyMap.remove(family);

familyMap.put(family,null);

returnthis;

}

publicScan addColumn(byte[]family,byte[]qualifier) {

NavigableSet set = familyMap.get(family);

if(set ==null) {

set =newTreeSet(Bytes.BYTES_COMPARATOR);

}

if(qualifier ==null) {

qualifier = HConstants.EMPTY_BYTE_ARRAY;

}

set.add(qualifier);

familyMap.put(family, set);

returnthis;

}

這里有個(gè)坑. 如果你調(diào)用addFamily, 然后在用addColumn, 那么顯然addFamily等于是無(wú)效的. 需要注意.

  • ClientScanner 類

這個(gè)類是封裝了scan請(qǐng)求, 以及返回結(jié)構(gòu).

在構(gòu)造這個(gè)類的時(shí)候, 調(diào)用nextScanner 構(gòu)造


ScannerCallable callable = getScannerCallable(localStartKey, nbRows);

發(fā)起scan請(qǐng)求:callable.call():


this.scannerId = openScanner();

request = RequestConverter.buildScanRequest(scannerId, caching,false, nextCallSeq);

response = getStub().scan(controller, request);

// Results are returned via controller

CellScanner cellScanner = controller.cellScanner();

rrs = ResponseConverter.getResults(cellScanner, response);

client 發(fā)起scan請(qǐng)求, 并接受返回的結(jié)果.

Scan Server 流程
  • Scanner 在server 結(jié)構(gòu)

1), 請(qǐng)求到RegionServer 構(gòu)造 RegionScanner

2), Region Server 管理一堆ColumnFamily. 構(gòu)造StoreFileScanner, 包括MemStoreScanner

3),StoreFile 管理一堆HFile, 構(gòu)造HFileScanner. 這個(gè)是實(shí)際讀取數(shù)據(jù)的地方.

這里有一個(gè)問(wèn)題. 過(guò)濾器是在哪一步來(lái)執(zhí)行的?

  • RegionSanner

實(shí)際在HRegion中, 通過(guò)RegionScannerImpl構(gòu)造

HRegion:


protectedRegionScanner getScanner(Scan scan,

List additionalScanners)throwsIOException {

startRegionOperation(Operation.SCAN);

try{

// Verify families are all valid

prepareScanner(scan);

if(scan.hasFamilies()) {

for(byte[] family : scan.getFamilyMap().keySet()) {

checkFamily(family);

}

}

returninstantiateRegionScanner(scan, additionalScanners);

}finally{

closeRegionOperation(Operation.SCAN);

}

}

//instantiateRegionScanner 函數(shù)里:

returnnewRegionScannerImpl(scan, additionalScanners,this);

  • StoreScanner 類:

RegionScannerImpl 構(gòu)造StoreScanner:


for(Map.Entry> entry :

scan.getFamilyMap().entrySet()) {

Store store = stores.get(entry.getKey());

KeyValueScanner scanner = store.getScanner(scan, entry.getValue(),this.readPt);

if(this.filter ==null|| !scan.doLoadColumnFamiliesOnDemand()

||this.filter.isFamilyEssential(entry.getKey())) {

scanners.add(scanner);

}else{

joinedScanners.add(scanner);

}

}

  • Server Scan 處理流程

1), HRegionServer 獲取到Scan的RPC請(qǐng)求

前面client提到:

response = getStub().scan(controller, request);

getStub() 返回的是ClientService.BlockingInterface

HRegionServer 的定義:

public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,

所以HRegionServer 其實(shí)是繼承自一個(gè)protobuf 類. 可以方便的交換數(shù)據(jù). 并且定義了scan接口. 調(diào)用方式可能是通過(guò)序列化, 反射的方式來(lái)執(zhí)行.這個(gè)后面再說(shuō).

2), scan 流程


// 初始化

region = getRegion(request.getRegion());

ClientProtos.Scan protoScan = request.getScan();

Scan scan = ProtobufUtil.toScan(protoScan);

region.prepareScanner(scan);

RegionScannerscanner = region.getScanner(scan);

// scan 函數(shù):

region.startRegionOperation(Operation.SCAN);

while (i < rows) {

// Stop collecting results if maxScannerResultSize is set and we have exceeded it

if ((maxScannerResultSize < Long.MAX_VALUE) &&

(currentScanResultSize >= maxResultSize)) {

break;

}

// Collect values to be returned here

boolean moreRows = scanner.nextRaw(values);

if (!values.isEmpty()) {

for (Cell cell : values) {

KeyValue kv = KeyValueUtil.ensureKeyValue(cell);

currentScanResultSize += kv.heapSize();

totalKvSize += kv.getLength();

}

results.add(Result.create(values));

i++;

}

if (!moreRows) {

break;

}

values.clear();

}

注意上面的rows 就是scan.setCaching(rows)的設(shè)置的.

核心的代碼是 RegionScannerImp 實(shí)現(xiàn)的nextRaw函數(shù)

nextRaw 調(diào)用nextInternal 函數(shù)


while (true) {

// First, check if we are at a stop row. If so, there are no more results.

if (stopRow) {

if (filter != null && filter.hasFilterRow()) {

filter.filterRowCells(results);

}

return false;

}

// Check if rowkey filter wants to exclude this row. If so, loop to next.

// Technically, if we hit limits before on this row, we don't need this call.

if (filterRowKey(currentRow, offset, length)) {

boolean moreRows = nextRow(currentRow, offset, length);

if (!moreRows) return false;

results.clear();

continue;

}

KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,

length);

// Ok, we are good, let's try to get some results from the main heap.

if (nextKv == KV_LIMIT) {

if (this.filter != null && filter.hasFilterRow()) {

throw new IncompatibleFilterException(

"Filter whose hasFilterRow() returns true is incompatible with scan with limit!");

}

return true; // We hit the limit.

}

FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;

if (filter != null && filter.hasFilterRow()) {

ret = filter.filterRowCellsWithRet(results);

}

if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {

results.clear();

boolean moreRows = nextRow(currentRow, offset, length);

if (!moreRows) return false;

// This row was totally filtered out, if this is NOT the last row,

// we should continue on. Otherwise, nothing else to do.

if (!stopRow) continue;

return false;

}

} else {

// Populating from the joined heap was stopped by limits, populate some more.

populateFromJoinedHeap(results, limit);

}

}

// nextRow 代碼:

protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {

assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";

KeyValue next;

while ((next = this.storeHeap.peek()) != null &&

next.matchingRow(currentRow, offset, length)) {

this.storeHeap.next(MOCKED_LIST);

}

resetFilters();

// Calling the hook in CP which allows it to do a fast forward

return this.region.getCoprocessorHost() == null

|| this.region.getCoprocessorHost()

.postScannerFilterRow(this, currentRow, offset, length);

}

注意, 這里的limit 是setBatch(N)設(shè)置的.

也就是說(shuō), 這里的limit是不能設(shè)置的. 否則也會(huì)報(bào)錯(cuò).

但是如果不設(shè)置, scan 會(huì)找到一個(gè)rowKey 一行所有的列. 對(duì)于某些較大的數(shù)據(jù), 就會(huì)非常慢.

如上代碼里, 過(guò)濾器在哪一步執(zhí)行也非常明顯了.

那么之前提到的StoreScanner 在哪里呢, 也就是真正去讀HFile文件的地方呢?

答案來(lái)了:


// scanners 就是之前定義的CF scanner

this.storeHeap =newKeyValueHeap(scanners, region.comparator);

//this.storeHeap.peek():

我們知道, StoreScanner 下管理很多的HFile. 這相當(dāng)于是一個(gè)多路歸并拉數(shù)據(jù)的算法.

這塊的調(diào)用比較復(fù)雜. 但是看StoreScanner下next() 函數(shù)

這里是個(gè)遞歸的調(diào)用:


LOOP: while((kv = this.heap.peek()) != null) {

if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.

checkScanOrder(prevKV, kv, comparator);

prevKV = kv;

ScanQueryMatcher.MatchCode qcode = matcher.match(kv);

switch(qcode) {

case INCLUDE:

case INCLUDE_AND_SEEK_NEXT_ROW:

case INCLUDE_AND_SEEK_NEXT_COL:

this.countPerRow++;

if (storeLimit > -1 &&

this.countPerRow > (storeLimit + storeOffset)) {

// do what SEEK_NEXT_ROW does.

if (!matcher.moreRowsMayExistAfter(kv)) {

return false;

}

seekToNextRow(kv);

break LOOP;

}

// add to results only if we have skipped #storeOffset kvs

// also update metric accordingly

if (this.countPerRow > storeOffset) {

outResult.add(kv);

count++;

}

if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {

if (!matcher.moreRowsMayExistAfter(kv)) {

return false;

}

seekToNextRow(kv);

} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {

seekAsDirection(matcher.getKeyForNextColumn(kv));

} else {

this.heap.next();

}

if (limit > 0 && (count == limit)) {

break LOOP;

}

continue;

default:

throw new RuntimeException("UNEXPECTED");

}

}

注意storeLimit變量


java:

// set storeLimit

this.storeLimit = scan.getMaxResultsPerColumnFamily();

也就是, 在這里, 可以通過(guò)這個(gè)給函數(shù). 在實(shí)際scan table的時(shí)候, 對(duì)那些column 非常多的行, 做過(guò)濾. 實(shí)際上不需要所有的行都讀.

這樣可以近似的加快數(shù)據(jù)的統(tǒng)計(jì).

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末侵状,一起剝皮案震驚了整個(gè)濱河市智嚷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌吊洼,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,561評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件集灌,死亡現(xiàn)場(chǎng)離奇詭異随珠,居然都是意外死亡蝶俱,警方通過(guò)查閱死者的電腦和手機(jī)笛匙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)侨把,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)犀变,“玉大人,你說(shuō)我怎么就攤上這事秋柄』裰Γ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,162評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵骇笔,是天一觀的道長(zhǎng)映琳。 經(jīng)常有香客問(wèn)我,道長(zhǎng)蜘拉,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,470評(píng)論 1 283
  • 正文 為了忘掉前任有鹿,我火速辦了婚禮旭旭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘葱跋。我一直安慰自己持寄,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,550評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布娱俺。 她就那樣靜靜地躺著稍味,像睡著了一般。 火紅的嫁衣襯著肌膚如雪荠卷。 梳的紋絲不亂的頭發(fā)上模庐,一...
    開(kāi)封第一講書(shū)人閱讀 49,806評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音油宜,去河邊找鬼掂碱。 笑死,一個(gè)胖子當(dāng)著我的面吹牛慎冤,可吹牛的內(nèi)容都是我干的疼燥。 我是一名探鬼主播,決...
    沈念sama閱讀 38,951評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼蚁堤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼醉者!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起披诗,我...
    開(kāi)封第一講書(shū)人閱讀 37,712評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤撬即,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后藤巢,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體搞莺,經(jīng)...
    沈念sama閱讀 44,166評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,510評(píng)論 2 327
  • 正文 我和宋清朗相戀三年掂咒,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了才沧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迈喉。...
    茶點(diǎn)故事閱讀 38,643評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖温圆,靈堂內(nèi)的尸體忽然破棺而出挨摸,到底是詐尸還是另有隱情,我是刑警寧澤岁歉,帶...
    沈念sama閱讀 34,306評(píng)論 4 330
  • 正文 年R本政府宣布得运,位于F島的核電站,受9級(jí)特大地震影響锅移,放射性物質(zhì)發(fā)生泄漏熔掺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,930評(píng)論 3 313
  • 文/蒙蒙 一非剃、第九天 我趴在偏房一處隱蔽的房頂上張望置逻。 院中可真熱鬧,春花似錦备绽、人聲如沸券坞。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,745評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)恨锚。三九已至,卻和暖如春倍靡,著一層夾襖步出監(jiān)牢的瞬間猴伶,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,983評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工塌西, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蜗顽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,351評(píng)論 2 360
  • 正文 我出身青樓雨让,卻偏偏與公主長(zhǎng)得像雇盖,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子栖忠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,509評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容