Spark與HBase的整合

前言

之前因?yàn)閮H僅是把HBase當(dāng)成一個(gè)可橫向擴(kuò)展并且具有持久化能力的KV數(shù)據(jù)庫(kù)龙致,所以只用在了指標(biāo)存儲(chǔ)上势告,參看很早之前的一篇文章基于HBase做Storm 實(shí)時(shí)計(jì)算指標(biāo)存儲(chǔ)啦逆。這次將HBase用在了用戶行為存儲(chǔ)上百姓,因?yàn)镽owkey的過(guò)濾功能也很不錯(cuò),可以很方便的把按人或者內(nèi)容的維度過(guò)濾出所有的行為瓢阴。從某種意義上畅蹂,HBase的是一個(gè)有且僅有一個(gè)多字段復(fù)合索引的存儲(chǔ)引擎。

雖然我比較推崇實(shí)時(shí)計(jì)算荣恐,不過(guò)補(bǔ)數(shù)據(jù)或者計(jì)算歷史數(shù)據(jù)啥的液斜,批處理還是少不了的。對(duì)于歷史數(shù)據(jù)的計(jì)算叠穆,其實(shí)我是有兩個(gè)選擇的少漆,一個(gè)是基于HBase的已經(jīng)存儲(chǔ)好的行為數(shù)據(jù)進(jìn)行計(jì)算,或者基于Hive的原始數(shù)據(jù)進(jìn)行計(jì)算硼被,最終選擇了前者示损,這就涉及到Spark(StreamingPro) 對(duì)HBase的批處理操作了。

整合過(guò)程

和Spark 整合嚷硫,意味著最好能有Schema(Mapping),因?yàn)镈ataframe 以及SQL API 都要求你有Schema检访。 遺憾的是HBase 有沒有Schema取決于使用者和場(chǎng)景。通常SparkOnHBase的庫(kù)都要求你定義一個(gè)Mapping(Schema),比如hortonworks的 SHC(https://github.com/hortonworks-spark/shc) 就要求你定義一個(gè)如下的配置:

{
"rowkey":"key",
"table":{"namespace":"default", "name":"pi_user_log", "tableCoder":"PrimitiveType"},
"columns":{"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"f","col":"col1", "type":"string"}
}
}

看上面的定義已經(jīng)還是很容易看出來(lái)的论巍。對(duì)HBase的一個(gè)列族和列取一個(gè)名字烛谊,這樣就可以在Spark的DataSource API使用了,關(guān)于如何開發(fā)Spark DataSource API可以參考我的這篇文章利用 Spark DataSource API 實(shí)現(xiàn)Rest數(shù)據(jù)源中使用嘉汰,SHC大體實(shí)現(xiàn)的就是這個(gè)API。現(xiàn)在你可以這么用了:

 val cat = "{\n\"rowkey\":\"key\",\"table\":{\"namespace\":\"default\", \"name\":\"pi_user_log\", \"tableCoder\":\"PrimitiveType\"},\n\"columns\":{\"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},\n\"28360592\":{\"cf\":\"f\",\"col\":\"28360592\", \"type\":\"string\"}\n}\n}"
    val cc = sqlContext
      .read
      .options(Map(HBaseTableCatalog.tableCatalog -> cat))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

不過(guò)當(dāng)你有成千上萬(wàn)個(gè)列状勤,那么這個(gè)就無(wú)解了鞋怀,你不大可能一一定義,而且很多時(shí)候使用者也不知道會(huì)有哪些列持搜,列名甚至可能是一個(gè)時(shí)間戳密似。我們現(xiàn)在好幾種情況都遇到了,所以都需要解決:

  1. 自動(dòng)獲取HBase里所有的列形成Schema,這樣就不需要用戶配置了葫盼。
  2. 規(guī)定HBase只有兩個(gè)列残腌,一個(gè)rowkey,一個(gè) content,content 是一個(gè)map,包含所有以列族+列名為key,對(duì)應(yīng)內(nèi)容為value。

先說(shuō)說(shuō)第二種方案(因?yàn)槠鋵?shí)第一種方案也要依賴于第二種方案):

{
        "name": "batch.sources",
        "params": [
          {
            "inputTableName": "log1",
            "format": "org.apache.spark.sql.execution.datasources.hbase.raw",
            "path": "-",
            "outputTable": "log1"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select rowkey,json_value_collect(content) as actionList from log1",
            "outputTableName":"finalTable"
          }
        ]
      },

首先我們配置了一個(gè)HBase的表抛猫,叫l(wèi)og1,當(dāng)然蟆盹,這里是因?yàn)槌绦蛲ㄟ^(guò)hbase-site.xml獲得HBase的鏈接,所以配置上你看不到HBase相關(guān)的信息闺金。接著呢逾滥,在SQL 里你就可以對(duì)content 做處理了。我這里是把content 轉(zhuǎn)化成了JSON格式字符串败匹。再之后你就可以自己寫一個(gè)UDF函數(shù)之類的做處理了寨昙,從而實(shí)現(xiàn)你復(fù)雜的業(yè)務(wù)邏輯。我們其實(shí)每個(gè)字段里存儲(chǔ)的都是JSON掀亩,所以我其實(shí)不關(guān)心列名舔哪,只要讓我拿到所有的列就好。而上面的例子正好能夠滿足我這個(gè)需求了槽棍。

而且實(shí)現(xiàn)這個(gè)HBase DataSource 也很簡(jiǎn)單捉蚤,核心邏輯大體如下:

case class HBaseRelation(
                          parameters: Map[String, String],
                          userSpecifiedschema: Option[StructType]
                        )(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan with Logging {

  val hbaseConf = HBaseConfiguration.create()


  def buildScan(): RDD[Row] = {
    hbaseConf.set(TableInputFormat.INPUT_TABLE, parameters("inputTableName"))
    val hBaseRDD = sqlContext.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
      .map { line =>
        val rowKey = Bytes.toString(line._2.getRow)

        import net.liftweb.{json => SJSon}
        implicit val formats = SJSon.Serialization.formats(SJSon.NoTypeHints)

        val content = line._2.getMap.navigableKeySet().flatMap { f =>
          line._2.getFamilyMap(f).map { c =>
            (Bytes.toString(f) + ":" + Bytes.toString(c._1), Bytes.toString(c._2))
          }
        }.toMap

        val contentStr = SJSon.Serialization.write(content)

        Row.fromSeq(Seq(UTF8String.fromString(rowKey), UTF8String.fromString(contentStr)))
      }
    hBaseRDD
  }
}

那么我們回過(guò)頭來(lái),如何讓Spark自動(dòng)發(fā)現(xiàn)Schema呢刹泄?大體你還是需要過(guò)濾所有數(shù)據(jù)得到列的合集外里,然后形成Schema的,成本開銷很大特石。我們也可以先將我們的數(shù)據(jù)轉(zhuǎn)化為JSON格式盅蝗,然后就可以利用Spark已經(jīng)支持的JSON格式來(lái)自動(dòng)推倒Schema的能力了。

總體而言姆蘸,其實(shí)并不太鼓勵(lì)大家使用Spark 對(duì)HBase進(jìn)行批處理墩莫,因?yàn)檫@很容易讓HBase過(guò)載,比如內(nèi)存溢出導(dǎo)致RegionServer 掛掉,最遺憾的地方是一旦RegionServer 掛掉了逞敷,會(huì)有一段時(shí)間讀寫不可用狂秦,而HBase 又很容易作為實(shí)時(shí)在線程序的存儲(chǔ),所以影響很大推捐。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末裂问,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子牛柒,更是在濱河造成了極大的恐慌堪簿,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件皮壁,死亡現(xiàn)場(chǎng)離奇詭異椭更,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)蛾魄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門虑瀑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)湿滓,“玉大人,你說(shuō)我怎么就攤上這事舌狗∵窗拢” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵把夸,是天一觀的道長(zhǎng)而线。 經(jīng)常有香客問(wèn)我,道長(zhǎng)恋日,這世上最難降的妖魔是什么膀篮? 我笑而不...
    開封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮岂膳,結(jié)果婚禮上誓竿,老公的妹妹穿的比我還像新娘。我一直安慰自己谈截,他們只是感情好筷屡,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著簸喂,像睡著了一般毙死。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上喻鳄,一...
    開封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天扼倘,我揣著相機(jī)與錄音,去河邊找鬼除呵。 笑死再菊,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的颜曾。 我是一名探鬼主播纠拔,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼泛豪!你這毒婦竟也來(lái)了稠诲?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤诡曙,失蹤者是張志新(化名)和其女友劉穎吕粹,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體岗仑,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年聚请,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了荠雕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稳其。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖炸卑,靈堂內(nèi)的尸體忽然破棺而出既鞠,到底是詐尸還是另有隱情,我是刑警寧澤盖文,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布嘱蛋,位于F島的核電站,受9級(jí)特大地震影響五续,放射性物質(zhì)發(fā)生泄漏洒敏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一疙驾、第九天 我趴在偏房一處隱蔽的房頂上張望凶伙。 院中可真熱鬧,春花似錦它碎、人聲如沸函荣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)傻挂。三九已至,卻和暖如春挖息,著一層夾襖步出監(jiān)牢的瞬間金拒,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工旋讹, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留殖蚕,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓沉迹,卻偏偏與公主長(zhǎng)得像睦疫,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子鞭呕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容