Spark 操作hbase(構(gòu)建一個支持更新和快速檢索的數(shù)據(jù)庫)

一告匠、背景

在用戶畫像的系統(tǒng)中,需要將用戶ID的拉通結(jié)果表和用戶標(biāo)簽的結(jié)果表存入Hbase中。
組件如下:

  1. Spark 2.0
  2. hbase 1.2 
  3. hadoop 2.6

因而提出以下幾個問題:

1. 如何使用Spark 大批量地晾匠、快速地導(dǎo)入初始化數(shù)據(jù)?
2. 如何從Hbase 快速地梯刚、大批量地查詢數(shù)據(jù)凉馆?
3. Hbase 的的存取效率和RowKey 設(shè)計(jì),Region個數(shù)之間的關(guān)系亡资?
4. 如何根據(jù)RowKey快速地澜共、大批量地刪除數(shù)據(jù)?
5. 如何解決版本兼容問題锥腻?
6. 如何實(shí)現(xiàn)Hbase Join 運(yùn)算嗦董?

Spark 作為內(nèi)存計(jì)算引擎強(qiáng)于計(jì)算,Hbase作為KV 數(shù)據(jù)存儲強(qiáng)于存儲瘦黑。在數(shù)據(jù)倉庫項(xiàng)目中京革,一般會將數(shù)據(jù)存儲為parquet 來支持?jǐn)?shù)據(jù)查詢分析和后續(xù)應(yīng)用奇唤,但包括hive,parquet 以及Spark 本身不支持對數(shù)據(jù)更新。因此匹摇,是否可以將Spark 視為數(shù)據(jù)庫計(jì)算引擎咬扇,而Hbase 視為數(shù)據(jù)庫底層的數(shù)據(jù)存儲結(jié)構(gòu)。從而實(shí)現(xiàn)一個基于Spark+Hbase 的支持?jǐn)?shù)據(jù)更新和快速檢索的“關(guān)系型數(shù)據(jù)庫”来惧?

二冗栗、實(shí)現(xiàn)

1. 使用Spark 批量導(dǎo)入數(shù)據(jù)到Hbase

  方案一:使用BulkPut, 通過Spark partition 并發(fā)的put 數(shù)據(jù)到Hbase
  方案二:使用Spark rdd 算子:saveAsNewAPIHadoopDataset
  方案三:預(yù)生成Hfile 文件,使用doBulkLoad 加載到Hbase Region

3.0 版本的Hbase 由于已經(jīng)集成了Spark-on hbase供搀。同時支持方案一和方案二隅居。
若是sbt 項(xiàng)目,可再build.sbt 中配置如下依賴
"org.apache.hbase" % "hbase-spark" % "2.0.0-alpha3"
然后調(diào)用對應(yīng)的API即可葛虐√ピ矗或者參考https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark/2.0.0-alpha3 路徑配置最新的依賴。

但是可能會存在問題:

  1. 當(dāng)前hbase 集群版本為 1.2.0 屿脐。在集成Spark On Hbase 包之后涕蚤,該依賴包最低支持的hbase 2.0.0-alpha-1 因此會存在版本兼容的問題。
  2. spark On hbase 引用了hbase 服務(wù)端的類的诵,這些類通過依賴無法下載
  3. spark on hbase 是用Logging 類spark 1.3 以后的版本已經(jīng)不在支持万栅。
  4. 目前不需要應(yīng)用spark-stream on hbase, 代碼和依賴很冗余

因此需要對 hbase-Spark 進(jìn)行改造,以便支持在目前的版本下運(yùn)行西疤。
從hbase-spark 源碼和實(shí)例中可看到批量的操作都基于一個類:
因而重寫HbaseContext

2. Spark 從Hbase 快速檢索的方法

  方案一:通過并發(fā)得Get 檢索數(shù)據(jù)烦粒,Spark 對數(shù)據(jù)分區(qū),創(chuàng)建Get,獲取數(shù)據(jù)
  方案二:通過Scan 全表掃描Hbase 數(shù)據(jù)

BulkGet 的思路是對Spark RDD 數(shù)據(jù)分區(qū)代赁,構(gòu)建檢索的Hbase RowKey,
再充分利用Spark天然的分布式內(nèi)存計(jì)算優(yōu)勢扰她,并發(fā)的Get 數(shù)據(jù)。當(dāng)需要平衡好 從Hbase 中沒批返回得數(shù)據(jù)量芭碍。過多會造成網(wǎng)絡(luò)擁塞徒役,過少會造成頻繁得創(chuàng)建Hbase RPC,數(shù)據(jù)返回緩慢窖壕。
Scan 得思路是創(chuàng)建和Hbase 表region 個數(shù)一致的 spark 分區(qū)忧勿,每個分區(qū)接受一個region 的數(shù)據(jù)。通過關(guān)閉Scan 過程 參數(shù)setCacheBlocks(false) 達(dá)到近視直接掃描Hbase 底層Hfile 文件的性能瞻讽。

3. Hbase Join 的問題

  方案一:通過bulkGet 獲取指定數(shù)據(jù)嵌套循環(huán)或廣播
  方案二:通過快速Scan 利用SparkSQL 實(shí)現(xiàn)Join 運(yùn)算

如果Scan的足夠快狐蜕,讀取Hbase 數(shù)據(jù)如同讀取parquet. 因此,提高Scan 速度卸夕,再轉(zhuǎn)換為Spark SQL 就可以解決Hbase 不支持join 運(yùn)算的問題

封裝HbaseETL

功能特色

  1. Spark 快速大批量得存取 Hbase
  2. 支持隱式的RDD 調(diào)用
  3. Hbase的快速掃描和SparkSQL 實(shí)現(xiàn)Hbase Join

性能說明:

在一個 3臺 64c,128G 內(nèi)存上的hbase 集群上測試:

  1. BulkLoad 一個40G 文件 4分鐘(regions = 50實(shí)際時間和region 個數(shù)有關(guān))
  2. bulkGet 10000000 條數(shù)據(jù)從1 的表中時間為 1 分鐘
  3. bulkDelete 10000000 調(diào)數(shù)據(jù)從1 的表中時間為 1 分鐘

使用方法

sbt 打包引入到項(xiàng)目中层释,參照 HbaseSuit 實(shí)現(xiàn)

使用場景

  1. Hbase 作為前端數(shù)據(jù)快速檢索的數(shù)據(jù)庫
    • 數(shù)據(jù)源為hive 表
    • 數(shù)據(jù)源為關(guān)系型數(shù)據(jù)庫
    • 參考DataBaseSuit.scala 實(shí)現(xiàn)

例如將hive 表的數(shù)據(jù)增量寫入到Hbase

  def insertOrUpdate = {
    val rdd = spark.read.table("").rdd
    hc.bulkLoadThinRows[Row](rdd,
      tableName,
      r => {
        val rawPK = new StringBuilder
        for(c<- table_PK) rawPK.append(r.getAs[String](c))
        val rk = rowKeyByMD5(rawPK.toString)
        val familyQualifiersValues = new FamiliesQualifiersValues
        val fq = familyQualifierToByte
        for(c<- fq) {
          val family = c._1
          val qualifier = c._2
          val value = Bytes.toBytes(r.getAs[String](c._3))
          familyQualifiersValues += (family, qualifier, value)
        }
        (new ByteArrayWrapper(rk), familyQualifiersValues)
      },
      10)
  }
  1. Hbase 作為支持?jǐn)?shù)據(jù)檢索、更新的Spark運(yùn)行數(shù)據(jù)庫
    • bulkLoad 更新
    • bulkGet 查詢快集,Spark SQL Join 解決Hbase 不支持Join 的問題
    • BulkDelete 數(shù)據(jù)刪除
    • 參考 HbaseSuit.scala 實(shí)現(xiàn)

例如向Hbase 批量導(dǎo)入數(shù)據(jù)

def initDate() = {
    // 清空贡羔,并重新創(chuàng)建表
    createTable
    // 準(zhǔn)備數(shù)據(jù)廉白,rdd 處理
    import spark.implicits._
    val rdd = spark.sql("select * from hive.graph").map(x => {
      val sid = x.getString(0)
      val id = x.getString(1)
      val idType = x.getString(3)
      (sid, id, idType)
    }).rdd
    // bulk load
    hc.bulkLoadThinRows[(String, String, String)](rdd,
      "lenovo:GRAPH",
      t => {
        val rowKey = rowKeyByMD5(t._2, t._3)
        val familyQualifiersValues = new FamiliesQualifiersValues
        val pk = t._2 + "|" + t._3
        // Hbase 存入兩列,一列 PK 存 業(yè)務(wù)主鍵乖寒,一列 s 存 superid
        val column = List(("pk", pk), ("s", t._1))
        column.foreach(f => {
          val family: Array[Byte] = Bytes.toBytes(columnFamily.head)
          val qualifier = Bytes.toBytes(f._1)
          val value: Array[Byte] = Bytes.toBytes(f._2)
          familyQualifiersValues += (family, qualifier, value)
        })
        (new ByteArrayWrapper(rowKey), familyQualifiersValues)
      },
      10
    )
  }
  1. ETL工具
    • 封裝的初始程序
    • bulkGet
    • bulkDelete

例如作為ETL工具操作Hbase

// Hbase 表定義
  val nameSpace = "lenovo"
  val tableName = "GRAPH"
  val columnFamily = Seq("cf")
  // 獲取源表得Schema 信息
  val columns = spark.sql("select * from hive.graph").schema.map(_.name)
  val schema = Schema(nameSpace,tableName,columnFamily,columns,50)
  val data = spark.sql("select * from hive.graph").rdd
  // (sid:String,id:String,idType:String)
  // 創(chuàng)建Hbase Table 實(shí)例
   val ht = HbaseTable(spark,hc,schema)
  // 初始化數(shù)據(jù)測試
  ht.tableInit[Row](data,mkRowKey,mkHbaseRow)
  // 構(gòu)造HbaseTable的rowkey 規(guī)則
  def mkRowKey(r:Row):Array[Byte] = {
    // 業(yè)務(wù)要求 id+idtype 的Md5 作為主鍵
    val rawRK = r.getAs[String]("id") + r.getAs[String]("idType")
    rowKeyByMD5(rawRK)
  }
  // 構(gòu)造HbaseTable的row的規(guī)則
  def mkHbaseRow(r:Row):FamiliesQualifiersValues = {
    val rk = this.mkRowKey(r)
    val familyQualifiersValues = new FamiliesQualifiersValues
    var i = 0
    for(c<-schema.familyQualifierToByte.toList) {
      val family = c._1
      val qualifier = c._2
      val value: Array[Byte] = schema.strToBytes(r.getString(i))
      familyQualifiersValues += (family, qualifier, value)
      i = i + 1
    }
    familyQualifiersValues
  }
  1. Hbase Join 問題
    • 快速的Scan
    • 使用Spark SQL解決Hbase Join 問題

例如

 // 快速Scan 獲取Hbase 數(shù)據(jù)
 ht.tableInit[Row](data, mkRowKey, mkHbaseRow)
  
 //SparkSQL 實(shí)現(xiàn) Join 
  import spark.implicits._
  // fixme Scan 返回得結(jié)構(gòu)為SparkRow
  val t1 = ht.hbaseScan.toDF("id","idtype")
  val t2 = spark.sql("select * from t1")
  
  val join = t1.join(t2,Seq("id","idtype"))

聲明

  1. 該ETL工具改造自 hbase-Spark 并對其中BulkLoad 方法重新實(shí)現(xiàn)
  2. 如有任務(wù)問題請聯(lián)系作者郵箱 huanghl0817@gmail.com

Github 上實(shí)現(xiàn):https://github.com/Smallhi/example

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末猴蹂,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子楣嘁,更是在濱河造成了極大的恐慌磅轻,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件逐虚,死亡現(xiàn)場離奇詭異聋溜,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)叭爱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進(jìn)店門撮躁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人买雾,你說我怎么就攤上這事把曼。” “怎么了漓穿?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵嗤军,是天一觀的道長。 經(jīng)常有香客問我晃危,道長叙赚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任山害,我火速辦了婚禮,結(jié)果婚禮上沿量,老公的妹妹穿的比我還像新娘浪慌。我一直安慰自己,他們只是感情好朴则,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布权纤。 她就那樣靜靜地躺著,像睡著了一般乌妒。 火紅的嫁衣襯著肌膚如雪汹想。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天撤蚊,我揣著相機(jī)與錄音古掏,去河邊找鬼。 笑死侦啸,一個胖子當(dāng)著我的面吹牛槽唾,可吹牛的內(nèi)容都是我干的丧枪。 我是一名探鬼主播,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼庞萍,長吁一口氣:“原來是場噩夢啊……” “哼拧烦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起钝计,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤恋博,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后私恬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體债沮,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年践付,在試婚紗的時候發(fā)現(xiàn)自己被綠了秦士。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡永高,死狀恐怖隧土,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情命爬,我是刑警寧澤曹傀,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站饲宛,受9級特大地震影響皆愉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜艇抠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一幕庐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧家淤,春花似錦异剥、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至青伤,卻和暖如春督怜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背狠角。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工号杠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人丰歌。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓究流,卻偏偏與公主長得像辣吃,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子芬探,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,585評論 2 359

推薦閱讀更多精彩內(nèi)容