Hbase - 自定義Rowkey規(guī)則

在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢孕蝉,這時候我們就可以定制TableInputFormat來實(shí)現(xiàn)我們的需求了,我們還可以采用Flink的DataSet的方式讀取,另外下面還有Spark讀取的例子祷肯。

使用教程

Md5Util.java

import org.apache.commons.codec.binary.Hex;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class Md5Util {
    public static String md5(byte[] key) {
        return md5(key, 0, key.length);
    }

    public static String md5(byte[] key, int offset, int length) {
        try {
            MessageDigest e = MessageDigest.getInstance("MD5");
            e.update(key, offset, length);
            byte[] digest = e.digest();
            return new String(Hex.encodeHex(digest));
        } catch (NoSuchAlgorithmException var5) {
            throw new RuntimeException("Error computing MD5 hash", var5);
        }
    }

    public static String md5(String str) {
        return md5(str.getBytes());
    }
    public static String md5(String str,int offset, int length) {
        return md5(str.getBytes(),offset,length);
    }
}

數(shù)據(jù)Split方式

private Connection connection;
    private Admin admin;

    @Before
    public void init() throws Exception {
        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        System.setProperty("sun.security.krb5.debug", "false");
        final String user = "hbase/abc.demo.com@DEMO.COM";
        final String keyPath = "/home/dounine/kerberos/lake.keytab";

        Configuration conf = new Configuration();
        conf.addResource("hbase-site.xml");

        UserGroupInformation.setConfiguration(conf);
        UserGroupInformation.loginUserFromKeytab(user, keyPath);

        connection = ConnectionFactory.createConnection(conf);
        admin = connection.getAdmin();
    }

@Test
    public void createTable() throws IOException {
        TableName table = TableName.valueOf("logTable1");
        TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table);
        tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName());
        tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2");

        ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build();
        ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build();
        ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build();
        tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF));
        try {
            byte[][] splitKeys = new byte[4][];
            splitKeys[0] = Bytes.toBytes("00");
            splitKeys[1] = Bytes.toBytes("40");
            splitKeys[2] = Bytes.toBytes("80");
            splitKeys[3] = Bytes.toBytes("c0");
            admin.createTable(tableDesc.build(),splitKeys);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

logTable1數(shù)據(jù)寫入方式

public class HbaseKerberos{
    private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class);
    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
    private static final String TABLE_NAME = "logTable1";

    public void insertDataToHbase1(String appKey,List<Log> hasDatas) throws IOException {
        Table table = HbaseUtils.getTable(TABLE_NAME);
        Long sumCount = 0L;

        /**
         * 常規(guī)值
         */
        byte[] extCF = Bytes.toBytes("ext");//CF列族
        Random random = new Random();
        List<Put> rows = new ArrayList<>();
        for (Log logEntity : hasDatas) {
            JSONObject dataJsonObject = logEntity.getData();
            JSONObject extJsonObject = dataJsonObject.getJSONObject("ext");
            String userId = extJsonObject.getString("userId");
            String timeStr = logEntity.getTime().format(dtf);
          
            String md5Str = Md5Util.md5(userId);
            String rowKey = new StringBuilder()
                    .append(md5Str.substring(0,2))//md5出來的前兩位最高為ff,00~ff為256位嘹裂,后期Region可以增加那么多盲泛,足夠使用了。
                    .append("|")
                    .append(timeStr)//時間
                    .append("|")
                    .append(CrcUtil.getCrcValue(appKey))
                    .append("|")
                    .append(md5Str.substring(2,8))
                    .append("|")
                    .append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2))
                    .toString();
            Put row = new Put(Bytes.toBytes(rowKey));

            for(String keyName : extJsonObject.keySet()){
                String value = extJsonObject.getString(keyName);
                if(StringUtils.isNotBlank(value)){
                    row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value));
                }
            }
            row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString()));
      
            /**
             * 設(shè)備信息
             */
            putFieldToRow(logEntity.getData(),"device",row);

            /**
             * 位置信息
             */
            putFieldToRow(logEntity.getData(),"location",row);

            rows.add(row);
        }
        for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){
            Object[] results = new Object[(durtation[1]-durtation[0])];
            try {
                table.batch(rows.subList(durtation[0], durtation[1]),results);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sumCount += (durtation[1]-durtation[0]);
        }
        LOGGER.info("write data count:" + sumCount);
    }
}

logTable1數(shù)據(jù)

00|20180518203401772|2352356512|4519 column=ext:appKey, timestamp=1533646292389, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                      
 f3|f1                                                                                                                                            
 00|20180518203401772|2352356512|4519 column=ext:channelCode, timestamp=1533646292389, value=guanlan-resurrection-002-                            
 f3|f1                                                                                                                                            
 00|20180518203401772|2352356512|4519 column=ext:createDateTime, timestamp=1533646292389, value=1526646836093                                     
 f3|f1                                                                                                                                            
 00|20180518203401772|2352356512|4519 column=ext:retain, timestamp=1533646292389, value=17670                                                     
 f3|f1                                                                                                                                            
 00|20180518203401772|2352356512|4519 column=ext:scene, timestamp=1533646292389, value=1007                                                       
 f3|f1                                                                                                                                            
 00|20180518203401772|2352356512|4519 column=ext:shareId, timestamp=1533646292389, value=ogJmG5ItE_nBCS3pg5XCvGotGI1c                             
 f3|f1                                                                                                                                            
 00|20180518203401772|2352356512|4519 column=ext:time, timestamp=1533646292389, value=2018-05-18T20:34:01                                         
 f3|f1                                                                                                                                            
 00|20180518203401772|2352356512|4519 column=ext:type, timestamp=1533646292389, value=login_in                                                    
 f3|f1                                                                                                                                            
 00|20180518203401772|2352356512|4519 column=ext:userId, timestamp=1533646292389, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                              
 f3|f1                                                                                                                                            
 00|20180518203406167|2352356512|4519 column=ext:appKey, timestamp=1533646347725, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                      
 f3|54                                                                                                                                            
 00|20180518203406167|2352356512|4519 column=ext:channelCode, timestamp=1533646347725, value=guanlan-regular-001-                                 
 f3|54                                                                                                                                            
 00|20180518203406167|2352356512|4519 column=ext:createDateTime, timestamp=1533646347725, value=1526646839075                                     
 f3|54                                                                                                                                            
 00|20180518203406167|2352356512|4519 column=ext:retain, timestamp=1533646347725, value=17670                                                     
 f3|54                                                                                                                                            
 00|20180518203406167|2352356512|4519 column=ext:shareId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                             
 f3|54                                                                                                                                            
 00|20180518203406167|2352356512|4519 column=ext:time, timestamp=1533646347725, value=2018-05-18T20:34:06                                         
 f3|54                                                                                                                                            
 00|20180518203406167|2352356512|4519 column=ext:type, timestamp=1533646347725, value=sharesuccess                                                
 f3|54                                                                                                                                            
 00|20180518203406167|2352356512|4519 column=ext:userId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                              
 f3|54                                                                                                                                            
 00|20180518203407144|2352356512|5ca1 column=ext:appKey, timestamp=1533646294045, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                      
 c4|bc                                                                                                                                            
 00|20180518203407144|2352356512|5ca1 column=ext:createDateTime, timestamp=1533646294045, value=1526646849745                                     
 c4|bc                                                                                                                                            
 00|20180518203407144|2352356512|5ca1 column=ext:retain, timestamp=1533646294045, value=17670                                                     
 c4|bc                                                                                                                                            
 00|20180518203407144|2352356512|5ca1 column=ext:scene, timestamp=1533646294045, value=1037                                                       
 c4|bc                                                                                                                                            
 00|20180518203407144|2352356512|5ca1 column=ext:time, timestamp=1533646294045, value=2018-05-18T20:34:07                                         
 c4|bc                                                                                                                                            
 00|20180518203407144|2352356512|5ca1 column=ext:type, timestamp=1533646294045, value=login_in  

CustomTableInputFormat.java

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.net.DNS;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class CustomTableInputFormat extends TableInputFormat {

    private HashMap<InetAddress, String> reverseDNSCacheMap =
            new HashMap<>();
    private List<String> keys = new ArrayList<>();

    public CustomTableInputFormat(){
        super();
        for(int i =0;i<256;i++){
            keys.add(StringUtils.substring("00"+Integer.toHexString(i),-2));
        }
    }

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {
        super.initialize(context);
        TableName tableName = super.getTable().getName();
        RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin());
        List<InputSplit> splits = new ArrayList<>();

        for (String key : keys) {
            HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(key), false);
            InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
            InetAddress regionAddress = isa.getAddress();
            String regionLocation;
            regionLocation = reverseDNS(regionAddress);

            byte[] regionName = location.getRegion().getRegionName();
            String encodedRegionName = location.getRegion().getEncodedName();
            long regionSize = sizeCalculator.getRegionSize(regionName);

            byte[] splitStart = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStartRow());
            byte[] splitStop = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStopRow());

            TableSplit split = new TableSplit(tableName, this.getScan(),
                    splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
            splits.add(split);
        }
        return splits;
    }

    String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
        String hostName = this.reverseDNSCacheMap.get(ipAddress);
        if (hostName == null) {
            String ipAddressString = null;
            try {
                ipAddressString = DNS.reverseDns(ipAddress, null);
            } catch (Exception e) {
                ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
            }
            if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
            hostName = Strings.domainNamePointerToHostName(ipAddressString);
            this.reverseDNSCacheMap.put(ipAddress, hostName);
        }
        return hostName;
    }
}

Flink例子

static Configuration conf;

    static {
        HadoopKrbLogin.login();
        conf = new Configuration();
        String tableName = "logTable1";
        conf.addResource("hbase-site.xml");

        Scan scan = new Scan();
        scan.setCaching(1000);
        scan.withStartRow("201805182039".getBytes());
        scan.withStopRow("201805182040".getBytes());
        scan.setCacheBlocks(false);
        conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName);
        ClientProtos.Scan proto = null;
        try {
            proto = ProtobufUtil.toScan(scan);
        } catch (IOException e) {
            e.printStackTrace();
        }
        String ScanToString = Base64.encodeBytes(proto.toByteArray());
        conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString);
    }

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<ImmutableBytesWritable, Result>> hbase = env.createInput(
                HadoopInputs.createHadoopInput(
                        new CustomTableInputFormat(),
                        ImmutableBytesWritable.class,
                        Result.class,
                        Job.getInstance(conf)
                )
        );

        DataSet<LogEntity> toTuple = hbase.map(
                new MapFunction<Tuple2<ImmutableBytesWritable, Result>, LogEntity>() {
                    public LogEntity map(Tuple2<ImmutableBytesWritable, Result> record) throws Exception {
                        Result result = record.f1;
                        return result2Entity(result);
                    }
                });
}
private static LogEntity result2Entity(Result result) {
        JSONObject root = new JSONObject();
        JSONObject ext = new JSONObject();
        JSONObject device = new JSONObject();
        JSONObject location = new JSONObject();
        for (Cell cell : result.rawCells()) {
            byte[] family = CellUtil.cloneFamily(cell);
            byte[] column = CellUtil.cloneQualifier(cell);
            byte[] value = CellUtil.cloneValue(cell);
            String columnName = Bytes.toString(column);
            if ("ext".equals(Bytes.toString(family))) {
                if ("durationTime".equals(columnName)) {
                    ext.put(columnName, Bytes.toLong(value));
                } else if ("time".equals(columnName)) {
                    root.put(columnName, Bytes.toString(value));
                    root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));
                } else {
                    ext.put(columnName, Bytes.toString(value));
                }
            } else if ("device".equals(Bytes.toString(family))) {
                device.put(columnName, Bytes.toString(value));
            } else if ("location".equals(Bytes.toString(family))) {
                location.put(columnName, Bytes.toString(value));
            }
        }
        JSONObject data = new JSONObject();
        if (device.keySet().size() > 0) {
            data.put("device", device);
        }
        if (location.keySet().size() > 0) {
            data.put("location", location);
        }
        data.put("ext", ext);
        root.put("data", data);
        return JSON.parseObject(root.toString(), LogEntity.class);
    }

Spark 例子

public class SimpleApp implements Serializable {
 static Configuration cfg = null;
    static {
        HadoopKrbLogin.login();
        cfg = new Configuration();
        String tableName = "logTable1";

        cfg.addResource("hbase-site.xml");

        Scan scan = new Scan();
        scan.setCaching(1000);
        scan.withStartRow("201805182039".getBytes());
        scan.withStopRow("201805182040".getBytes());
        scan.setCacheBlocks(false);
        cfg.set(TableInputFormat.INPUT_TABLE, tableName);
        ClientProtos.Scan proto = null;
        try {
            proto = ProtobufUtil.toScan(scan);
        } catch (IOException e) {
            e.printStackTrace();
        }
        String ScanToString = Base64.encodeBytes(proto.toByteArray());
        cfg.set(TableInputFormat.SCAN, ScanToString);
    }

public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
                .setMaster("local")
                .setAppName("HbaseDemo");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
                jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class);

        // do some transformation
        JavaRDD<LogEntity> rdd1 = hBaseRDD.mapPartitions((FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, LogEntity>)
                tuple2Iterator -> {
                    List<LogEntity> logEntities = new ArrayList<>();
                    while (tuple2Iterator.hasNext()) {
                        Tuple2<ImmutableBytesWritable, Result> tuple = tuple2Iterator.next();
                        Result result = tuple._2;
                        String rowKey = Bytes.toString(result.getRow());
                        logEntities.add(result2Entity(result));
                    }
                    return logEntities.iterator();
                });

}
private static LogEntity result2Entity(Result result) {
        JSONObject root = new JSONObject();
        JSONObject ext = new JSONObject();
        JSONObject device = new JSONObject();
        JSONObject location = new JSONObject();
        for (Cell cell : result.rawCells()) {
            byte[] family = CellUtil.cloneFamily(cell);
            byte[] column = CellUtil.cloneQualifier(cell);
            byte[] value = CellUtil.cloneValue(cell);
            String columnName = Bytes.toString(column);
            if ("ext".equals(Bytes.toString(family))) {
                if ("durationTime".equals(columnName)) {
                    ext.put(columnName, Bytes.toLong(value));
                } else if ("time".equals(columnName)) {
                    root.put(columnName, Bytes.toString(value));
                    root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));
                } else {
                    ext.put(columnName, Bytes.toString(value));
                }
            } else if ("device".equals(Bytes.toString(family))) {
                device.put(columnName, Bytes.toString(value));
            } else if ("location".equals(Bytes.toString(family))) {
                location.put(columnName, Bytes.toString(value));
            }
        }
        JSONObject data = new JSONObject();
        if (device.keySet().size() > 0) {
            data.put("device", device);
        }
        if (location.keySet().size() > 0) {
            data.put("location", location);
        }
        data.put("ext", ext);
        root.put("data", data);
        return JSON.parseObject(root.toString(), LogEntity.class);
    }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末键耕,一起剝皮案震驚了整個濱河市寺滚,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌屈雄,老刑警劉巖村视,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異酒奶,居然都是意外死亡蚁孔,警方通過查閱死者的電腦和手機(jī)奶赔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來杠氢,“玉大人站刑,你說我怎么就攤上這事”前伲” “怎么了绞旅?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長温艇。 經(jīng)常有香客問我因悲,道長,這世上最難降的妖魔是什么勺爱? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任晃琳,我火速辦了婚禮,結(jié)果婚禮上琐鲁,老公的妹妹穿的比我還像新娘卫旱。我一直安慰自己,他們只是感情好绣否,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布誊涯。 她就那樣靜靜地躺著,像睡著了一般蒜撮。 火紅的嫁衣襯著肌膚如雪暴构。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天段磨,我揣著相機(jī)與錄音取逾,去河邊找鬼。 笑死苹支,一個胖子當(dāng)著我的面吹牛砾隅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播债蜜,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼晴埂,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了寻定?” 一聲冷哼從身側(cè)響起儒洛,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎狼速,沒想到半個月后琅锻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年恼蓬,在試婚紗的時候發(fā)現(xiàn)自己被綠了惊完。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡处硬,死狀恐怖小槐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情郁油,我是刑警寧澤本股,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站桐腌,受9級特大地震影響拄显,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜案站,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一躬审、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蟆盐,春花似錦承边、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至痹愚,卻和暖如春富岳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背拯腮。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工窖式, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人动壤。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓萝喘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親琼懊。 傳聞我的和親對象是個殘疾皇子阁簸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢,這時候我們就可以定制Table...
    kikiki2閱讀 133評論 0 1
  • 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢哼丈,這時候我們就可以定制Table...
    kikiki2閱讀 220評論 0 2
  • 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢启妹,這時候我們就可以定制Table...
    kikiki2閱讀 171評論 0 1
  • 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢,這時候我們就可以定制Table...
    大豬大豬閱讀 152評論 0 2
  • 在Flink中我們有時候需要分析數(shù)據(jù)1點(diǎn)到2點(diǎn)的范圍,可是經(jīng)過Region又比較慢削祈,這時候我們就可以定制Table...
    kikiki2閱讀 329評論 0 2