自定義OutputFormat案例

FilterMapper

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

? ? @Override

? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

? ? ? ? context.write(value, NullWritable.get());

? ? }

}


FilterReducer

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

? ? Text k = new Text();

? ? @Override

? ? protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

? ? ? ? String line = key.toString();

? ? ? ? line = line + "\r\n";

? ? ? ? k.set(line);

? ? ? ? //防止有重復的數(shù)據(jù)

? ? ? ? for (NullWritable nullWritable : values) {

? ? ? ? ? ? context.write(key, NullWritable.get());

? ? ? ? }

? ? }

}


FilterOutputFormat

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {

? ? @Override

? ? public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

? ? ? ? return new FRecordWriter(job);

? ? }

}


FRecordWriter

public class FRecordWriter extends RecordWriter<Text, NullWritable> {

? ? FSDataOutputStream fosatguigu;

? ? FSDataOutputStream fosother;

? ? public FRecordWriter(TaskAttemptContext job) {

? ? ? ? try {

? ? ? ? ? ? // 1 獲取文件系統(tǒng)

? ? ? ? ? ? FileSystem fs = FileSystem.get(job.getConfiguration());

? ? ? ? ? ? // 2 創(chuàng)建輸出到atguigu.log的輸出流

? ? ? ? ? ? fosatguigu = fs.create(new Path("e:/atguigu.log"));

? ? ? ? ? ? // 3 創(chuàng)建輸出到other.log

? ? ? ? ? ? fosother = fs.create(new Path("e:/other.log"));

? ? ? ? } catch (IOException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

? ? @Override

? ? public void write(Text key, NullWritable value) throws IOException, InterruptedException {

? ? ? ? //判斷key當中是否有atguigu,如果有寫出到atguigu.log,如果沒有寫出到other.log

? ? ? ? if(key.toString().contains("atguigu")){

? ? ? ? ? ? //atguigu輸出流

? ? ? ? ? ? fosatguigu.write(key.toString().getBytes());

? ? ? ? }else{

? ? ? ? ? ? //other輸出流

? ? ? ? ? ? fosother.write(key.toString().getBytes());

? ? ? ? }

? ? }

? ? @Override

? ? public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

? ? ? ? IOUtils.closeStream(fosatguigu);

? ? ? ? IOUtils.closeStream(fosother);

? ? }

}


FilterDirver

public class FilterDirver {

? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

? ? ? ? //輸入輸出路徑需要根據(jù)自己電腦上的實際的輸入輸出路徑設(shè)置

? ? ? ? args = new String[]{"e:/input/inputoutputformat", "e:/output2"};

? ? ? ? // 1 獲取配置信息

? ? ? ? Configuration conf = new Configuration();

? ? ? ? Job job = Job.getInstance(conf);

? ? ? ? // 2 設(shè)置jar包加載路徑

? ? ? ? job.setJarByClass(FilterDirver.class);

? ? ? ? // 3 加載map/reduce類

? ? ? ? job.setMapperClass(FilterMapper.class);

? ? ? ? job.setReducerClass(FilterReducer.class);

? ? ? ? // 4 設(shè)置map輸出數(shù)據(jù)kv類型

? ? ? ? job.setMapOutputKeyClass(Text.class);

? ? ? ? job.setMapOutputValueClass(NullWritable.class);

? ? ? ? // 5 設(shè)置最終輸出數(shù)據(jù)的kv類型

? ? ? ? job.setOutputKeyClass(Text.class);

? ? ? ? job.setOutputValueClass(NullWritable.class);

? ? ? ? //要將自定義的輸出格式組件設(shè)置到j(luò)ob中

? ? ? ? job.setOutputFormatClass(FilterOutputFormat.class);

? ? ? ? // 6 設(shè)置輸入數(shù)據(jù)和輸出數(shù)據(jù)路徑

? ? ? ? FileInputFormat.setInputPaths(job, new Path(args[0]));

? ? ? ? FileOutputFormat.setOutputPath(job, new Path(args[1]));

? ? ? ? // 8 設(shè)置reduce端的分組

? ? ? ? job.setGroupingComparatorClass(OrderGroupingComparator.class);

? ? ? ? // 7 提交job

? ? ? ? boolean result = job.waitForCompletion(true);

? ? ? ? System.exit(result ? 0 : 1);

? ? }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末愉烙,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子烹卒,更是在濱河造成了極大的恐慌,老刑警劉巖伴箩,帶你破解...
    沈念sama閱讀 211,348評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異叉谜,居然都是意外死亡错敢,警方通過查閱死者的電腦和手機践樱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評論 2 385
  • 文/潘曉璐 我一進店門厂画,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人映胁,你說我怎么就攤上這事木羹〖籽牛” “怎么了解孙?”我有些...
    開封第一講書人閱讀 156,936評論 0 347
  • 文/不壞的土叔 我叫張陵坑填,是天一觀的道長。 經(jīng)常有香客問我弛姜,道長,這世上最難降的妖魔是什么廷臼? 我笑而不...
    開封第一講書人閱讀 56,427評論 1 283
  • 正文 為了忘掉前任寂恬,我火速辦了婚禮初肉,結(jié)果婚禮上饰躲,老公的妹妹穿的比我還像新娘。我一直安慰自己嘹裂,他們只是感情好寄狼,可當我...
    茶點故事閱讀 65,467評論 6 385
  • 文/花漫 我一把揭開白布丁寄。 她就那樣靜靜地躺著狡逢,像睡著了一般拼卵。 火紅的嫁衣襯著肌膚如雪腋腮。 梳的紋絲不亂的頭發(fā)上即寡,一...
    開封第一講書人閱讀 49,785評論 1 290
  • 那天莺丑,我揣著相機與錄音,去河邊找鬼。 笑死梢莽,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播洪鸭,決...
    沈念sama閱讀 38,931評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼拾枣,長吁一口氣:“原來是場噩夢啊……” “哼盒让!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起姨蝴,我...
    開封第一講書人閱讀 37,696評論 0 266
  • 序言:老撾萬榮一對情侶失蹤浮梢,失蹤者是張志新(化名)和其女友劉穎彤路,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體远豺,經(jīng)...
    沈念sama閱讀 44,141評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡棺滞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,483評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了避凝。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,625評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡眨补,死狀恐怖管削,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情撑螺,我是刑警寧澤含思,帶...
    沈念sama閱讀 34,291評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站甘晤,受9級特大地震影響含潘,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜线婚,卻給世界環(huán)境...
    茶點故事閱讀 39,892評論 3 312
  • 文/蒙蒙 一遏弱、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧塞弊,春花似錦漱逸、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至诀黍,卻和暖如春袋坑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背眯勾。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工咒彤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人咒精。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓镶柱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親模叙。 傳聞我的和親對象是個殘疾皇子歇拆,可洞房花燭夜當晚...
    茶點故事閱讀 43,492評論 2 348