前言:本篇文章主要介紹自己工作中對(duì)HBase數(shù)據(jù)寫入的一些淺淺的認(rèn)識(shí)和理解尼酿。最近剛接到一個(gè)需求是將Hive的數(shù)據(jù)加工處理成用戶標(biāo)簽數(shù)據(jù)存于HBase中,那么問題來了秉撇,對(duì)于HBase小白來說自己怎么去實(shí)現(xiàn)呢?Hive -> HBase的解決方法有多種,只能依靠搜索引擎來幫助自己了匙监,下面這幅圖或許能夠體現(xiàn)寫這篇文章的用意。
步入正題:
上篇文章已經(jīng)簡單介紹了有關(guān)HBase原理的一些東西,其中有一部分講解了HBase寫流程的情況小作。對(duì)于每天千萬的數(shù)據(jù)怎么能夠高效的寫入HBase呢亭姥?首先想到的就是python提供的通過Thrift訪問HBase的庫Happybase。通過Python API來去操作HBase簡單便捷顾稀。下面是自己的二兩測試案例达罗。
方案1:hive sql + python + hbase
-
第一步
將Hive中的數(shù)據(jù)通過Hive SQL加工處理成自己想要的結(jié)果。落地成文件(格式:yyyy-MM-dd.data)
##query.sql 查詢sql語句
use ods_hfjydb;
SELECT
s.student_intention_id,
date(s.create_time) create_time,
d.value
FROM view_student s
left join ddic d
ON s.know_origin=d.code
WHERE d.type='TP016
group by s.student_intention_id,date(s.create_time),d.value
limit 5000000 ;
-
第二步
通過Python API操作HBase静秆,將數(shù)據(jù)寫入相應(yīng)表中粮揉。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,sys
import happybase
#建立連接
conn = happybase.Connection("hostname")
#獲取table
tab_app_data = conn.table("table-name")
#批處理操作
b_app_data = tab_app_data.batch()
#名單進(jìn)線
for line in sys.stdin:
parts = line.strip('\n').split("\t")
row_key = parts[0]
kv = {}
k = "i:2#" + parts[1] + "#student_source"
kv[k] = parts[2]
tab_app_data.put(row_key , kv)
b_app_data.send()
conn.close()
方案2:hive +presto + python + hbase
方案1,代碼量少而且容易理解抚笔,在當(dāng)前數(shù)據(jù)量少的情況下滔蝉,使用上面解決思路還是可取的,經(jīng)測試500萬數(shù)據(jù)從查詢到入庫大概25min左右塔沃。如果只使用Python + SQL 直接coding,還有沒有更好的方式呢蝠引?既然日常使用presto查詢hive數(shù)據(jù),為何不直接使用presto +python將數(shù)據(jù)入庫呢蛀柴!至少查詢速度方面螃概,presto相比hive查詢速度還是有很大優(yōu)勢的,下面貼上自己測試的code:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,sys
import datetime
import happybase
from pyhive import presto
#presto獲得連接
p_connection = presto.connect(host='hostname',port='8334')
cursor = p_connection.cursor()
#hbase獲得連接
h_connection = happybase.Connection("hostname")
tab_app_data = h_connection.table("table_name")
#b_app_data = tab_app_data.batch(batch_size=1000)
b_app_data = tab_app_data.batch()
def execute_batch_to_hbase(dt):
#需要執(zhí)行的sql語句
sql = "SELECT s.student_intention_id,substr(s.create_time,1,10) create_time,d.value " + \
"FROM dwd_db_hfjydb.view_student s " + \
"left join dwd_db_hfjydb.ddic d " + \
"ON s.know_origin=d.code " + \
"WHERE d.type='TP016' " + \
"group by s.student_intention_id,substr(s.create_time,1,10),d.value limit 5000000"
#獲取查詢的數(shù)據(jù)
cursor.execute(sql)
rows = cursor.fetchall()
for item in rows:
student_intention_id = str(item[0])
dt = item[1]
source_name = item[2]
#獲取的數(shù)據(jù)
print str(student_intention_id) + " # " + dt + " # " + source_name
#hbase數(shù)據(jù)準(zhǔn)備
row_key = student_intention_id
kv = {}
k = "i:2#" + dt + "#student_source"
kv[k] = source_name
tab_app_data.put(row_key , kv)
b_app_data.send()
h_connection.close()
#校驗(yàn)參數(shù)格式是否正確
def validate(date_text):
try:
datetime.datetime.strptime(date_text, '%Y-%m-%d')
return date_text
except ValueError:
raise ValueError("Incorrect data format, should be YYYY-MM-DD")
if __name__ == '__main__':
if (len(sys.argv) < 2):
raise Exception,u"arguments needed !"
dt = validate(sys.argv[1])
execute_batch_to_hbase(dt)
結(jié)論1:方案1和方案2都可以幫助自己達(dá)成目標(biāo),但是耗時(shí)大概都在min左右鸽疾。但是要考慮的問題有:執(zhí)行時(shí)間效率以及頻繁的對(duì)hbase寫操作和對(duì)集群網(wǎng)絡(luò)的影響吊洼。
上面的兩種方案其寫入數(shù)據(jù)的過程在下圖。 上篇文章已經(jīng)詳細(xì)的介紹了制肮,為了和接下來的要寫的內(nèi)容作對(duì)比冒窍,先畫一個(gè)簡單的草圖。
腳步不能聽 go on! >>>>>>>
下面或許是大家最常使用的方式了豺鼻,直接使用spark操作hive將數(shù)據(jù)入庫到HBase综液,對(duì)于在開發(fā)領(lǐng)域征戰(zhàn)多年的各位,應(yīng)該都實(shí)現(xiàn)過儒飒,至于基于Hadoop新版API(rdd.saveAsNewAPIHadoopDataset(job.getConfiguration))和基于hadoop舊版API實(shí)現(xiàn)(rdd.saveAsHadoopDataset(jobConf))這兩種方式谬莹,不做討論,有興趣的同學(xué)可以自行測試。下面是批量導(dǎo)數(shù)據(jù)到HBase的code(以HFile的方式寫入HBase):
import com.hfjy.bigdata.caculate.utils.{ConfigUtil, DingDingUtils, HBaseUtils, HdfsUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable
val sparkSession = SparkSession.builder().appName("HBaseHFileBulkLoadThree").enableHiveSupport()
.getOrCreate()
//HBase配置
val conf = HBaseUtils.getConfiguration
val tableName = HBaseUtils.getHBaseTableName
val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoad(job, tableName)
//Hive SQL查詢出500萬數(shù)據(jù)
val query_sql = s"SELECT s.student_intention_id, " +
"substr(s.create_time,1,10) create_time, " +
"CAST(d.value AS string ) source_value " +
"FROM dwd_db_hfjydb.view_student s " +
"left join dwd_db_hfjydb.ddic d " +
"ON s.know_origin=d.code " +
"WHERE d.type='TP016' " +
"group by s.student_intention_id,substr(s.create_time,1,10),d.value limit 5000000 "
val resultDF: DataFrame = sparkSession.sql(query_sql)
val commonConf = ConfigUtil.getAppCommonConfig()
val hdfs_path = commonConf.getProperty("hbase.tmp.save.path") + "/" + label_id
//取出DataFrame的所有字段的列名進(jìn)行排序
val columnsNames = resultDF.columns.drop(1).sorted
//改過程重點(diǎn)是保證RowKey有序已經(jīng)Column Name有序附帽。
val resultRDD: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDF.rdd.repartition(30).sortBy(row => row.getString(0)).map(f = row => {
var list: Seq[KeyValue] = List()
//RowKey
val row_key = row.getString(0)
//列簇
val columnFamily = "i"
//列名
var columnName: String = null
//列值
var columnValue: String = null
var kv: KeyValue = null
val sb = new StringBuffer()
for (i <- 0 to (columnsNames.length - 1)) {
if (i <= columnsNames.length - 2) {
sb.append( "2#" + row.getAs[String](columnsNames(i)) + "#student_source")
}
if (i == columnsNames.length - 1) {
columnValue = row.getAs[String](columnsNames(i))
}
}
columnName = sb.toString
kv = new KeyValue(Bytes.toBytes(row_key), columnFamily.getBytes, columnName.getBytes(), columnValue.getBytes())
//將新的kv加在list后面(不能反 需要整體有序)
list = list :+ kv
(new ImmutableBytesWritable(Bytes.toBytes(row_key)), list)
})
//要使用saveAsNewAPIHadoopFile 該方法埠戳,必須保證處理的結(jié)果數(shù)據(jù)格式為: RDD[(ImmutableBytesWritable, KeyValue)]
//對(duì)當(dāng)前的數(shù)據(jù)做整體有序處理,包括RowKey的有序已經(jīng)Column Name的有序蕉扮。
val finalResultRDD: RDD[(ImmutableBytesWritable, KeyValue)] = resultRDD.flatMapValues(s => s.iterator)
//判斷HDFS上是否有該目錄整胃,如果有則刪除
val fileSystem = HdfsUtil.getFileSystem
fileSystem.delete(new Path(hdfs_path), true)
//保存HFiles到HDFS上
finalResultRDD.saveAsNewAPIHadoopFile(hdfs_path, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
//Bulk load HFiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path(hdfs_path), table)
上面的實(shí)現(xiàn)過程是將數(shù)據(jù)先寫到HFile里面,然后再導(dǎo)入到表中喳钟,打包放到集群中跑爪模,配置合適的[executor-memory,executor-cores,num-executors]處理完500萬的數(shù)據(jù)大概耗時(shí)2min左右。下圖是其寫入數(shù)據(jù)的過程荚藻。
結(jié)論:
- 方案1和方案2的數(shù)據(jù)寫入過程是要經(jīng)過MemStore屋灌,其數(shù)據(jù)在MemStore中做的最重要的任務(wù)就是對(duì)數(shù)據(jù)進(jìn)行排序。所以在開發(fā)中用戶可以不用關(guān)心順序的問題应狱。然而最后一種方案共郭,數(shù)據(jù)是直接寫到HFile中的不經(jīng)過MemStore,在寫入之前就是開發(fā)者重點(diǎn)注意的就是對(duì)RowKey以及Column Name的排序疾呻。如果不排序除嘹,數(shù)據(jù)是無法寫入HFile中的。會(huì)報(bào)下面的錯(cuò)誤岸蜗。
java.io.IOException: Added a key not lexically larger than previous. Current cell = xxxxxxxxxxxxxxxx, lastCell =xxxxxxxxxxxxx
- 效率問題是文章開頭第一幅圖片所表達(dá)的也是也是本文寫作的目的尉咕。方案1和方案2的方式寫入數(shù)據(jù)的效率比最后一種方案效率要低很多。具體在開發(fā)中璃岳,各位可以結(jié)合公司的技術(shù)架構(gòu)年缎,數(shù)據(jù)量的大小,對(duì)網(wǎng)絡(luò)影響的可控性以及對(duì)HBase寫操作的時(shí)候?qū)Base對(duì)外提供服務(wù)的影響相結(jié)合來去選擇從而達(dá)到自己的目標(biāo)铃慷。
- 通過以上的測試单芜,也讓自己對(duì)HBase 寫數(shù)據(jù)的原理有了更進(jìn)一步的認(rèn)識(shí)和理解。