> 要導(dǎo)入大量數(shù)據(jù)扣癣,Hbase的BulkLoad是必不可少的惰帽,在導(dǎo)入歷史數(shù)據(jù)的時(shí)候,我們一般會(huì)選擇使用BulkLoad方式父虑,我們還可以借助Spark的計(jì)算能力將數(shù)據(jù)快速地導(dǎo)入该酗。
![](https://upload-images.jianshu.io/upload_images/9028759-df09619803f62d95.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
## 使用方法
1. 導(dǎo)入依賴包
```
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.1.3.0.0.0-1634'
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0.3.0.0.0-1634'
compile group: 'org.apache.hbase', name: 'hbase-it', version: '2.0.0.3.0.0.0-1634'
```
2. 創(chuàng)建好表與Family
```
create 'test_log','ext'
```
3. 編寫核心代碼
BulkLoad.scala
```
def main(args: Array[String]): Unit = {
? ? val sparkConf = new SparkConf()
? ? ? //? ? ? .setMaster("local[12]")
? ? ? .setAppName("HbaseBulkLoad")
? ? val spark = SparkSession
? ? ? .builder
? ? ? .config(sparkConf)
? ? ? .getOrCreate()
? ? val sc = spark.sparkContext
? ? val datas = List(//模擬200億數(shù)據(jù)
? ? ? ("abc", ("ext", "type", "login")),
? ? ? ("ccc", ("ext", "type", "logout"))
? ? )
? ? val dataRdd = sc.parallelize(datas)
? ? val output = dataRdd.map {
? ? ? x => {
? ? ? ? val rowKey = Bytes.toBytes(x._1)
? ? ? ? val immutableRowKey = new ImmutableBytesWritable(rowKey)
? ? ? ? val colFam = x._2._1
? ? ? ? val colName = x._2._2
? ? ? ? val colValue = x._2._3
? ? ? ? val kv = new KeyValue(
? ? ? ? ? rowKey,
? ? ? ? ? Bytes.toBytes(colFam),
? ? ? ? ? Bytes.toBytes(colName),
? ? ? ? ? Bytes.toBytes(colValue.toString)
? ? ? ? )
? ? ? ? (immutableRowKey, kv)
? ? ? }
? ? }
? ? val hConf = HBaseConfiguration.create()
? ? hConf.addResource("hbase-site.xml")
? ? val hTableName = "test_log"
? ? hConf.set("hbase.mapreduce.hfileoutputformat.table.name", hTableName)
? ? val tableName = TableName.valueOf(hTableName)
? ? val conn = ConnectionFactory.createConnection(hConf)
? ? val table = conn.getTable(tableName)
? ? val regionLocator = conn.getRegionLocator(tableName)
? ? val hFileOutput = "/tmp/h_file"
? ? output.saveAsNewAPIHadoopFile(hFileOutput,
? ? ? classOf[ImmutableBytesWritable],
? ? ? classOf[KeyValue],
? ? ? classOf[HFileOutputFormat2],
? ? ? hConf
? ? )
? ? val bulkLoader = new LoadIncrementalHFiles(hConf)
? ? bulkLoader.doBulkLoad(new Path(hFileOutput), conn.getAdmin, table, regionLocator)
? }
```
4. 提交Spark任務(wù)
```
spark-submit --master yarn --conf spark.yarn.tokens.hbase.enabled=true --class com.dounine.hbase.BulkLoad --executor-memory 2G --num-executors 2G --driver-memory 2G? ? --executor-cores 2 build/libs/hbase-data-insert-1.0.0-SNAPSHOT-all.jar
```
## 完整項(xiàng)目源碼
[https://github.com/dounine/hbase-data-insert/blob/master/src/main/scala/com/dounine/hbase/BulkLoad.scala](https://github.com/dounine/hbase-data-insert)
---
![](https://upload-images.jianshu.io/upload_images/9028759-07315bb8dadcd082.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)