1 .MapReduce的運(yùn)行機(jī)制詳解
1.1:MapTask 工作機(jī)制
簡單概述:inputFile通過split被邏輯切分為多個(gè)split文件革砸,通過Record按行讀取內(nèi)容給map(用戶自己實(shí)現(xiàn)的)進(jìn)行處理蜻势,數(shù)據(jù)被map處理結(jié)束之后交給OutputCollector收集器譬圣,對(duì)其結(jié)果key進(jìn)行分區(qū)(默認(rèn)使用hash分區(qū)),然后寫入buffer超陆,每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果蟀俊,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤,當(dāng)整個(gè)map task結(jié)束后再對(duì)磁盤中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并订雾,生成最終的正式輸出文件肢预,然后等待reduce task來拉數(shù)據(jù)
詳細(xì)步驟
讀取數(shù)據(jù)組件 InputFormat (默認(rèn) TextInputFormat) 會(huì)通過
getSplits
方法對(duì)輸入目錄中文件進(jìn)行邏輯切片規(guī)劃得到block
, 有多少個(gè)block
就對(duì)應(yīng)啟動(dòng)多少個(gè)MapTask
.將輸入文件切分為
block
之后, 由RecordReader
對(duì)象 (默認(rèn)是LineRecordReader) 進(jìn)行讀取, 以\n
作為分隔符, 讀取一行數(shù)據(jù), 返回<key,value>
. Key 表示每行首字符偏移值, Value 表示這一行文本內(nèi)容讀取
block
返回<key,value>
, 進(jìn)入用戶自己繼承的 Mapper 類中葬燎,執(zhí)行用戶重寫的 map 函數(shù), RecordReader 讀取一行這里調(diào)用一次-
Mapper 邏輯結(jié)束之后, 將 Mapper 的每條結(jié)果通過
context.write
進(jìn)行collect數(shù)據(jù)收集. 在 collect 中, 會(huì)先對(duì)其進(jìn)行分區(qū)處理误甚,默認(rèn)使用 HashPartitionerMapReduce 提供
Partitioner
接口, 它的作用就是根據(jù)Key
或Value
及Reducer
的數(shù)量來決定當(dāng)前的這對(duì)輸出數(shù)據(jù)最終應(yīng)該交由哪個(gè)Reduce task
處理, 默認(rèn)對(duì) Key Hash 后再以 Reducer 數(shù)量取模. 默認(rèn)的取模方式只是為了平均 Reducer 的處理能力, 如果用戶自己對(duì) Partitioner 有需求, 可以訂制并設(shè)置到 Job 上
-
接下來, 會(huì)將數(shù)據(jù)寫入內(nèi)存, 內(nèi)存中這片區(qū)域叫做環(huán)形緩沖區(qū), 緩沖區(qū)的作用是批量收集 Mapper 結(jié)果, 減少磁盤 IO 的影響. 我們的 Key/Value 對(duì)以及 Partition 的結(jié)果都會(huì)被寫入緩沖區(qū). 當(dāng)然, 寫入之前,Key 與 Value 值都會(huì)被序列化成字節(jié)數(shù)組
環(huán)形緩沖區(qū)其實(shí)是一個(gè)數(shù)組, 數(shù)組中存放著 Key, Value 的序列化數(shù)據(jù)和 Key, Value 的元數(shù)據(jù)信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及 Value 的長度. 環(huán)形結(jié)構(gòu)是一個(gè)抽象概念
緩沖區(qū)是有大小限制, 默認(rèn)是 100MB. 當(dāng) Mapper 的輸出結(jié)果很多時(shí), 就可能會(huì)撐爆內(nèi)存, 所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時(shí)寫入磁盤, 然后重新利用這塊緩沖區(qū). 這個(gè)從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為 Spill, 中文可譯為溢寫. 這個(gè)溢寫是由單獨(dú)線程來完成, 不影響往緩沖區(qū)寫 Mapper 結(jié)果的線程. 溢寫線程啟動(dòng)時(shí)不應(yīng)該阻止 Mapper 的結(jié)果輸出, 所以整個(gè)緩沖區(qū)有個(gè)溢寫的比例
spill.percent
. 這個(gè)比例默認(rèn)是 0.8, 也就是當(dāng)緩沖區(qū)的數(shù)據(jù)已經(jīng)達(dá)到閾值buffer size * spill percent = 100MB * 0.8 = 80MB
, 溢寫線程啟動(dòng), 鎖定這 80MB 的內(nèi)存, 執(zhí)行溢寫過程. Mapper 的輸出結(jié)果還可以往剩下的 20MB 內(nèi)存中寫, 互不影響
-
當(dāng)溢寫線程啟動(dòng)后, 需要對(duì)這 80MB 空間內(nèi)的 Key 做排序 (Sort). 排序是 MapReduce 模型默認(rèn)的行為, 這里的排序也是對(duì)序列化的字節(jié)做的排序
如果 Job 設(shè)置過 Combiner, 那么現(xiàn)在就是使用 Combiner 的時(shí)候了. 將有相同 Key 的 Key/Value 對(duì)的 Value 加起來, 減少溢寫到磁盤的數(shù)據(jù)量. Combiner 會(huì)優(yōu)化 MapReduce 的中間結(jié)果, 所以它在整個(gè)模型中會(huì)多次使用
那哪些場景才能使用 Combiner 呢? 從這里分析, Combiner 的輸出是 Reducer 的輸入, Combiner 絕不能改變最終的計(jì)算結(jié)果. Combiner 只應(yīng)該用于那種 Reduce 的輸入 Key/Value 與輸出 Key/Value 類型完全一致, 且不影響最終結(jié)果的場景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它對(duì) Job 執(zhí)行效率有幫助, 反之會(huì)影響 Reducer 的最終結(jié)果
合并溢寫文件, 每次溢寫會(huì)在磁盤上生成一個(gè)臨時(shí)文件 (寫之前判斷是否有 Combiner), 如果 Mapper 的輸出結(jié)果真的很大, 有多次這樣的溢寫發(fā)生, 磁盤上相應(yīng)的就會(huì)有多個(gè)臨時(shí)文件存在. 當(dāng)整個(gè)數(shù)據(jù)處理結(jié)束之后開始對(duì)磁盤中的臨時(shí)文件進(jìn)行 Merge 合并, 因?yàn)樽罱K的文件只有一個(gè), 寫入磁盤, 并且為這個(gè)文件提供了一個(gè)索引文件, 以記錄每個(gè)reduce對(duì)應(yīng)數(shù)據(jù)的偏移量
配置
配置 | 默認(rèn)值 | 解釋 |
---|---|---|
mapreduce.task.io.sort.mb |
100 | 設(shè)置環(huán)型緩沖區(qū)的內(nèi)存值大小 |
mapreduce.map.sort.spill.percent |
0.8 | 設(shè)置溢寫的比例 |
mapreduce.cluster.local.dir |
${hadoop.tmp.dir}/mapred/local |
溢寫數(shù)據(jù)目錄 |
mapreduce.task.io.sort.factor |
10 | 設(shè)置一次合并多少個(gè)溢寫文件 |
1.2 :ReduceTask 工作機(jī)制
Reduce 大致分為 copy、sort、reduce 三個(gè)階段垢油,重點(diǎn)在前兩個(gè)階段。copy 階段包含一個(gè) eventFetcher 來獲取已完成的 map 列表冈钦,由 Fetcher 線程去 copy 數(shù)據(jù),在此過程中會(huì)啟動(dòng)兩個(gè) merge 線程李请,分別為 inMemoryMerger 和 onDiskMerger瞧筛,分別將內(nèi)存中的數(shù)據(jù) merge 到磁盤和將磁盤中的數(shù)據(jù)進(jìn)行 merge。待數(shù)據(jù) copy 完成之后导盅,copy 階段就完成了较幌,開始進(jìn)行 sort 階段,sort 階段主要是執(zhí)行 finalMerge 操作白翻,純粹的 sort 階段乍炉,完成之后就是 reduce 階段,調(diào)用用戶定義的 reduce 函數(shù)進(jìn)行處理
詳細(xì)步驟
-
Copy階段
滤馍,簡單地拉取數(shù)據(jù)岛琼。Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請(qǐng)求maptask獲取屬于自己的文件巢株。 -
Merge階段
槐瑞。這里的merge如map端的merge動(dòng)作,只是數(shù)組中存放的是不同map端copy來的數(shù)值阁苞。Copy過來的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中困檩,這里的緩沖區(qū)大小要比map端的更為靈活。merge有三種形式:內(nèi)存到內(nèi)存那槽;內(nèi)存到磁盤窗看;磁盤到磁盤。默認(rèn)情況下第一種形式不啟用倦炒。當(dāng)內(nèi)存中的數(shù)據(jù)量到達(dá)一定閾值,就啟動(dòng)內(nèi)存到磁盤的merge软瞎。與map 端類似逢唤,這也是溢寫的過程拉讯,這個(gè)過程中如果你設(shè)置有Combiner,也是會(huì)啟用的鳖藕,然后在磁盤中生成了眾多的溢寫文件魔慷。第二種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束著恩,然后啟動(dòng)第三種磁盤到磁盤的merge方式生成最終的文件院尔。 -
合并排序
。把分散的數(shù)據(jù)合并成一個(gè)大的數(shù)據(jù)后喉誊,還會(huì)再對(duì)合并后的數(shù)據(jù)排序邀摆。 -
對(duì)排序后的鍵值對(duì)調(diào)用reduce方法
,鍵相等的鍵值對(duì)調(diào)用一次reduce方法伍茄,每次調(diào)用會(huì)產(chǎn)生零個(gè)或者多個(gè)鍵值對(duì)栋盹,最后把這些輸出的鍵值對(duì)寫入到HDFS文件中。
1.3:Shuffle 過程
map 階段處理的數(shù)據(jù)如何傳遞給 reduce 階段敷矫,是 MapReduce 框架中最關(guān)鍵的一個(gè)流程例获,這個(gè)流程就叫 shuffle
shuffle: 洗牌、發(fā)牌 ——(核心機(jī)制:數(shù)據(jù)分區(qū)曹仗,排序榨汤,分組,規(guī)約怎茫,合并等過程)
shuffle 是 Mapreduce 的核心收壕,它分布在 Mapreduce 的 map 階段和 reduce 階段。一般把從 Map 產(chǎn)生輸出開始到 Reduce 取得數(shù)據(jù)作為輸入之前的過程稱作 shuffle遭居。
-
Collect階段
:將 MapTask 的結(jié)果輸出到默認(rèn)大小為 100M 的環(huán)形緩沖區(qū)啼器,保存的是 key/value,Partition 分區(qū)信息等俱萍。 -
Spill階段
:當(dāng)內(nèi)存中的數(shù)據(jù)量達(dá)到一定的閥值的時(shí)候端壳,就會(huì)將數(shù)據(jù)寫入本地磁盤,在將數(shù)據(jù)寫入磁盤之前需要對(duì)數(shù)據(jù)進(jìn)行一次排序的操作枪蘑,如果配置了 combiner损谦,還會(huì)將有相同分區(qū)號(hào)和 key 的數(shù)據(jù)進(jìn)行排序。 -
Merge階段
:把所有溢出的臨時(shí)文件進(jìn)行一次合并操作岳颇,以確保一個(gè) MapTask 最終只產(chǎn)生一個(gè)中間數(shù)據(jù)文件照捡。 -
Copy階段
:ReduceTask 啟動(dòng) Fetcher 線程到已經(jīng)完成 MapTask 的節(jié)點(diǎn)上復(fù)制一份屬于自己的數(shù)據(jù),這些數(shù)據(jù)默認(rèn)會(huì)保存在內(nèi)存的緩沖區(qū)中话侧,當(dāng)內(nèi)存的緩沖區(qū)達(dá)到一定的閥值的時(shí)候栗精,就會(huì)將數(shù)據(jù)寫到磁盤之上。 -
Merge階段
:在 ReduceTask 遠(yuǎn)程復(fù)制數(shù)據(jù)的同時(shí),會(huì)在后臺(tái)開啟兩個(gè)線程對(duì)內(nèi)存到本地的數(shù)據(jù)文件進(jìn)行合并操作悲立。 -
Sort階段
:在對(duì)數(shù)據(jù)進(jìn)行合并的同時(shí)鹿寨,會(huì)進(jìn)行排序操作,由于 MapTask 階段已經(jīng)對(duì)數(shù)據(jù)進(jìn)行了局部的排序薪夕,ReduceTask 只需保證 Copy 的數(shù)據(jù)的最終整體有效性即可脚草。
Shuffle 中的緩沖區(qū)大小會(huì)影響到 mapreduce 程序的執(zhí)行效率,原則上說原献,緩沖區(qū)越大馏慨,磁盤io的次數(shù)越少,執(zhí)行速度就越快
緩沖區(qū)的大小可以通過參數(shù)調(diào)整, 參數(shù):mapreduce.task.io.sort.mb 默認(rèn)100M
2. 案例: Reduce 端實(shí)現(xiàn) JOIN
2.1. 需求
假如數(shù)據(jù)量巨大姑隅,兩表的數(shù)據(jù)是以文件的形式存儲(chǔ)在 HDFS 中, 需要用 MapReduce 程序來實(shí)現(xiàn)以下 SQL 查詢運(yùn)算
select a.id,a.date,b.name,b.category_id,b.price from t_order a left join t_product b on a.pid = b.id
商品表
id | pname | category_id | price |
---|---|---|---|
P0001 | 小米5 | 1000 | 2000 |
P0002 | 錘子T1 | 1000 | 3000 |
訂單數(shù)據(jù)表
id | date | pid | amount |
---|---|---|---|
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0002 | 3 |
2.2 實(shí)現(xiàn)步驟
通過將關(guān)聯(lián)的條件作為map輸出的key写隶,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個(gè)reduce task粤策,在reduce中進(jìn)行數(shù)據(jù)的串聯(lián)
Step 1: 定義 Mapper
public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:判斷數(shù)據(jù)來自哪個(gè)文件
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
if(fileName.equals("product.txt")){
//數(shù)據(jù)來自商品表
//2:將K1和V1轉(zhuǎn)為K2和V2,寫入上下文中
String[] split = value.toString().split(",");
String productId = split[0];
context.write(new Text(productId), value);
}else{
//數(shù)據(jù)來自訂單表
//2:將K1和V1轉(zhuǎn)為K2和V2,寫入上下文中
String[] split = value.toString().split(",");
String productId = split[2];
context.write(new Text(productId), value);
}
}
}
Step 2: 定義 Reducer
public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:判斷數(shù)據(jù)來自哪個(gè)文件
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
if(fileName.equals("product.txt")){
//數(shù)據(jù)來自商品表
//2:將K1和V1轉(zhuǎn)為K2和V2,寫入上下文中
String[] split = value.toString().split(",");
String productId = split[0];
context.write(new Text(productId), value);
}else{
//數(shù)據(jù)來自訂單表
//2:將K1和V1轉(zhuǎn)為K2和V2,寫入上下文中
String[] split = value.toString().split(",");
String productId = split[2];
context.write(new Text(productId), value);
}
}
}
Step 3: 定義主類
public class ReduceJoinReducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//1:遍歷集合,獲取V3 (first +second)
String first = "";
String second = "";
for (Text value : values) {
if(value.toString().startsWith("p")){
first = value.toString();
}else{
second += value.toString();
}
}
//2:將K3和V3寫入上下文中
context.write(key, new Text(first+"\t"+second));
}
}
3. 案例: Map端實(shí)現(xiàn) JOIN
3.1 概述
? 適用于關(guān)聯(lián)表中有小表的情形.
? 使用分布式緩存,可以將小表分發(fā)到所有的map節(jié)點(diǎn)樟澜,這樣,map節(jié)點(diǎn)就可以在本地對(duì)自己所讀到的大表數(shù)據(jù)進(jìn)行join并輸出最終結(jié)果叮盘,可以大大提高join操作的并發(fā)度秩贰,加快處理速度
3.2 實(shí)現(xiàn)步驟
先在mapper類中預(yù)先定義好小表,進(jìn)行join
引入實(shí)際場景中的解決方案:一次加載數(shù)據(jù)庫或者用
Step 1:定義Mapper
public class MapJoinMapper extends Mapper<LongWritable,Text,Text,Text>{
private HashMap<String, String> map = new HashMap<>();
//第一件事情:將分布式緩存的小表數(shù)據(jù)讀取到本地Map集合(只需要做一次)
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//1:獲取分布式緩存文件列表
URI[] cacheFiles = context.getCacheFiles();
//2:獲取指定的分布式緩存文件的文件系統(tǒng)(FileSystem)
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
//3:獲取文件的輸入流
FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));
//4:讀取文件內(nèi)容, 并將數(shù)據(jù)存入Map集合
//4.1 將字節(jié)輸入流轉(zhuǎn)為字符緩沖流FSDataInputStream --->BufferedReader
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
//4.2 讀取小表文件內(nèi)容,以行位單位,并將讀取的數(shù)據(jù)存入map集合
String line = null;
while((line = bufferedReader.readLine()) != null){
String[] split = line.split(",");
map.put(split[0], line);
}
//5:關(guān)閉流
bufferedReader.close();
fileSystem.close();
}
//第二件事情:對(duì)大表的處理業(yè)務(wù)邏輯,而且要實(shí)現(xiàn)大表和小表的join操作
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:從行文本數(shù)據(jù)中獲取商品的id: p0001 , p0002 得到了K2
String[] split = value.toString().split(",");
String productId = split[2]; //K2
//2:在Map集合中,將商品的id作為鍵,獲取值(商品的行文本數(shù)據(jù)) ,將value和值拼接,得到V2
String productLine = map.get(productId);
String valueLine = productLine+"\t"+value.toString(); //V2
//3:將K2和V2寫入上下文中
context.write(new Text(productId), new Text(valueLine));
}
}
Step 2:定義主類
public class JobMain extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
//1:獲取job對(duì)象
Job job = Job.getInstance(super.getConf(), "map_join_job");
//2:設(shè)置job對(duì)象(將小表放在分布式緩存中)
//將小表放在分布式緩存中
// DistributedCache.addCacheFile(new URI("hdfs://node01:8020/cache_file/product.txt"), super.getConf());
job.addCacheFile(new URI("hdfs://node01:8020/cache_file/product.txt"));
//第一步:設(shè)置輸入類和輸入的路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\map_join_input"));
//第二步:設(shè)置Mapper類和數(shù)據(jù)類型
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//第八步:設(shè)置輸出類和輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\map_join_out"));
//3:等待任務(wù)結(jié)束
boolean bl = job.waitForCompletion(true);
return bl ? 0 :1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//啟動(dòng)job任務(wù)
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
4. 案例:求共同好友
4.1 需求分析
以下是qq的好友列表數(shù)據(jù)柔吼,冒號(hào)前是一個(gè)用戶毒费,冒號(hào)后是該用戶的所有好友(數(shù)據(jù)中的好友關(guān)系是單向的)
A:B,C,D,F,E,O
B:A,C,E,K
C:A,B,D,E,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰愈魏?
4.2 實(shí)現(xiàn)步驟
第一步:代碼實(shí)現(xiàn)
Mapper類
public class Step1Mapper extends Mapper<LongWritable,Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:以冒號(hào)拆分行文本數(shù)據(jù): 冒號(hào)左邊就是V2
String[] split = value.toString().split(":");
String userStr = split[0];
//2:將冒號(hào)右邊的字符串以逗號(hào)拆分,每個(gè)成員就是K2
String[] split1 = split[1].split(",");
for (String s : split1) {
//3:將K2和v2寫入上下文中
context.write(new Text(s), new Text(userStr));
}
}
}
Reducer類:
public class Step1Reducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//1:遍歷集合,并將每一個(gè)元素拼接,得到K3
StringBuffer buffer = new StringBuffer();
for (Text value : values) {
buffer.append(value.toString()).append("-");
}
//2:K2就是V3
//3:將K3和V3寫入上下文中
context.write(new Text(buffer.toString()), key);
}
}
JobMain:
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//1:獲取Job對(duì)象
Job job = Job.getInstance(super.getConf(), "common_friends_step1_job");
//2:設(shè)置job任務(wù)
//第一步:設(shè)置輸入類和輸入路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\common_friends_step1_input"));
//第二步:設(shè)置Mapper類和數(shù)據(jù)類型
job.setMapperClass(Step1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//第三,四,五,六
//第七步:設(shè)置Reducer類和數(shù)據(jù)類型
job.setReducerClass(Step1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//第八步:設(shè)置輸出類和輸出的路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step1_out"));
//3:等待job任務(wù)結(jié)束
boolean bl = job.waitForCompletion(true);
return bl ? 0: 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//啟動(dòng)job任務(wù)
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
第二步:代碼實(shí)現(xiàn)
Mapper類
public class Step2Mapper extends Mapper<LongWritable,Text,Text,Text> {
/*
K1 V1
0 A-F-C-J-E- B
----------------------------------
K2 V2
A-C B
A-E B
A-F B
C-E B
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:拆分行文本數(shù)據(jù),結(jié)果的第二部分可以得到V2
String[] split = value.toString().split("\t");
String friendStr =split[1];
//2:繼續(xù)以'-'為分隔符拆分行文本數(shù)據(jù)第一部分,得到數(shù)組
String[] userArray = split[0].split("-");
//3:對(duì)數(shù)組做一個(gè)排序
Arrays.sort(userArray);
//4:對(duì)數(shù)組中的元素進(jìn)行兩兩組合,得到K2
/*
A-E-C -----> A C E
A C E
A C E
*/
for (int i = 0; i <userArray.length -1 ; i++) {
for (int j = i+1; j < userArray.length ; j++) {
//5:將K2和V2寫入上下文中
context.write(new Text(userArray[i] +"-"+userArray[j]), new Text(friendStr));
}
}
}
}
Reducer類:
public class Step2Reducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//1:原來的K2就是K3
//2:將集合進(jìn)行遍歷,將集合中的元素拼接,得到V3
StringBuffer buffer = new StringBuffer();
for (Text value : values) {
buffer.append(value.toString()).append("-");
}
//3:將K3和V3寫入上下文中
context.write(key, new Text(buffer.toString()));
}
}
JobMain:
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//1:獲取Job對(duì)象
Job job = Job.getInstance(super.getConf(), "common_friends_step2_job");
//2:設(shè)置job任務(wù)
//第一步:設(shè)置輸入類和輸入路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\out\\common_friends_step1_out"));
//第二步:設(shè)置Mapper類和數(shù)據(jù)類型
job.setMapperClass(Step2Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//第三,四,五,六
//第七步:設(shè)置Reducer類和數(shù)據(jù)類型
job.setReducerClass(Step2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//第八步:設(shè)置輸出類和輸出的路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step2_out"));
//3:等待job任務(wù)結(jié)束
boolean bl = job.waitForCompletion(true);
return bl ? 0: 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//啟動(dòng)job任務(wù)
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}