@[toc]
參考文檔 : https://hbase.apache.org/book.html#_sparksql_dataframes
簡介
hbase-spark integration使用了Spark-1.2.0中引入的DataSource API (SPARK-3247), 它在簡單的HBase KV存儲和復(fù)雜的關(guān)系SQL查詢之間架起橋梁,使用戶能夠使用Spark在HBase上執(zhí)行復(fù)雜的數(shù)據(jù)分析工作。HBase數(shù)據(jù)幀是一個(gè)標(biāo)準(zhǔn)的Spark數(shù)據(jù)幀,能夠與Hive宛逗、ORC找颓、Parquet椿胯、JSON等任何其他數(shù)據(jù)源交互烤黍。HBase Spark集成應(yīng)用了諸如分區(qū)修剪、列修剪量瓜、謂詞下推和數(shù)據(jù)位置等關(guān)鍵技術(shù)。
要使用hbase-spark integration connector途乃,用戶需要為HBase和Spark表之間的模式映射定義Catalog绍傲,準(zhǔn)備數(shù)據(jù)并填充HBase表,然后加載HBase數(shù)據(jù)幀耍共。之后烫饼,用戶可以使用SQL查詢來集成查詢和訪問HBase表中的記錄。
打包生成hbase-spark庫
使用hbase-spark integration需要hbase-spark庫
找了半天沒有找到最新的那個(gè)包, 所以自己去github上面下載代碼打包, 然后安裝到本地倉庫
git clone https://github.com/apache/hbase-connectors.git
cd hbase-connectors/spark/hbase-spark
mvn -Dspark.version=2.4.3 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install
然后在項(xiàng)目pom.xml中添加依賴
<dependency>
<groupId>org.apache.hbase.connectors.spark</groupId>
<artifactId>hbase-spark</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.4</version>
</dependency>
解決訪問Hbase問題
執(zhí)行代碼時(shí)出現(xiàn)錯(cuò)誤:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/fs/HFileSystem
at cn.com.sjfx.sparkappdemo.Application.main(Application.java:27)
... 6 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.fs.HFileSystem
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
這是因?yàn)閟park無法訪問hbase中的庫造成的, 需要在制作鏡像的時(shí)候把hbase的庫加入到spark中,
修改Dockerfile, 增加如下內(nèi)容:
COPY /hbase-lib/* /spark/jars/
讀寫Hbase
public class Application {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("demo");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SparkSession sparkSession = SparkSession.builder()
.sparkContext(jsc.sc())
.getOrCreate();
//設(shè)置要訪問的hbase的zookeeper
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "192.168.1.22:15301,192.168.1.22:15302,192.168.1.22:15303");
//一定要?jiǎng)?chuàng)建這個(gè)hbaseContext, 因?yàn)楹竺鎸懭霑r(shí)會用到它
HBaseContext hBaseContext=new HBaseContext(jsc.sc(),configuration,null);
//創(chuàng)建一個(gè)測試用的RDD
List<Integer> data = new ArrayList<>();
for (int i = 0; i < 256; i++) {
data.add(i);
}
JavaRDD<Integer> rdd = jsc.parallelize(data);
JavaRDD<HBaseRecord> rdd1 = rdd.map(i -> new HBaseRecord(i, "extra"));
rdd1.collect().forEach(System.out::println);
//根據(jù)RDD創(chuàng)建數(shù)據(jù)幀
Dataset<Row> df = sparkSession.createDataFrame(rdd1, HBaseRecord.class);
//定義映射的catalog
String catalog = "{" +
" \"table\":{\"namespace\":\"default\", \"name\":\"table1\"}," +
" \"rowkey\":\"key\"," +
" \"columns\":{" +
" \"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
" \"col1\":{\"cf\":\"cf1\", \"col\":\"col1\", \"type\":\"boolean\"}," +
" \"col2\":{\"cf\":\"cf2\", \"col\":\"col2\", \"type\":\"double\"}," +
" \"col3\":{\"cf\":\"cf3\", \"col\":\"col3\", \"type\":\"float\"}," +
" \"col4\":{\"cf\":\"cf4\", \"col\":\"col4\", \"type\":\"int\"}," +
" \"col5\":{\"cf\":\"cf5\", \"col\":\"col5\", \"type\":\"bigint\"}," +
" \"col6\":{\"cf\":\"cf6\", \"col\":\"col6\", \"type\":\"smallint\"}," +
" \"col7\":{\"cf\":\"cf7\", \"col\":\"col7\", \"type\":\"string\"}," +
" \"col8\":{\"cf\":\"cf8\", \"col\":\"col8\", \"type\":\"tinyint\"}" +
" }" +
" }";
//寫入數(shù)據(jù)
df.write()
.format("org.apache.hadoop.hbase.spark")
.option(HBaseTableCatalog.tableCatalog(), catalog)
.option(HBaseTableCatalog.newTable(), "5") //寫入到5個(gè)分區(qū)
.mode(SaveMode.Overwrite) // 覆蓋模式
.save();
//讀取數(shù)據(jù)
Dataset<Row> df2 = sparkSession.read()
.format("org.apache.hadoop.hbase.spark")
.option(HBaseTableCatalog.tableCatalog(), catalog)
.load();
System.out.println("read result: ");
df2.show();
}
//類需要可序列化
public static class HBaseRecord implements Serializable {
private static final long serialVersionUID = 4331526295356820188L;
//屬性一定要getter/setter, 即使是public
public String col0;
public Boolean col1;
public Double col2;
public Float col3;
public Integer col4;
public Long col5;
public Short col6;
public String col7;
public Byte col8;
public String getCol0() {
return col0;
}
public void setCol0(String col0) {
this.col0 = col0;
}
public Boolean getCol1() {
return col1;
}
public void setCol1(Boolean col1) {
this.col1 = col1;
}
public Double getCol2() {
return col2;
}
public void setCol2(Double col2) {
this.col2 = col2;
}
public Float getCol3() {
return col3;
}
public void setCol3(Float col3) {
this.col3 = col3;
}
public Integer getCol4() {
return col4;
}
public void setCol4(Integer col4) {
this.col4 = col4;
}
public Long getCol5() {
return col5;
}
public void setCol5(Long col5) {
this.col5 = col5;
}
public Short getCol6() {
return col6;
}
public void setCol6(Short col6) {
this.col6 = col6;
}
public String getCol7() {
return col7;
}
public void setCol7(String col7) {
this.col7 = col7;
}
public Byte getCol8() {
return col8;
}
public void setCol8(Byte col8) {
this.col8 = col8;
}
public HBaseRecord(Integer i, String s) {
col0 = String.format("row%03d", i);
col1 = i % 2 == 0;
col2 = Double.valueOf(i);
col3 = Float.valueOf(i);
col4 = i;
col5 = Long.valueOf(i);
col6 = i.shortValue();
col7 = "String:" + s;
col8 = i.byteValue();
}
@Override
public String toString() {
return "HBaseRecord{" +
"col0='" + col0 + '\'' +
", col1=" + col1 +
", col2=" + col2 +
", col3=" + col3 +
", col4=" + col4 +
", col5=" + col5 +
", col6=" + col6 +
", col7='" + col7 + '\'' +
", col8=" + col8 +
'}';
}
}
}