HBase之?dāng)?shù)據(jù)寫入再認(rèn)識(shí)

前言:本篇文章主要介紹自己工作中對(duì)HBase數(shù)據(jù)寫入的一些淺淺的認(rèn)識(shí)和理解尼酿。最近剛接到一個(gè)需求是將Hive的數(shù)據(jù)加工處理成用戶標(biāo)簽數(shù)據(jù)存于HBase中,那么問題來了秉撇,對(duì)于HBase小白來說自己怎么去實(shí)現(xiàn)呢?Hive -> HBase的解決方法有多種,只能依靠搜索引擎來幫助自己了匙监,下面這幅圖或許能夠體現(xiàn)寫這篇文章的用意。


《怎么更高效的往hbase寫數(shù)據(jù)?》

步入正題:

上篇文章已經(jīng)簡單介紹了有關(guān)HBase原理的一些東西,其中有一部分講解了HBase寫流程的情況小作。對(duì)于每天千萬的數(shù)據(jù)怎么能夠高效的寫入HBase呢亭姥?首先想到的就是python提供的通過Thrift訪問HBase的庫Happybase。通過Python API來去操作HBase簡單便捷顾稀。下面是自己的二兩測試案例达罗。

方案1:hive sql + python + hbase

  • 第一步

將Hive中的數(shù)據(jù)通過Hive SQL加工處理成自己想要的結(jié)果。落地成文件(格式:yyyy-MM-dd.data)

##query.sql 查詢sql語句
use ods_hfjydb;
SELECT
s.student_intention_id,
date(s.create_time) create_time,
d.value
FROM view_student s
left join ddic d
ON s.know_origin=d.code
WHERE d.type='TP016
group by s.student_intention_id,date(s.create_time),d.value 
limit 5000000 ;
  • 第二步

通過Python API操作HBase静秆,將數(shù)據(jù)寫入相應(yīng)表中粮揉。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os,sys
import happybase

#建立連接
conn = happybase.Connection("hostname")
#獲取table
tab_app_data = conn.table("table-name")
#批處理操作
b_app_data = tab_app_data.batch()

#名單進(jìn)線
for line in sys.stdin:
        parts = line.strip('\n').split("\t")
        row_key = parts[0]

        kv = {}
        k = "i:2#" + parts[1] + "#student_source"
        kv[k] = parts[2]
        tab_app_data.put(row_key , kv)

b_app_data.send()
conn.close()

方案2:hive +presto + python + hbase

方案1,代碼量少而且容易理解抚笔,在當(dāng)前數(shù)據(jù)量少的情況下滔蝉,使用上面解決思路還是可取的,經(jīng)測試500萬數(shù)據(jù)從查詢到入庫大概25min左右塔沃。如果只使用Python + SQL 直接coding,還有沒有更好的方式呢蝠引?既然日常使用presto查詢hive數(shù)據(jù),為何不直接使用presto +python將數(shù)據(jù)入庫呢蛀柴!至少查詢速度方面螃概,presto相比hive查詢速度還是有很大優(yōu)勢的,下面貼上自己測試的code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,sys
import datetime
import happybase
from pyhive import presto

#presto獲得連接
p_connection = presto.connect(host='hostname',port='8334')
cursor = p_connection.cursor()

#hbase獲得連接
h_connection = happybase.Connection("hostname")
tab_app_data = h_connection.table("table_name")
#b_app_data = tab_app_data.batch(batch_size=1000)
b_app_data = tab_app_data.batch()

def execute_batch_to_hbase(dt):
    #需要執(zhí)行的sql語句
    sql = "SELECT s.student_intention_id,substr(s.create_time,1,10) create_time,d.value " + \
    "FROM dwd_db_hfjydb.view_student s " + \
    "left join dwd_db_hfjydb.ddic d " + \
    "ON s.know_origin=d.code " + \
    "WHERE d.type='TP016' " + \
    "group by s.student_intention_id,substr(s.create_time,1,10),d.value  limit 5000000" 
    
    #獲取查詢的數(shù)據(jù)
    cursor.execute(sql)
    rows = cursor.fetchall()

    for item in rows:
        student_intention_id = str(item[0])
        dt = item[1]
        source_name = item[2]

        #獲取的數(shù)據(jù)  
        print str(student_intention_id) + " # " + dt + " # " + source_name
    
        #hbase數(shù)據(jù)準(zhǔn)備
        row_key = student_intention_id
        kv = {}
        k = "i:2#" + dt + "#student_source"
        kv[k] = source_name
        tab_app_data.put(row_key , kv)

    b_app_data.send()

    h_connection.close()

#校驗(yàn)參數(shù)格式是否正確
def validate(date_text):
    try:
        datetime.datetime.strptime(date_text, '%Y-%m-%d')
        return date_text
    except ValueError:
        raise ValueError("Incorrect data format, should be YYYY-MM-DD")

if __name__ == '__main__':
    if (len(sys.argv) < 2):
        raise Exception,u"arguments needed !"
        
    dt = validate(sys.argv[1])
    
    execute_batch_to_hbase(dt)

結(jié)論1:方案1和方案2都可以幫助自己達(dá)成目標(biāo),但是耗時(shí)大概都在min左右鸽疾。但是要考慮的問題有:執(zhí)行時(shí)間效率以及頻繁的對(duì)hbase寫操作和對(duì)集群網(wǎng)絡(luò)的影響吊洼。
上面的兩種方案其寫入數(shù)據(jù)的過程在下圖。 上篇文章已經(jīng)詳細(xì)的介紹了制肮,為了和接下來的要寫的內(nèi)容作對(duì)比冒窍,先畫一個(gè)簡單的草圖。

腳步不能聽 go on! >>>>>>>

下面或許是大家最常使用的方式了豺鼻,直接使用spark操作hive將數(shù)據(jù)入庫到HBase综液,對(duì)于在開發(fā)領(lǐng)域征戰(zhàn)多年的各位,應(yīng)該都實(shí)現(xiàn)過儒飒,至于基于Hadoop新版API(rdd.saveAsNewAPIHadoopDataset(job.getConfiguration))和基于hadoop舊版API實(shí)現(xiàn)(rdd.saveAsHadoopDataset(jobConf))這兩種方式谬莹,不做討論,有興趣的同學(xué)可以自行測試。下面是批量導(dǎo)數(shù)據(jù)到HBase的code(以HFile的方式寫入HBase):

import com.hfjy.bigdata.caculate.utils.{ConfigUtil, DingDingUtils, HBaseUtils, HdfsUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import scala.collection.mutable

val sparkSession = SparkSession.builder().appName("HBaseHFileBulkLoadThree").enableHiveSupport()
      .getOrCreate()

//HBase配置     
val conf = HBaseUtils.getConfiguration
val tableName = HBaseUtils.getHBaseTableName

val job = Job.getInstance(conf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])

HFileOutputFormat2.configureIncrementalLoad(job, tableName)

//Hive SQL查詢出500萬數(shù)據(jù)
val query_sql = s"SELECT s.student_intention_id, " +
      "substr(s.create_time,1,10) create_time, " +
      "CAST(d.value  AS string ) source_value " +
      "FROM dwd_db_hfjydb.view_student s  " +
      "left join dwd_db_hfjydb.ddic d  " +
      "ON s.know_origin=d.code  " +
      "WHERE d.type='TP016'  " +
      "group by s.student_intention_id,substr(s.create_time,1,10),d.value  limit 5000000 "

val resultDF: DataFrame = sparkSession.sql(query_sql)

val commonConf = ConfigUtil.getAppCommonConfig()
val hdfs_path = commonConf.getProperty("hbase.tmp.save.path") + "/" + label_id

//取出DataFrame的所有字段的列名進(jìn)行排序
val columnsNames = resultDF.columns.drop(1).sorted

//改過程重點(diǎn)是保證RowKey有序已經(jīng)Column Name有序附帽。
val resultRDD: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDF.rdd.repartition(30).sortBy(row => row.getString(0)).map(f = row => {

      var list: Seq[KeyValue] = List()
      //RowKey
      val row_key = row.getString(0)
      //列簇
      val columnFamily = "i"
      //列名
      var columnName: String = null
      //列值
      var columnValue: String = null

      var kv: KeyValue = null

      val sb = new StringBuffer()

      for (i <- 0 to (columnsNames.length - 1)) {
        if (i <= columnsNames.length - 2) {
          sb.append( "2#" + row.getAs[String](columnsNames(i)) + "#student_source")
        }
        if (i == columnsNames.length - 1) {
          columnValue = row.getAs[String](columnsNames(i))
        }
      }
      columnName = sb.toString

      kv = new KeyValue(Bytes.toBytes(row_key), columnFamily.getBytes, columnName.getBytes(), columnValue.getBytes())

      //將新的kv加在list后面(不能反 需要整體有序)
      list = list :+ kv

      (new ImmutableBytesWritable(Bytes.toBytes(row_key)), list)
    })

 //要使用saveAsNewAPIHadoopFile 該方法埠戳,必須保證處理的結(jié)果數(shù)據(jù)格式為: RDD[(ImmutableBytesWritable, KeyValue)]
 //對(duì)當(dāng)前的數(shù)據(jù)做整體有序處理,包括RowKey的有序已經(jīng)Column Name的有序蕉扮。
 val finalResultRDD: RDD[(ImmutableBytesWritable, KeyValue)] = resultRDD.flatMapValues(s => s.iterator)

 //判斷HDFS上是否有該目錄整胃,如果有則刪除
 val fileSystem = HdfsUtil.getFileSystem
 fileSystem.delete(new Path(hdfs_path), true)

 //保存HFiles到HDFS上
 finalResultRDD.saveAsNewAPIHadoopFile(hdfs_path, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)

 //Bulk load HFiles to Hbase
 val bulkLoader = new LoadIncrementalHFiles(conf)
 bulkLoader.doBulkLoad(new Path(hdfs_path), table)

上面的實(shí)現(xiàn)過程是將數(shù)據(jù)先寫到HFile里面,然后再導(dǎo)入到表中喳钟,打包放到集群中跑爪模,配置合適的[executor-memory,executor-cores,num-executors]處理完500萬的數(shù)據(jù)大概耗時(shí)2min左右。下圖是其寫入數(shù)據(jù)的過程荚藻。


結(jié)論:

  • 方案1和方案2的數(shù)據(jù)寫入過程是要經(jīng)過MemStore屋灌,其數(shù)據(jù)在MemStore中做的最重要的任務(wù)就是對(duì)數(shù)據(jù)進(jìn)行排序。所以在開發(fā)中用戶可以不用關(guān)心順序的問題应狱。然而最后一種方案共郭,數(shù)據(jù)是直接寫到HFile中的不經(jīng)過MemStore,在寫入之前就是開發(fā)者重點(diǎn)注意的就是對(duì)RowKey以及Column Name的排序疾呻。如果不排序除嘹,數(shù)據(jù)是無法寫入HFile中的。會(huì)報(bào)下面的錯(cuò)誤岸蜗。
java.io.IOException: Added a key not lexically larger than previous. Current cell = xxxxxxxxxxxxxxxx, lastCell =xxxxxxxxxxxxx
  • 效率問題是文章開頭第一幅圖片所表達(dá)的也是也是本文寫作的目的尉咕。方案1和方案2的方式寫入數(shù)據(jù)的效率比最后一種方案效率要低很多。具體在開發(fā)中璃岳,各位可以結(jié)合公司的技術(shù)架構(gòu)年缎,數(shù)據(jù)量的大小,對(duì)網(wǎng)絡(luò)影響的可控性以及對(duì)HBase寫操作的時(shí)候?qū)Base對(duì)外提供服務(wù)的影響相結(jié)合來去選擇從而達(dá)到自己的目標(biāo)铃慷。
  • 通過以上的測試单芜,也讓自己對(duì)HBase 寫數(shù)據(jù)的原理有了更進(jìn)一步的認(rèn)識(shí)和理解。

>>本篇文章僅僅記錄自己的學(xué)習(xí)過程犁柜,文章中如有錯(cuò)誤或不妥之處洲鸠,請(qǐng)留言,謝謝馋缅!<<
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末扒腕,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子萤悴,更是在濱河造成了極大的恐慌簿透,老刑警劉巖攻锰,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秘噪,死亡現(xiàn)場離奇詭異绍傲,居然都是意外死亡宿礁,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門凫碌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來唤蔗,“玉大人,你說我怎么就攤上這事柳沙⊙颐穑” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵赂鲤,是天一觀的道長噪径。 經(jīng)常有香客問我,道長数初,這世上最難降的妖魔是什么找爱? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮泡孩,結(jié)果婚禮上车摄,老公的妹妹穿的比我還像新娘。我一直安慰自己仑鸥,他們只是感情好吮播,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著眼俊,像睡著了一般意狠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上疮胖,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天环戈,我揣著相機(jī)與錄音,去河邊找鬼澎灸。 笑死谷市,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的击孩。 我是一名探鬼主播迫悠,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼巩梢!你這毒婦竟也來了创泄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤括蝠,失蹤者是張志新(化名)和其女友劉穎鞠抑,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體忌警,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡搁拙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年秒梳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片箕速。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡酪碘,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出盐茎,到底是詐尸還是另有隱情兴垦,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布字柠,位于F島的核電站探越,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏窑业。R本人自食惡果不足惜钦幔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望常柄。 院中可真熱鬧鲤氢,春花似錦、人聲如沸拐纱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽秸架。三九已至揍庄,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間东抹,已是汗流浹背蚂子。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留缭黔,地道東北人食茎。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像馏谨,于是被迫代替她去往敵國和親别渔。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Zookeeper用于集群主備切換惧互。 YARN讓集群具備更好的擴(kuò)展性哎媚。 Spark沒有存儲(chǔ)能力。 Spark的Ma...
    Yobhel閱讀 7,269評(píng)論 0 34
  • 【什么是大數(shù)據(jù)喊儡、大數(shù)據(jù)技術(shù)】 大數(shù)據(jù),又稱巨量資料艾猜,指的是所涉及的數(shù)據(jù)資料量規(guī)模巨大到無法在合理時(shí)間內(nèi)通過傳統(tǒng)的應(yīng)...
    kimibob閱讀 2,743評(píng)論 0 51
  • 才發(fā)現(xiàn)我高一時(shí)正是他大一時(shí)买喧,同樣是新環(huán)境捻悯,同樣是對(duì)未來充滿著無限的期待,同樣是在一年末收獲不同的愛情淤毛,但高一和大一...
    水面清圓一一風(fēng)荷舉蘇幕遮閱讀 158評(píng)論 0 0
  • 2018年1月8日今缚,五位學(xué)員繼續(xù)到華興中學(xué)跟崗學(xué)習(xí)。上午钱床,體衛(wèi)處孫主任與李干事介紹了體衛(wèi)藝工作荚斯;觀摩了學(xué)校監(jiān)控室埠居。...
    龍正中學(xué)張春城閱讀 402評(píng)論 0 0
  • 下載和安裝grafana查牌,此處例子為使用homebrew方式安裝: 第二部安裝grafana最新版本 當(dāng)出現(xiàn)如下信...
    時(shí)彬斌閱讀 6,578評(píng)論 0 2