最近在做spark和hbase的相關(guān)項(xiàng)目牍白。暫且將其分為兩部分:一是利用spark streaming消費(fèi)前臺(tái)推到kafka中的消息溜族,進(jìn)行簡單處理后寫入到hbase;然后就是利用spark讀取hbase末捣,將結(jié)果組裝成json进每,再利用spark SQL進(jìn)行計(jì)算篷就。
介紹一下環(huán)境:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<org.scala.version>2.11.2</org.scala.version>
<org.spark.version>2.0.2</org.spark.version>
<org.kafka.version>0.10.2.1</org.kafka.version>
<org.apache.hbase.version>1.2.2</org.apache.hbase.version>
</properties>
這里先來討論后半部分射亏,spark讀取hbase近忙。
首先,了解過hbase的都應(yīng)該知道智润,它是一No SQL的非關(guān)系型數(shù)據(jù)及舍。與我們平時(shí)常見的MySQL和Oracle不同,No SQL最大的特點(diǎn)就是不支持事務(wù)窟绷,對(duì)于關(guān)系型數(shù)據(jù)庫輕松加隨意的join啊锯玛、groupby啊什么的,都不擅長兼蜈。不過hbase既然這么火攘残,肯定有其道理。我這里之所以采用它为狸,最重要的就是因?yàn)椋阂皇菙?shù)據(jù)量大歼郭,項(xiàng)目還沒上線,不過預(yù)測(cè)日增量有上百g辐棒,二來呢hbase提供了java api病曾,以前搞過,get和scan的效率還是很給力的漾根。再加上我們記錄的用戶行為信息泰涂,根本不需要更新操作,我只要能寫進(jìn)去辐怕,拿出來就行啦逼蒙。
廢話不多數(shù),下面上代碼:
先要組裝hbase client
先要引入配置文件
private val config = ConfigFactory.load()
private val conn = getConnection
具體的application.conf如下
spark{
master="local[*]"
appName="KafkaConsumer"
}
kafka {
topics = "topic007"
brokers = "192.168.1.97:9092,192.168.1.98:9092,192.168.1.99:9092,192.168.1.106:9092,192.168.1.107:9092,192.168.1.108:9092"
group = "groupid"
}
hbase{
port = "2181"
quorum = "master1.hadoop,slave2.hadoop,slave3.hadoop,slave4.hadoop,slave5.hadoop,slave6.hadoop"
tableName = "test"
}
大家根據(jù)自己的設(shè)置自行修改啊秘蛇。
然后就是hbase相關(guān)的api調(diào)用啦其做,具體如下:
/**
* 掃描HBase并返回結(jié)果
* @param tableName 表名
* @param filter 過濾條件
* @param startRow 起始行鍵
* @param stopRow 結(jié)束行鍵
* @return 掃描結(jié)果
*/
def scan(tableName: String, filter: Filter, startRow: String, stopRow: String): List[Map[String, String]] = {
val s = buildScan(filter, startRow, stopRow)
val t = conn.getTable(TableName.valueOf(tableName))
scan(t, s)
}
/**
* 執(zhí)行掃描
* @param table 表
* @param scan scan
*/
private def scan(table: Table, scan: Scan): List[Map[String, String]] = {
val scanner = table.getScanner(scan)
val ite = scanner.iterator()
val result = new ListBuffer[Map[String, String]]
while (ite.hasNext){
val map = new mutable.ListMap[String, String]
ite.next().listCells().foreach(c => map += readCell(c))
result += map.toMap
}
result.toList
}
/**
* 讀取單元格
* @param cell 單元格
*/
private def readCell(cell: Cell) = {
val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
val value = Bytes.toString(CellUtil.cloneValue(cell))
(qualifier, value)
}
/**
* 構(gòu)建Scan實(shí)例
* @param filter 過濾條件
* @param startRow 起始行鍵
* @param stopRow 結(jié)束行鍵
*/
private def buildScan(filter: Filter, startRow: String, stopRow: String): Scan ={
val scan = new Scan()
scan.setMaxVersions()
scan.setCaching(2000)
scan.setCacheBlocks(false)
if(filter != null)
scan.setFilter(filter)
if(startRow != null)
scan.setStartRow(Bytes.toBytes(startRow))
if(stopRow != null)
scan.setStopRow(Bytes.toBytes(stopRow))
scan
}
/**
* 獲取鏈接
*/
private def getConnection: Connection = {
val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM, config.getString("hbase.quorum"))
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, config.getString("hbase.port"))
ConnectionFactory.createConnection(conf)
}
以上就是調(diào)用hbase 的scan api做的事,具體的條件設(shè)置大家可以自行查看官方文檔赁还,按需配置妖泄。
scan. 這個(gè)啊, scan. 那個(gè)啊艘策,蹈胡,,自己看著弄就行http://hbase.apache.org/apidocs/index.html
然后是我的驅(qū)動(dòng)類:
這里就要介紹一下業(yè)務(wù)了朋蔫。以我《Hbase權(quán)威指南》兩個(gè)星期的閱讀理解結(jié)合我們的實(shí)際需求來講罚渐,我把rowkey設(shè)計(jì)為 "token_querytime" 的形式。例如:p4064d445c9f4ff4d536dfeae965aa95_1503364335426
token是什么呢驯妄,據(jù)我們的PHP前端工程師+技術(shù)總監(jiān)來說荷并,就是用戶的訪問行為,具體咋產(chǎn)生我也不知道青扔。源织。翩伪。總之谈息,我的目標(biāo)就是缘屹,前端傳過來用戶要查看的某段時(shí)間內(nèi)的某頁面上的各種訪問行為,也就是token和querytime的各種組合侠仇,我從hbase中給他拿出來計(jì)算好就行轻姿,所以我這樣設(shè)計(jì)了rowkey。來看看我是怎么拿的:
def getDF(spark: SparkSession, filter: String, startRow: String, stopRow: String): DataFrame = {
val filter1 = new PrefixFilter(Bytes.toBytes(filter))
val results = HBaseClient.scan("test", filter1, startRow, stopRow)
val jsonString = results.map(e => JSONObject(e).toString())
val jsonRDD = spark.sparkContext.parallelize(jsonString)
val df = spark.read.json(jsonRDD)
df
}
里邊的“test” 是我的表名逻炊,這里我寫死了互亮。startRow和stopRow傳入的就是開始和結(jié)束的rowkey,filter可以為null嗅骄。
之后就可以對(duì)著結(jié)果各種蹂躪啦胳挎,只要是DataFrame支持的,什么姿勢(shì)都行溺森。嘿嘿??
(注:吐槽簡書一句慕爬,感覺對(duì)代碼的支持很不好啊,從idea粘貼過來各種不行)