VI吼具、 HBase-MR

一横辆、 官方HBase-MapReduce

查看HBase的MapReduce任務(wù)的所需的依賴
bin/hbase mapredcp
執(zhí)行環(huán)境變量的導(dǎo)入
export HBASE_HOME=/opt/module/hbase-1.3.1
export HADOOP_CLASSPATH=${HBASE_HOME}/bin/hbase mapredcp

  • 統(tǒng)計(jì)Student表中有多少行數(shù)據(jù)

yarn jar ../hbase-1.3.1/lib/hbase-server-1.3.1.jar rowcounter zhangsan

image.png

  • 使用MapReduce將本地?cái)?shù)據(jù)導(dǎo)入到HBase

image.png

在HDFS中上傳abc.tsv文件
[root@bigdata111 input]# hdfs dfs -mkdir /hbase_input
[root@bigdata111 input]# hdfs dfs -put abc.tsv /hbase_input/
yarn jar ../hbase-1.3.1/lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,cf:name,cf:age test hdfs://bigdata111:9000/hbase_input
image.png

二讲衫、 自己編寫實(shí)現(xiàn)Hbase2Hbase

  • Mapper

package top.gujm.hbase_h2h;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

/**
 * 只需要cf:age列
 */
public class Mapper extends TableMapper<ImmutableBytesWritable, Put> {
    Logger logger = LoggerFactory.getLogger(Mapper.class);
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        List<Cell> columnCells = value.getColumnCells(Bytes.toBytes("cf"), Bytes.toBytes("age"));
        logger.info(Bytes.toString(key.get())+"的cf:age列個(gè)數(shù):"+columnCells.size());
        if(columnCells.size() > 0){
            Put put = new Put(key.get());
            put.add(columnCells.get(0));
            context.write(key, put);
        }
    }
}
  • Reducer

package top.gujm.hbase_h2h;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put put : values){
            context.write(NullWritable.get(), put);
        }
    }
}
  • Driver

package top.gujm.hbase_h2h;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Driver extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(getConf(), Driver.class.getSimpleName());
        job.setJarByClass(Driver.class);
        TableMapReduceUtil.initTableMapperJob(
                "test",
                new Scan(),
                Mapper.class,
                ImmutableBytesWritable.class,
                Put.class,
                job
        );
        TableMapReduceUtil.initTableReducerJob("test2", Reducer.class, job);
        job.setNumReduceTasks(1);
        boolean flag = job.waitForCompletion(true);
        return flag ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(HBaseConfiguration.create(), new Driver(), args);

    }
}
  • 打包并上傳到linux伴箩,然后執(zhí)行下面命令運(yùn)行

yarn jar FlumeCustom-1.0-SNAPSHOT.jar top.gujm.hbase_h2h.Driver

  • 結(jié)果

image.png

三、 自己編寫實(shí)現(xiàn)hdfs2hbase

  • Mapper

package top.gujm.hbase_hdfs2hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //第一列rowkey绝葡,第二列name深碱,第三列age
        String[] fields = value.toString().split("\t");
        byte[] rowkey = Bytes.toBytes(fields[0]);
        Put put = new Put(rowkey);
        byte[] family = Bytes.toBytes("cf");
        byte[] columnName = Bytes.toBytes("name");
        byte[] columnAge = Bytes.toBytes("age");
        put.addColumn(family, columnName, Bytes.toBytes(fields[1]));
        put.addColumn(family, columnAge, Bytes.toBytes(fields[2]));
        context.write(new ImmutableBytesWritable(rowkey), put);
    }
}
  • Reducer

package top.gujm.hbase_hdfs2hbase;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class Reducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put put : values){
            context.write(NullWritable.get(), put);
        }
    }
}
  • Driver

package top.gujm.hbase_hdfs2hbase;

import com.sun.org.apache.xerces.internal.dom.PSVIAttrNSImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Driver extends Configured implements Tool {

    public int run(String[] strings) throws Exception {

        Job job = Job.getInstance(getConf());
        job.setJarByClass(Driver.class);

        job.setMapperClass(Mapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        TableMapReduceUtil.initTableReducerJob("test", Reducer.class, job);

        FileInputFormat.setInputPaths(job, "/hbase_input/abc.tsv");

        job.setNumReduceTasks(1);
        boolean f = job.waitForCompletion(true);
        return f ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new Driver(), args);
    }
}
  • 打包上傳運(yùn)行

yarn jar FlumeCustom-1.0-SNAPSHOT.jar top.gujm.hbase_hdfs2hbase.Driver

  • 結(jié)果

image.png

四、 自己編寫實(shí)現(xiàn)hdfs2hbase

  • Mapper

package top.gujm.hbase_hbase2hdfs;


import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import java.io.IOException;

public class Mapper extends TableMapper<ImmutableBytesWritable, Result> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}
  • Reducer

package top.gujm.hbase_hbase2hdfs;


import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class Reducer extends org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, Result, Text, Text> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context) throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer();
        int i = 0;
        for (Result result : values){
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                sb.append(i == 0 ? "" : ",");
                sb.append(
                        Bytes.toString(CellUtil.cloneFamily(cell)) + ":" +
                                Bytes.toString(CellUtil.cloneQualifier(cell)) + "=" +
                                Bytes.toString(CellUtil.cloneValue(cell))
                );
            }
            context.write(new Text(Bytes.toString(key.get())), new Text(sb.toString()));
        }
    }
}
  • Driver

package top.gujm.hbase_hbase2hdfs;


import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Driver extends Configured implements Tool {
    public int run(String[] strings) throws Exception {

        Job job = Job.getInstance(getConf());
        job.setJarByClass(Driver.class);

        TableMapReduceUtil.initTableMapperJob(
                "test",
                new Scan(),
                Mapper.class,
                ImmutableBytesWritable.class,
                Result.class,
                job
        );

        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);

        FileOutputFormat.setOutputPath(job, new Path("/out"));

        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(HBaseConfiguration.create(), new Driver(), args);
    }
}
  • 測試數(shù)據(jù)

[圖片上傳失敗...(image-c1be27-1569657812536)]

  • 結(jié)果

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末藏畅,一起剝皮案震驚了整個(gè)濱河市敷硅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌愉阎,老刑警劉巖绞蹦,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異榜旦,居然都是意外死亡幽七,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進(jìn)店門溅呢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來澡屡,“玉大人,你說我怎么就攤上這事咐旧∈火模” “怎么了?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵铣墨,是天一觀的道長室埋。 經(jīng)常有香客問我,道長踏兜,這世上最難降的妖魔是什么词顾? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮碱妆,結(jié)果婚禮上肉盹,老公的妹妹穿的比我還像新娘。我一直安慰自己疹尾,他們只是感情好上忍,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著纳本,像睡著了一般窍蓝。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上繁成,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天吓笙,我揣著相機(jī)與錄音,去河邊找鬼巾腕。 笑死面睛,一個(gè)胖子當(dāng)著我的面吹牛絮蒿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播叁鉴,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼土涝,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了幌墓?” 一聲冷哼從身側(cè)響起但壮,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎常侣,沒想到半個(gè)月后蜡饵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡胳施,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年验残,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片巾乳。...
    茶點(diǎn)故事閱讀 39,745評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖鸟召,靈堂內(nèi)的尸體忽然破棺而出胆绊,到底是詐尸還是另有隱情,我是刑警寧澤欧募,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布压状,位于F島的核電站,受9級特大地震影響跟继,放射性物質(zhì)發(fā)生泄漏种冬。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一舔糖、第九天 我趴在偏房一處隱蔽的房頂上張望娱两。 院中可真熱鬧,春花似錦金吗、人聲如沸十兢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽旱物。三九已至,卻和暖如春卫袒,著一層夾襖步出監(jiān)牢的瞬間宵呛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工夕凝, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留宝穗,地道東北人户秤。 一個(gè)月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓,卻偏偏與公主長得像讽营,于是被迫代替她去往敵國和親虎忌。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評論 2 354

推薦閱讀更多精彩內(nèi)容