前言
之前因?yàn)閮H僅是把HBase當(dāng)成一個(gè)可橫向擴(kuò)展并且具有持久化能力的KV數(shù)據(jù)庫(kù)龙致,所以只用在了指標(biāo)存儲(chǔ)上势告,參看很早之前的一篇文章基于HBase做Storm 實(shí)時(shí)計(jì)算指標(biāo)存儲(chǔ)啦逆。這次將HBase用在了用戶行為存儲(chǔ)上百姓,因?yàn)镽owkey的過(guò)濾功能也很不錯(cuò),可以很方便的把按人或者內(nèi)容的維度過(guò)濾出所有的行為瓢阴。從某種意義上畅蹂,HBase的是一個(gè)有且僅有一個(gè)多字段復(fù)合索引的存儲(chǔ)引擎。
雖然我比較推崇實(shí)時(shí)計(jì)算荣恐,不過(guò)補(bǔ)數(shù)據(jù)或者計(jì)算歷史數(shù)據(jù)啥的液斜,批處理還是少不了的。對(duì)于歷史數(shù)據(jù)的計(jì)算叠穆,其實(shí)我是有兩個(gè)選擇的少漆,一個(gè)是基于HBase的已經(jīng)存儲(chǔ)好的行為數(shù)據(jù)進(jìn)行計(jì)算,或者基于Hive的原始數(shù)據(jù)進(jìn)行計(jì)算硼被,最終選擇了前者示损,這就涉及到Spark(StreamingPro) 對(duì)HBase的批處理操作了。
整合過(guò)程
和Spark 整合嚷硫,意味著最好能有Schema(Mapping),因?yàn)镈ataframe 以及SQL API 都要求你有Schema检访。 遺憾的是HBase 有沒有Schema取決于使用者和場(chǎng)景。通常SparkOnHBase的庫(kù)都要求你定義一個(gè)Mapping(Schema),比如hortonworks的 SHC(https://github.com/hortonworks-spark/shc) 就要求你定義一個(gè)如下的配置:
{
"rowkey":"key",
"table":{"namespace":"default", "name":"pi_user_log", "tableCoder":"PrimitiveType"},
"columns":{"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"f","col":"col1", "type":"string"}
}
}
看上面的定義已經(jīng)還是很容易看出來(lái)的论巍。對(duì)HBase的一個(gè)列族和列取一個(gè)名字烛谊,這樣就可以在Spark的DataSource API使用了,關(guān)于如何開發(fā)Spark DataSource API可以參考我的這篇文章利用 Spark DataSource API 實(shí)現(xiàn)Rest數(shù)據(jù)源中使用嘉汰,SHC大體實(shí)現(xiàn)的就是這個(gè)API。現(xiàn)在你可以這么用了:
val cat = "{\n\"rowkey\":\"key\",\"table\":{\"namespace\":\"default\", \"name\":\"pi_user_log\", \"tableCoder\":\"PrimitiveType\"},\n\"columns\":{\"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},\n\"28360592\":{\"cf\":\"f\",\"col\":\"28360592\", \"type\":\"string\"}\n}\n}"
val cc = sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
不過(guò)當(dāng)你有成千上萬(wàn)個(gè)列状勤,那么這個(gè)就無(wú)解了鞋怀,你不大可能一一定義,而且很多時(shí)候使用者也不知道會(huì)有哪些列持搜,列名甚至可能是一個(gè)時(shí)間戳密似。我們現(xiàn)在好幾種情況都遇到了,所以都需要解決:
- 自動(dòng)獲取HBase里所有的列形成Schema,這樣就不需要用戶配置了葫盼。
- 規(guī)定HBase只有兩個(gè)列残腌,一個(gè)rowkey,一個(gè) content,content 是一個(gè)map,包含所有以列族+列名為key,對(duì)應(yīng)內(nèi)容為value。
先說(shuō)說(shuō)第二種方案(因?yàn)槠鋵?shí)第一種方案也要依賴于第二種方案):
{
"name": "batch.sources",
"params": [
{
"inputTableName": "log1",
"format": "org.apache.spark.sql.execution.datasources.hbase.raw",
"path": "-",
"outputTable": "log1"
}
]
},
{
"name": "batch.sql",
"params": [
{
"sql": "select rowkey,json_value_collect(content) as actionList from log1",
"outputTableName":"finalTable"
}
]
},
首先我們配置了一個(gè)HBase的表抛猫,叫l(wèi)og1,當(dāng)然蟆盹,這里是因?yàn)槌绦蛲ㄟ^(guò)hbase-site.xml獲得HBase的鏈接,所以配置上你看不到HBase相關(guān)的信息闺金。接著呢逾滥,在SQL 里你就可以對(duì)content 做處理了。我這里是把content 轉(zhuǎn)化成了JSON格式字符串败匹。再之后你就可以自己寫一個(gè)UDF函數(shù)之類的做處理了寨昙,從而實(shí)現(xiàn)你復(fù)雜的業(yè)務(wù)邏輯。我們其實(shí)每個(gè)字段里存儲(chǔ)的都是JSON掀亩,所以我其實(shí)不關(guān)心列名舔哪,只要讓我拿到所有的列就好。而上面的例子正好能夠滿足我這個(gè)需求了槽棍。
而且實(shí)現(xiàn)這個(gè)HBase DataSource 也很簡(jiǎn)單捉蚤,核心邏輯大體如下:
case class HBaseRelation(
parameters: Map[String, String],
userSpecifiedschema: Option[StructType]
)(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan with Logging {
val hbaseConf = HBaseConfiguration.create()
def buildScan(): RDD[Row] = {
hbaseConf.set(TableInputFormat.INPUT_TABLE, parameters("inputTableName"))
val hBaseRDD = sqlContext.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
.map { line =>
val rowKey = Bytes.toString(line._2.getRow)
import net.liftweb.{json => SJSon}
implicit val formats = SJSon.Serialization.formats(SJSon.NoTypeHints)
val content = line._2.getMap.navigableKeySet().flatMap { f =>
line._2.getFamilyMap(f).map { c =>
(Bytes.toString(f) + ":" + Bytes.toString(c._1), Bytes.toString(c._2))
}
}.toMap
val contentStr = SJSon.Serialization.write(content)
Row.fromSeq(Seq(UTF8String.fromString(rowKey), UTF8String.fromString(contentStr)))
}
hBaseRDD
}
}
那么我們回過(guò)頭來(lái),如何讓Spark自動(dòng)發(fā)現(xiàn)Schema呢刹泄?大體你還是需要過(guò)濾所有數(shù)據(jù)得到列的合集外里,然后形成Schema的,成本開銷很大特石。我們也可以先將我們的數(shù)據(jù)轉(zhuǎn)化為JSON格式盅蝗,然后就可以利用Spark已經(jīng)支持的JSON格式來(lái)自動(dòng)推倒Schema的能力了。
總體而言姆蘸,其實(shí)并不太鼓勵(lì)大家使用Spark 對(duì)HBase進(jìn)行批處理墩莫,因?yàn)檫@很容易讓HBase過(guò)載,比如內(nèi)存溢出導(dǎo)致RegionServer 掛掉,最遺憾的地方是一旦RegionServer 掛掉了逞敷,會(huì)有一段時(shí)間讀寫不可用狂秦,而HBase 又很容易作為實(shí)時(shí)在線程序的存儲(chǔ),所以影響很大推捐。