前言
本文是個人之前紀(jì)錄的MapReduce學(xué)習(xí)筆記踪区,主要涉及到MapReduce基本概念、Hadoop 經(jīng)典示例WordCount的使用解析陈莽、hdfs與hbase的簡單了解使用〕诱疲現(xiàn)在整理了一下分享出來犁享,希望對別人有所幫助坪蚁。
學(xué)習(xí)MapReduce一定要理解這種Map半夷、Reduce的編程模型以及Mapper、Reducer數(shù)據(jù)處理的原理迅细,否則只是一味的復(fù)制粘貼可能比較難上手。
同時學(xué)習(xí)大數(shù)據(jù)的知識淘邻,一定要將自己對分布式的理解研究透徹茵典。
一、概念理解
- MapReduce 是一種線性的可伸縮的編程模型宾舅,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運(yùn)算
- 在MapReduce里统阿,Map處理的是原始數(shù)據(jù),每條數(shù)據(jù)之間互相沒有關(guān)系(這一點一定要注意)筹我。Reduce階段扶平,以key為標(biāo)識,對同一個key下的value進(jìn)行統(tǒng)計蔬蕊,類似{key,[value1,value2……]}
- 可以把MapReduce理解為结澄,把一堆雜亂無章的數(shù)據(jù)按照某種特征歸納起來,然后處理并得到最后的結(jié)果。
Map面對的是雜亂無章的互不相關(guān)的數(shù)據(jù)麻献,它解析每個數(shù)據(jù)们妥,從中提取出key和value,也就是提取了數(shù)據(jù)的特征勉吻。
經(jīng)過MapReduce的Shuffle階段之后监婶,在Reduce階段看到的都是已經(jīng)歸納好的數(shù)據(jù)了,在此基礎(chǔ)上我們可以做進(jìn)一步的處理以便得到結(jié)果齿桃。 - 缺點:不適用于實時計算惑惶,實時計算一般最低都是要求秒級響應(yīng)的,MR很難滿足這個要求短纵,實時計算一般采用storm等流式計算系統(tǒng)
-
MapReduce計算流程
MapReduce計算流程--來源網(wǎng)絡(luò)
二带污、編程模型
- 每個應(yīng)用程序稱為一個作業(yè)(Job),每個Job是由一系列的Mapper和Reducer來完成
- 任務(wù)過程分為兩個階段踩娘,map和reduce階段刮刑,兩個階段都是使用鍵值對(key/value)作為輸入輸出的
- 每個Mapper處理一個Split,每個split對應(yīng)一個map線程养渴。Split中的數(shù)據(jù)作為map的輸入雷绢,map的輸出一定在map端
- Map方法:Map(k1,v1) -> list(k2,v2) ,并行應(yīng)用于每一個輸入的數(shù)據(jù)集理卑,每一次調(diào)用都會產(chǎn)生一個(k2,v2)的隊列 翘紊。
- Reduce方法:Reduce(k2,list(v2)) -> list(k3,v3)。收集map端輸出隊列l(wèi)ist(k2,v2)中有相同key的數(shù)據(jù)對藐唠,把它們聚集在一起帆疟,輸出時形成目的數(shù)據(jù) list(k3,v3)。
- 新舊版本API的區(qū)別:
- 新的api放在:org.apache.hadoop.mapreduce,舊版api放在:org.apache.hadoop.mapred
- 新API使用虛類宇立,舊版使用的是接口踪宠,虛類更加利于擴(kuò)展
三、運(yùn)行機(jī)制
-
輸入分片(input split)
map計算之前妈嘹,MapReduce會根據(jù)輸入文件計算輸入分片(input -> spliting),每個input split針對一個map任務(wù)柳琢。split存儲的并不是數(shù)據(jù),而是一個分片長度和一個記錄數(shù)據(jù)的位置的數(shù)組
-
map階段
map階段的操作一般都是在數(shù)據(jù)存儲節(jié)點上操作润脸,所以有時候為了能夠減輕數(shù)據(jù)傳輸?shù)木W(wǎng)絡(luò)壓力柬脸,可以先combiner階段處理一下數(shù)據(jù),在進(jìn)行reduce
-
combiner階段
此階段是可選的毙驯,不是必須經(jīng)過的一個階段倒堕,combiner其實也是一種reduce操作徽千,可以說combiner是一種本地化的reduce操作顶霞,是map運(yùn)算的后續(xù)操作,可以減輕網(wǎng)絡(luò)傳輸?shù)膲毫σ乇搿5莄ombiner的使用需要注意不要影響到reduce的最終結(jié)果,比如計算平均值的時候如果使用combiner就會影響最終的結(jié)果魂那,但是計算總數(shù)的話則對最終結(jié)果沒影響
-
shuffle階段
將map的輸出作為reduce的輸入蛾号,這個過程就是shuffle,是MapReduce優(yōu)化的重要階段涯雅。
-
reduce階段
reducer階段鲜结,輸入是shuffle階段的輸出,對每個不同的鍵和該鍵對應(yīng)的值的數(shù)據(jù)流進(jìn)行獨(dú)立活逆、并行的處理精刷。
四、WordCount--官方提供的example
代碼
package com.smile.test;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
private static final String INPUT_PATH = "/user/cdh/yjq/input/words.txt";
//hdfs輸出路徑
private static final String OUTPUT_PATH = "/user/cdh/yjq/output/";
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
// Text 實現(xiàn)了BinaryComparable類可以作為key值
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 解析鍵值對
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
String[] paths = {INPUT_PATH,OUTPUT_PATH};
//獲得Configuration配置 Configuration: core-default.xml, core-site.xml
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, paths).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
// 設(shè)置Mapper類
job.setMapperClass(TokenizerMapper.class);
// 設(shè)置Combiner類
job.setCombinerClass(IntSumReducer.class);
// 設(shè)置Reduce類
job.setReducerClass(IntSumReducer.class);
// 設(shè)置輸出key的類型,注意跟reduce的輸出類型保持一致
job.setOutputKeyClass(Text.class);
// 設(shè)置輸出value的類型蔗候,注意跟reduce的輸出類型保持一致
job.setOutputValueClass(IntWritable.class);
// 設(shè)置輸入路徑
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// 設(shè)置輸出路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
解析
MapReduce的輸出路徑一定要保證文件夾不存在怒允,最好的解決方法時在代碼中添加判斷,執(zhí)行之前刪除output文件夾(具體方法見下面的hdfs操作)
MapReduce可以沒有輸出锈遥,但必須設(shè)置輸出路徑
-
MapReduce的輸入路徑可以直接寫hdfs的目錄路徑纫事,然后放在集群下執(zhí)行,
hadoop jar **.jar java類名 參數(shù)1 參數(shù)2 ...
-
Mapper
//map public void map(Object key, Text value, Context context)
前面兩個參數(shù)分別是輸入的key所灸,value丽惶,Context context可以記錄輸入的key和value,context也可以記錄map運(yùn)算的狀態(tài)
map中的context記錄了map執(zhí)行的上下文爬立,在mapper類中钾唬,context可以存儲一些job conf的信息,也就是說context是作為參數(shù)傳遞的載體侠驯。比如runner中configuration的set信息[conf.set(Str, strValue)
]抡秆,map中可以get到[context.getConfiguration().get(Str)
]//setup protected void setup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) //cleanup protected void cleanup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
MapReduce框架內(nèi)的setup和cleanup方法只會執(zhí)行一次,所以一些相關(guān)變量或者是資源的初始化和釋放最好是在setup中執(zhí)行吟策,如果放在map中執(zhí)行儒士,則在解析每一行數(shù)據(jù)的時候都會執(zhí)行一次,嚴(yán)重影響程序運(yùn)行效率檩坚。
-
Reducer
public void reduce(Text key, Iterable<IntWritable> values, Context context)
reduce的輸入也是key/value形式着撩,不過是values,也就是一個key對應(yīng)的一組value效床,例如key,value1;key,value2...
reducer不是必須的,如果用不到reducer階段可以不寫reduce會接收到不同map傳遞過來的數(shù)據(jù) 权谁,并且每個map傳遞過來的數(shù)據(jù)都是有序的剩檀。如果reduce端接收到的數(shù)據(jù)量比較小,那么會存儲在內(nèi)存中旺芽,如果超出緩沖區(qū)大小一定比例沪猴,則會合并后寫到磁盤上
-
調(diào)用 runner
Configuration conf = new Configuration(); //連接hbase辐啄,操作hbase Configuration conf = HBaseConfiguration.create();
MapReduce運(yùn)行之前都要初始化Configuration,主要是讀取MapReduce系統(tǒng)配置运嗜,如core-site.xml壶辜、hdfs-site.xml、mapred-site.xml担租、hbase-site.xml
scan.setCaching(500);
增加緩存讀取條數(shù)(一次RPC調(diào)用返回多行紀(jì)錄砸民,也就是每次從服務(wù)器端讀取的行數(shù)),加快scanner讀取速度奋救,但耗費(fèi)內(nèi)存增加岭参,設(shè)太大會響應(yīng)慢、超時或者OOM尝艘。
setBatch(int batch)
設(shè)置獲取紀(jì)錄的列個數(shù)演侯,默認(rèn)無限制,也就是返回所有的列背亥。實際上就是控制一次next()傳輸多少個columns秒际,如batch為5表示每個result實例返回5個columns
setBatch使用場景為,用客戶端的scanner緩存進(jìn)行批量交互從而提高性能時狡汉,非常大的行可能無法放入客戶端的內(nèi)存娄徊,這時需要用HBase客戶端API中進(jìn)行batching處理。
scan.setCacheBlocks(false);
默認(rèn)是true轴猎,分內(nèi)存嵌莉,緩存和磁盤,三種方式捻脖,一般數(shù)據(jù)的讀取為內(nèi)存->緩存->磁盤锐峭;
setCacheBlocks不適合MapReduce工作:
MR程序為非熱點數(shù)據(jù),不需要緩存可婶,因為Blockcache is LRU沿癞,也就是最近最少訪問算法(扔掉最少訪問的),那么矛渴,前一個請求(比如map讀茸笛铩)讀入Blockcache的所有記錄在后一個請求(新的map讀取)中都沒有用具温,就必須全部被swap蚕涤,那么RegionServer要不斷的進(jìn)行無意義的swapping data,也就是無意義的輸入和輸出BlockCache铣猩,增加了無必要的IO揖铜。而普通讀取時局部查找,或者查找最熱數(shù)據(jù)時达皿,會有提升性能的幫助天吓。
runner方法中可以寫定義多個job贿肩,job會順序執(zhí)行。
五龄寞、常用hadoop fs命令 (類似Linux的文件操作命令汰规,可類比學(xué)習(xí)使用)
-help
功能:輸出這個命令參數(shù)手冊
-ls
功能:顯示目錄信息
示例: hadoop fs -ls /yjq
-mkdir
功能:在hdfs上創(chuàng)建目錄
示例:hadoop fs -mkdir -p /yjq/test
-moveFromLocal
功能:從本地剪切粘貼到hdfs
示例:hadoop fs -moveFromLocal /home/cdh/a.txt /yjq/test
-moveToLocal
功能:從hdfs剪切粘貼到本地
示例:hadoop fs -moveToLocal /yjq/test/a.txt /home/cdh/
-copyFromLocal
功能:從本地文件系統(tǒng)中拷貝文件到hdfs路徑去
示例:hadoop fs -copyFromLocal /home/cdh/a.txt /yjq/test
-copyToLocal
功能:從hdfs拷貝到本地
示例:hadoop fs -copyToLocal /yjq/test/a.txt /home/cdh/
-get
功能:等同于copyToLocal,從hdfs下載文件到本地路徑(.表示當(dāng)前路徑)
示例:hadoop fs -get /yjq/test/a.txt .
-getmerge
功能:合并下載多個文件
示例:將目錄下所有的TXT文件下載到本地物邑,并合并成一個文件
hadoop fs -getmerge /yjq/test/*.txt /home/cdh/test.txt
-put
功能:等同于copyFromLocal
示例:hadoop fs -put /home/cdh/a.txt /yjq/test
-cp
功能:從hdfs的一個路徑拷貝hdfs的另一個路徑
示例: hadoop fs -cp /yjq/test1/a.txt /yjq/test2/
-mv
功能:在hdfs目錄中移動文件
示例: hadoop fs -mv /yjq/test1/a.txt /yjq/test2/
-appendToFile
功能:追加一個文件到已經(jīng)存在的文件末尾(本地文件追加到hdfs)
示例:Hadoop fs -appendToFile /home/cdh/a.txt /yjq/test1/a.txt
-cat
功能:顯示文件內(nèi)容
示例:hadoop fs -cat /yjq/test1/a.txt
-tail
功能:顯示一個文件的末尾
示例:hadoop fs -tail /yjq/test1/a.txt
-text
功能:以字符形式打印一個文件的內(nèi)容
示例:hadoop fs -text /yjq/test1/a.txt
-chgrp溜哮、-chmod、-chown
功能:修改文件所屬權(quán)限
示例:
hadoop fs -chmod 666 /yjq/test1/a.txt
# cdh為用戶名拂封,hadoop為用戶組
hadoop fs -chown cdh:group /yjq/test1/a.txt
-rm
功能:刪除文件或文件夾
示例:hadoop fs -rm -r /yjq/test/a.txt
-df
功能:統(tǒng)計文件系統(tǒng)的可用空間信息
示例:hadoop fs -df -h /
-du
功能:統(tǒng)計文件夾的大小信息
示例:
hadoop fs -du -s -h /yjq/*
-count
功能:統(tǒng)計一個指定目錄下的文件節(jié)點數(shù)量
示例:hadoop fs -count /yjq/
六茬射、HBase 相關(guān)操作
- 簡介
- HBase是一個分布式的、面向列的開源數(shù)據(jù)庫
- 表由行和列組成冒签,列劃分為多個列族/列簇(column family)
- RowKey:是Byte array在抛,是表中每條記錄的“主鍵”,方便快速查找萧恕,Rowkey的設(shè)計非常重要刚梭。
- Column Family:列族,擁有一個名稱(string)票唆,包含一個或者多個相關(guān)列
- Column:屬于某一個columnfamily朴读,familyName:columnName,每條記錄可動態(tài)添加
- Hbase--圖片來源網(wǎng)絡(luò)
-
編碼
Configuration conf = HBaseConfiguration.create();
會自動讀取hbase-site.xml配置文件
Scan scan = new Scan(); scan.setCaching(1000); scan.setStartRow(getBytes(startDate)); scan.setStopRow(getBytes(endDate)); TableMapReduceUtil.initTableMapperJob(HB_TABLE_NAME, scan, NewsStreamUrlMapper.class, Text.class, Text.class, job);
參數(shù):hbase table name走趋,scan衅金,mapper class,outputKeyClass簿煌,outputValueClass氮唯,job
七、hdfs操作
-
運(yùn)算之前清除hdfs上的文件夾
FileSystem fs = FileSystem.get(new Configuration()); Path outputDir = new Path(OUTPUT_PATH); //運(yùn)算之前如果文件夾存在則清除文件夾 if(fs.exists(outputDir)) fs.delete(outputDir, true);
-
HDFS讀流程
- 客戶端向NameNode發(fā)起讀數(shù)據(jù)請求
- NameNode找出距離最近的DataNode節(jié)點信息
- 客戶端從DataNode分塊下載文件
-
HDFS寫流程
- 客戶端向NameNode發(fā)起寫數(shù)據(jù)請求
- 分塊寫入DataNode節(jié)點姨伟,DataNode自動完成副本備份
- DataNode向NameNode匯報存儲完成惩琉,NameNode通知客戶端
八、多表操作
MultiTableInputFormat 支持多個mapper的輸出混合到一個shuffle夺荒,一個reducer瞒渠,其中每個mapper擁有不同的inputFormat和mapper處理類。
所有的mapper需要輸出相同的數(shù)據(jù)類型技扼,對于輸出value伍玖,需要標(biāo)記該value來源,以便reducer識別
List<Scan> scans = new ArrayList<Scan>();
Scan scan1 = new Scan();
scan1.setCaching(100);
scan1.setCacheBlocks(false);
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inTable.getBytes());
scans.add(scan1);
Scan scan2 = new Scan();
scan2.setCaching(100);
scan2.setCacheBlocks(false);
scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inPhoneImsiTable.getBytes());
scans.add(scan2);
TableMapReduceUtil.initTableMapperJob(scans, ReadHbaseMapper.class, Text.class,Result.class, job);
九剿吻、錯誤處理
-
ScannerTimeoutException:org.apache.hadoop.hbase.client.ScannerTimeoutException
這是當(dāng)從服務(wù)器傳輸數(shù)據(jù)到客戶端的時間窍箍,或者客戶端處理數(shù)據(jù)的時間大于了scanner設(shè)置的超時時間,scanner超時報錯,可在客戶端代碼中設(shè)置超時時間
Configuration conf = HBaseConfiguration.create() conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,120000)
如果Mapper階段對每條數(shù)據(jù)的處理時間過長仔燕,可以將scan.setCaching(1000)的值設(shè)置小一點,如果值設(shè)置太大魔招,則處理時間會很長就會出現(xiàn)超時錯誤晰搀。
寫在最后
很久之前寫的學(xué)習(xí)筆記了,資料來源網(wǎng)絡(luò)及項目組內(nèi)的討論办斑,參考文獻(xiàn)就不一一標(biāo)注了外恕,侵刪~
如果您覺得本文對您有幫助,點個贊吧~~