MapReduce中使用Avro

原文地址:https://itweknow.cn/detail?id=71 岖瑰,歡迎大家訪問休蟹。

上篇文章我們簡要介紹了一下Avro是啥袭蝗,以及其幾種數(shù)據(jù)類型版扩。那么通過這篇文章我們一起來實踐一下Avro在MapReduce中的使用。

前提條件

一個maven項目
Hadoop集群,如果你還沒有安裝的話绍撞,請戳這里,查看之前的文章得院。

說明

本篇文章是一個簡單的用例傻铣,使用的例子是一個txt文件中存儲了大量的學生信息,這些學生有姓名祥绞、年齡非洲、愛好和班級信息,我們要做的事情就是通過MapReduce程序找到各個班級年齡最大的學生蜕径。

項目依賴

我們需要hadoop以及avro相關的包两踏。

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.8.5</version>
</dependency>

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-mapred</artifactId>
    <version>1.8.2</version>
</dependency>

Avro模式

前面也說到了每個學生有姓名、年齡兜喻、愛好梦染、班級四個字段的信息,所以我們定義了如下的Avro模式來描述一個學生朴皆。命名為Student.avsc帕识,存放在resources目錄下。

{
    "type": "record",
    "name": "StudentRecord",
    "doc": "A student",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
        {"name": "hobby", "type": "string"},
        {"name": "class", "type": "string"}
    ]
}

Mapper和Reducer

  • Mapper
public class StudentAgeMaxMapper extends Mapper<LongWritable, Text,
        AvroKey<String>, AvroValue<GenericRecord>> {

    private GenericRecord record = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());

    private StudentRecordParser parser = new StudentRecordParser();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException,
            InterruptedException {
        parser.parse(value);
        if (parser.isValid()) {
            // 數(shù)據(jù)合法遂铡。
            record.put("name", parser.getName());
            record.put("age", parser.getAge());
            record.put("hobby", parser.getHobby());
            record.put("class", parser.getClazz());
            context.write(new AvroKey<>(parser.getClazz()), new AvroValue<>(record));
        }
    }
}

上面的代碼中你可以看到我們自定義了一個StudentRecordParser的類來解析一行記錄肮疗,由于篇幅的原因這里就不展示了,你可以在后面提供的源碼中找到扒接。其實不難看出伪货,Map程序主要做的事情就是將我們存放在txt中的記錄解析成一個個的GenericRecord對戲,然后以班級名稱為鍵钾怔,record為值傳遞給Reducer做進一步處理碱呼。

  • Reducer
public class StudentAgeMaxReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>,
        AvroKey<GenericRecord>, NullWritable> {

    @Override
    protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values,
                          Context context) throws IOException, InterruptedException {

        GenericRecord max = null;
        for (AvroValue<GenericRecord> value : values) {
            GenericRecord record = value.datum();
            if (max == null || ((Integer)max.get("age") <
                    (Integer) record.get("age"))) {
                max = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
                max.put("name", record.get("name"));
                max.put("age", record.get("age"));
                max.put("hobby", record.get("hobby"));
                max.put("class", record.get("class"));
            }
        }
        context.write(new AvroKey<>(max), NullWritable.get());
    }
}

Reducer的邏輯其實也比較簡單,就是通過循環(huán)比較的方式找到年齡最大的學生蒂教。

驅動程序

public class StudentAgeMaxDriver {

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // 注釋1:為了解決在Hadoop集群中運行時我們使用的Avro版本和集群中Avro版本不一致的問題巍举。
        configuration.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
        Job job = Job.getInstance(configuration);
        job.setJarByClass(StudentAgeMaxDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputValueSchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
        AvroJob.setOutputKeySchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        job.setMapperClass(StudentAgeMaxMapper.class);
        job.setReducerClass(StudentAgeMaxReducer.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }

}

和之前的MapReduce實戰(zhàn)中實例比較,我們這里使用AvroJob來配置作業(yè)凝垛,AvroJob類主要用來給輸入懊悯、map輸出以及最后輸出數(shù)據(jù)指定Avro模式。

項目打包

在打包的時候我們需要將依賴也打到jar包中梦皮,不然后面在集群中運行的時候會報找不到AvroJob類的錯誤炭分。可通過在pom.xml中添加如下插件來解決打包的問題剑肯。

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

運行

  1. 準備輸入文件捧毛,input.txt。

    zhangsan    23  music   class1
    lisi    24  pingpong    class2
    wangwu  24  dance   class1
    liuyi   25  music   class1
    chener  25  dance   class2
    zhaoliu 22  dance   class2
    sunqi   22  pingpong    class1
    zhouba  23  music   class2
    wujiu   26  dance   class1
    zhengshi    21  dance   class2
    
  2. 將輸入文件上傳到HDFS上

    hadoop fs -mkdir /input
    hadoop fs -put input.txt /input
    
  3. 將jar拷貝到集群中任意一臺Hadoop機器上。

  4. 運行下面的命令執(zhí)行jar包

    export HADOOP_CLASSPATH=${你的jar包名}
    export HADOOP_USER_CLASSPATH_FIRST=true
    hadoop jar {你的jar包名} {主類路徑} /input /output
    
  5. 將運行結果拷貝到本地

    hadoop fs -copyToLocal /output/part-r-00000.avro part-r-00000.avro
    
  6. 運行結果查看

    root@test:~# java -jar /root/extra-jar/avro-tools-1.8.2.jar tojson part-r-00000.avro
    {"name":"wujiu","age":26,"hobby":"dance","class":"class1"}
    {"name":"chener","age":25,"hobby":"dance","class":"class2"}
    
    

想要項目源碼嗎呀忧?戳這里就有哦师痕。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市而账,隨后出現(xiàn)的幾起案子胰坟,更是在濱河造成了極大的恐慌,老刑警劉巖泞辐,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件笔横,死亡現(xiàn)場離奇詭異,居然都是意外死亡咐吼,警方通過查閱死者的電腦和手機吹缔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來锯茄,“玉大人厢塘,你說我怎么就攤上這事〖∮模” “怎么了俗冻?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長牍颈。 經常有香客問我迄薄,道長,這世上最難降的妖魔是什么煮岁? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任讥蔽,我火速辦了婚禮,結果婚禮上画机,老公的妹妹穿的比我還像新娘冶伞。我一直安慰自己,他們只是感情好步氏,可當我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布响禽。 她就那樣靜靜地躺著,像睡著了一般荚醒。 火紅的嫁衣襯著肌膚如雪芋类。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天界阁,我揣著相機與錄音侯繁,去河邊找鬼。 笑死泡躯,一個胖子當著我的面吹牛贮竟,可吹牛的內容都是我干的丽焊。 我是一名探鬼主播,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼咕别,長吁一口氣:“原來是場噩夢啊……” “哼技健!你這毒婦竟也來了?” 一聲冷哼從身側響起惰拱,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤凫乖,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后弓颈,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡删掀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年翔冀,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片披泪。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡纤子,死狀恐怖,靈堂內的尸體忽然破棺而出款票,到底是詐尸還是另有隱情控硼,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布艾少,位于F島的核電站卡乾,受9級特大地震影響,放射性物質發(fā)生泄漏缚够。R本人自食惡果不足惜幔妨,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望谍椅。 院中可真熱鬧误堡,春花似錦、人聲如沸雏吭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽杖们。三九已至悉抵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間摘完,已是汗流浹背基跑。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留描焰,地道東北人媳否。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓栅螟,卻偏偏與公主長得像,于是被迫代替她去往敵國和親篱竭。 傳聞我的和親對象是個殘疾皇子力图,可洞房花燭夜當晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內容