我們都知道澎嚣,當對兩個表進行關聯(lián)的時候可以用sql的join語句簡單的去實現(xiàn),并且如果兩張表的數(shù)據(jù)查詢非常大瘟芝,那么一般會講小表放在左邊易桃,可以達到優(yōu)化的作用,為何呢锌俱?其實我們在使用mapreduce的時候小表可以先加載到內(nèi)存中晤郑,然后再與輸入數(shù)據(jù)進行對比,如果匹配成功則關聯(lián)輸出贸宏。今天我們將介紹使用mapreduce中mapjoin與reducejoin兩種方式對數(shù)據(jù)的關聯(lián)并輸出贩汉。
一、先看數(shù)據(jù):
image.png
我們分別將兩個數(shù)據(jù)文件放到hdfs上:
image.png
二锚赤、以 order 作為小表在 map 中進行 join,首先我們創(chuàng)建驅(qū)動類框架:
public class MapJoinRM extends Configured implements Tool {
//加載到內(nèi)存中的對象
static Map<String, String> customerMap = new HashMap<String, String>();
public int run(String[] args) throws Exception {
//driver
//1) 獲取配置對象
Configuration configuration = this.getConf();
//2) 創(chuàng)建任務對象
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
//3.1) 設置輸入
Path path = new Path(args[0]);
FileInputFormat.addInputPath(job, path);
//3.2) map 的設置
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//3.3 reduce 設置
//3.4 添加緩存
URI uri = new URI(args[2]);
job.addCacheFile(uri);
//3.5 設置輸出
Path output = new Path(args[1]);
FileOutputFormat.setOutputPath(job, output);
//4. 提交
boolean sucess = job.waitForCompletion(true);
return sucess ? 0 : 1;
}
public static void main(String[] args) {
args = new String[]{
"hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/order.txt",
"hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output66",
"hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/customer.txt"
};
Configuration configuration = new Configuration();
try {
//判斷是否已經(jīng)存在路徑
Path fileOutputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(fileOutputPath)){
fileSystem.delete(fileOutputPath, true);
}
int status = ToolRunner.run(configuration, new MapJoinRM(), args);
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
}
三褐鸥、實現(xiàn) mapper 子類處理緩存數(shù)據(jù)以及關聯(lián)邏輯的實現(xiàn):
public static class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text outputKey = new Text();
private Text outputValue = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//緩存數(shù)據(jù)的處理
Configuration configuration = context.getConfiguration();
URI[] uri = Job.getInstance(configuration).getCacheFiles();
Path path = new Path(uri[0]);
FileSystem fileSystem = FileSystem.get(configuration);
InputStream inputStream = fileSystem.open(path);
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String line = null;
while((line = bufferedReader.readLine()) != null){
if(line.trim().length() > 0){
customerMap.put(line.split(",")[0], line);
}
}
bufferedReader.close();
inputStreamReader.close();
inputStream.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineValue = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(lineValue, ",");
while(stringTokenizer.hasMoreTokens()){
String wordValue = stringTokenizer.nextToken();
if(customerMap.get(wordValue) != null){
outputKey.set(wordValue);
outputValue.set(customerMap.get(wordValue) + lineValue);
context.write(outputKey, outputValue);
break;
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
}
}
四线脚、運行程序并在控制臺中命令查看關聯(lián)結果:
bin/hdfs dfs -text /user/hdfs/output66/part*
運行結果如圖:
image.png
大小表的關聯(lián)就這么簡單,接下來我們使用 reduce 的進行 join
五、由于在 reduce 中進行 join 的話是同時加載兩個數(shù)據(jù)進來的浑侥,為了區(qū)分從 map 中傳進來的數(shù)據(jù)姊舵,我們要自定義一個類型,設置一個變量用于標識是哪張表的數(shù)據(jù)寓落,這樣我們在reduce中才能區(qū)分哪些數(shù)據(jù)是屬于哪張表的:
public class DataJoionWritable implements Writable {
private String tag;
private String data;
public DataJoionWritable() {
}
public DataJoionWritable(String tag, String data) {
this.set(tag, data);
}
public void set(String tag, String data){
this.tag = tag;
this.data = data;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.getTag());
dataOutput.writeUTF(this.getData());
}
public void readFields(DataInput dataInput) throws IOException {
this.setTag(dataInput.readUTF());
this.setData(dataInput.readUTF());
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
@Override
public String toString() {
return "DataJoionWritable{" +
"tag='" + tag + '\'' +
", data='" + data + '\'' +
'}';
}
}
六括丁、為了方便使用表示常量我們創(chuàng)建一個常用類:
public class DataCommon {
public final static String CUSTOMER = "customer";
public final static String ORDER = "order";
}
七、創(chuàng)建驅(qū)動類的通用框架:
public class ReduceJoinMR extends Configured implements Tool {
public int run(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
//driver
//1) 獲取配置對象
Configuration configuration = this.getConf();
//2) 創(chuàng)建任務對象
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
//3.1) 設置輸入
Path path = new Path(args[0]);
FileInputFormat.addInputPath(job, path);
//3.2) map 的設置
job.setMapperClass(JoinMapper2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataJoionWritable.class);
//3.3 reduce 設置
job.setReducerClass(JoinReduce2.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//3.4 設置輸出
Path output = new Path(args[1]);
FileOutputFormat.setOutputPath(job, output);
//4. 提交
boolean sucess = job.waitForCompletion(true);
return sucess ? 0 : 1;
}
public static void main(String[] args) {
//datas目錄下有已存在要關聯(lián)的兩個數(shù)據(jù)文件
args = new String[]{
"hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/datas",
"hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output100"
};
Configuration configuration = new Configuration();
try {
//判斷是否已經(jīng)存在路徑
Path fileOutputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(fileOutputPath)){
fileSystem.delete(fileOutputPath, true);
}
int status = ToolRunner.run(configuration, new ReduceJoinMR(), args);
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
}
八伶选、接下來我們開始實現(xiàn) Mapper 的數(shù)據(jù)邏輯的處理:
public static class JoinMapper2 extends Mapper<LongWritable, Text, Text, DataJoionWritable>{
private Text outputKey = new Text();
DataJoionWritable outputValue = new DataJoionWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(",");
if((3 != values.length) && (4 != values.length)) return;
//customer
if(3 == values.length){
String cid = values[0];
String name = values[1];
String telphone = values[2];
outputKey.set(cid);
outputValue.set(DataCommon.CUSTOMER,name + ","+telphone);
}
//order
if(4 == values.length){
String cid = values[1];
String price = values[2];
String productName = values[3];
outputKey.set(cid);
outputValue.set(DataCommon.ORDER,productName + ","+price);
}
context.write(outputKey,outputValue);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
}
}
九史飞、使用 reduce 對數(shù)據(jù)的關聯(lián)處理:
public static class JoinReduce2 extends Reducer<Text, DataJoionWritable, NullWritable, Text>{
private Text outputValue = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void reduce(Text key, Iterable<DataJoionWritable> values, Context context) throws IOException, InterruptedException {
String customerInfo = null;
List<String> orderList = new ArrayList<String>();
for (DataJoionWritable dataJoinWritable : values){
if(DataCommon.CUSTOMER.equals(dataJoinWritable.getTag())){
customerInfo = dataJoinWritable.getData();
}
else if(DataCommon.ORDER.equals(dataJoinWritable.getTag())){
orderList.add(dataJoinWritable.getData());
}
}
for (String orderInfo : orderList){
if(customerInfo == null) continue;
outputValue.set(key.toString() +","+ customerInfo + ","+ orderInfo);
context.write(NullWritable.get(),outputValue);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
}
}
十、使用命令查詢結果如下:
image.png
由于時間過于緊迫仰税,基本上就粘貼代碼了构资,后續(xù)會優(yōu)化,在此感謝老師的思路陨簇。吐绵。。