借鑒水塘抽樣算法的一種解決思想

1背景

關于水塘抽樣的算法原理此處不再說明了, 本文重點是針對它的一種應用場景, 具體算法原理可參考水塘抽樣算法原理

2問題:

在編寫Spark程序時, 鑒于內存等資源不夠, 然而Hbase數據量又十分巨大(100億數據, 申請資源Spark核數以及內存較小), 此時在Spark應用程序中調用了repartition進行重新分區(qū), 導致了大量Shuffle 網絡IO, 很有可能使得Spark應用程序癱瘓. 因此想到一種委婉的解決方式, 使用水塘抽樣算法, 它可以在一開始不知道數據總量的情況下進行抽樣, 最終得到接近均勻的分區(qū). 具體解決原理是先進行掃描Hbase進行水塘抽樣,抽樣出rowKey, 然后分別針對這些rowKey進行單獨scan掃描處理, 由于處理的數據量小了就可以保證應用程序不崩潰. 此處是保證可用性犧牲了性能.
解決方式:

  • 使用水塘抽樣算法
  • Hbase預分region,并配置合適的region 分隔算法, 保證每個region數據量不要太大, 按region掃描

3代碼

3.1水塘抽樣解決代碼

此處給出一個基本的Demo,

首先創(chuàng)建一個字節(jié)數組封裝類, 用于比較字節(jié)數組(rowKey)

package hbase;

import org.apache.hadoop.hbase.util.Bytes;

/**
 * Created by zhangkai12 on 2018/6/27.
 */
public class MyBytes implements Comparable<MyBytes> {
    private byte[] bytes;

    public MyBytes(byte[] bytes) {
        this.bytes = bytes;
    }

    public byte[] getBytes() {
        return bytes;
    }

    public void setBytes(byte[] bytes) {
        this.bytes = bytes;
    }

    @Override
    public int compareTo(MyBytes o) {
        return Bytes.compareTo(this.bytes, o.getBytes());
    }
}

其次創(chuàng)建一個Hbase基本處理類

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import java.io.IOException;

public class HbaseUtil {

    static Configuration conf = HBaseConfiguration.create();
    private static Connection connection;

    public HbaseUtil(String zkUrl) {
        conf.set("hbase.zookeeper.quorum", zkUrl);
        conf.set("hbase.client.operation.timeout", "60000");
    }

    public static synchronized Connection getConn() {
        if (connection == null) {
            try {
                connection = ConnectionFactory.createConnection(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return connection;
    }

    public static Table getTable(String tableName) throws IOException {
        Table table = getConn().getTable(TableName.valueOf(tableName));
        return table;
    }

    public static void closeTable(Table table) {
        if (table == null) {
            return;
        }
        try {
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

最后就是最終的抽樣算法類, 此處的抽樣函數直接將Spark的scala代碼移植成java版本的.

package hbase;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Created by zhangkai12 on 2018/6/27.
 */
public class Sample {
    private static final byte[] ROW_END = new byte[]{-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1};
    private static final byte[] ROW_START = {0};
    public static void main(String[] args) {
        Table table = null;
        ResultScanner scanner = null;
        try {
            table = HbaseUtil.getTable("myedge");
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("g"));
            scanner = table.getScanner(scan);
            Iterator<Result> iterator = scanner.iterator();
            byte[][] rowKeys = reservoirSampleAndCount(iterator, 20, 100);

            System.out.println("----------------------sort before----------------------");
            Arrays.stream(rowKeys).forEach(res -> {
                System.out.println(" value " + Arrays.toString(res));
            });

            System.out.println("----------------------sort after----------------------");
            Stream<MyBytes> sorted = Arrays.stream(rowKeys).map(res -> new MyBytes(res)).sorted();
            Arrays.stream(rowKeys).map(res -> new MyBytes(res)).sorted().forEach(res -> {
                System.out.println("****" + Arrays.toString(res.getBytes()));
            });
            List<MyBytes> collect = sorted.collect(Collectors.toList());
            byte[] start = ROW_START;
            byte[] end;
            int sumNum = 0;
            if (null != collect) {
                for (int i = 0; i < collect.size() ; i++) {
                    end = collect.get(i).getBytes();
                    sumNum += scanWithStartAndEndRow(start,end);
                    start = end ;
                }
                start = collect.get(collect.size()-1).getBytes();
                end = ROW_END;
                sumNum += scanWithStartAndEndRow(start,end);
            }


            /**
             * 不使用抽樣方法計算所有的sum
             */
            int numNotSample = getSumWithNOSampleMethod();
            System.out.println(sumNum + "---------------" + numNotSample);
            boolean judge = (sumNum == numNotSample);
            System.out.println(judge);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != scanner) {
                scanner.close();
            }
            if (null != table) {
                HbaseUtil.closeTable(table);
            }
        }
    }


    /**
     *  算法描述
     *  從S中抽取首k項放入「水塘」中
     對于每一個S[j]項(j ≥ k):
     隨機產生一個范圍0到j的整數r
     若 r < k 則把水塘中的第r項換成S[j]項
     */
    public static byte[][] reservoirSampleAndCount(Iterator<Result> input, int k, long seed) {
        byte[][] reservoir = new byte[k][];
        int i = 0;
        while (i < k && input.hasNext()) {
            Result next = input.next();
            reservoir[i] = next.getRow();
            i ++;
        }
        if (i < k) {
            byte[][] trimReservoir = new byte[i][];
            System.arraycopy(reservoir, 0, trimReservoir, 0, i);
            return trimReservoir;
        } else {
            Random random = new Random(seed);
            while (input.hasNext()) {
                byte[] item = input.next().getRow();
                int replacementIndex = random.nextInt(i);
                if (replacementIndex < k) {
                    reservoir[replacementIndex] = item;
                }
                i ++;
            }
            return reservoir;
        }
    }


    private static int scanWithStartAndEndRow(byte[] start, byte[] end) {
        int num = 0;
        Table table = null;
        ResultScanner scanner = null;
        try {
            table = HbaseUtil.getTable("myedge");
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("g"));
            scan.setStartRow(start);
            scan.setStopRow(end);
            scanner = table.getScanner(scan);
            Iterator<Result> iterator = scanner.iterator();
            while (iterator.hasNext()) {
                Result next = iterator.next();
                byte[] row = next.getRow();
                String s = Bytes.toString(row);
                System.out.println(Arrays.toString(row) + "---" + s);
                num ++;
            }
            System.out.println("---num-----: " + num);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != scanner) {
                scanner.close();
            }
            if (null != table) {
                HbaseUtil.closeTable(table);
            }
        }
        return num;
    }

    private static int getSumWithNOSampleMethod() {
        int num = 0;
        Table table = null;
        ResultScanner scanner = null;
        try {
            table = HbaseUtil.getTable("myedge");
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("g"));
            scanner = table.getScanner(scan);
            Iterator<Result> iterator = scanner.iterator();
            while (iterator.hasNext()) {
                Result next = iterator.next();
                num ++;
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != scanner) {
                scanner.close();
            }
            if (null != table) {
                HbaseUtil.closeTable(table);
            }
        }
        return num;
    }
}

3.2 按Region進行數據處理

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * Created by zhangkai12 on 2018/6/27.
 */
public class HbaseRegion {

    public static void main(String[] args) {
        try {
            checkTable("Janus321");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void checkTable(String tabName) throws Exception {
        TableName tn = TableName.valueOf(tabName);
        Configuration config = HBaseConfiguration.create();
        HRegionInfo regionInfo;
        Connection connection = null;
        Admin admin = null;
        Table table = null;
        try  {
            connection = ConnectionFactory.createConnection(config);
            admin = connection.getAdmin();
            table = connection.getTable(tn);

            if (!admin.tableExists(TableName.valueOf(tabName))) {
                return;
            }
            List<HRegionInfo> lr = admin.getTableRegions(tn);
            Result r = null;
            if (lr == null) {
                System.out.print("No region found for table " + tabName);
            }
            // 遍歷表的每個region
            Iterator<HRegionInfo> ir = lr.iterator();
            int i = 1;
            while (ir.hasNext()) {
                regionInfo = ir.next();
                ResultScanner scanner = null;
                byte[] startRowkey = regionInfo.getStartKey();
                System.out.println("----start----" + Bytes.toString(startRowkey));
                byte[] endKey = regionInfo.getEndKey();
                System.out.println("----end----" + Bytes.toString(endKey));
                Scan sc = new Scan();
                sc.setBatch(1);
                sc.setStartRow(startRowkey);
                sc.setStopRow(endKey);
                try {
                    scanner = table.getScanner(sc);
                    Iterator<Result> iterator = scanner.iterator();
                    while (iterator.hasNext()) {
                        Result next = iterator.next();
                        byte[] row = next.getRow();
                        System.out.println("第" + i + " 批 " + Arrays.toString(row));
                    }
                } finally {
                    if (null != scanner) {
                        scanner.close();
                    }
                }
                i ++;
            }
        }catch (Exception e) {

        } finally {
            if (null != table) {
                table.close();
            }
            if (null != admin) {
                admin.close();
            }
            if (null != connection) {
                connection.close();
            }
        }
    }
}

4水塘抽樣結果圖

水塘抽樣結果圖

根據水塘抽樣結果圖可以發(fā)現(xiàn), 我們通過抽樣算法處理的數據總量和不使用抽樣算法的數據總量是一致的,也就保證了算法的準確性.

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市树肃,隨后出現(xiàn)的幾起案子滑沧,更是在濱河造成了極大的恐慌泳赋,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蟋字,死亡現(xiàn)場離奇詭異沉御,居然都是意外死亡,警方通過查閱死者的電腦和手機屿储,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進店門贿讹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人够掠,你說我怎么就攤上這事民褂。” “怎么了疯潭?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵赊堪,是天一觀的道長。 經常有香客問我竖哩,道長哭廉,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任相叁,我火速辦了婚禮遵绰,結果婚禮上辽幌,老公的妹妹穿的比我還像新娘。我一直安慰自己椿访,他們只是感情好乌企,可當我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著成玫,像睡著了一般加酵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上哭当,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天猪腕,我揣著相機與錄音,去河邊找鬼荣病。 笑死码撰,一個胖子當著我的面吹牛,可吹牛的內容都是我干的个盆。 我是一名探鬼主播脖岛,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼颊亮!你這毒婦竟也來了柴梆?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤终惑,失蹤者是張志新(化名)和其女友劉穎绍在,沒想到半個月后,有當地人在樹林里發(fā)現(xiàn)了一具尸體雹有,經...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡偿渡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了霸奕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片溜宽。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖质帅,靈堂內的尸體忽然破棺而出适揉,到底是詐尸還是另有隱情,我是刑警寧澤煤惩,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布嫉嘀,位于F島的核電站,受9級特大地震影響魄揉,放射性物質發(fā)生泄漏剪侮。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一洛退、第九天 我趴在偏房一處隱蔽的房頂上張望瓣俯。 院中可真熱鬧红淡,春花似錦、人聲如沸降铸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽推掸。三九已至,卻和暖如春驻仅,著一層夾襖步出監(jiān)牢的瞬間谅畅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工噪服, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留毡泻,地道東北人。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓粘优,卻偏偏與公主長得像仇味,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子雹顺,可洞房花燭夜當晚...
    茶點故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內容