MapReduce 實現(xiàn) join 文件數(shù)據(jù)(四)

我們都知道澎嚣,當對兩個表進行關聯(lián)的時候可以用sql的join語句簡單的去實現(xiàn),并且如果兩張表的數(shù)據(jù)查詢非常大瘟芝,那么一般會講小表放在左邊易桃,可以達到優(yōu)化的作用,為何呢锌俱?其實我們在使用mapreduce的時候小表可以先加載到內(nèi)存中晤郑,然后再與輸入數(shù)據(jù)進行對比,如果匹配成功則關聯(lián)輸出贸宏。今天我們將介紹使用mapreduce中mapjoin與reducejoin兩種方式對數(shù)據(jù)的關聯(lián)并輸出贩汉。
一、先看數(shù)據(jù):
image.png
我們分別將兩個數(shù)據(jù)文件放到hdfs上:
image.png
二锚赤、以 order 作為小表在 map 中進行 join,首先我們創(chuàng)建驅(qū)動類框架:
public class MapJoinRM extends Configured implements Tool {

    //加載到內(nèi)存中的對象
    static Map<String, String> customerMap = new HashMap<String, String>();

    public int run(String[] args) throws Exception {

        //driver
        //1) 獲取配置對象
        Configuration configuration = this.getConf();

        //2) 創(chuàng)建任務對象
        Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        //3.1) 設置輸入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job, path);

        //3.2) map 的設置
        job.setMapperClass(JoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //3.3 reduce 設置

        //3.4 添加緩存
        URI uri = new URI(args[2]);
        job.addCacheFile(uri);

        //3.5 設置輸出
        Path output = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, output);

        //4. 提交
        boolean sucess = job.waitForCompletion(true);
        return sucess ? 0 : 1;
    }

    public static void main(String[] args) {

        args = new String[]{
                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/order.txt",
                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output66",
                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/customer.txt"
        };

        Configuration configuration = new Configuration();
        try {
            //判斷是否已經(jīng)存在路徑
            Path fileOutputPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if(fileSystem.exists(fileOutputPath)){
                fileSystem.delete(fileOutputPath, true);
            }

            int status = ToolRunner.run(configuration, new MapJoinRM(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

三褐鸥、實現(xiàn) mapper 子類處理緩存數(shù)據(jù)以及關聯(lián)邏輯的實現(xiàn):
public static class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{

        private Text outputKey = new Text();
        private Text outputValue = new Text();
  
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //緩存數(shù)據(jù)的處理
            Configuration configuration = context.getConfiguration();
            URI[] uri = Job.getInstance(configuration).getCacheFiles();
            Path path = new Path(uri[0]);
            FileSystem fileSystem = FileSystem.get(configuration);
            InputStream inputStream = fileSystem.open(path);

            InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
            BufferedReader bufferedReader = new BufferedReader(inputStreamReader);

            String line = null;
            while((line = bufferedReader.readLine()) != null){
                if(line.trim().length() > 0){
                    customerMap.put(line.split(",")[0], line);
                }
            }

            bufferedReader.close();
            inputStreamReader.close();
            inputStream.close();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String lineValue = value.toString();
            StringTokenizer stringTokenizer = new StringTokenizer(lineValue, ",");
            while(stringTokenizer.hasMoreTokens()){
                String wordValue = stringTokenizer.nextToken();
                if(customerMap.get(wordValue) != null){
                    outputKey.set(wordValue);
                    outputValue.set(customerMap.get(wordValue) + lineValue);
                    context.write(outputKey, outputValue);
                    break;
                }
            }

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }
    }
四线脚、運行程序并在控制臺中命令查看關聯(lián)結果:
bin/hdfs dfs -text /user/hdfs/output66/part*

運行結果如圖:


image.png

大小表的關聯(lián)就這么簡單,接下來我們使用 reduce 的進行 join

五、由于在 reduce 中進行 join 的話是同時加載兩個數(shù)據(jù)進來的浑侥,為了區(qū)分從 map 中傳進來的數(shù)據(jù)姊舵,我們要自定義一個類型,設置一個變量用于標識是哪張表的數(shù)據(jù)寓落,這樣我們在reduce中才能區(qū)分哪些數(shù)據(jù)是屬于哪張表的:
public class DataJoionWritable implements Writable {

    private String tag;
    private String data;

    public DataJoionWritable() {
    }

    public DataJoionWritable(String tag, String data) {
       this.set(tag, data);
    }

    public void set(String tag, String data){
        this.tag = tag;
        this.data = data;
    }

    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeUTF(this.getTag());
        dataOutput.writeUTF(this.getData());

    }

    public void readFields(DataInput dataInput) throws IOException {

        this.setTag(dataInput.readUTF());
        this.setData(dataInput.readUTF());

    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "DataJoionWritable{" +
                "tag='" + tag + '\'' +
                ", data='" + data + '\'' +
                '}';
    }

}
六括丁、為了方便使用表示常量我們創(chuàng)建一個常用類:
public class DataCommon {

    public final static String CUSTOMER = "customer";
    public final static String ORDER = "order";

}
七、創(chuàng)建驅(qū)動類的通用框架:
public class ReduceJoinMR extends Configured implements Tool {

    public int run(String args[]) throws IOException, ClassNotFoundException, InterruptedException {

        //driver
        //1) 獲取配置對象
        Configuration configuration = this.getConf();

        //2) 創(chuàng)建任務對象
        Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        //3.1) 設置輸入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job, path);

        //3.2) map 的設置
        job.setMapperClass(JoinMapper2.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DataJoionWritable.class);

        //3.3 reduce 設置
        job.setReducerClass(JoinReduce2.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        //3.4 設置輸出
        Path output = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, output);

        //4. 提交
        boolean sucess = job.waitForCompletion(true);
        return sucess ? 0 : 1;
    }


    public static void main(String[] args) {
        //datas目錄下有已存在要關聯(lián)的兩個數(shù)據(jù)文件
        args = new String[]{
                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/datas",
                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output100"
        };

        Configuration configuration = new Configuration();
        try {
            //判斷是否已經(jīng)存在路徑
            Path fileOutputPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if(fileSystem.exists(fileOutputPath)){
                fileSystem.delete(fileOutputPath, true);
            }

            int status = ToolRunner.run(configuration, new ReduceJoinMR(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

八伶选、接下來我們開始實現(xiàn) Mapper 的數(shù)據(jù)邏輯的處理:
public static class JoinMapper2 extends Mapper<LongWritable, Text, Text, DataJoionWritable>{

        private Text outputKey = new Text();
        DataJoionWritable outputValue = new DataJoionWritable();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] values = value.toString().split(",");
            if((3 != values.length) && (4 != values.length)) return;

            //customer
            if(3 == values.length){
                String cid = values[0];
                String name = values[1];
                String telphone = values[2];
                outputKey.set(cid);
                outputValue.set(DataCommon.CUSTOMER,name + ","+telphone);
            }

            //order
            if(4 == values.length){
                String cid = values[1];
                String price = values[2];
                String productName = values[3];
                outputKey.set(cid);
                outputValue.set(DataCommon.ORDER,productName + ","+price);
            }

            context.write(outputKey,outputValue);

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }
    }

九史飞、使用 reduce 對數(shù)據(jù)的關聯(lián)處理:
public static class JoinReduce2 extends Reducer<Text, DataJoionWritable, NullWritable, Text>{

        private  Text outputValue = new Text();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void reduce(Text key, Iterable<DataJoionWritable> values, Context context) throws IOException, InterruptedException {

            String customerInfo = null;
            List<String> orderList = new ArrayList<String>();

            for (DataJoionWritable dataJoinWritable : values){
                if(DataCommon.CUSTOMER.equals(dataJoinWritable.getTag())){
                    customerInfo = dataJoinWritable.getData();
                }
                else if(DataCommon.ORDER.equals(dataJoinWritable.getTag())){
                    orderList.add(dataJoinWritable.getData());
                }
            }

            for (String orderInfo : orderList){
                if(customerInfo == null) continue;
                outputValue.set(key.toString() +","+ customerInfo + ","+ orderInfo);
                context.write(NullWritable.get(),outputValue);
            }

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }
    }

十、使用命令查詢結果如下:
image.png

由于時間過于緊迫仰税,基本上就粘貼代碼了构资,后續(xù)會優(yōu)化,在此感謝老師的思路陨簇。吐绵。。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末河绽,一起剝皮案震驚了整個濱河市己单,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌耙饰,老刑警劉巖纹笼,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異榔幸,居然都是意外死亡允乐,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門削咆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來牍疏,“玉大人,你說我怎么就攤上這事拨齐×墼桑” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵瞻惋,是天一觀的道長厦滤。 經(jīng)常有香客問我,道長歼狼,這世上最難降的妖魔是什么掏导? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮羽峰,結果婚禮上趟咆,老公的妹妹穿的比我還像新娘添瓷。我一直安慰自己,他們只是感情好值纱,可當我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布鳞贷。 她就那樣靜靜地躺著,像睡著了一般虐唠。 火紅的嫁衣襯著肌膚如雪搀愧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天疆偿,我揣著相機與錄音咱筛,去河邊找鬼。 笑死翁脆,一個胖子當著我的面吹牛眷蚓,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播反番,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼沙热,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了罢缸?” 一聲冷哼從身側響起篙贸,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎枫疆,沒想到半個月后爵川,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡息楔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年寝贡,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片值依。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡圃泡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出愿险,到底是詐尸還是另有隱情颇蜡,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布辆亏,位于F島的核電站风秤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏扮叨。R本人自食惡果不足惜缤弦,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望彻磁。 院中可真熱鬧碍沐,春花似錦惦费、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽恍箭。三九已至刻恭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間扯夭,已是汗流浹背鳍贾。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留交洗,地道東北人骑科。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像构拳,于是被迫代替她去往敵國和親咆爽。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容