> 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢,這時候我們就可以定制`TableInputFormat`來實(shí)現(xiàn)我們的需求了,我們還可以采用Flink的`DataSet`的方式讀取,另外下面還有`Spark`讀取的例子。
## 使用教程
Md5Util.java
```
import org.apache.commons.codec.binary.Hex;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class Md5Util {
? ? public static String md5(byte[] key) {
? ? ? ? return md5(key, 0, key.length);
? ? }
? ? public static String md5(byte[] key, int offset, int length) {
? ? ? ? try {
? ? ? ? ? ? MessageDigest e = MessageDigest.getInstance("MD5");
? ? ? ? ? ? e.update(key, offset, length);
? ? ? ? ? ? byte[] digest = e.digest();
? ? ? ? ? ? return new String(Hex.encodeHex(digest));
? ? ? ? } catch (NoSuchAlgorithmException var5) {
? ? ? ? ? ? throw new RuntimeException("Error computing MD5 hash", var5);
? ? ? ? }
? ? }
? ? public static String md5(String str) {
? ? ? ? return md5(str.getBytes());
? ? }
? ? public static String md5(String str,int offset, int length) {
? ? ? ? return md5(str.getBytes(),offset,length);
? ? }
}
```
數(shù)據(jù)`Split`方式
```
private Connection connection;
? ? private Admin admin;
? ? @Before
? ? public void init() throws Exception {
? ? ? ? System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
? ? ? ? System.setProperty("sun.security.krb5.debug", "false");
? ? ? ? final String user = "hbase/abc.demo.com@DEMO.COM";
? ? ? ? final String keyPath = "/home/dounine/kerberos/lake.keytab";
? ? ? ? Configuration conf = new Configuration();
? ? ? ? conf.addResource("hbase-site.xml");
? ? ? ? UserGroupInformation.setConfiguration(conf);
? ? ? ? UserGroupInformation.loginUserFromKeytab(user, keyPath);
? ? ? ? connection = ConnectionFactory.createConnection(conf);
? ? ? ? admin = connection.getAdmin();
? ? }
@Test
? ? public void createTable() throws IOException {
? ? ? ? TableName table = TableName.valueOf("logTable1");
? ? ? ? TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table);
? ? ? ? tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName());
? ? ? ? tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2");
? ? ? ? ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build();
? ? ? ? ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build();
? ? ? ? ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build();
? ? ? ? tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF));
? ? ? ? try {
? ? ? ? ? ? byte[][] splitKeys = new byte[4][];
? ? ? ? ? ? splitKeys[0] = Bytes.toBytes("00");
? ? ? ? ? ? splitKeys[1] = Bytes.toBytes("40");
? ? ? ? ? ? splitKeys[2] = Bytes.toBytes("80");
? ? ? ? ? ? splitKeys[3] = Bytes.toBytes("c0");
? ? ? ? ? ? admin.createTable(tableDesc.build(),splitKeys);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
```
`logTable1`數(shù)據(jù)寫入方式
```
public class HbaseKerberos{
? ? private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class);
? ? private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
? ? private static final String TABLE_NAME = "logTable1";
? ? public void insertDataToHbase1(String appKey,List<Log> hasDatas) throws IOException {
? ? ? ? Table table = HbaseUtils.getTable(TABLE_NAME);
? ? ? ? Long sumCount = 0L;
? ? ? ? /**
? ? ? ? * 常規(guī)值
? ? ? ? */
? ? ? ? byte[] extCF = Bytes.toBytes("ext");//CF列族
? ? ? ? Random random = new Random();
? ? ? ? List<Put> rows = new ArrayList<>();
? ? ? ? for (Log logEntity : hasDatas) {
? ? ? ? ? ? JSONObject dataJsonObject = logEntity.getData();
? ? ? ? ? ? JSONObject extJsonObject = dataJsonObject.getJSONObject("ext");
? ? ? ? ? ? String userId = extJsonObject.getString("userId");
? ? ? ? ? ? String timeStr = logEntity.getTime().format(dtf);
? ? ? ? ? ? String md5Str = Md5Util.md5(userId);
? ? ? ? ? ? String rowKey = new StringBuilder()
? ? ? ? ? ? ? ? ? ? .append(md5Str.substring(0,2))//md5出來的前兩位最高為ff,00~ff為256位蝇狼,后期Region可以增加那么多,足夠使用了。
? ? ? ? ? ? ? ? ? ? .append("|")
? ? ? ? ? ? ? ? ? ? .append(timeStr)//時間
? ? ? ? ? ? ? ? ? ? .append("|")
? ? ? ? ? ? ? ? ? ? .append(CrcUtil.getCrcValue(appKey))
? ? ? ? ? ? ? ? ? ? .append("|")
? ? ? ? ? ? ? ? ? ? .append(md5Str.substring(2,8))
? ? ? ? ? ? ? ? ? ? .append("|")
? ? ? ? ? ? ? ? ? ? .append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2))
? ? ? ? ? ? ? ? ? ? .toString();
? ? ? ? ? ? Put row = new Put(Bytes.toBytes(rowKey));
? ? ? ? ? ? for(String keyName : extJsonObject.keySet()){
? ? ? ? ? ? ? ? String value = extJsonObject.getString(keyName);
? ? ? ? ? ? ? ? if(StringUtils.isNotBlank(value)){
? ? ? ? ? ? ? ? ? ? row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString()));
? ? ? ? ? ? /**
? ? ? ? ? ? * 設(shè)備信息
? ? ? ? ? ? */
? ? ? ? ? ? putFieldToRow(logEntity.getData(),"device",row);
? ? ? ? ? ? /**
? ? ? ? ? ? * 位置信息
? ? ? ? ? ? */
? ? ? ? ? ? putFieldToRow(logEntity.getData(),"location",row);
? ? ? ? ? ? rows.add(row);
? ? ? ? }
? ? ? ? for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){
? ? ? ? ? ? Object[] results = new Object[(durtation[1]-durtation[0])];
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? table.batch(rows.subList(durtation[0], durtation[1]),results);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? sumCount += (durtation[1]-durtation[0]);
? ? ? ? }
? ? ? ? LOGGER.info("write data count:" + sumCount);
? ? }
}
```
`logTable1`數(shù)據(jù)
```
00|20180518203401772|2352356512|4519 column=ext:appKey, timestamp=1533646292389, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203401772|2352356512|4519 column=ext:channelCode, timestamp=1533646292389, value=guanlan-resurrection-002-? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203401772|2352356512|4519 column=ext:createDateTime, timestamp=1533646292389, value=1526646836093? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203401772|2352356512|4519 column=ext:retain, timestamp=1533646292389, value=17670? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203401772|2352356512|4519 column=ext:scene, timestamp=1533646292389, value=1007? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203401772|2352356512|4519 column=ext:shareId, timestamp=1533646292389, value=ogJmG5ItE_nBCS3pg5XCvGotGI1c? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203401772|2352356512|4519 column=ext:time, timestamp=1533646292389, value=2018-05-18T20:34:01? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203401772|2352356512|4519 column=ext:type, timestamp=1533646292389, value=login_in? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203401772|2352356512|4519 column=ext:userId, timestamp=1533646292389, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|f1? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203406167|2352356512|4519 column=ext:appKey, timestamp=1533646347725, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e? ? ? ? ? ? ? ? ? ? ?
f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203406167|2352356512|4519 column=ext:channelCode, timestamp=1533646347725, value=guanlan-regular-001-? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203406167|2352356512|4519 column=ext:createDateTime, timestamp=1533646347725, value=1526646839075? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203406167|2352356512|4519 column=ext:retain, timestamp=1533646347725, value=17670? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203406167|2352356512|4519 column=ext:shareId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203406167|2352356512|4519 column=ext:time, timestamp=1533646347725, value=2018-05-18T20:34:06? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203406167|2352356512|4519 column=ext:type, timestamp=1533646347725, value=sharesuccess? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203406167|2352356512|4519 column=ext:userId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
f3|54? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203407144|2352356512|5ca1 column=ext:appKey, timestamp=1533646294045, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e? ? ? ? ? ? ? ? ? ? ?
c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203407144|2352356512|5ca1 column=ext:createDateTime, timestamp=1533646294045, value=1526646849745? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203407144|2352356512|5ca1 column=ext:retain, timestamp=1533646294045, value=17670? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203407144|2352356512|5ca1 column=ext:scene, timestamp=1533646294045, value=1037? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203407144|2352356512|5ca1 column=ext:time, timestamp=1533646294045, value=2018-05-18T20:34:07? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
c4|bc? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
00|20180518203407144|2352356512|5ca1 column=ext:type, timestamp=1533646294045, value=login_in?
```
CustomTableInputFormat.java
```
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.net.DNS;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class CustomTableInputFormat extends TableInputFormat {
? ? private HashMap<InetAddress, String> reverseDNSCacheMap =
? ? ? ? ? ? new HashMap<>();
? ? private List<String> keys = new ArrayList<>();
? ? public CustomTableInputFormat(){
? ? ? ? super();
? ? ? ? for(int i =0;i<256;i++){
? ? ? ? ? ? keys.add(StringUtils.substring("00"+Integer.toHexString(i),-2));
? ? ? ? }
? ? }
? ? @Override
? ? public List<InputSplit> getSplits(JobContext context) throws IOException {
? ? ? ? super.initialize(context);
? ? ? ? TableName tableName = super.getTable().getName();
? ? ? ? RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin());
? ? ? ? List<InputSplit> splits = new ArrayList<>();
? ? ? ? for (String key : keys) {
? ? ? ? ? ? HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(key), false);
? ? ? ? ? ? InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
? ? ? ? ? ? InetAddress regionAddress = isa.getAddress();
? ? ? ? ? ? String regionLocation;
? ? ? ? ? ? regionLocation = reverseDNS(regionAddress);
? ? ? ? ? ? byte[] regionName = location.getRegion().getRegionName();
? ? ? ? ? ? String encodedRegionName = location.getRegion().getEncodedName();
? ? ? ? ? ? long regionSize = sizeCalculator.getRegionSize(regionName);
? ? ? ? ? ? byte[] splitStart = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStartRow());
? ? ? ? ? ? byte[] splitStop = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStopRow());
? ? ? ? ? ? TableSplit split = new TableSplit(tableName, this.getScan(),
? ? ? ? ? ? ? ? ? ? splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
? ? ? ? ? ? splits.add(split);
? ? ? ? }
? ? ? ? return splits;
? ? }
? ? String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
? ? ? ? String hostName = this.reverseDNSCacheMap.get(ipAddress);
? ? ? ? if (hostName == null) {
? ? ? ? ? ? String ipAddressString = null;
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ipAddressString = DNS.reverseDns(ipAddress, null);
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
? ? ? ? ? ? }
? ? ? ? ? ? if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
? ? ? ? ? ? hostName = Strings.domainNamePointerToHostName(ipAddressString);
? ? ? ? ? ? this.reverseDNSCacheMap.put(ipAddress, hostName);
? ? ? ? }
? ? ? ? return hostName;
? ? }
}
```
## Flink例子
```
static Configuration conf;
? ? static {
? ? ? ? HadoopKrbLogin.login();
? ? ? ? conf = new Configuration();
? ? ? ? String tableName = "logTable1";
? ? ? ? conf.addResource("hbase-site.xml");
? ? ? ? Scan scan = new Scan();
? ? ? ? scan.setCaching(1000);
? ? ? ? scan.withStartRow("201805182039".getBytes());
? ? ? ? scan.withStopRow("201805182040".getBytes());
? ? ? ? scan.setCacheBlocks(false);
? ? ? ? conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName);
? ? ? ? ClientProtos.Scan proto = null;
? ? ? ? try {
? ? ? ? ? ? proto = ProtobufUtil.toScan(scan);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? String ScanToString = Base64.encodeBytes(proto.toByteArray());
? ? ? ? conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString);
? ? }
? ? public static void main(String[] args) throws Exception {
? ? ? ? final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
? ? ? ? DataSource<Tuple2<ImmutableBytesWritable, Result>> hbase = env.createInput(
? ? ? ? ? ? ? ? HadoopInputs.createHadoopInput(
? ? ? ? ? ? ? ? ? ? ? ? new CustomTableInputFormat(),
? ? ? ? ? ? ? ? ? ? ? ? ImmutableBytesWritable.class,
? ? ? ? ? ? ? ? ? ? ? ? Result.class,
? ? ? ? ? ? ? ? ? ? ? ? Job.getInstance(conf)
? ? ? ? ? ? ? ? )
? ? ? ? );
? ? ? ? DataSet<LogEntity> toTuple = hbase.map(
? ? ? ? ? ? ? ? new MapFunction<Tuple2<ImmutableBytesWritable, Result>, LogEntity>() {
? ? ? ? ? ? ? ? ? ? public LogEntity map(Tuple2<ImmutableBytesWritable, Result> record) throws Exception {
? ? ? ? ? ? ? ? ? ? ? ? Result result = record.f1;
? ? ? ? ? ? ? ? ? ? ? ? return result2Entity(result);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
}
private static LogEntity result2Entity(Result result) {
? ? ? ? JSONObject root = new JSONObject();
? ? ? ? JSONObject ext = new JSONObject();
? ? ? ? JSONObject device = new JSONObject();
? ? ? ? JSONObject location = new JSONObject();
? ? ? ? for (Cell cell : result.rawCells()) {
? ? ? ? ? ? byte[] family = CellUtil.cloneFamily(cell);
? ? ? ? ? ? byte[] column = CellUtil.cloneQualifier(cell);
? ? ? ? ? ? byte[] value = CellUtil.cloneValue(cell);
? ? ? ? ? ? String columnName = Bytes.toString(column);
? ? ? ? ? ? if ("ext".equals(Bytes.toString(family))) {
? ? ? ? ? ? ? ? if ("durationTime".equals(columnName)) {
? ? ? ? ? ? ? ? ? ? ext.put(columnName, Bytes.toLong(value));
? ? ? ? ? ? ? ? } else if ("time".equals(columnName)) {
? ? ? ? ? ? ? ? ? ? root.put(columnName, Bytes.toString(value));
? ? ? ? ? ? ? ? ? ? root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ext.put(columnName, Bytes.toString(value));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } else if ("device".equals(Bytes.toString(family))) {
? ? ? ? ? ? ? ? device.put(columnName, Bytes.toString(value));
? ? ? ? ? ? } else if ("location".equals(Bytes.toString(family))) {
? ? ? ? ? ? ? ? location.put(columnName, Bytes.toString(value));
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? JSONObject data = new JSONObject();
? ? ? ? if (device.keySet().size() > 0) {
? ? ? ? ? ? data.put("device", device);
? ? ? ? }
? ? ? ? if (location.keySet().size() > 0) {
? ? ? ? ? ? data.put("location", location);
? ? ? ? }
? ? ? ? data.put("ext", ext);
? ? ? ? root.put("data", data);
? ? ? ? return JSON.parseObject(root.toString(), LogEntity.class);
? ? }
```
## Spark 例子
```
public class SimpleApp implements Serializable {
static Configuration cfg = null;
? ? static {
? ? ? ? HadoopKrbLogin.login();
? ? ? ? cfg = new Configuration();
? ? ? ? String tableName = "logTable1";
? ? ? ? cfg.addResource("hbase-site.xml");
? ? ? ? Scan scan = new Scan();
? ? ? ? scan.setCaching(1000);
? ? ? ? scan.withStartRow("201805182039".getBytes());
? ? ? ? scan.withStopRow("201805182040".getBytes());
? ? ? ? scan.setCacheBlocks(false);
? ? ? ? cfg.set(TableInputFormat.INPUT_TABLE, tableName);
? ? ? ? ClientProtos.Scan proto = null;
? ? ? ? try {
? ? ? ? ? ? proto = ProtobufUtil.toScan(scan);
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? String ScanToString = Base64.encodeBytes(proto.toByteArray());
? ? ? ? cfg.set(TableInputFormat.SCAN, ScanToString);
? ? }
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
? ? ? ? ? ? ? ? .setMaster("local")
? ? ? ? ? ? ? ? .setAppName("HbaseDemo");
? ? ? ? JavaSparkContext jsc = new JavaSparkContext(sparkConf);
? ? ? ? JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
? ? ? ? ? ? ? ? jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class);
? ? ? ? // do some transformation
? ? ? ? JavaRDD<LogEntity> rdd1 = hBaseRDD.mapPartitions((FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, LogEntity>)
? ? ? ? ? ? ? ? tuple2Iterator -> {
? ? ? ? ? ? ? ? ? ? List<LogEntity> logEntities = new ArrayList<>();
? ? ? ? ? ? ? ? ? ? while (tuple2Iterator.hasNext()) {
? ? ? ? ? ? ? ? ? ? ? ? Tuple2<ImmutableBytesWritable, Result> tuple = tuple2Iterator.next();
? ? ? ? ? ? ? ? ? ? ? ? Result result = tuple._2;
? ? ? ? ? ? ? ? ? ? ? ? String rowKey = Bytes.toString(result.getRow());
? ? ? ? ? ? ? ? ? ? ? ? logEntities.add(result2Entity(result));
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? return logEntities.iterator();
? ? ? ? ? ? ? ? });
}
private static LogEntity result2Entity(Result result) {
? ? ? ? JSONObject root = new JSONObject();
? ? ? ? JSONObject ext = new JSONObject();
? ? ? ? JSONObject device = new JSONObject();
? ? ? ? JSONObject location = new JSONObject();
? ? ? ? for (Cell cell : result.rawCells()) {
? ? ? ? ? ? byte[] family = CellUtil.cloneFamily(cell);
? ? ? ? ? ? byte[] column = CellUtil.cloneQualifier(cell);
? ? ? ? ? ? byte[] value = CellUtil.cloneValue(cell);
? ? ? ? ? ? String columnName = Bytes.toString(column);
? ? ? ? ? ? if ("ext".equals(Bytes.toString(family))) {
? ? ? ? ? ? ? ? if ("durationTime".equals(columnName)) {
? ? ? ? ? ? ? ? ? ? ext.put(columnName, Bytes.toLong(value));
? ? ? ? ? ? ? ? } else if ("time".equals(columnName)) {
? ? ? ? ? ? ? ? ? ? root.put(columnName, Bytes.toString(value));
? ? ? ? ? ? ? ? ? ? root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ext.put(columnName, Bytes.toString(value));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } else if ("device".equals(Bytes.toString(family))) {
? ? ? ? ? ? ? ? device.put(columnName, Bytes.toString(value));
? ? ? ? ? ? } else if ("location".equals(Bytes.toString(family))) {
? ? ? ? ? ? ? ? location.put(columnName, Bytes.toString(value));
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? JSONObject data = new JSONObject();
? ? ? ? if (device.keySet().size() > 0) {
? ? ? ? ? ? data.put("device", device);
? ? ? ? }
? ? ? ? if (location.keySet().size() > 0) {
? ? ? ? ? ? data.put("location", location);
? ? ? ? }
? ? ? ? data.put("ext", ext);
? ? ? ? root.put("data", data);
? ? ? ? return JSON.parseObject(root.toString(), LogEntity.class);
? ? }
```
---
