1背景
關于水塘抽樣的算法原理此處不再說明了, 本文重點是針對它的一種應用場景, 具體算法原理可參考水塘抽樣算法原理
2問題:
在編寫Spark程序時, 鑒于內存等資源不夠, 然而Hbase數據量又十分巨大(100億數據, 申請資源Spark核數以及內存較小), 此時在Spark應用程序中調用了repartition進行重新分區(qū), 導致了大量Shuffle 網絡IO, 很有可能使得Spark應用程序癱瘓. 因此想到一種委婉的解決方式, 使用水塘抽樣算法, 它可以在一開始不知道數據總量的情況下進行抽樣, 最終得到接近均勻的分區(qū). 具體解決原理是先進行掃描Hbase進行水塘抽樣,抽樣出rowKey, 然后分別針對這些rowKey進行單獨scan掃描處理, 由于處理的數據量小了就可以保證應用程序不崩潰. 此處是保證可用性犧牲了性能.
解決方式:
- 使用水塘抽樣算法
- Hbase預分region,并配置合適的region 分隔算法, 保證每個region數據量不要太大, 按region掃描
3代碼
3.1水塘抽樣解決代碼
此處給出一個基本的Demo,
首先創(chuàng)建一個字節(jié)數組封裝類, 用于比較字節(jié)數組(rowKey)
package hbase;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Created by zhangkai12 on 2018/6/27.
*/
public class MyBytes implements Comparable<MyBytes> {
private byte[] bytes;
public MyBytes(byte[] bytes) {
this.bytes = bytes;
}
public byte[] getBytes() {
return bytes;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
@Override
public int compareTo(MyBytes o) {
return Bytes.compareTo(this.bytes, o.getBytes());
}
}
其次創(chuàng)建一個Hbase基本處理類
package hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import java.io.IOException;
public class HbaseUtil {
static Configuration conf = HBaseConfiguration.create();
private static Connection connection;
public HbaseUtil(String zkUrl) {
conf.set("hbase.zookeeper.quorum", zkUrl);
conf.set("hbase.client.operation.timeout", "60000");
}
public static synchronized Connection getConn() {
if (connection == null) {
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
return connection;
}
public static Table getTable(String tableName) throws IOException {
Table table = getConn().getTable(TableName.valueOf(tableName));
return table;
}
public static void closeTable(Table table) {
if (table == null) {
return;
}
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
最后就是最終的抽樣算法類, 此處的抽樣函數直接將Spark的scala代碼移植成java版本的.
package hbase;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Created by zhangkai12 on 2018/6/27.
*/
public class Sample {
private static final byte[] ROW_END = new byte[]{-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1};
private static final byte[] ROW_START = {0};
public static void main(String[] args) {
Table table = null;
ResultScanner scanner = null;
try {
table = HbaseUtil.getTable("myedge");
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("g"));
scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
byte[][] rowKeys = reservoirSampleAndCount(iterator, 20, 100);
System.out.println("----------------------sort before----------------------");
Arrays.stream(rowKeys).forEach(res -> {
System.out.println(" value " + Arrays.toString(res));
});
System.out.println("----------------------sort after----------------------");
Stream<MyBytes> sorted = Arrays.stream(rowKeys).map(res -> new MyBytes(res)).sorted();
Arrays.stream(rowKeys).map(res -> new MyBytes(res)).sorted().forEach(res -> {
System.out.println("****" + Arrays.toString(res.getBytes()));
});
List<MyBytes> collect = sorted.collect(Collectors.toList());
byte[] start = ROW_START;
byte[] end;
int sumNum = 0;
if (null != collect) {
for (int i = 0; i < collect.size() ; i++) {
end = collect.get(i).getBytes();
sumNum += scanWithStartAndEndRow(start,end);
start = end ;
}
start = collect.get(collect.size()-1).getBytes();
end = ROW_END;
sumNum += scanWithStartAndEndRow(start,end);
}
/**
* 不使用抽樣方法計算所有的sum
*/
int numNotSample = getSumWithNOSampleMethod();
System.out.println(sumNum + "---------------" + numNotSample);
boolean judge = (sumNum == numNotSample);
System.out.println(judge);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != scanner) {
scanner.close();
}
if (null != table) {
HbaseUtil.closeTable(table);
}
}
}
/**
* 算法描述
* 從S中抽取首k項放入「水塘」中
對于每一個S[j]項(j ≥ k):
隨機產生一個范圍0到j的整數r
若 r < k 則把水塘中的第r項換成S[j]項
*/
public static byte[][] reservoirSampleAndCount(Iterator<Result> input, int k, long seed) {
byte[][] reservoir = new byte[k][];
int i = 0;
while (i < k && input.hasNext()) {
Result next = input.next();
reservoir[i] = next.getRow();
i ++;
}
if (i < k) {
byte[][] trimReservoir = new byte[i][];
System.arraycopy(reservoir, 0, trimReservoir, 0, i);
return trimReservoir;
} else {
Random random = new Random(seed);
while (input.hasNext()) {
byte[] item = input.next().getRow();
int replacementIndex = random.nextInt(i);
if (replacementIndex < k) {
reservoir[replacementIndex] = item;
}
i ++;
}
return reservoir;
}
}
private static int scanWithStartAndEndRow(byte[] start, byte[] end) {
int num = 0;
Table table = null;
ResultScanner scanner = null;
try {
table = HbaseUtil.getTable("myedge");
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("g"));
scan.setStartRow(start);
scan.setStopRow(end);
scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
while (iterator.hasNext()) {
Result next = iterator.next();
byte[] row = next.getRow();
String s = Bytes.toString(row);
System.out.println(Arrays.toString(row) + "---" + s);
num ++;
}
System.out.println("---num-----: " + num);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != scanner) {
scanner.close();
}
if (null != table) {
HbaseUtil.closeTable(table);
}
}
return num;
}
private static int getSumWithNOSampleMethod() {
int num = 0;
Table table = null;
ResultScanner scanner = null;
try {
table = HbaseUtil.getTable("myedge");
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("g"));
scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
while (iterator.hasNext()) {
Result next = iterator.next();
num ++;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != scanner) {
scanner.close();
}
if (null != table) {
HbaseUtil.closeTable(table);
}
}
return num;
}
}
3.2 按Region進行數據處理
package hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* Created by zhangkai12 on 2018/6/27.
*/
public class HbaseRegion {
public static void main(String[] args) {
try {
checkTable("Janus321");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void checkTable(String tabName) throws Exception {
TableName tn = TableName.valueOf(tabName);
Configuration config = HBaseConfiguration.create();
HRegionInfo regionInfo;
Connection connection = null;
Admin admin = null;
Table table = null;
try {
connection = ConnectionFactory.createConnection(config);
admin = connection.getAdmin();
table = connection.getTable(tn);
if (!admin.tableExists(TableName.valueOf(tabName))) {
return;
}
List<HRegionInfo> lr = admin.getTableRegions(tn);
Result r = null;
if (lr == null) {
System.out.print("No region found for table " + tabName);
}
// 遍歷表的每個region
Iterator<HRegionInfo> ir = lr.iterator();
int i = 1;
while (ir.hasNext()) {
regionInfo = ir.next();
ResultScanner scanner = null;
byte[] startRowkey = regionInfo.getStartKey();
System.out.println("----start----" + Bytes.toString(startRowkey));
byte[] endKey = regionInfo.getEndKey();
System.out.println("----end----" + Bytes.toString(endKey));
Scan sc = new Scan();
sc.setBatch(1);
sc.setStartRow(startRowkey);
sc.setStopRow(endKey);
try {
scanner = table.getScanner(sc);
Iterator<Result> iterator = scanner.iterator();
while (iterator.hasNext()) {
Result next = iterator.next();
byte[] row = next.getRow();
System.out.println("第" + i + " 批 " + Arrays.toString(row));
}
} finally {
if (null != scanner) {
scanner.close();
}
}
i ++;
}
}catch (Exception e) {
} finally {
if (null != table) {
table.close();
}
if (null != admin) {
admin.close();
}
if (null != connection) {
connection.close();
}
}
}
}
4水塘抽樣結果圖
水塘抽樣結果圖
根據水塘抽樣結果圖可以發(fā)現(xiàn), 我們通過抽樣算法處理的數據總量和不使用抽樣算法的數據總量是一致的,也就保證了算法的準確性.