Spark SQL 訪問Hbase

@[toc]
參考文檔 : https://hbase.apache.org/book.html#_sparksql_dataframes

簡介

hbase-spark integration使用了Spark-1.2.0中引入的DataSource API (SPARK-3247), 它在簡單的HBase KV存儲和復(fù)雜的關(guān)系SQL查詢之間架起橋梁,使用戶能夠使用Spark在HBase上執(zhí)行復(fù)雜的數(shù)據(jù)分析工作。HBase數(shù)據(jù)幀是一個(gè)標(biāo)準(zhǔn)的Spark數(shù)據(jù)幀,能夠與Hive宛逗、ORC找颓、Parquet椿胯、JSON等任何其他數(shù)據(jù)源交互烤黍。HBase Spark集成應(yīng)用了諸如分區(qū)修剪、列修剪量瓜、謂詞下推和數(shù)據(jù)位置等關(guān)鍵技術(shù)。
要使用hbase-spark integration connector途乃,用戶需要為HBase和Spark表之間的模式映射定義Catalog绍傲,準(zhǔn)備數(shù)據(jù)并填充HBase表,然后加載HBase數(shù)據(jù)幀耍共。之后烫饼,用戶可以使用SQL查詢來集成查詢和訪問HBase表中的記錄。

打包生成hbase-spark庫

使用hbase-spark integration需要hbase-spark庫
找了半天沒有找到最新的那個(gè)包, 所以自己去github上面下載代碼打包, 然后安裝到本地倉庫

git clone https://github.com/apache/hbase-connectors.git
cd hbase-connectors/spark/hbase-spark
mvn -Dspark.version=2.4.3 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install

然后在項(xiàng)目pom.xml中添加依賴

        <dependency>
            <groupId>org.apache.hbase.connectors.spark</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>1.0.1</version>
        </dependency>
       <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.4</version>
        </dependency>

解決訪問Hbase問題

執(zhí)行代碼時(shí)出現(xiàn)錯(cuò)誤:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/fs/HFileSystem
    at cn.com.sjfx.sparkappdemo.Application.main(Application.java:27)
    ... 6 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.fs.HFileSystem
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

這是因?yàn)閟park無法訪問hbase中的庫造成的, 需要在制作鏡像的時(shí)候把hbase的庫加入到spark中,
修改Dockerfile, 增加如下內(nèi)容:

COPY /hbase-lib/* /spark/jars/

讀寫Hbase

public class Application {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("demo");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SparkSession sparkSession = SparkSession.builder()
                .sparkContext(jsc.sc())
                .getOrCreate();

        //設(shè)置要訪問的hbase的zookeeper
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "192.168.1.22:15301,192.168.1.22:15302,192.168.1.22:15303");
        //一定要?jiǎng)?chuàng)建這個(gè)hbaseContext, 因?yàn)楹竺鎸懭霑r(shí)會用到它
        HBaseContext hBaseContext=new HBaseContext(jsc.sc(),configuration,null);

        //創(chuàng)建一個(gè)測試用的RDD
        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 256; i++) {
            data.add(i);
        }
        JavaRDD<Integer> rdd = jsc.parallelize(data);
        JavaRDD<HBaseRecord> rdd1 = rdd.map(i -> new HBaseRecord(i, "extra"));
        rdd1.collect().forEach(System.out::println);
        //根據(jù)RDD創(chuàng)建數(shù)據(jù)幀
        Dataset<Row> df = sparkSession.createDataFrame(rdd1, HBaseRecord.class);

        //定義映射的catalog
        String catalog = "{" +
                "       \"table\":{\"namespace\":\"default\", \"name\":\"table1\"}," +
                "       \"rowkey\":\"key\"," +
                "       \"columns\":{" +
                "         \"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "         \"col1\":{\"cf\":\"cf1\", \"col\":\"col1\", \"type\":\"boolean\"}," +
                "         \"col2\":{\"cf\":\"cf2\", \"col\":\"col2\", \"type\":\"double\"}," +
                "         \"col3\":{\"cf\":\"cf3\", \"col\":\"col3\", \"type\":\"float\"}," +
                "         \"col4\":{\"cf\":\"cf4\", \"col\":\"col4\", \"type\":\"int\"}," +
                "         \"col5\":{\"cf\":\"cf5\", \"col\":\"col5\", \"type\":\"bigint\"}," +
                "         \"col6\":{\"cf\":\"cf6\", \"col\":\"col6\", \"type\":\"smallint\"}," +
                "         \"col7\":{\"cf\":\"cf7\", \"col\":\"col7\", \"type\":\"string\"}," +
                "         \"col8\":{\"cf\":\"cf8\", \"col\":\"col8\", \"type\":\"tinyint\"}" +
                "       }" +
                "     }";
        //寫入數(shù)據(jù)
        df.write()
                .format("org.apache.hadoop.hbase.spark")
                .option(HBaseTableCatalog.tableCatalog(), catalog)
                .option(HBaseTableCatalog.newTable(), "5")  //寫入到5個(gè)分區(qū)
                .mode(SaveMode.Overwrite)  // 覆蓋模式
                .save();
        //讀取數(shù)據(jù)
        Dataset<Row> df2 = sparkSession.read()
                .format("org.apache.hadoop.hbase.spark")
                .option(HBaseTableCatalog.tableCatalog(), catalog)
                .load();
        System.out.println("read result: ");
        df2.show();
    }

    //類需要可序列化
    public static class HBaseRecord implements Serializable {
        private static final long serialVersionUID = 4331526295356820188L;
        //屬性一定要getter/setter, 即使是public
        public String col0;
        public Boolean col1;
        public Double col2;
        public Float col3;
        public Integer col4;
        public Long col5;
        public Short col6;
        public String col7;
        public Byte col8;

        public String getCol0() {
            return col0;
        }

        public void setCol0(String col0) {
            this.col0 = col0;
        }

        public Boolean getCol1() {
            return col1;
        }

        public void setCol1(Boolean col1) {
            this.col1 = col1;
        }

        public Double getCol2() {
            return col2;
        }

        public void setCol2(Double col2) {
            this.col2 = col2;
        }

        public Float getCol3() {
            return col3;
        }

        public void setCol3(Float col3) {
            this.col3 = col3;
        }

        public Integer getCol4() {
            return col4;
        }

        public void setCol4(Integer col4) {
            this.col4 = col4;
        }

        public Long getCol5() {
            return col5;
        }

        public void setCol5(Long col5) {
            this.col5 = col5;
        }

        public Short getCol6() {
            return col6;
        }

        public void setCol6(Short col6) {
            this.col6 = col6;
        }

        public String getCol7() {
            return col7;
        }

        public void setCol7(String col7) {
            this.col7 = col7;
        }

        public Byte getCol8() {
            return col8;
        }

        public void setCol8(Byte col8) {
            this.col8 = col8;
        }

        public HBaseRecord(Integer i, String s) {
            col0 = String.format("row%03d", i);
            col1 = i % 2 == 0;
            col2 = Double.valueOf(i);
            col3 = Float.valueOf(i);
            col4 = i;
            col5 = Long.valueOf(i);
            col6 = i.shortValue();
            col7 = "String:" + s;
            col8 = i.byteValue();
        }

        @Override
        public String toString() {
            return "HBaseRecord{" +
                    "col0='" + col0 + '\'' +
                    ", col1=" + col1 +
                    ", col2=" + col2 +
                    ", col3=" + col3 +
                    ", col4=" + col4 +
                    ", col5=" + col5 +
                    ", col6=" + col6 +
                    ", col7='" + col7 + '\'' +
                    ", col8=" + col8 +
                    '}';
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末试读,一起剝皮案震驚了整個(gè)濱河市杠纵,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌钩骇,老刑警劉巖比藻,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異倘屹,居然都是意外死亡银亲,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門纽匙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來务蝠,“玉大人,你說我怎么就攤上這事烛缔×蠖危” “怎么了赠尾?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長毅弧。 經(jīng)常有香客問我气嫁,道長,這世上最難降的妖魔是什么够坐? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任寸宵,我火速辦了婚禮,結(jié)果婚禮上元咙,老公的妹妹穿的比我還像新娘梯影。我一直安慰自己,他們只是感情好庶香,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布甲棍。 她就那樣靜靜地躺著,像睡著了一般赶掖。 火紅的嫁衣襯著肌膚如雪感猛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天奢赂,我揣著相機(jī)與錄音陪白,去河邊找鬼。 笑死膳灶,一個(gè)胖子當(dāng)著我的面吹牛咱士,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播轧钓,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼序厉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了毕箍?” 一聲冷哼從身側(cè)響起弛房,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎霉晕,沒想到半個(gè)月后庭再,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡牺堰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年拄轻,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片伟葫。...
    茶點(diǎn)故事閱讀 39,965評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡恨搓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情斧抱,我是刑警寧澤常拓,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站辉浦,受9級特大地震影響弄抬,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜宪郊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一掂恕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧弛槐,春花似錦懊亡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至叹誉,卻和暖如春鸯两,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背桂对。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工甩卓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留鸠匀,地道東北人蕉斜。 一個(gè)月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像缀棍,于是被迫代替她去往敵國和親宅此。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容