Hadoop是個高效的工具
介紹了Hadoop的由來和組成锄禽,當然提供給你用來分析的數(shù)據(jù),以及最快的方式靴姿。更重要的是描述第一個Hadoop工程的詳細編寫過程沃但。
1.認識Hadoop
1.1解決高速增長的存儲空間和讀取速度不匹配的問題
引入了多個磁盤同時讀取數(shù)據(jù)的技術。但為實現(xiàn)這個技術佛吓,又有兩個問題需要解決:
- 硬盤故障問題
- 各種分布式系統(tǒng)允許結合不同來源的數(shù)據(jù)進行分析宵晚,很難保證其正確性。
而在Hadoop中對這兩個問題都做到處理和解決维雇。對于第一個問題淤刃,常用的做法是保存數(shù)據(jù)副本(replica),Hadoop文件系統(tǒng)(HDFS吱型, Hadoop Distributed FileSystem)的使用原理類似逸贾,略有不同。第二個問題Hadoop中引入了MapReduce模型津滞,模型抽象出了硬盤讀寫問題并將其轉換為對一個數(shù)據(jù)集的計算铝侵,同時也具備較高的可靠性。
MapReduce 是一種線性的可伸縮編程模型触徐。使用者要寫兩個函數(shù)咪鲜,分別是Map函數(shù)和Reduce函數(shù),每個函數(shù)定義從一個鍵值對集合到另一個鍵值對集合到映射撞鹉。性能方面疟丙,MapReduce盡量在計算節(jié)點上存儲數(shù)據(jù)颖侄,以實現(xiàn)數(shù)據(jù)的本地快速訪問,數(shù)據(jù)本地化是MapReduce的核心特征享郊,從而獲得更好的性能览祖。另外有多種基于MapReduce的高級查詢語言(Pig和Hive)供使用。穩(wěn)定性上炊琉,MapReduce采用無共享(shared-nothing)框架展蒂,實現(xiàn)了失敗檢測,所有使用者無需擔心系統(tǒng)的部分失效問題温自。
1.2.氣象數(shù)據(jù)下載
書中的數(shù)據(jù)分析實例使用的是ncdc的氣象數(shù)據(jù)玄货,在手動編寫程序之前皇钞,首先要準備好這些數(shù)據(jù)悼泌。最開始找到了ncdc的ftp站點ftp://ftp.ncdc.noaa.gov/pub/data/
,下載經(jīng)常性的出現(xiàn)斷線夹界,下載速度異常緩慢馆里。所以不得不重新搜索新的源,最終找到了https://www1.ncdc.noaa.gov/pub/data/noaa
這個地址可柿,但是在下載時卻不像ftp可以批量下載鸠踪。
而只能通過腳本去抓去數(shù)據(jù)。這個腳本實現(xiàn)的功能是复斥,按年份批量下載對應地址的壓縮包营密,并將這些數(shù)據(jù)按年份保存。值得一說的是這個shell腳本使用了并行下載方式目锭,節(jié)省了大量的時間评汰。
#! /bin/bash
for i in {1901..2019}
do {
mkdir -p /Users/macos/noaaData/$i
wget --execute robots=off -r -np -nH --cut-dirs=4 -R index.html* https://www1.ncdc.noaa.gov/pub/data/noaa/$i/ -P /Users/macos/noaaData/$i
}&
done
2.第一個Hadoop工程
2.1 安裝并運行Hadoop
下載最新2.8.1版本
具體安裝方式和配置過程參考官方文檔 http://hadoop.apache.org/docs/current/
進入hadoop-x.x.x/sbin目錄下運行star-all腳本(中間需要輸入root密碼)
啟動成功驗證:
打開瀏覽器:
http://192.168.8.88:50070 (hdfs管理界面)顯示active活躍狀態(tài)
http://192.168.8.88:8088 (yarn管理界面)
以上兩個地址正常顯示,則說明啟動成功痢虹。
2.2 Hadoop程序編寫
MapReduce任務過程分為兩個處理階段:
- map階段
- reduce階段
每個階段都以鍵值對作為輸入和輸出被去,類型可供選擇。兩個處理階段需要分別編寫相應的函數(shù)方法奖唯,并加上運行作業(yè)的代碼惨缆。
新建Maven項目
- 在pom.xml文件中增加以下依賴關系
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
- 編寫Map函數(shù)、Reduce函數(shù)和調用執(zhí)行代碼
Map函數(shù)
完成功能:在天氣數(shù)據(jù)中截取溫度數(shù)據(jù)丰捷。并寫入到contex中為Reduce方法準備好數(shù)據(jù)坯墨。
// cc MaxTemperatureMapper Mapper for maximum temperature example
// vv MaxTemperatureMapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
// ^^ MaxTemperatureMapper
Reduce函數(shù)
完成功能:根據(jù)Map函數(shù)傳遞來的數(shù)據(jù)計算最大值,并輸出年份和最高溫度的鍵值對病往。
// cc MaxTemperatureReducer Reducer for maximum temperature example
// vv MaxTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
// ^^ MaxTemperatureReducer
main方法:
完成功能:創(chuàng)建運行Job畅蹂,傳遞數(shù)據(jù)目錄并設置Map和Reduce對應class;同時設置輸出鍵值對格式荣恐。
// cc MaxTemperature Application to find the maximum temperature in the weather dataset
// vv MaxTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// ^^ MaxTemperature
- 設置數(shù)據(jù)輸入輸出目錄
在Run configurations中Program arguments輸入框中液斜,設置數(shù)據(jù)目錄和輸出目錄的絕對路徑累贤。
運行會在輸出目錄下生成兩個文件:
_SUCCESS
part-r-00000
第二個文件為我們需要的運行結果如下:
1948 342
1949 311
...
到此我們對Hadoop工程有了一個初步認識,并成功運行了我們的第一個項目少漆。好了臼膏,這篇分享就到這了,感興趣可以持續(xù)關注博客更新哦??