最近進(jìn)行 HBase 表跨集群遷移鸟妙,使用組內(nèi)同事給的方案 : bulkload谆棱,但是 bulkload 完之后出現(xiàn)了一系列預(yù)料之外的問題,記錄如下:
hbase 表跨集群遷移步驟
- 目標(biāo)集群創(chuàng)建 HBase 表圆仔,未進(jìn)行預(yù)分區(qū)垃瞧;
- 源集群 hbase 表對應(yīng)的 hdfs 路徑創(chuàng)建 snaphost ,注意此 snapshot 是 hdfs snaphost坪郭;
- distcp 拷貝 hdfs 快照數(shù)據(jù)到目標(biāo)集群
- 目標(biāo)集群遍歷拷貝數(shù)據(jù)目錄个从,逐個 bulkload 到 hbase
問題
bulkload 結(jié)束后,這張 700 億行的大表歪沃,只有一個 region嗦锐,花了近一周的時間 minor compaction,region split 好沪曙;
原先以為奕污,拷貝的數(shù)據(jù)里面除了列族數(shù)據(jù),還包含 region 信息(/region_id/.regioninfo)液走,本來以為 bulkload 會自動處理 region碳默,通過了解了一番源碼發(fā)現(xiàn)贾陷,事情并非如此。
源碼分析 bulkload 過程
1 初始化一個線程池嘱根,線程池 corePoolSize 來源于參數(shù)配置 hbase.loadincremental.threads.max髓废,如果未配置,默認(rèn)取 jvm 可以用到的處理器的個數(shù)(Runtime.getRuntime().availableProcessors())该抒。
2 遍歷搜索過濾出 HFile 文件:遍歷目錄慌洪,搜索 hfile。途中經(jīng)過一系列校驗(yàn)凑保,判斷是否有 families冈爹,family name 是否合法性,跳過非 hfile 文件欧引,例如 _ 開頭的文件频伤,引用,HFileLink维咸,最后判斷 HFile 格式的有效性。
掃描過程中會檢查 HFile 文件的大小是否超出 region 大小的閾值(hbase.hregion.max.filesize惠爽,未配置的話默認(rèn)是 10G)癌蓖,如果超出閾值,會打印提示這可能會導(dǎo)致出現(xiàn) oversplitting 的問題婚肆。
將遍歷后的 hfile 以對象 LoadQueueItem(byte[] family, Path hfilePath) 的方式放入隊(duì)列 :Deque租副。
這一步就把 .regioninfo 就排除掉了,所以這個拷貝過來的 region 信息對于 bulkload 是無用了较性。
famliy 存在性校驗(yàn):再經(jīng)過一次篩選用僧,判斷是否有獲取到的 family 是否是即將導(dǎo)入 HBase 表中的 family。
3 groupOrSplitPhase 階段
這個階段判斷 hfile 判斷應(yīng)該寫到表的哪個 region赞咙,如果跨 region 了责循,需要進(jìn)行 split。
獲取當(dāng)前表所有 region 的 startkeys endkeys
final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
拿到即將導(dǎo)入的 hfile 的 startkey攀操,通過二分查找算法在 startkey 列表里面搜索院仿,如果搜到匹配的 startkey 直接返回?cái)?shù)組索引值,沒搜到速和,返回插入點(diǎn)歹垫,插入點(diǎn)是第一個大于值的索引位置。
int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
Bytes.BYTES_COMPARATOR);
if (idx < 0) {
// not on boundary, returns -(insertion index). Calculate region it
// would be in.
idx = -(idx + 1) - 1;
}
final int indexForCallable = idx;
通過這個索引值颠放,判斷是否有跨 region 的 hfile排惨,有的話需要 split。怎么判斷是否不需要 split碰凶,也就是有合適的 region暮芭,滿足其中兩個條件之一即可:
- hfile 的 endkey 小于表 region 的 endkey
- 表 region endeky 為空鹿驼,說明是最后一個 region ,理所當(dāng)然可以寫入
boolean lastKeyInRange =
Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
if (!lastKeyInRange) {
// split key 即為匹配 region 的 endkey
List<LoadQueueItem> lqis = splitStoreFile(item, table,
startEndKeys.getFirst()[indexForCallable],
startEndKeys.getSecond()[indexForCallable]);
return lqis;
}
// group regions.
regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
"region. Splitting...");
拆分是把當(dāng)前 HFile 拆分成兩半谴麦,top 和 bottom 兩部分蠢沿,保留元數(shù)據(jù),重建 bloom 過濾等匾效,生成新的 HFile 舷蟀,拆分策略是:根據(jù)匹配 region 的 endkey 的位置拆分成兩個。
/**
* Split a storefile into a top and bottom half, maintaining
* the metadata, recreating bloom filters, etc.
*/
static void splitStoreFile(
Configuration conf, Path inFile,
HColumnDescriptor familyDesc, byte[] splitKey,
Path bottomOut, Path topOut) throws IOException
{
// Open reader with no block cache, and not in-memory
Reference topReference = Reference.createTopReference(splitKey);
Reference bottomReference = Reference.createBottomReference(splitKey);
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
}
bulkLoadPhase:bulkload 階段
計(jì)算出 region 信息之后面哼,就是正式的 load 階段野宜,最終定位到 HStore 里面的 bulkLoadFile 方法
通過 StoreFile reader 讀取 StoreFile ,獲取寫鎖魔策,往 storefile 中新增數(shù)據(jù)匈子。
private void bulkLoadHFile(StoreFile sf) throws IOException {
StoreFile.Reader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
// Append the new storefile into the list
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
// the lock.
this.lock.writeLock().unlock();
}
notifyChangedReadersObservers();
LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
if (LOG.isTraceEnabled()) {
String traceMessage = "BULK LOAD time,size,store size,store files ["
+ EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
+ "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
LOG.trace(traceMessage);
}
}
結(jié)論
對于待遷移的 HBase 大表, bulkload 前盡可能在建表時做好預(yù)分區(qū)