Hbase - 自定義Rowkey規(guī)則

> 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢,這時候我們就可以定制`TableInputFormat`來實(shí)現(xiàn)我們的需求了,我們還可以采用Flink的`DataSet`的方式讀取,另外下面還有`Spark`讀取的例子。

## 使用教程

Md5Util.java

```

import org.apache.commons.codec.binary.Hex;

import java.security.MessageDigest;

import java.security.NoSuchAlgorithmException;

public class Md5Util {

? ? public static String md5(byte[] key) {

? ? ? ? return md5(key, 0, key.length);

? ? }

? ? public static String md5(byte[] key, int offset, int length) {

? ? ? ? try {

? ? ? ? ? ? MessageDigest e = MessageDigest.getInstance("MD5");

? ? ? ? ? ? e.update(key, offset, length);

? ? ? ? ? ? byte[] digest = e.digest();

? ? ? ? ? ? return new String(Hex.encodeHex(digest));

? ? ? ? } catch (NoSuchAlgorithmException var5) {

? ? ? ? ? ? throw new RuntimeException("Error computing MD5 hash", var5);

? ? ? ? }

? ? }

? ? public static String md5(String str) {

? ? ? ? return md5(str.getBytes());

? ? }

? ? public static String md5(String str,int offset, int length) {

? ? ? ? return md5(str.getBytes(),offset,length);

? ? }

}

```

數(shù)據(jù)`Split`方式

```

private Connection connection;

? ? private Admin admin;

? ? @Before

? ? public void init() throws Exception {

? ? ? ? System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");

? ? ? ? System.setProperty("sun.security.krb5.debug", "false");

? ? ? ? final String user = "hbase/abc.demo.com@DEMO.COM";

? ? ? ? final String keyPath = "/home/dounine/kerberos/lake.keytab";

? ? ? ? Configuration conf = new Configuration();

? ? ? ? conf.addResource("hbase-site.xml");

? ? ? ? UserGroupInformation.setConfiguration(conf);

? ? ? ? UserGroupInformation.loginUserFromKeytab(user, keyPath);

? ? ? ? connection = ConnectionFactory.createConnection(conf);

? ? ? ? admin = connection.getAdmin();

? ? }

@Test

? ? public void createTable() throws IOException {

? ? ? ? TableName table = TableName.valueOf("logTable1");

? ? ? ? TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table);

? ? ? ? tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName());

? ? ? ? tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2");

? ? ? ? ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build();

? ? ? ? ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build();

? ? ? ? ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build();

? ? ? ? tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF));

? ? ? ? try {

? ? ? ? ? ? byte[][] splitKeys = new byte[4][];

? ? ? ? ? ? splitKeys[0] = Bytes.toBytes("00");

? ? ? ? ? ? splitKeys[1] = Bytes.toBytes("40");

? ? ? ? ? ? splitKeys[2] = Bytes.toBytes("80");

? ? ? ? ? ? splitKeys[3] = Bytes.toBytes("c0");

? ? ? ? ? ? admin.createTable(tableDesc.build(),splitKeys);

? ? ? ? } catch (IOException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

```

`logTable1`數(shù)據(jù)寫入方式

```

public class HbaseKerberos{

? ? private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class);

? ? private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");

? ? private static final String TABLE_NAME = "logTable1";

? ? public void insertDataToHbase1(String appKey,List<Log> hasDatas) throws IOException {

? ? ? ? Table table = HbaseUtils.getTable(TABLE_NAME);

? ? ? ? Long sumCount = 0L;

? ? ? ? /**

? ? ? ? * 常規(guī)值

? ? ? ? */

? ? ? ? byte[] extCF = Bytes.toBytes("ext");//CF列族

? ? ? ? Random random = new Random();

? ? ? ? List<Put> rows = new ArrayList<>();

? ? ? ? for (Log logEntity : hasDatas) {

? ? ? ? ? ? JSONObject dataJsonObject = logEntity.getData();

? ? ? ? ? ? JSONObject extJsonObject = dataJsonObject.getJSONObject("ext");

? ? ? ? ? ? String userId = extJsonObject.getString("userId");

? ? ? ? ? ? String timeStr = logEntity.getTime().format(dtf);

? ? ? ? ? ? String md5Str = Md5Util.md5(userId);

? ? ? ? ? ? String rowKey = new StringBuilder()

? ? ? ? ? ? ? ? ? ? .append(md5Str.substring(0,2))//md5出來的前兩位最高為ff,00~ff為256位蝇狼,后期Region可以增加那么多,足夠使用了。

? ? ? ? ? ? ? ? ? ? .append("|")

? ? ? ? ? ? ? ? ? ? .append(timeStr)//時間

? ? ? ? ? ? ? ? ? ? .append("|")

? ? ? ? ? ? ? ? ? ? .append(CrcUtil.getCrcValue(appKey))

? ? ? ? ? ? ? ? ? ? .append("|")

? ? ? ? ? ? ? ? ? ? .append(md5Str.substring(2,8))

? ? ? ? ? ? ? ? ? ? .append("|")

? ? ? ? ? ? ? ? ? ? .append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2))

? ? ? ? ? ? ? ? ? ? .toString();

? ? ? ? ? ? Put row = new Put(Bytes.toBytes(rowKey));

? ? ? ? ? ? for(String keyName : extJsonObject.keySet()){

? ? ? ? ? ? ? ? String value = extJsonObject.getString(keyName);

? ? ? ? ? ? ? ? if(StringUtils.isNotBlank(value)){

? ? ? ? ? ? ? ? ? ? row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value));

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString()));

? ? ? ? ? ? /**

? ? ? ? ? ? * 設(shè)備信息

? ? ? ? ? ? */

? ? ? ? ? ? putFieldToRow(logEntity.getData(),"device",row);

? ? ? ? ? ? /**

? ? ? ? ? ? * 位置信息

? ? ? ? ? ? */

? ? ? ? ? ? putFieldToRow(logEntity.getData(),"location",row);

? ? ? ? ? ? rows.add(row);

? ? ? ? }

? ? ? ? for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){

? ? ? ? ? ? Object[] results = new Object[(durtation[1]-durtation[0])];

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? table.batch(rows.subList(durtation[0], durtation[1]),results);

? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? }

? ? ? ? ? ? sumCount += (durtation[1]-durtation[0]);

? ? ? ? }

? ? ? ? LOGGER.info("write data count:" + sumCount);

? ? }

}

```

`logTable1`數(shù)據(jù)

```

00|20180518203401772|2352356512|4519 column=ext:appKey, timestamp=1533646292389, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203401772|2352356512|4519 column=ext:channelCode, timestamp=1533646292389, value=guanlan-resurrection-002-? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203401772|2352356512|4519 column=ext:createDateTime, timestamp=1533646292389, value=1526646836093? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203401772|2352356512|4519 column=ext:retain, timestamp=1533646292389, value=17670? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203401772|2352356512|4519 column=ext:scene, timestamp=1533646292389, value=1007? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203401772|2352356512|4519 column=ext:shareId, timestamp=1533646292389, value=ogJmG5ItE_nBCS3pg5XCvGotGI1c? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203401772|2352356512|4519 column=ext:time, timestamp=1533646292389, value=2018-05-18T20:34:01? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203401772|2352356512|4519 column=ext:type, timestamp=1533646292389, value=login_in? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203401772|2352356512|4519 column=ext:userId, timestamp=1533646292389, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203406167|2352356512|4519 column=ext:appKey, timestamp=1533646347725, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e? ? ? ? ? ? ? ? ? ? ?

f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203406167|2352356512|4519 column=ext:channelCode, timestamp=1533646347725, value=guanlan-regular-001-? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203406167|2352356512|4519 column=ext:createDateTime, timestamp=1533646347725, value=1526646839075? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203406167|2352356512|4519 column=ext:retain, timestamp=1533646347725, value=17670? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203406167|2352356512|4519 column=ext:shareId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203406167|2352356512|4519 column=ext:time, timestamp=1533646347725, value=2018-05-18T20:34:06? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203406167|2352356512|4519 column=ext:type, timestamp=1533646347725, value=sharesuccess? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203406167|2352356512|4519 column=ext:userId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203407144|2352356512|5ca1 column=ext:appKey, timestamp=1533646294045, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e? ? ? ? ? ? ? ? ? ? ?

c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203407144|2352356512|5ca1 column=ext:createDateTime, timestamp=1533646294045, value=1526646849745? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203407144|2352356512|5ca1 column=ext:retain, timestamp=1533646294045, value=17670? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203407144|2352356512|5ca1 column=ext:scene, timestamp=1533646294045, value=1037? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203407144|2352356512|5ca1 column=ext:time, timestamp=1533646294045, value=2018-05-18T20:34:07? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

00|20180518203407144|2352356512|5ca1 column=ext:type, timestamp=1533646294045, value=login_in?

```

CustomTableInputFormat.java

```

import org.apache.commons.lang3.StringUtils;

import org.apache.hadoop.hbase.HRegionLocation;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.hadoop.hbase.mapreduce.TableSplit;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.hbase.util.Strings;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.net.DNS;

import java.io.IOException;

import java.net.InetAddress;

import java.net.InetSocketAddress;

import java.net.UnknownHostException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

public class CustomTableInputFormat extends TableInputFormat {

? ? private HashMap<InetAddress, String> reverseDNSCacheMap =

? ? ? ? ? ? new HashMap<>();

? ? private List<String> keys = new ArrayList<>();

? ? public CustomTableInputFormat(){

? ? ? ? super();

? ? ? ? for(int i =0;i<256;i++){

? ? ? ? ? ? keys.add(StringUtils.substring("00"+Integer.toHexString(i),-2));

? ? ? ? }

? ? }

? ? @Override

? ? public List<InputSplit> getSplits(JobContext context) throws IOException {

? ? ? ? super.initialize(context);

? ? ? ? TableName tableName = super.getTable().getName();

? ? ? ? RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin());

? ? ? ? List<InputSplit> splits = new ArrayList<>();

? ? ? ? for (String key : keys) {

? ? ? ? ? ? HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(key), false);

? ? ? ? ? ? InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());

? ? ? ? ? ? InetAddress regionAddress = isa.getAddress();

? ? ? ? ? ? String regionLocation;

? ? ? ? ? ? regionLocation = reverseDNS(regionAddress);

? ? ? ? ? ? byte[] regionName = location.getRegion().getRegionName();

? ? ? ? ? ? String encodedRegionName = location.getRegion().getEncodedName();

? ? ? ? ? ? long regionSize = sizeCalculator.getRegionSize(regionName);

? ? ? ? ? ? byte[] splitStart = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStartRow());

? ? ? ? ? ? byte[] splitStop = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStopRow());

? ? ? ? ? ? TableSplit split = new TableSplit(tableName, this.getScan(),

? ? ? ? ? ? ? ? ? ? splitStart, splitStop, regionLocation, encodedRegionName, regionSize);

? ? ? ? ? ? splits.add(split);

? ? ? ? }

? ? ? ? return splits;

? ? }

? ? String reverseDNS(InetAddress ipAddress) throws UnknownHostException {

? ? ? ? String hostName = this.reverseDNSCacheMap.get(ipAddress);

? ? ? ? if (hostName == null) {

? ? ? ? ? ? String ipAddressString = null;

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ipAddressString = DNS.reverseDns(ipAddress, null);

? ? ? ? ? ? } catch (Exception e) {

? ? ? ? ? ? ? ? ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();

? ? ? ? ? ? }

? ? ? ? ? ? if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);

? ? ? ? ? ? hostName = Strings.domainNamePointerToHostName(ipAddressString);

? ? ? ? ? ? this.reverseDNSCacheMap.put(ipAddress, hostName);

? ? ? ? }

? ? ? ? return hostName;

? ? }

}

```

## Flink例子

```

static Configuration conf;

? ? static {

? ? ? ? HadoopKrbLogin.login();

? ? ? ? conf = new Configuration();

? ? ? ? String tableName = "logTable1";

? ? ? ? conf.addResource("hbase-site.xml");

? ? ? ? Scan scan = new Scan();

? ? ? ? scan.setCaching(1000);

? ? ? ? scan.withStartRow("201805182039".getBytes());

? ? ? ? scan.withStopRow("201805182040".getBytes());

? ? ? ? scan.setCacheBlocks(false);

? ? ? ? conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName);

? ? ? ? ClientProtos.Scan proto = null;

? ? ? ? try {

? ? ? ? ? ? proto = ProtobufUtil.toScan(scan);

? ? ? ? } catch (IOException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? ? ? String ScanToString = Base64.encodeBytes(proto.toByteArray());

? ? ? ? conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString);

? ? }

? ? public static void main(String[] args) throws Exception {

? ? ? ? final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataSource<Tuple2<ImmutableBytesWritable, Result>> hbase = env.createInput(

? ? ? ? ? ? ? ? HadoopInputs.createHadoopInput(

? ? ? ? ? ? ? ? ? ? ? ? new CustomTableInputFormat(),

? ? ? ? ? ? ? ? ? ? ? ? ImmutableBytesWritable.class,

? ? ? ? ? ? ? ? ? ? ? ? Result.class,

? ? ? ? ? ? ? ? ? ? ? ? Job.getInstance(conf)

? ? ? ? ? ? ? ? )

? ? ? ? );

? ? ? ? DataSet<LogEntity> toTuple = hbase.map(

? ? ? ? ? ? ? ? new MapFunction<Tuple2<ImmutableBytesWritable, Result>, LogEntity>() {

? ? ? ? ? ? ? ? ? ? public LogEntity map(Tuple2<ImmutableBytesWritable, Result> record) throws Exception {

? ? ? ? ? ? ? ? ? ? ? ? Result result = record.f1;

? ? ? ? ? ? ? ? ? ? ? ? return result2Entity(result);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? });

}

private static LogEntity result2Entity(Result result) {

? ? ? ? JSONObject root = new JSONObject();

? ? ? ? JSONObject ext = new JSONObject();

? ? ? ? JSONObject device = new JSONObject();

? ? ? ? JSONObject location = new JSONObject();

? ? ? ? for (Cell cell : result.rawCells()) {

? ? ? ? ? ? byte[] family = CellUtil.cloneFamily(cell);

? ? ? ? ? ? byte[] column = CellUtil.cloneQualifier(cell);

? ? ? ? ? ? byte[] value = CellUtil.cloneValue(cell);

? ? ? ? ? ? String columnName = Bytes.toString(column);

? ? ? ? ? ? if ("ext".equals(Bytes.toString(family))) {

? ? ? ? ? ? ? ? if ("durationTime".equals(columnName)) {

? ? ? ? ? ? ? ? ? ? ext.put(columnName, Bytes.toLong(value));

? ? ? ? ? ? ? ? } else if ("time".equals(columnName)) {

? ? ? ? ? ? ? ? ? ? root.put(columnName, Bytes.toString(value));

? ? ? ? ? ? ? ? ? ? root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ext.put(columnName, Bytes.toString(value));

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } else if ("device".equals(Bytes.toString(family))) {

? ? ? ? ? ? ? ? device.put(columnName, Bytes.toString(value));

? ? ? ? ? ? } else if ("location".equals(Bytes.toString(family))) {

? ? ? ? ? ? ? ? location.put(columnName, Bytes.toString(value));

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? JSONObject data = new JSONObject();

? ? ? ? if (device.keySet().size() > 0) {

? ? ? ? ? ? data.put("device", device);

? ? ? ? }

? ? ? ? if (location.keySet().size() > 0) {

? ? ? ? ? ? data.put("location", location);

? ? ? ? }

? ? ? ? data.put("ext", ext);

? ? ? ? root.put("data", data);

? ? ? ? return JSON.parseObject(root.toString(), LogEntity.class);

? ? }

```

## Spark 例子

```

public class SimpleApp implements Serializable {

 static Configuration cfg = null;

? ? static {

? ? ? ? HadoopKrbLogin.login();

? ? ? ? cfg = new Configuration();

? ? ? ? String tableName = "logTable1";

? ? ? ? cfg.addResource("hbase-site.xml");

? ? ? ? Scan scan = new Scan();

? ? ? ? scan.setCaching(1000);

? ? ? ? scan.withStartRow("201805182039".getBytes());

? ? ? ? scan.withStopRow("201805182040".getBytes());

? ? ? ? scan.setCacheBlocks(false);

? ? ? ? cfg.set(TableInputFormat.INPUT_TABLE, tableName);

? ? ? ? ClientProtos.Scan proto = null;

? ? ? ? try {

? ? ? ? ? ? proto = ProtobufUtil.toScan(scan);

? ? ? ? } catch (IOException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? ? ? String ScanToString = Base64.encodeBytes(proto.toByteArray());

? ? ? ? cfg.set(TableInputFormat.SCAN, ScanToString);

? ? }

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf()

? ? ? ? ? ? ? ? .setMaster("local")

? ? ? ? ? ? ? ? .setAppName("HbaseDemo");

? ? ? ? JavaSparkContext jsc = new JavaSparkContext(sparkConf);

? ? ? ? JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =

? ? ? ? ? ? ? ? jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class);

? ? ? ? // do some transformation

? ? ? ? JavaRDD<LogEntity> rdd1 = hBaseRDD.mapPartitions((FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, LogEntity>)

? ? ? ? ? ? ? ? tuple2Iterator -> {

? ? ? ? ? ? ? ? ? ? List<LogEntity> logEntities = new ArrayList<>();

? ? ? ? ? ? ? ? ? ? while (tuple2Iterator.hasNext()) {

? ? ? ? ? ? ? ? ? ? ? ? Tuple2<ImmutableBytesWritable, Result> tuple = tuple2Iterator.next();

? ? ? ? ? ? ? ? ? ? ? ? Result result = tuple._2;

? ? ? ? ? ? ? ? ? ? ? ? String rowKey = Bytes.toString(result.getRow());

? ? ? ? ? ? ? ? ? ? ? ? logEntities.add(result2Entity(result));

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? return logEntities.iterator();

? ? ? ? ? ? ? ? });

}

private static LogEntity result2Entity(Result result) {

? ? ? ? JSONObject root = new JSONObject();

? ? ? ? JSONObject ext = new JSONObject();

? ? ? ? JSONObject device = new JSONObject();

? ? ? ? JSONObject location = new JSONObject();

? ? ? ? for (Cell cell : result.rawCells()) {

? ? ? ? ? ? byte[] family = CellUtil.cloneFamily(cell);

? ? ? ? ? ? byte[] column = CellUtil.cloneQualifier(cell);

? ? ? ? ? ? byte[] value = CellUtil.cloneValue(cell);

? ? ? ? ? ? String columnName = Bytes.toString(column);

? ? ? ? ? ? if ("ext".equals(Bytes.toString(family))) {

? ? ? ? ? ? ? ? if ("durationTime".equals(columnName)) {

? ? ? ? ? ? ? ? ? ? ext.put(columnName, Bytes.toLong(value));

? ? ? ? ? ? ? ? } else if ("time".equals(columnName)) {

? ? ? ? ? ? ? ? ? ? root.put(columnName, Bytes.toString(value));

? ? ? ? ? ? ? ? ? ? root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ext.put(columnName, Bytes.toString(value));

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } else if ("device".equals(Bytes.toString(family))) {

? ? ? ? ? ? ? ? device.put(columnName, Bytes.toString(value));

? ? ? ? ? ? } else if ("location".equals(Bytes.toString(family))) {

? ? ? ? ? ? ? ? location.put(columnName, Bytes.toString(value));

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? JSONObject data = new JSONObject();

? ? ? ? if (device.keySet().size() > 0) {

? ? ? ? ? ? data.put("device", device);

? ? ? ? }

? ? ? ? if (location.keySet().size() > 0) {

? ? ? ? ? ? data.put("location", location);

? ? ? ? }

? ? ? ? data.put("ext", ext);

? ? ? ? root.put("data", data);

? ? ? ? return JSON.parseObject(root.toString(), LogEntity.class);

? ? }

```

---

![](https://upload-images.jianshu.io/upload_images/9028759-07315bb8dadcd082.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末映九,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子瞎颗,更是在濱河造成了極大的恐慌件甥,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哼拔,死亡現(xiàn)場離奇詭異引有,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)倦逐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進(jìn)店門譬正,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人檬姥,你說我怎么就攤上這事曾我。” “怎么了健民?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵抒巢,是天一觀的道長。 經(jīng)常有香客問我秉犹,道長蛉谜,這世上最難降的妖魔是什么平酿? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮悦陋,結(jié)果婚禮上蜈彼,老公的妹妹穿的比我還像新娘。我一直安慰自己俺驶,他們只是感情好幸逆,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著暮现,像睡著了一般还绘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上栖袋,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天拍顷,我揣著相機(jī)與錄音,去河邊找鬼塘幅。 笑死昔案,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的电媳。 我是一名探鬼主播踏揣,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼匾乓!你這毒婦竟也來了捞稿?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤拼缝,失蹤者是張志新(化名)和其女友劉穎娱局,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體咧七,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡衰齐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了猪叙。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片娇斩。...
    茶點(diǎn)故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖穴翩,靈堂內(nèi)的尸體忽然破棺而出犬第,到底是詐尸還是另有隱情,我是刑警寧澤芒帕,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布歉嗓,位于F島的核電站,受9級特大地震影響背蟆,放射性物質(zhì)發(fā)生泄漏鉴分。R本人自食惡果不足惜哮幢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望志珍。 院中可真熱鬧橙垢,春花似錦、人聲如沸伦糯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽敛纲。三九已至喂击,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間淤翔,已是汗流浹背翰绊。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留旁壮,地道東北人监嗜。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像寡具,于是被迫代替她去往敵國和親秤茅。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容

  • > 要導(dǎo)入大量數(shù)據(jù)童叠,Hbase的BulkLoad是必不可少的,在導(dǎo)入歷史數(shù)據(jù)的時候课幕,我們一般會選擇使用BulkLo...
    kikiki1閱讀 622評論 0 2
  • 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢厦坛,這時候我們就可以定制Table...
    大豬大豬閱讀 1,316評論 0 16
  • 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢,這時候我們就可以定制Table...
    kikiki2閱讀 142評論 0 1
  • 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢乍惊,這時候我們就可以定制Table...
    大豬大豬閱讀 254評論 0 1
  • > 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢杜秸,這時候我們就可以定制`Ta...
    kikiki5閱讀 133評論 0 3