原文地址:https://itweknow.cn/detail?id=71 岖瑰,歡迎大家訪問休蟹。
上篇文章我們簡要介紹了一下Avro是啥袭蝗,以及其幾種數(shù)據(jù)類型版扩。那么通過這篇文章我們一起來實踐一下Avro在MapReduce中的使用。
前提條件
一個maven項目
Hadoop集群,如果你還沒有安裝的話绍撞,請戳這里,查看之前的文章得院。
說明
本篇文章是一個簡單的用例傻铣,使用的例子是一個txt文件中存儲了大量的學生信息,這些學生有姓名祥绞、年齡非洲、愛好和班級信息,我們要做的事情就是通過MapReduce程序找到各個班級年齡最大的學生蜕径。
項目依賴
我們需要hadoop以及avro相關的包两踏。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.8.2</version>
</dependency>
Avro模式
前面也說到了每個學生有姓名、年齡兜喻、愛好梦染、班級四個字段的信息,所以我們定義了如下的Avro模式來描述一個學生朴皆。命名為Student.avsc帕识,存放在resources目錄下。
{
"type": "record",
"name": "StudentRecord",
"doc": "A student",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "hobby", "type": "string"},
{"name": "class", "type": "string"}
]
}
Mapper和Reducer
- Mapper
public class StudentAgeMaxMapper extends Mapper<LongWritable, Text,
AvroKey<String>, AvroValue<GenericRecord>> {
private GenericRecord record = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
private StudentRecordParser parser = new StudentRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
parser.parse(value);
if (parser.isValid()) {
// 數(shù)據(jù)合法遂铡。
record.put("name", parser.getName());
record.put("age", parser.getAge());
record.put("hobby", parser.getHobby());
record.put("class", parser.getClazz());
context.write(new AvroKey<>(parser.getClazz()), new AvroValue<>(record));
}
}
}
上面的代碼中你可以看到我們自定義了一個StudentRecordParser的類來解析一行記錄肮疗,由于篇幅的原因這里就不展示了,你可以在后面提供的源碼中找到扒接。其實不難看出伪货,Map程序主要做的事情就是將我們存放在txt中的記錄解析成一個個的GenericRecord對戲,然后以班級名稱為鍵钾怔,record為值傳遞給Reducer做進一步處理碱呼。
- Reducer
public class StudentAgeMaxReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>,
AvroKey<GenericRecord>, NullWritable> {
@Override
protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values,
Context context) throws IOException, InterruptedException {
GenericRecord max = null;
for (AvroValue<GenericRecord> value : values) {
GenericRecord record = value.datum();
if (max == null || ((Integer)max.get("age") <
(Integer) record.get("age"))) {
max = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
max.put("name", record.get("name"));
max.put("age", record.get("age"));
max.put("hobby", record.get("hobby"));
max.put("class", record.get("class"));
}
}
context.write(new AvroKey<>(max), NullWritable.get());
}
}
Reducer的邏輯其實也比較簡單,就是通過循環(huán)比較的方式找到年齡最大的學生蒂教。
驅動程序
public class StudentAgeMaxDriver {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 注釋1:為了解決在Hadoop集群中運行時我們使用的Avro版本和集群中Avro版本不一致的問題巍举。
configuration.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
Job job = Job.getInstance(configuration);
job.setJarByClass(StudentAgeMaxDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setMapOutputValueSchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
AvroJob.setOutputKeySchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapperClass(StudentAgeMaxMapper.class);
job.setReducerClass(StudentAgeMaxReducer.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
和之前的MapReduce實戰(zhàn)中實例比較,我們這里使用AvroJob來配置作業(yè)凝垛,AvroJob類主要用來給輸入懊悯、map輸出以及最后輸出數(shù)據(jù)指定Avro模式。
項目打包
在打包的時候我們需要將依賴也打到jar包中梦皮,不然后面在集群中運行的時候會報找不到AvroJob類的錯誤炭分。可通過在pom.xml中添加如下插件來解決打包的問題剑肯。
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
運行
-
準備輸入文件捧毛,input.txt。
zhangsan 23 music class1 lisi 24 pingpong class2 wangwu 24 dance class1 liuyi 25 music class1 chener 25 dance class2 zhaoliu 22 dance class2 sunqi 22 pingpong class1 zhouba 23 music class2 wujiu 26 dance class1 zhengshi 21 dance class2
-
將輸入文件上傳到HDFS上
hadoop fs -mkdir /input hadoop fs -put input.txt /input
將jar拷貝到集群中任意一臺Hadoop機器上。
-
運行下面的命令執(zhí)行jar包
export HADOOP_CLASSPATH=${你的jar包名} export HADOOP_USER_CLASSPATH_FIRST=true hadoop jar {你的jar包名} {主類路徑} /input /output
-
將運行結果拷貝到本地
hadoop fs -copyToLocal /output/part-r-00000.avro part-r-00000.avro
-
運行結果查看
root@test:~# java -jar /root/extra-jar/avro-tools-1.8.2.jar tojson part-r-00000.avro {"name":"wujiu","age":26,"hobby":"dance","class":"class1"} {"name":"chener","age":25,"hobby":"dance","class":"class2"}
想要項目源碼嗎呀忧?戳這里就有哦师痕。