一据沈、前言
在大數據領域,海量存儲與快速檢索方面HBase早已有了自己的一席之地鸠珠。MapReduce計算框架早已對接了HBase巍耗,以HBase作為數據源,完成批量數據的讀寫渐排。而Hive默認底層以MapReduce作為計算引擎炬太,支持 以HBase作為外部表,通過HQL對HBase中的數據進行分析驯耻,Hive On HBase 也是很好的滿足在某些場景下通過SQL對HBase表中的數據進行分析亲族。
如今即MapReduce之后,Spark在大數據領域有著舉足輕重的地位吓歇,無論是跑批孽水,流處理,甚至圖計算等都有它的用武之地城看。因此類似于Hive On HBase這種通過SQL的方式對HBase數據做交互式分析女气。Spark SQL On HBase成為不少用戶的需求。而截至目前Spark并未提供已HBase最為數據源测柠。
二炼鞠、Spark SQL On HBase社區(qū)相關的進展
- hortonworks: Apache HBase Connector
- 華為: Fast SQL on HBase using SparkSQL
- cloudera: SparkOnHBase
三、如何使用Spark SQL On HBase
現在市面上的Spark對接HBase的方式多種多樣轰胁,根據個人感覺谒主,hortonworks公司的不錯,因此本文選擇hortonworks公司開源的對接方式赃阀。
以下是使用步驟:
- 編譯源碼
- 在源碼中找到編譯出來的jar,在提交作業(yè)時指定
- 在提交作業(yè)時霎肯,所使用的HBase jar,必須與編譯源碼時的HBase的版本對應
- 用過HBase用戶去認證
- 通過命令行提交用用程序
以spark-shell為例榛斯,提交應用程序:
spark-shell --master yarn --jars shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar
在spark-shell中先導入相關包观游,并引用sqlContext的命令:
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.{DataFrame, SparkSession}
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
再按如下步驟依次執(zhí)行:
- Define the catalog for the schema mapping:
def catalog = s"""{
|"table":{"namespace":"default","name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
- Prepare the data and populate the HBase table:
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte
)
object HBaseRecord {
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(
s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s”String$i: $t”,
i.toByte)
}
}
val data = (0 to 255).map { i => HBaseRecord(i, “extra”)}
sc.parallelize(data).
toDF.
write.
options(
Map(HBaseTableCatalog.tableCatalog -> catalog,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
注意:在spark-shell中使用粘貼模式(:paste)執(zhí)行case class HBaseRecord
以及object HBaseRecord
。
- Load the DataFrame:
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val df = withCatalog(catalog)
- Language integrated query:
val s = df.filter((($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005"||
$"col0" === "row020"||
$"col0" === "r20"||
$"col0" <= "row005") &&
($"col4" === 1 ||
$"col4" === 42))
.select("col0", "col1", "col4")
s.show
- SQL query:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show