問題描述:集群中原有采集程序從源文件入hbase出現(xiàn)積壓搅方,優(yōu)化修改程序都無濟(jì)于事征炼,需要趕緊出個方案進(jìn)行解決
問題解決:集群中的采集程序也有一條線是從源文件入到hdfs的炊汤,所以計劃以hdfs里的數(shù)據(jù)為源數(shù)據(jù)采用mapreduce生成hfile后通過bulkload的方式入hbase避免了原始數(shù)據(jù)的清洗操作
以下是開發(fā)的程序
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HFileGenerator {
public static class HFileMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String symbol = "_";
if ("".equals(line)||null == line) {
return;
}
String[] items = line.split("\\|", -1);
//根據(jù)業(yè)務(wù)需要組合rowkey
byte[] row = Bytes.toBytes(items[0]+symbol+items[1]+symbol+items[2]+symbol+items[3]+symbol+items[4]);
ImmutableBytesWritable rowkey = new ImmutableBytesWritable(row);
System.out.println(rowkey);
KeyValue kv = new KeyValue(row,
"f1".getBytes(), "column1".getBytes(),
System.currentTimeMillis(), Bytes.toBytes(line));
if (null != kv) {
System.out.println("kv"+kv);
context.write(rowkey, kv);
}
}
}
public static void main(String[] args) throws Exception {
Table table = null;
try{
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(TableName.valueOf("hbase_test"));
Job job = Job.getInstance(conf);
job.setJobName("HFile bulk load test");
job.setJarByClass(HFileGenerator.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setMapperClass(HFileMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
// 判斷output文件夾是否存在,如果存在則刪除
Path path = new Path("hdfs://lip1:8020/user/lipeng/hbase/output");
FileSystem fileSystem = path.getFileSystem(conf);
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
Path path1 = new Path("hdfs://lip1:8020/user/lipeng/hbase/output");
FileInputFormat.addInputPath(job, new Path("hdfs://lip1:8020/user/lipeng/hbase/input"));
FileOutputFormat.setOutputPath(job, path1);
HFileOutputFormat.configureIncrementalLoad(job, (HTable) table);
if (job.waitForCompletion(true)) {
FsShell shell = new FsShell(conf);
try {
//將該目錄賦予777權(quán)限
shell.run(new String[]{"-chmod", "-R", "777", "hdfs://lip1:8020/user/lipeng/hbase/output"});
} catch (Exception e) {
throw new IOException(e);
}
//加載到hbase表
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(path1, (HTable) table);
} else {
System.exit(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if (table != null) {
table.close();
}
}
}
}
執(zhí)行的時候需要將hbase的classpath添加到hadoop的hadoop-env.sh中畦贸,要不然會報找不到hbase相關(guān)的類的錯