【Spark實戰(zhàn)】Spark之讀寫HBase

1 配置

1.1 開發(fā)環(huán)境:

  • HBase:hbase-1.0.0-cdh5.4.5.tar.gz
  • Hadoop:hadoop-2.6.0-cdh5.4.5.tar.gz
  • ZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gz
  • Spark:spark-2.1.0-bin-hadoop2.6

1.2 Spark的配置

  • Jar包:需要HBase的Jar如下(經(jīng)過測試栖茉,正常運行,但是是否存在冗余的Jar并未證實剩胁,若發(fā)現(xiàn)多余的jar可自行進行刪除)
jars
  • spark-env.sh
    添加以下配置:export SPARK_CLASSPATH=/home/hadoop/data/lib1/*
    注:如果使用spark-shell的yarn模式進行測試的話,那么最好每個NodeManager節(jié)點都有配置jars和hbase-site.xml
  • spark-default.sh
spark.yarn.historyServer.address=slave11:18080
spark.history.ui.port=18080
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///tmp/spark/events
spark.history.fs.logDirectory=hdfs:///tmp/spark/events
spark.driver.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer

1.3 數(shù)據(jù)

1)格式: barCode@item@value@standardValue@upperLimit@lowerLimit

01055HAXMTXG10100001@KEY_VOLTAGE_TEC_PWR@1.60@1.62@1.75@1.55
01055HAXMTXG10100001@KEY_VOLTAGE_T_C_PWR@1.22@1.24@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_BC_PWR@1.16@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_11@1.32@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_RC_PWR@1.24@1.25@1.45@0.8
01055HAXMTXG10100001@KEY_VOLTAGE_T_VCC_5V@1.93@1.90@1.95@1.65
01055HAXMTXG10100001@KEY_VOLTAGE_T_VDD3V3@1.59@1.62@1.75@1.55

2 代碼演示

2.1 準(zhǔn)備動作

1)既然是與HBase相關(guān)日月,那么首先需要使用hbase shell來創(chuàng)建一個表

創(chuàng)建表格:create ‘data’,’v’,create ‘data1’,’v’

2)使用spark-shell進行操作,命令如下:

bin/spark-shell --master yarn --deploy-mode client --num-executors 5 --executor-memory 1g --executor-cores 2

代碼演示環(huán)境

3)import 各種類

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64,Bytes}
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.commons.codec.digest.DigestUtils

2.2 代碼實戰(zhàn)

創(chuàng)建conf和table

val conf= HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,"data1")
val table = new HTable(conf,"data1")

2.2.1 數(shù)據(jù)寫入

格式:

val put = new Put(Bytes.toBytes("rowKey"))
put.add("cf","q","value")

使用for來插入5條數(shù)據(jù)

for(i <- 1 to 5){ var put= new Put(Bytes.toBytes("row"+i));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes("value"+i));table.put(put)}

到hbase shell中查看結(jié)果

hbase_data1表中的數(shù)據(jù)

2.2.2 數(shù)據(jù)讀取

val hbaseRdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

1)take

hbaseRdd take 1
take_result

2)scan

var scan = new Scan();
scan.addFamily(Bytes.toBytes(“v”));
var proto = ProtobufUtil.toScan(scan)
var scanToString = Base64.encodeBytes(proto.toByteArray());
conf.set(TableInputFormat.SCAN,scanToString)

val datas = hbaseRdd.map( x=>x._2).map{result => (result.getRow,result.getValue(Bytes.toBytes("v"),Bytes.toBytes("value")))}.map(row => (new String(row._1),new String(row._2))).collect.foreach(r => (println(r._1+":"+r._2)))
scan_result

2.3 批量插入

2.3.1 普通插入

1)代碼

val rdd = sc.textFile("/data/produce/2015/2015-03-01.log")
val data = rdd.map(_.split("@")).map{x=>(x(0)+x(1),x(2))}
val result = data.foreachPartition{x => {val conf= HBaseConfiguration.create();conf.set(TableInputFormat.INPUT_TABLE,"data");conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");conf.set("hbase.zookeeper.property.clientPort","2181");conf.addResource("/home/hadoop/data/lib/hbase-site.xml");val table = new HTable(conf,"data");table.setAutoFlush(false,false);table.setWriteBufferSize(3*1024*1024); x.foreach{y => {
var put= new Put(Bytes.toBytes(y._1));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)};table.flushCommits}}}

2)執(zhí)行時間如下:7.6 min

執(zhí)行時間

2.3.2 Bulkload

  1. 代碼:
val conf = HBaseConfiguration.create();
val tableName = "data1"
val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)

lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)

val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}

rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)

2) 執(zhí)行時間:7s

執(zhí)行時間_BulkLoad

3)執(zhí)行結(jié)果:
到hbase shell 中查看 list “data1”

結(jié)果查詢

通過對比我們可以發(fā)現(xiàn)bulkload批量導(dǎo)入所用時間遠遠少于普通導(dǎo)入郭脂,速度提升了60多倍稽荧,當(dāng)然我沒有使用更大的數(shù)據(jù)量測試橘茉,但是我相信導(dǎo)入速度的提升是非常顯著的,強烈建議使用BulkLoad批量導(dǎo)入數(shù)據(jù)到HBase中姨丈。

關(guān)于Spark與Hbase之間操作就寫到這里畅卓,如果有什么地方寫得不對或者運行不了,歡迎指出蟋恬,謝謝

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末翁潘,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子歼争,更是在濱河造成了極大的恐慌拜马,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,997評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沐绒,死亡現(xiàn)場離奇詭異俩莽,居然都是意外死亡,警方通過查閱死者的電腦和手機乔遮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評論 3 392
  • 文/潘曉璐 我一進店門扮超,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人申眼,你說我怎么就攤上這事瞒津。” “怎么了括尸?”我有些...
    開封第一講書人閱讀 163,359評論 0 353
  • 文/不壞的土叔 我叫張陵巷蚪,是天一觀的道長。 經(jīng)常有香客問我濒翻,道長屁柏,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,309評論 1 292
  • 正文 為了忘掉前任有送,我火速辦了婚禮淌喻,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘雀摘。我一直安慰自己裸删,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,346評論 6 390
  • 文/花漫 我一把揭開白布阵赠。 她就那樣靜靜地躺著涯塔,像睡著了一般肌稻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上匕荸,一...
    開封第一講書人閱讀 51,258評論 1 300
  • 那天爹谭,我揣著相機與錄音,去河邊找鬼榛搔。 笑死诺凡,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的践惑。 我是一名探鬼主播腹泌,決...
    沈念sama閱讀 40,122評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼童本!你這毒婦竟也來了真屯?” 一聲冷哼從身側(cè)響起脸候,我...
    開封第一講書人閱讀 38,970評論 0 275
  • 序言:老撾萬榮一對情侶失蹤穷娱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后运沦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體泵额,經(jīng)...
    沈念sama閱讀 45,403評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,596評論 3 334
  • 正文 我和宋清朗相戀三年携添,在試婚紗的時候發(fā)現(xiàn)自己被綠了嫁盲。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,769評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡烈掠,死狀恐怖羞秤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情左敌,我是刑警寧澤瘾蛋,帶...
    沈念sama閱讀 35,464評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站矫限,受9級特大地震影響哺哼,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜叼风,卻給世界環(huán)境...
    茶點故事閱讀 41,075評論 3 327
  • 文/蒙蒙 一取董、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧无宿,春花似錦茵汰、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,705評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽坡垫。三九已至,卻和暖如春画侣,著一層夾襖步出監(jiān)牢的瞬間冰悠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,848評論 1 269
  • 我被黑心中介騙來泰國打工配乱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留溉卓,地道東北人。 一個月前我還...
    沈念sama閱讀 47,831評論 2 370
  • 正文 我出身青樓搬泥,卻偏偏與公主長得像桑寨,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子忿檩,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,678評論 2 354

推薦閱讀更多精彩內(nèi)容