spark讀取hbase為DataFrame后利用SQL進(jìn)行計(jì)算

最近在做spark和hbase的相關(guān)項(xiàng)目牍白。暫且將其分為兩部分:一是利用spark streaming消費(fèi)前臺(tái)推到kafka中的消息溜族,進(jìn)行簡單處理后寫入到hbase;然后就是利用spark讀取hbase末捣,將結(jié)果組裝成json进每,再利用spark SQL進(jìn)行計(jì)算篷就。

介紹一下環(huán)境:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<org.scala.version>2.11.2</org.scala.version>
<org.spark.version>2.0.2</org.spark.version>
<org.kafka.version>0.10.2.1</org.kafka.version>
<org.apache.hbase.version>1.2.2</org.apache.hbase.version>
</properties>

這里先來討論后半部分射亏,spark讀取hbase近忙。

首先,了解過hbase的都應(yīng)該知道智润,它是一No SQL的非關(guān)系型數(shù)據(jù)及舍。與我們平時(shí)常見的MySQL和Oracle不同,No SQL最大的特點(diǎn)就是不支持事務(wù)窟绷,對(duì)于關(guān)系型數(shù)據(jù)庫輕松加隨意的join啊锯玛、groupby啊什么的,都不擅長兼蜈。不過hbase既然這么火攘残,肯定有其道理。我這里之所以采用它为狸,最重要的就是因?yàn)椋阂皇菙?shù)據(jù)量大歼郭,項(xiàng)目還沒上線,不過預(yù)測(cè)日增量有上百g辐棒,二來呢hbase提供了java api病曾,以前搞過,get和scan的效率還是很給力的漾根。再加上我們記錄的用戶行為信息泰涂,根本不需要更新操作,我只要能寫進(jìn)去辐怕,拿出來就行啦逼蒙。

廢話不多數(shù),下面上代碼:

先要組裝hbase client

先要引入配置文件

private val config = ConfigFactory.load()
private val conn = getConnection

具體的application.conf如下
spark{
master="local[*]"
appName="KafkaConsumer"
}
kafka {
topics = "topic007"
brokers = "192.168.1.97:9092,192.168.1.98:9092,192.168.1.99:9092,192.168.1.106:9092,192.168.1.107:9092,192.168.1.108:9092"
group = "groupid"
}
hbase{
port = "2181"
quorum = "master1.hadoop,slave2.hadoop,slave3.hadoop,slave4.hadoop,slave5.hadoop,slave6.hadoop"
tableName = "test"
}

大家根據(jù)自己的設(shè)置自行修改啊秘蛇。

然后就是hbase相關(guān)的api調(diào)用啦其做,具體如下:

/**
  * 掃描HBase并返回結(jié)果
  * @param tableName 表名
  * @param filter 過濾條件
  * @param startRow 起始行鍵
  * @param stopRow 結(jié)束行鍵
  * @return 掃描結(jié)果
  */
  def scan(tableName: String, filter: Filter, startRow: String, stopRow: String): List[Map[String, String]] = {
    val s = buildScan(filter, startRow, stopRow)
    val t = conn.getTable(TableName.valueOf(tableName))
    scan(t, s)
  }

/**
  * 執(zhí)行掃描
  * @param table 表
  * @param scan scan
  */
private def scan(table: Table, scan: Scan): List[Map[String, String]] = {
  val scanner = table.getScanner(scan)
  val ite = scanner.iterator()
  val result = new ListBuffer[Map[String, String]]
  while (ite.hasNext){
    val map = new mutable.ListMap[String, String]
    ite.next().listCells().foreach(c => map += readCell(c))
    result += map.toMap
  }
  result.toList
}

/**
  * 讀取單元格
  * @param cell 單元格
  */
private def readCell(cell: Cell) = {
  val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
  val value = Bytes.toString(CellUtil.cloneValue(cell))
  (qualifier, value)
}

/**
  * 構(gòu)建Scan實(shí)例
  * @param filter 過濾條件
  * @param startRow 起始行鍵
  * @param stopRow 結(jié)束行鍵
  */
private def buildScan(filter: Filter, startRow: String, stopRow: String): Scan ={
  val scan = new Scan()
  scan.setMaxVersions()
  scan.setCaching(2000)
  scan.setCacheBlocks(false)
  if(filter != null)
    scan.setFilter(filter)
  if(startRow != null)
    scan.setStartRow(Bytes.toBytes(startRow))
  if(stopRow != null)
    scan.setStopRow(Bytes.toBytes(stopRow))
  scan
}

/**
  * 獲取鏈接
  */
private def getConnection: Connection = {
  val conf = HBaseConfiguration.create()
  conf.set(HConstants.ZOOKEEPER_QUORUM, config.getString("hbase.quorum"))
  conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, config.getString("hbase.port"))
  ConnectionFactory.createConnection(conf)
}

以上就是調(diào)用hbase 的scan api做的事,具體的條件設(shè)置大家可以自行查看官方文檔赁还,按需配置妖泄。
scan. 這個(gè)啊, scan. 那個(gè)啊艘策,蹈胡,,自己看著弄就行http://hbase.apache.org/apidocs/index.html

然后是我的驅(qū)動(dòng)類:
這里就要介紹一下業(yè)務(wù)了朋蔫。以我《Hbase權(quán)威指南》兩個(gè)星期的閱讀理解結(jié)合我們的實(shí)際需求來講罚渐,我把rowkey設(shè)計(jì)為 "token_querytime" 的形式。例如:p4064d445c9f4ff4d536dfeae965aa95_1503364335426

token是什么呢驯妄,據(jù)我們的PHP前端工程師+技術(shù)總監(jiān)來說荷并,就是用戶的訪問行為,具體咋產(chǎn)生我也不知道青扔。源织。翩伪。總之谈息,我的目標(biāo)就是缘屹,前端傳過來用戶要查看的某段時(shí)間內(nèi)的某頁面上的各種訪問行為,也就是token和querytime的各種組合侠仇,我從hbase中給他拿出來計(jì)算好就行轻姿,所以我這樣設(shè)計(jì)了rowkey。來看看我是怎么拿的:

def getDF(spark: SparkSession, filter: String, startRow: String, stopRow: String): DataFrame = {
  val filter1 = new PrefixFilter(Bytes.toBytes(filter))
  val results = HBaseClient.scan("test", filter1, startRow, stopRow)
  val jsonString = results.map(e => JSONObject(e).toString())
  val jsonRDD = spark.sparkContext.parallelize(jsonString)
  val df = spark.read.json(jsonRDD)
  df
}

里邊的“test” 是我的表名逻炊,這里我寫死了互亮。startRow和stopRow傳入的就是開始和結(jié)束的rowkey,filter可以為null嗅骄。

之后就可以對(duì)著結(jié)果各種蹂躪啦胳挎,只要是DataFrame支持的,什么姿勢(shì)都行溺森。嘿嘿??

(注:吐槽簡書一句慕爬,感覺對(duì)代碼的支持很不好啊,從idea粘貼過來各種不行)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末屏积,一起剝皮案震驚了整個(gè)濱河市医窿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌炊林,老刑警劉巖姥卢,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異渣聚,居然都是意外死亡独榴,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門奕枝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來棺榔,“玉大人,你說我怎么就攤上這事隘道≈⑿” “怎么了?”我有些...
    開封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵谭梗,是天一觀的道長忘晤。 經(jīng)常有香客問我,道長激捏,這世上最難降的妖魔是什么设塔? 我笑而不...
    開封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮远舅,結(jié)果婚禮上闰蛔,老公的妹妹穿的比我還像新娘竞思。我一直安慰自己,他們只是感情好钞护,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著爆办,像睡著了一般难咕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上距辆,一...
    開封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天余佃,我揣著相機(jī)與錄音,去河邊找鬼跨算。 笑死爆土,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的诸蚕。 我是一名探鬼主播步势,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼背犯!你這毒婦竟也來了坏瘩?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤漠魏,失蹤者是張志新(化名)和其女友劉穎倔矾,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柱锹,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡哪自,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了禁熏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片壤巷。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖匹层,靈堂內(nèi)的尸體忽然破棺而出隙笆,到底是詐尸還是另有隱情,我是刑警寧澤升筏,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布撑柔,位于F島的核電站,受9級(jí)特大地震影響您访,放射性物質(zhì)發(fā)生泄漏铅忿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一灵汪、第九天 我趴在偏房一處隱蔽的房頂上張望檀训。 院中可真熱鬧柑潦,春花似錦、人聲如沸峻凫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽荧琼。三九已至譬胎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間命锄,已是汗流浹背堰乔。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留脐恩,地道東北人镐侯。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像驶冒,于是被迫代替她去往敵國和親苟翻。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 之前的有點(diǎn)忘記了,這里在云筆記拿出來再玩玩.看不懂的可以留言 大家可以嘗試下Ambari來配置Hadoop的相關(guān)環(huán)...
    HT_Jonson閱讀 2,960評(píng)論 0 50
  • HBase那些事 @(大數(shù)據(jù)工程學(xué)院)[HBase, Hadoop, 優(yōu)化, HadoopChen, hbase]...
    分癡閱讀 3,944評(píng)論 3 17
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理只怎,服務(wù)發(fā)現(xiàn)袜瞬,斷路器,智...
    卡卡羅2017閱讀 134,672評(píng)論 18 139
  • Spark學(xué)習(xí)筆記 Data Source->Kafka->Spark Streaming->Parquet->S...
    哎喲喂嘍閱讀 6,625評(píng)論 0 51
  • Spark SQL, DataFrames and Datasets Guide Overview SQL Dat...
    Joyyx閱讀 8,328評(píng)論 0 16