hbase 寫入數(shù)據(jù)有以下三種方式:
1.利用hbase提供的api寫入
2.通過mr任務(wù)將數(shù)據(jù)寫入
3.通過bulkload工具將數(shù)據(jù)寫入
前兩種寫入方式在面對大數(shù)據(jù)量寫入的時(shí)候效率會很低下看靠,因?yàn)樗鼈兌际峭ㄟ^請求regionserver將數(shù)據(jù)寫入液走,這期間數(shù)據(jù)會先寫入memstore,memstore達(dá)到閾值后會刷寫到磁盤生成hfile文件锨侯,hfile文件過多時(shí)會發(fā)生compaction,如果region大小過大時(shí)也會發(fā)生split绘闷。這些因素都會影響hbase數(shù)據(jù)寫入的效率器予,因此在面臨大數(shù)據(jù)寫入時(shí),這兩種方式是不合適的狈谊。
而bulkload正好解決了這個(gè)問題,bulkload工具是將數(shù)據(jù)直接寫入到hfile文件中沟沙,寫入完畢后河劝,通知hbase去加載這些hfile文件,因此可以避免上述耗時(shí)的因素矛紫,大大增加了數(shù)據(jù)寫入的效率赎瞎。下面就來講述下如何利用bulkloan加載數(shù)據(jù)。
這里通過hbase shell創(chuàng)建一個(gè)person表颊咬,person表有兩個(gè)列族分別為 "basic","social"务甥,結(jié)果如下圖所示:
注:因?yàn)檫@里講解的是bulkload加載數(shù)據(jù),因此對于rowkey的設(shè)計(jì)沒有做過多的處理
輸入源為txt格式文件喳篇,格式類型如下所示:
這里通過mr任務(wù)生成hfile文件敞临,再通過bulkload下載hfile到hbase中去,mr任務(wù)代碼如下:
package com.zjc.spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* Created by zjc on 2018/12/26.
*/
public class sparkApplication10 {
public static class bulkLoadextends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
String[] strs = value.toString().split(",");
String rowKey = strs[0];
ImmutableBytesWritable ibw =new ImmutableBytesWritable(Bytes.toBytes(rowKey));
Put put =new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes(strs[1]));
put.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age"), Bytes.toBytes(strs[4]));
put.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("birth"), Bytes.toBytes(strs[5]));
put.addColumn(Bytes.toBytes("social"), Bytes.toBytes("idnum"), Bytes.toBytes(strs[2]));
put.addColumn(Bytes.toBytes("social"), Bytes.toBytes("phone"), Bytes.toBytes(strs[3]));
put.addColumn(Bytes.toBytes("social"), Bytes.toBytes("sex"), Bytes.toBytes(strs[6]));
put.addColumn(Bytes.toBytes("social"), Bytes.toBytes("address"), Bytes.toBytes(strs[7]));
put.addColumn(Bytes.toBytes("social"), Bytes.toBytes("company"), Bytes.toBytes(strs[8]));
context.write(ibw, put);
}
}
public static Configurationconf =null;
public static Connectionconn =null;
public static Tablet =null;
public static RegionLocatorlocator =null;
public static Adminadmin =null;
static {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "自個(gè)兒的zk地址");
}
public static void main(String[] args) {
try {
//獲得hbase 表資源
conn = ConnectionFactory.createConnection(conf);
t =conn.getTable(TableName.valueOf("person"));
locator =conn.getRegionLocator(TableName.valueOf("person"));
admin =conn.getAdmin();
//定義一個(gè)mr job
Job job = Job.getInstance();
job.setJarByClass(sparkApplication10.class);
//定義map任務(wù)輸出key value 類型
job.setMapperClass(sparkApplication10.bulkLoad.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//定義輸入輸出文件格式類型
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
//定義輸入輸出文件路徑
FileInputFormat.addInputPath(job, new Path("/test6/"));
FileOutputFormat.setOutputPath(job, new Path("/tmp/"));
//配置bulkLoad
HFileOutputFormat2.configureIncrementalLoad(job, t, locator);
boolean b = job.waitForCompletion(true);
//hbase 下載hfile文件
LoadIncrementalHFiles load =new LoadIncrementalHFiles(conf);
load.doBulkLoad(new Path("/tmp/"), admin, t, locator);
}catch (Exception e) {
System.out.println(e);
}finally {
//resources closed
}
}
}
然后將該項(xiàng)目打成一個(gè)jar包麸澜,將jar包提交到y(tǒng)arn運(yùn)行挺尿,執(zhí)行結(jié)果如下:
查看hbase shell 表中是否有導(dǎo)入的數(shù)據(jù):
到此,數(shù)據(jù)導(dǎo)入成功炊邦,筆者親測编矾,7000w的數(shù)據(jù)量3臺虛擬機(jī)默認(rèn)配置導(dǎo)入到Hbase中僅花費(fèi)40分鐘不到。本篇文章中主要運(yùn)用mr 任務(wù)將數(shù)據(jù)導(dǎo)入到hbase中馁害,bulkload也支持spark導(dǎo)入窄俏,不過spark官方文檔主要運(yùn)用scala來實(shí)現(xiàn)的。等有機(jī)會的時(shí)候再研究研究java實(shí)現(xiàn)方式的碘菜。