一告匠、背景
在用戶畫像的系統(tǒng)中,需要將用戶ID的拉通結(jié)果表和用戶標(biāo)簽的結(jié)果表存入Hbase中。
組件如下:
1. Spark 2.0
2. hbase 1.2
3. hadoop 2.6
因而提出以下幾個問題:
1. 如何使用Spark 大批量地晾匠、快速地導(dǎo)入初始化數(shù)據(jù)?
2. 如何從Hbase 快速地梯刚、大批量地查詢數(shù)據(jù)凉馆?
3. Hbase 的的存取效率和RowKey 設(shè)計(jì),Region個數(shù)之間的關(guān)系亡资?
4. 如何根據(jù)RowKey快速地澜共、大批量地刪除數(shù)據(jù)?
5. 如何解決版本兼容問題锥腻?
6. 如何實(shí)現(xiàn)Hbase Join 運(yùn)算嗦董?
Spark 作為內(nèi)存計(jì)算引擎強(qiáng)于計(jì)算,Hbase作為KV 數(shù)據(jù)存儲強(qiáng)于存儲瘦黑。在數(shù)據(jù)倉庫項(xiàng)目中京革,一般會將數(shù)據(jù)存儲為parquet 來支持?jǐn)?shù)據(jù)查詢分析和后續(xù)應(yīng)用奇唤,但包括hive,parquet 以及Spark 本身不支持對數(shù)據(jù)更新。因此匹摇,是否可以將Spark 視為數(shù)據(jù)庫計(jì)算引擎咬扇,而Hbase 視為數(shù)據(jù)庫底層的數(shù)據(jù)存儲結(jié)構(gòu)。從而實(shí)現(xiàn)一個基于Spark+Hbase 的支持?jǐn)?shù)據(jù)更新和快速檢索的“關(guān)系型數(shù)據(jù)庫”来惧?
二冗栗、實(shí)現(xiàn)
1. 使用Spark 批量導(dǎo)入數(shù)據(jù)到Hbase
方案一:使用BulkPut, 通過Spark partition 并發(fā)的put 數(shù)據(jù)到Hbase
方案二:使用Spark rdd 算子:saveAsNewAPIHadoopDataset
方案三:預(yù)生成Hfile 文件,使用doBulkLoad 加載到Hbase Region
3.0 版本的Hbase 由于已經(jīng)集成了Spark-on hbase供搀。同時支持方案一和方案二隅居。
若是sbt 項(xiàng)目,可再build.sbt 中配置如下依賴
"org.apache.hbase" % "hbase-spark" % "2.0.0-alpha3"
然后調(diào)用對應(yīng)的API即可葛虐√ピ矗或者參考https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark/2.0.0-alpha3 路徑配置最新的依賴。
但是可能會存在問題:
- 當(dāng)前hbase 集群版本為 1.2.0 屿脐。在集成Spark On Hbase 包之后涕蚤,該依賴包最低支持的hbase 2.0.0-alpha-1 因此會存在版本兼容的問題。
- spark On hbase 引用了hbase 服務(wù)端的類的诵,這些類通過依賴無法下載
- spark on hbase 是用Logging 類spark 1.3 以后的版本已經(jīng)不在支持万栅。
- 目前不需要應(yīng)用spark-stream on hbase, 代碼和依賴很冗余
因此需要對 hbase-Spark 進(jìn)行改造,以便支持在目前的版本下運(yùn)行西疤。
從hbase-spark 源碼和實(shí)例中可看到批量的操作都基于一個類:
因而重寫HbaseContext
2. Spark 從Hbase 快速檢索的方法
方案一:通過并發(fā)得Get 檢索數(shù)據(jù)烦粒,Spark 對數(shù)據(jù)分區(qū),創(chuàng)建Get,獲取數(shù)據(jù)
方案二:通過Scan 全表掃描Hbase 數(shù)據(jù)
BulkGet 的思路是對Spark RDD 數(shù)據(jù)分區(qū)代赁,構(gòu)建檢索的Hbase RowKey,
再充分利用Spark天然的分布式內(nèi)存計(jì)算優(yōu)勢扰她,并發(fā)的Get 數(shù)據(jù)。當(dāng)需要平衡好 從Hbase 中沒批返回得數(shù)據(jù)量芭碍。過多會造成網(wǎng)絡(luò)擁塞徒役,過少會造成頻繁得創(chuàng)建Hbase RPC,數(shù)據(jù)返回緩慢窖壕。
Scan 得思路是創(chuàng)建和Hbase 表region 個數(shù)一致的 spark 分區(qū)忧勿,每個分區(qū)接受一個region 的數(shù)據(jù)。通過關(guān)閉Scan 過程 參數(shù)setCacheBlocks(false) 達(dá)到近視直接掃描Hbase 底層Hfile 文件的性能瞻讽。
3. Hbase Join 的問題
方案一:通過bulkGet 獲取指定數(shù)據(jù)嵌套循環(huán)或廣播
方案二:通過快速Scan 利用SparkSQL 實(shí)現(xiàn)Join 運(yùn)算
如果Scan的足夠快狐蜕,讀取Hbase 數(shù)據(jù)如同讀取parquet. 因此,提高Scan 速度卸夕,再轉(zhuǎn)換為Spark SQL 就可以解決Hbase 不支持join 運(yùn)算的問題
封裝HbaseETL
功能特色
- Spark 快速大批量得存取 Hbase
- 支持隱式的RDD 調(diào)用
- Hbase的快速掃描和SparkSQL 實(shí)現(xiàn)Hbase Join
性能說明:
在一個 3臺 64c,128G 內(nèi)存上的hbase 集群上測試:
- BulkLoad 一個40G 文件 4分鐘(regions = 50實(shí)際時間和region 個數(shù)有關(guān))
- bulkGet 10000000 條數(shù)據(jù)從1 的表中時間為 1 分鐘
- bulkDelete 10000000 調(diào)數(shù)據(jù)從1 的表中時間為 1 分鐘
使用方法
sbt 打包引入到項(xiàng)目中层释,參照 HbaseSuit 實(shí)現(xiàn)
使用場景
- Hbase 作為前端數(shù)據(jù)快速檢索的數(shù)據(jù)庫
- 數(shù)據(jù)源為hive 表
- 數(shù)據(jù)源為關(guān)系型數(shù)據(jù)庫
- 參考DataBaseSuit.scala 實(shí)現(xiàn)
例如將hive 表的數(shù)據(jù)增量寫入到Hbase
def insertOrUpdate = {
val rdd = spark.read.table("").rdd
hc.bulkLoadThinRows[Row](rdd,
tableName,
r => {
val rawPK = new StringBuilder
for(c<- table_PK) rawPK.append(r.getAs[String](c))
val rk = rowKeyByMD5(rawPK.toString)
val familyQualifiersValues = new FamiliesQualifiersValues
val fq = familyQualifierToByte
for(c<- fq) {
val family = c._1
val qualifier = c._2
val value = Bytes.toBytes(r.getAs[String](c._3))
familyQualifiersValues += (family, qualifier, value)
}
(new ByteArrayWrapper(rk), familyQualifiersValues)
},
10)
}
- Hbase 作為支持?jǐn)?shù)據(jù)檢索、更新的Spark運(yùn)行數(shù)據(jù)庫
- bulkLoad 更新
- bulkGet 查詢快集,Spark SQL Join 解決Hbase 不支持Join 的問題
- BulkDelete 數(shù)據(jù)刪除
- 參考 HbaseSuit.scala 實(shí)現(xiàn)
例如向Hbase 批量導(dǎo)入數(shù)據(jù)
def initDate() = {
// 清空贡羔,并重新創(chuàng)建表
createTable
// 準(zhǔn)備數(shù)據(jù)廉白,rdd 處理
import spark.implicits._
val rdd = spark.sql("select * from hive.graph").map(x => {
val sid = x.getString(0)
val id = x.getString(1)
val idType = x.getString(3)
(sid, id, idType)
}).rdd
// bulk load
hc.bulkLoadThinRows[(String, String, String)](rdd,
"lenovo:GRAPH",
t => {
val rowKey = rowKeyByMD5(t._2, t._3)
val familyQualifiersValues = new FamiliesQualifiersValues
val pk = t._2 + "|" + t._3
// Hbase 存入兩列,一列 PK 存 業(yè)務(wù)主鍵乖寒,一列 s 存 superid
val column = List(("pk", pk), ("s", t._1))
column.foreach(f => {
val family: Array[Byte] = Bytes.toBytes(columnFamily.head)
val qualifier = Bytes.toBytes(f._1)
val value: Array[Byte] = Bytes.toBytes(f._2)
familyQualifiersValues += (family, qualifier, value)
})
(new ByteArrayWrapper(rowKey), familyQualifiersValues)
},
10
)
}
- ETL工具
- 封裝的初始程序
- bulkGet
- bulkDelete
例如作為ETL工具操作Hbase
// Hbase 表定義
val nameSpace = "lenovo"
val tableName = "GRAPH"
val columnFamily = Seq("cf")
// 獲取源表得Schema 信息
val columns = spark.sql("select * from hive.graph").schema.map(_.name)
val schema = Schema(nameSpace,tableName,columnFamily,columns,50)
val data = spark.sql("select * from hive.graph").rdd
// (sid:String,id:String,idType:String)
// 創(chuàng)建Hbase Table 實(shí)例
val ht = HbaseTable(spark,hc,schema)
// 初始化數(shù)據(jù)測試
ht.tableInit[Row](data,mkRowKey,mkHbaseRow)
// 構(gòu)造HbaseTable的rowkey 規(guī)則
def mkRowKey(r:Row):Array[Byte] = {
// 業(yè)務(wù)要求 id+idtype 的Md5 作為主鍵
val rawRK = r.getAs[String]("id") + r.getAs[String]("idType")
rowKeyByMD5(rawRK)
}
// 構(gòu)造HbaseTable的row的規(guī)則
def mkHbaseRow(r:Row):FamiliesQualifiersValues = {
val rk = this.mkRowKey(r)
val familyQualifiersValues = new FamiliesQualifiersValues
var i = 0
for(c<-schema.familyQualifierToByte.toList) {
val family = c._1
val qualifier = c._2
val value: Array[Byte] = schema.strToBytes(r.getString(i))
familyQualifiersValues += (family, qualifier, value)
i = i + 1
}
familyQualifiersValues
}
- Hbase Join 問題
- 快速的Scan
- 使用Spark SQL解決Hbase Join 問題
例如
// 快速Scan 獲取Hbase 數(shù)據(jù)
ht.tableInit[Row](data, mkRowKey, mkHbaseRow)
//SparkSQL 實(shí)現(xiàn) Join
import spark.implicits._
// fixme Scan 返回得結(jié)構(gòu)為SparkRow
val t1 = ht.hbaseScan.toDF("id","idtype")
val t2 = spark.sql("select * from t1")
val join = t1.join(t2,Seq("id","idtype"))
聲明
- 該ETL工具改造自 hbase-Spark 并對其中BulkLoad 方法重新實(shí)現(xiàn)
- 如有任務(wù)問題請聯(lián)系作者郵箱 huanghl0817@gmail.com
Github 上實(shí)現(xiàn):https://github.com/Smallhi/example