InputFormat數(shù)據(jù)輸入
切片與MapTask并行度決定機(jī)制
問題引出
MapTask的并行度決定Map階段的任務(wù)處理并發(fā)度,從而影響整個(gè)job的處理速度
思考:1G的數(shù)據(jù)献酗,啟動(dòng)8個(gè)MapTask嘁信,可以提高集群的并發(fā)處理能力李根。那么1K的數(shù)據(jù)粪小,也啟動(dòng)8個(gè)MapTask椒楣,會(huì)提高集群性能嗎赴蝇?MapTask并行任務(wù)是不是越多越好呢恢总?哪些因素影響了MapTask的并行度迎罗?MapTask并行度決定機(jī)制
數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊。數(shù)據(jù)塊是HDFS上數(shù)據(jù)存儲(chǔ)單位
數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對(duì)輸入進(jìn)行分片离熏,并不會(huì)在磁盤上將其切分成片進(jìn)行存儲(chǔ)佳谦。數(shù)據(jù)切片是MapReduce程序計(jì)算輸入數(shù)據(jù)的單位,一個(gè)切片會(huì)對(duì)應(yīng)啟動(dòng)一個(gè)MapTask
job提交流程源碼和切片源碼解析
- job提交流程源碼
waitForCompletion()
submit();
// 1建立連接
connect();
// 1)創(chuàng)建提交Job的代理
new Cluster(getConfiguration());
// (1)判斷是本地運(yùn)行環(huán)境還是yarn集群運(yùn)行環(huán)境
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)獲取jobid 滋戳,并創(chuàng)建Job路徑
JobID jobId = submitClient.getNewJobID();
// 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)計(jì)算切片钻蔑,生成切片規(guī)劃文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路徑寫XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交Job,返回提交狀態(tài)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
FileInputFormat切片源碼解析
- 程序先找到數(shù)據(jù)存儲(chǔ)的目錄
- 開始遍歷處理(規(guī)劃切片)目錄下的每一個(gè)文件
- 遍歷第一個(gè)文件
(3.1)獲取文件大小,fs.sizeOf()
(3.2)計(jì)算切片大小
computeSplitSize(Math.max(minSize, Math.min(blockSize, maxSize))) = blockSize = 128M
(3.3)默認(rèn)情況下奸鸯,切片大小為blockSize
(3.4)開始切咪笑,形成第一個(gè)切片,0——128M娄涩,第二個(gè)切片窗怒,129——256M,第三個(gè)切片蓄拣,256——300M
每次切片時(shí)扬虚,要判斷切完剩下的部分是否大于塊的1.1倍,如果不大于球恤,那么就切分為一塊
(3.5)將切片信息寫到一個(gè)切片規(guī)劃文件里
(3.6)整個(gè)切片核心過程在getSplit()方法中完成
(3.7)InputSplit只記錄了切片的元數(shù)據(jù)信息辜昵,比如起始位置,長(zhǎng)度咽斧,以及所在的節(jié)點(diǎn)列表等 - 提交切面規(guī)劃文件到Y(jié)ARN上堪置,YARN的MrAppMaster就可以根據(jù)切片規(guī)劃文件計(jì)算開啟幾個(gè)MapTask了
FileInputFormat切片機(jī)制
- 切片機(jī)制
(1)簡(jiǎn)單的按照文件的內(nèi)容長(zhǎng)度切分
(2)切片大小躬存,默認(rèn)等于Block的大小
(3)切片時(shí)不考慮數(shù)據(jù)集整體,而是針對(duì)每一個(gè)文件進(jìn)行切片 -
案例分析
image.png
FileInputFormat切片大小的參數(shù)配置
源碼中計(jì)算切片大小的公式
Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize = 1
mapreduce.input.fileinputformat.split.maxsize = Long.MAX_VALUE
因此舀锨,默認(rèn)情況下岭洲,切片大小為blockSize切片大小設(shè)置
maxSize(切片最大值):參數(shù)如果調(diào)的比blockSize小,則會(huì)讓切片變小坎匿,而且就等于配置的這個(gè)參數(shù)的值
minSize(切片最小值):參數(shù)調(diào)的比blockSize大盾剩,則可以讓切片變得比blockSize大獲取切片信息API
// 獲取切片的文件名稱
String name = inputSplit.getPath().getName();
// 根據(jù)文件類型獲取切片信息
FileSplit inputSplit = (FileSplit)context.getInputSplit();
TextInputFormat
FileInputFormat實(shí)現(xiàn)類
思考:在運(yùn)行MapReduce程序時(shí),輸入的文件格式包括:基于行的日志文件替蔬、二進(jìn)制格式文件彪腔、數(shù)據(jù)庫表等。那么进栽,針對(duì)不同的數(shù)據(jù)類型,MapReduce是如何讀取這些數(shù)據(jù)的呢恭垦?
FileInputFormat常見的接口實(shí)現(xiàn)類包括:TextInputFormat快毛、KeyValueTextInputFormat、NLineIntpuFormat番挺、CombineTextInputFormat和自定義的InputFormatTextInputFormat
TextInputFormat是默認(rèn)的FileInputFormat實(shí)現(xiàn)類唠帝。按行讀取每條記錄。鍵是存儲(chǔ)該行在整個(gè)文件中的起始字節(jié)偏移量玄柏,LongWritable類型襟衰。值是這行的內(nèi)容,不包括任何行終止符(回車粪摘、換行)瀑晒,Text類型
CombineTextInputFormat切片機(jī)制
框架默認(rèn)的TextInputFormat切片機(jī)制是對(duì)任務(wù)按文件規(guī)劃切片,不管文件多小徘意,都會(huì)是一個(gè)單獨(dú)的切片苔悦,都會(huì)交給一個(gè)MapTask,這樣如果有大量的小文件椎咧,就會(huì)產(chǎn)生大量的MapTask玖详,處理效率極其低下
- 應(yīng)用場(chǎng)景
CombineTextInputFormat用于小文件過多的場(chǎng)景,它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中勤讽,這樣蟋座,多個(gè)小文件就可以交給一個(gè)MapTask處理 - 虛擬存儲(chǔ)切片最大值設(shè)置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4M
注意:虛擬存儲(chǔ)切片最大值設(shè)置最好根據(jù)實(shí)際的小文件大小情況來設(shè)置具體的值 -
切片機(jī)制
生成切片過程包括:虛擬存儲(chǔ)過程和切片過程兩部分
image.png
(1)虛擬存儲(chǔ)過程:
將輸入目錄下所有文件大小,一次和設(shè)置的setMaxInputSplitSize值比較脚牍,如果不大于設(shè)置的最大值向臀,邏輯上劃分一個(gè)塊。如果輸入文件大于設(shè)置的最大值且大于兩倍莫矗,那么以最大值切割一塊飒硅;當(dāng)剩余數(shù)據(jù)大小超過設(shè)置的最大值且不大于最大值兩倍砂缩,此時(shí)將文件均分為2個(gè)虛擬存儲(chǔ)塊(防止出現(xiàn)太小切片)
例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M三娩,則先邏輯上分成一個(gè)4M庵芭。剩余的大小為4.02M,如果按照4M邏輯劃分雀监,就會(huì)出現(xiàn)0.02M的小虛擬存儲(chǔ)塊双吆,所以將剩余的4.02M切分成2.01M和2.01M的兩個(gè)文件
(2)切片過程:
(a)判斷虛擬存儲(chǔ)的文件大小是否大于setMaxInputSplitSize值,大于等于則單獨(dú)形成一個(gè)切片
(b)如果不大于則跟下一個(gè)虛擬存儲(chǔ)文件進(jìn)行合并会前,共同形成一個(gè)切片
(c)測(cè)試舉例:有4個(gè)小文件好乐,大小分別為:1.7M、5.1M瓦宜、3.4M以及6.8M蔚万,這四個(gè)小文件,則虛擬存儲(chǔ)之后會(huì)形成6個(gè)文件塊临庇,大小分別為1.7M反璃、(2.55M、2.55M)假夺、3.4M淮蜈、(3.4M、3.4M)已卷,最終形成3個(gè)切片梧田,大小分別是(1.7+2.55)M、(2.55+3.4)M侧蘸、(3.4+3.4)M
MapReduce工作流程
Shuffle機(jī)制
Shuffle機(jī)制
Map之后裁眯,Reduce之前的數(shù)據(jù)處理過程稱為Shuffle
Partition分區(qū)
問題引出
要求將統(tǒng)計(jì)結(jié)果按照條件輸出到不同文件中(分區(qū))。比如:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(分區(qū))默認(rèn)Partition分區(qū)
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
}
}
默認(rèn)分區(qū)是根據(jù)key的hashCode()對(duì)ReduceTasks個(gè)數(shù)取模得到的闺魏,用戶沒法控制哪個(gè)key存儲(chǔ)到哪個(gè)分區(qū)
- 自定義Partitioner步驟
(1)自定義類繼承Partitioner未状,重寫getPartition()方法
public class ClustomPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions){
// 控制分區(qū)代碼邏輯
return partition;
}
}
(2)在job驅(qū)動(dòng)中,設(shè)置自定義Partitioner
job.setPartitionerClass(ClustomPartitioner.class);
(3)自定義Partitioner后析桥,要根據(jù)自定義的Partitioner的邏輯設(shè)置響應(yīng)數(shù)量的ReduceTask
job.setReduceTasks(5);
Partition分區(qū)案例實(shí)操
- 需求
將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中
(1)輸入數(shù)據(jù)
1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200
(2)期望輸出數(shù)據(jù)
手機(jī)號(hào)136司草、137、138泡仗、139開頭都分別放到一個(gè)獨(dú)立的文件中埋虹,其他開頭的放到一個(gè)文件中
-
分析
image.png 編碼
Partitioner
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
String phone = key.toString();
String prePhone = phone.substring(0, 3);
int partition;
switch (prePhone){
case "136":
partition = 0;
break;
case "137":
partition = 1;
break;
case "138":
partition = 2;
break;
case "139":
partition = 3;
break;
default:
partition = 4;
}
return partition;
}
}
修改Driver
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 設(shè)置jar路徑
job.setJarByClass(FlowCountDriver.class);
// 關(guān)聯(lián)mapper、reducer
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 設(shè)置map階段輸出的類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 設(shè)置最終輸出的類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 綁定Partitioner
job.setPartitionerClass(ProvincePartitioner.class);
// 設(shè)置reduce任務(wù)數(shù)娩怎,根據(jù)自定義Partitioner確定
job.setNumReduceTasks(5);
// 設(shè)置輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path("data/phone_data.txt"));
FileOutputFormat.setOutputPath(job, new Path("phone_result2"));
// 提交job
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
分區(qū)總結(jié)
(1)如果ReduceTask的數(shù)量 > getPartition()的結(jié)果數(shù)搔课,則會(huì)多產(chǎn)生幾個(gè)空的輸出文件
(2)如果1 < ReduceTask < getPartition()的結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無處安放截亦,拋異常
(3)如果ReduceTask的數(shù)量 = 1袍啡,則不管MapTask段輸出多少個(gè)分區(qū)文件踩官,最終結(jié)果都會(huì)交給這一個(gè)ReduceTask,最終也就只會(huì)產(chǎn)生一個(gè)結(jié)果文件
(4)分區(qū)號(hào)必須從0開始境输,逐一累加案例分析
例如:自定義分區(qū)數(shù)為5蔗牡,則
(1)job.setNumReduceTasks(1); 正常運(yùn)行,只產(chǎn)生一個(gè)輸出文件
(2)job.setNumReduceTasks(2); 拋異常
(3)job.setNumReduceTasks(5); 與預(yù)期一樣
(4)job.setNumReduceTasks(6); 程序正常運(yùn)行,但會(huì)產(chǎn)生一個(gè)空文件
WritableComparable排序
概述
排序是MapReduce框架中最重要的操作之一
MapTask和ReduceTask均會(huì)對(duì)數(shù)據(jù)按照key進(jìn)行排序信粮,該操作屬于Hadoop的默認(rèn)行為黔攒。任何應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序,而不管邏輯上是否需要
默認(rèn)排序是按照字段順序排序欺旧,且實(shí)現(xiàn)該排序的方法是快速排序
對(duì)于MapTask,它會(huì)將處理的結(jié)果暫時(shí)放到環(huán)形緩沖區(qū)中蛤签,當(dāng)環(huán)形緩沖區(qū)使用率達(dá)到一定閾值后辞友,在對(duì)緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次快速排序,并將這些有序數(shù)據(jù)溢寫到磁盤上震肮,而當(dāng)數(shù)據(jù)處理完畢后称龙,他會(huì)對(duì)磁盤上所有文件進(jìn)行歸并排序
對(duì)于ReduceTask,他從每個(gè)MapTask上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件戳晌,如果文件大小超過一定閾值鲫尊,則溢寫到磁盤上,否則存儲(chǔ)在內(nèi)存中沦偎,如果磁盤上文件數(shù)目達(dá)到一定閾值疫向,則進(jìn)行一次歸并排序以生成一個(gè)更大的文件;如果內(nèi)存中文件大小或者數(shù)目超過一定閾值豪嚎,則進(jìn)行一次合并后將數(shù)據(jù)溢寫到磁盤上搔驼。當(dāng)所有數(shù)據(jù)拷貝完畢后,ReduceTask同意對(duì)內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次歸并排序侈询。
分類排序
- 部分排序
MapReduce根據(jù)輸入記錄的鍵對(duì)數(shù)據(jù)排序舌涨。保證輸出的每個(gè)文件內(nèi)部有序 - 全排序
最終輸出結(jié)果只有一個(gè)文件,且文件內(nèi)部有序扔字。實(shí)現(xiàn)方式是只設(shè)置一個(gè)ReduceTask囊嘉。但該方法在處理大型文件時(shí)效率極低温技,因?yàn)橐慌_(tái)機(jī)器處理所有文件,完全喪失了MapReduce所提供的并行架構(gòu) - 輔助排序
在Reduce端對(duì)key進(jìn)行分組扭粱。應(yīng)用于:在接收的key為bean對(duì)象時(shí)舵鳞,想讓一個(gè)或幾個(gè)字段相同(全部字段不同)的key進(jìn)入到同一個(gè)reduce方法,可以采用分組排序 - 二次排序
在自定義排序過程中焊刹,如果compareTo中的判斷條件為兩個(gè)即為二次排序
自定義WritableComparable原理分析
bean對(duì)象作為key傳輸系任,需要實(shí)現(xiàn)WriableComparable接口,重寫compareTo方法虐块,就可以實(shí)現(xiàn)排序
@Override
public int compareTo(FlowBean bean) {
int result;
if(this.sumFlow > bean.getSumFlow()) {
result = -1;
} else if(this.sumFlow < bean.getSumFlow()) {
result = 1;
} else {
result = 0;
}
return resule;
}
WritableComparable排序案例實(shí)操(全排序)
需求
根據(jù)手機(jī)號(hào)的案例俩滥,將結(jié)果根據(jù)總流量倒序排序-
需求分析
image.png 編碼
FlowBean
在原有基礎(chǔ)上,改為實(shí)現(xiàn)WriableComparable接口贺奠,并重寫compareTo方法
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
// 空參構(gòu)造
public FlowBean() {
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public int compareTo(FlowBean bean) {
int result;
if (this.sumFlow > bean.getSumFlow()) {
result = -1;
} else if(this.sumFlow < bean.getSumFlow()) {
result = 1;
} else {
result = 0;
}
return result;
}
}
Mapper
public class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private Text outValue = new Text();
private FlowBean outKey = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 獲取一行
String line = value.toString();
// 切分
String[] split = line.split("\t");
// 封裝
String phone = split[0];
outValue.set(phone);
long up = Long.parseLong(split[1]);
long down = Long.parseLong(split[2]);
long sum = Long.parseLong(split[3]);
outKey.setUpFlow(up);
outKey.setDownFlow(down);
outKey.setSumFlow(sum);
// 寫出
context.write(outKey, outValue);
}
}
Reducer
public class FlowCountReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
private FlowBean outValue = new FlowBean();
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 循環(huán)遍歷霜旧,防止不同手機(jī)號(hào)流量相同,循環(huán)過程中寫出
for (Text value : values) {
context.write(value, key);
}
}
}
Driver
注意修改Mapper的輸出類型儡率,并將之前案例的輸出作為輸入
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 設(shè)置jar路徑
job.setJarByClass(FlowCountDriver.class);
// 關(guān)聯(lián)mapper挂据、reducer
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 設(shè)置map階段輸出的類型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 設(shè)置最終輸出的類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 設(shè)置輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path("phone_result"));
FileOutputFormat.setOutputPath(job, new Path("phone_compare_result"));
// 提交job
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
Combiner合并
Combiner是MapReduce程序中Mapper和Reducer之外的一種組件
Combiner組件的父類就是Reducer
Combiner和Reducer的區(qū)別在于運(yùn)行的位置
Combiner是在每一個(gè)MapTask所在的節(jié)點(diǎn)運(yùn)行
Reducer是接收全局所有Mapper的輸出結(jié)果Combiner的意義就是對(duì)每一個(gè)MapTask的輸出進(jìn)行局部匯總,以減小網(wǎng)絡(luò)傳輸量
Combiner能夠應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯儿普,而且崎逃,Combiner的輸出KV應(yīng)該和Reducer的輸入KV類型對(duì)應(yīng)起來
自定義Combiner實(shí)現(xiàn)步驟
(1)自定義一個(gè)Combiner繼承Reducer,重寫reduce方法
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outValue.set(sum);
context.write(key, outValue);
}
}
(2)在驅(qū)動(dòng)類中進(jìn)行設(shè)置
job.setCombinerClass(WordCountCombiner.class);
另一種實(shí)現(xiàn)方式:
我們發(fā)現(xiàn)新編寫的Combiner和原始的Reducer內(nèi)容一致眉孩,而Combiner和Reducer都繼承自Reducer个绍,所以我們可以將原有的Reducer作為Combiner使用
job.setCombinerClass(WordCountReducer.class)
OutputFormat數(shù)據(jù)輸出
OutputFormat接口實(shí)現(xiàn)類
OutputFormat是MapReduce輸出的基類,所有實(shí)現(xiàn)MapReduce輸出都實(shí)現(xiàn)了OutputFormat接口浪汪。下面我們介紹幾種常見的OutputForamt實(shí)現(xiàn)類
-
OutputFormat實(shí)現(xiàn)類
image.png
- 默認(rèn)輸出格式TextOutputFormat
- 自定義OutputFormat
應(yīng)用場(chǎng)景:輸出數(shù)據(jù)到MySQL/HBASE/ElasticSearch等存儲(chǔ)中
步驟:
(1) 自定義一個(gè)類繼承OutputFormat
(2)改寫RecordWriter巴柿,具體改寫輸出數(shù)據(jù)的方法write()
自定義OutputFormat案例
- 需求
過濾輸入的log日志,包含atguigu的網(wǎng)站輸出到atguigu.log死遭,不包含的輸出的other.log
數(shù)據(jù)
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.taobao.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
-
需求分析
image.png 編碼
Mapper
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 不作任何處理
context.write(value, NullWritable.get());
}
}
Reducer
public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 相同key的數(shù)據(jù)會(huì)被聚合广恢,循環(huán)寫出,防止丟失
for (NullWritable value : values) {
context.write(key, value);
}
}
}
OutputFormat
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
LogRecordWriter logRecordWriter = new LogRecordWriter(job);
return logRecordWriter;
}
}
RecordWriter
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext job) {
// 創(chuàng)建兩條流
try {
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
atguiguOut = fileSystem.create(new Path("log/atguigu.log"));
otherOut = fileSystem.create(new Path("log/other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 具體怎么寫
String line = key.toString();
if (line.contains("atguigu")) {
atguiguOut.writeBytes(line + "\n");
} else {
otherOut.writeBytes(line + "\n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStreams(atguiguOut, otherOut);
}
}
Driver
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 設(shè)置jar路徑
job.setJarByClass(LogDriver.class);
// 關(guān)聯(lián)mapper呀潭、reducer
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
// 設(shè)置map輸出的KV類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 設(shè)置最終輸出的KV類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 配置自定義OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
// 設(shè)置輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path("data/log.txt"));
/*
雖然我們?cè)O(shè)置了自定義的OutputFormat钉迷,但是自定義的OutputFormat繼承自FileOutputFormat
而FileOutputFormat要輸出一個(gè)_SUCCESS文件,所以還是要指定輸出路徑
*/
FileOutputFormat.setOutputPath(job, new Path("log"));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
MapReduce內(nèi)核源碼解析
MapTask工作機(jī)制
- Read階段:MapTask通過用戶編寫的RecordReader钠署,從輸入InputSplit中解析出一個(gè)個(gè)key/value篷牌。
- Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value踏幻。
- Collect收集階段:在用戶編寫map()函數(shù)中枷颊,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果。在該函數(shù)內(nèi)部夭苗,它會(huì)將生成的key/value分區(qū)(調(diào)用Partitioner)信卡,并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
- Spill階段:即“溢寫”题造,當(dāng)環(huán)形緩沖區(qū)滿后傍菇,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上,生成一個(gè)臨時(shí)文件界赔。需要注意的是丢习,將數(shù)據(jù)寫入本地磁盤之前,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序淮悼,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并咐低、壓縮等操作。
ReduceTask工作機(jī)制
- Copy階段:ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù)袜腥,并針對(duì)某一片數(shù)據(jù)见擦,如果其大小超過一定閾值,則寫到磁盤上羹令,否則直接放到內(nèi)存中
- Sort階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí)鲤屡,ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過多或磁盤上文件過多福侈。按照MapReduce語義酒来,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起肪凛,Hadoop采用了基于排序的策略役首。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序,因此显拜,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可。
- Reduce階段:reduce()函數(shù)將計(jì)算結(jié)果寫到HDFS上
ReduceTask并行度決定機(jī)制
回顧:MapTask并行度有切片個(gè)數(shù)決定爹袁,切片個(gè)數(shù)由輸入文件和切片規(guī)則決定远荠。
思考:ReduceTask并行度由誰決定
- 設(shè)置ReduceTask并行度個(gè)數(shù)
ReduceTask的并行度同樣影響整個(gè)Job的執(zhí)行并發(fā)讀和執(zhí)行效率,但與MapTask的并發(fā)數(shù)由切片數(shù)決定不同失息,ReduceTask數(shù)量的決定是可以直接手動(dòng)設(shè)置:
// 默認(rèn)值為1譬淳,手動(dòng)設(shè)置為4
job.setNumReduceTasks(4);
-
實(shí)驗(yàn):測(cè)試ReduceTask多少合適
(1)實(shí)驗(yàn)環(huán)境
1個(gè)Master節(jié)點(diǎn),16個(gè)Slave節(jié)點(diǎn)盹兢,CPU:8GHZ邻梆,內(nèi)存2G
(2)實(shí)驗(yàn)結(jié)果
image.png 注意事項(xiàng)
(1)ReduceTask=0,表示沒有Reduce階段绎秒,輸出文件個(gè)數(shù)和Map個(gè)數(shù)一致浦妄。
(2)ReduceTask默認(rèn)值就是1,所以輸出文件個(gè)數(shù)為1
(3)如果數(shù)據(jù)分布不均勻,就有可能在Reduce階段產(chǎn)生數(shù)據(jù)傾斜
(4)ReduceTask數(shù)量并不是任意設(shè)置剂娄,還要考慮業(yè)務(wù)邏輯需求蠢涝,有些情況下,需要計(jì)算全局匯總結(jié)果阅懦,就只能有一個(gè)ReduceTask
(5)具體多少個(gè)ReduceTask和二,需要根據(jù)集群性能而定
(6)如果分區(qū)數(shù)不是1,但是ReduceTask為1耳胎,不執(zhí)行分區(qū)過程惯吕,因?yàn)樵贛apTask源碼中,執(zhí)行分區(qū)的前提是先判斷ReduceTask個(gè)數(shù)是否為1怕午,不大于1肯定不執(zhí)行
MapTask & ReduceTask源碼跟蹤
MapTask
context.write(k, NullWritable.get()); //自定義的 map 方法的寫出废登,進(jìn)入
output.write(key, value);
//MapTask727 行,收集方法诗轻,進(jìn)入兩次
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
HashPartitioner(); //默認(rèn)分區(qū)器
collect() //MapTask1082 行 map 端所有的 kv 全部寫出后會(huì)走下面的 close 方法
close() //MapTask732 行
collector.flush() // 溢出刷寫方法钳宪,MapTask735 行,提前打個(gè)斷點(diǎn)扳炬,進(jìn)入
sortAndSpill() //溢寫排序吏颖,MapTask1505 行,進(jìn)入
sorter.sort() QuickSort //溢寫排序方法恨樟,MapTask1625 行半醉,進(jìn)入
mergeParts(); //合并文件,MapTask1527 行劝术,進(jìn)入
collector.close(); //MapTask739 行,收集器關(guān)閉,即將進(jìn)入 ReduceTask
ReduceTask
if (isMapOrReduce()) //reduceTask324 行缩多,提前打斷點(diǎn)
initialize() // reduceTask333 行,進(jìn)入
init(shuffleContext); // reduceTask375 行,走到這需要先給下面的打斷點(diǎn)
totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl 第 120 行,提前打斷點(diǎn)
merger = createMergeManager(context); //合并方法养晋,Shuffle 第 80 行
// MergeManagerImpl 第 232 235 行衬吆,提前打斷點(diǎn)
this.inMemoryMerger = createInMemoryMerger(); //內(nèi)存合并
this.onDiskMerger = new OnDiskMerger(this); //磁盤合并
rIter = shuffleConsumerPlugin.run();
eventFetcher.start(); //開始抓取數(shù)據(jù),Shuffle 第 107 行绳泉,提前打斷點(diǎn)
eventFetcher.shutDown(); //抓取結(jié)束逊抡,Shuffle 第 141 行,提前打斷點(diǎn)
copyPhase.complete(); //copy 階段完成零酪,Shuffle 第 151 行
taskStatus.setPhase(TaskStatus.Phase.SORT); //開始排序階段冒嫡,Shuffle 第 152 行
sortPhase.complete(); //排序階段完成,即將進(jìn)入 reduce 階段 reduceTask382 行
reduce(); //reduce 階段調(diào)用的就是我們自定義的 reduce 方法四苇,會(huì)被調(diào)用多次
cleanup(context); //reduce 完成之前孝凌,會(huì)最后調(diào)用一次 Reducer 里面的 cleanup 方法
Join多種應(yīng)用
Reduce Join
Map端的主要工作:為來自不同表或文件的KV對(duì),打標(biāo)簽以區(qū)別不同來源的記錄月腋。然后用連接字段作為key蟀架,其余部分和新加的標(biāo)志作為value瓣赂,最后進(jìn)行輸出
Reduce端的主要工作:在Reduce端以連接字段作為key的分組已經(jīng)完成,我們只需要在每一個(gè)分組當(dāng)中將那些來源于不同文件的記錄(在Map階段已經(jīng)打標(biāo)志)分開辜窑,最后進(jìn)行合并就可以了
Reduce Join案例
-
需求
輸入數(shù)據(jù)
image.png
order.txt
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
pd.txt
01 小米
02 華為
03 格力
期望輸出
-
需求分析
通過將關(guān)聯(lián)條件作為Map輸出的key钩述,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個(gè)ReduceTask穆碎,在Reduce中進(jìn)行數(shù)據(jù)的串聯(lián)
image.png 編碼
TableBean
public class TableBean implements Writable {
// 訂單id
private String id;
// 商品id
private String pid;
// 商品數(shù)量
private int amount;
// 商品名稱
private String pname;
// 標(biāo)志字段 order pd
private String flag;
public TableBean() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
id = in.readUTF();
pid = in.readUTF();
amount = in.readInt();
pname = in.readUTF();
flag = in.readUTF();
}
@Override
public String toString() {
return id + "\t" + pname + "\t" + amount;
}
}
Mapper
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
private String fileName;
private Text outKey = new Text();
private TableBean outValue = new TableBean();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 初始化order pd
FileSplit inputSplit = (FileSplit) context.getInputSplit();
fileName = inputSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 獲取一行
String line = value.toString();
System.out.println(line);
// 判斷屬于哪個(gè)文件
if (fileName.contains("order")) {
// 切分牙勘, 防止多個(gè)文件切分邏輯不同
String[] split = line.split("\t");
// 訂單
/*
1001 01 1
1002 02 2
*/
// 封裝
System.out.println(Arrays.toString(split));
String id = split[0];
String pid = split[1];
int amount = Integer.parseInt(split[2]);
outKey.set(pid);
outValue.setId(id);
outValue.setPid(pid);
outValue.setAmount(amount);
outValue.setPname("");
outValue.setFlag("order");
} else {
// 切分
String[] split = line.split("\t");
// 產(chǎn)品
/*
01 小米
02 華為
*/
// 封裝
String pid = split[0];
String pname = split[1];
outKey.set(pid);
outValue.setId("");
outValue.setPid(pid);
outValue.setAmount(-1);
outValue.setPname(pname);
outValue.setFlag("pd");
}
// 寫出
context.write(outKey, outValue);
}
}
Reducer
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
/*
01 1001 1 order
01 1004 4 order
01 小米 pd
*/
// 初始化存儲(chǔ)
List<TableBean> orderBeans = new ArrayList<>();
TableBean pdBean = new TableBean();
// 遍歷
for (TableBean value : values) {
if ("order".equals(value.getFlag())) {
// 訂單
TableBean tempBean = new TableBean();
try {
BeanUtils.copyProperties(tempBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderBeans.add(tempBean);
} else {
// 商品
try {
BeanUtils.copyProperties(pdBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
// 遍歷orderBeans,為每個(gè)設(shè)置產(chǎn)品名
for (TableBean orderBean : orderBeans) {
orderBean.setPname(pdBean.getPname());
context.write(orderBean, NullWritable.get());
}
}
}
Driver
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 設(shè)置jar路徑
job.setJarByClass(TableDriver.class);
// 關(guān)聯(lián)mapper所禀、reducer
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
// 設(shè)置map輸出的KV類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
// 設(shè)置最終輸出的KV類型
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 設(shè)置輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path("data/join"));
FileOutputFormat.setOutputPath(job, new Path("join_result"));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
- 總結(jié)
缺點(diǎn):這種方式方面,合并的操作在Reduce階段完成,Reduce端的處理壓力太大色徘,Map節(jié)點(diǎn)的運(yùn)算負(fù)載很低恭金,資源利用率不高,而且在Reduce階段會(huì)產(chǎn)生數(shù)據(jù)傾斜
解決方式:使用MapJoin
Map Join
使用場(chǎng)景
Map Join適用于一張表很大褂策,一張表很小的場(chǎng)景優(yōu)點(diǎn)
思考:在Reduce端處理過多的表横腿,非常容易產(chǎn)生數(shù)據(jù)傾斜,怎么辦斤寂?
在Map端緩存多張表耿焊,提前處理業(yè)務(wù)邏輯,這樣增加Map端業(yè)務(wù)遍搞,減少Reduce端數(shù)據(jù)的壓力罗侯,盡可能的減少數(shù)據(jù)傾斜具體辦法:采用DistributedCache
(1)在Mapper的setup階段,將文件讀取到緩存集合中
(2)在Driver驅(qū)動(dòng)類中加載緩存
// 緩存普通文件到Task運(yùn)行的節(jié)點(diǎn)
job.addCacheFile(new URI("file://cache/xxx.txt"));
// 集群環(huán)境運(yùn)行需要設(shè)置HDFS路徑
job.addCacheFile(new URI("hdfs://ip:port/cache/xxx.txt"))
Map Join案例
需求同Reduce Join
-
需求分析
image.png 編碼
Driver
public class MapJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 設(shè)置jar路徑
job.setJarByClass(MapJoinDriver.class);
// 關(guān)聯(lián)mapper溪猿,不需要reducer
job.setMapperClass(MapJoinMapper.class);
job.setNumReduceTasks(0);
// 設(shè)置緩存
job.addCacheFile(new URI("cache/pd.txt"));
// 設(shè)置map輸出的KV類型
job.setMapOutputKeyClass(TableBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 設(shè)置最終輸出的KV類型
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 設(shè)置輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path("data/join"));
FileOutputFormat.setOutputPath(job, new Path("map_join_result"));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Mapper
public class MapJoinMapper extends Mapper<LongWritable, Text, TableBean, NullWritable> {
private Map<String, String> cache = new HashMap<>();
private TableBean outKey = new TableBean();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 讀取緩存
URI[] cacheFiles = context.getCacheFiles();
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fsDataInputStream = fs.open(new Path(cacheFiles[0]));
// 從流中讀取數(shù)據(jù)
List<String> lines = IOUtils.readLines(fsDataInputStream, StandardCharsets.UTF_8);
for (String line : lines) {
String[] split = line.split("\t");
cache.put(split[0], split[1]);
}
// 關(guān)流
org.apache.hadoop.io.IOUtils.closeStreams(fsDataInputStream, fs);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 處理order.txt
// 獲取一行
String line = value.toString();
// 切分
String[] split = line.split("\t");
/*
1001 01 1
1002 02 2
*/
// 根據(jù)pid獲取緩存中的pname
// 封裝
outKey.setId(split[0]);
outKey.setPname(cache.get(split[1]));
outKey.setAmount(Integer.parseInt(split[2]));
// 寫出
context.write(outKey, NullWritable.get());
}
}
MapReduce開發(fā)總結(jié)
- 輸入數(shù)據(jù)接口:InputFormat
(1)默認(rèn)使用的實(shí)現(xiàn)類是:TextInputFormat
(2)TextInputFormat的功能邏輯是:一次讀取一行文本钩杰,然后將該行的起始偏移量作為key,行內(nèi)容作為value返回
(3)CombineTextInputFormat可以把多個(gè)小文件合并成一個(gè)切片處理诊县,提高處理效率 - 邏輯處理接口:Mapper
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中的三個(gè)方法:map()讲弄、setup()、cleanup() - Partition分區(qū)
(1)有默認(rèn)實(shí)現(xiàn)HashPartitioner依痊,邏輯是根據(jù)key的哈希值和numReduce來返回一個(gè)分區(qū)號(hào):key.hashCode() & Integer.MAX_VALUE % numReduces
(2)如果業(yè)務(wù)上有特別的需求避除,可以自定義分區(qū) - Comparable排序
(1)當(dāng)我們用自定義的對(duì)象作為key來輸出時(shí),就必須要實(shí)現(xiàn)WritableComparable接口抗悍,重寫其中的compareTo方法
(2)部分排序:對(duì)最終輸出的每一個(gè)文件進(jìn)行內(nèi)部排序
(3)全排序:對(duì)所有數(shù)據(jù)進(jìn)行排序,通常只有一個(gè)Reduce
(4)二次排序:排序的條件有兩個(gè) - Combiner
不影響最終的業(yè)務(wù)邏輯
提前聚合map钳枕,解決數(shù)據(jù)傾斜 - Reducer
用戶的業(yè)務(wù)邏輯
setup()缴渊、reduce()、clearup() - OutputFormat
(1)默認(rèn)使用的實(shí)現(xiàn)類是:TextOutputFormat
(2)TextOutputFormat的功能邏輯:將每個(gè)KV對(duì)項(xiàng)目表文件中寫一行
(3)自定義OutputFormat