MapReduce原理深入理解(二)

1.Mapreduce操作不需要reduce階段

 

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.FileSystem;
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.NullWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 
12 import java.io.IOException;
13 
14 public class WordCount03 {
15     public static class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
16         @Override
17         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
18             String line = value.toString();
19             String s = line.split(",")[3];
20             if(s.equals("男")){
21                 context.write(new Text(s),NullWritable.get());
22             }
23         }
24     }
25     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
26         Job job= Job.getInstance();
27         job.setNumReduceTasks(0);
28         /**
29          * 有些情況下,不需要reduce(聚合程式),
30          * 在不需要聚合操作的時候,可以不需要reduce
31          * 而reduce默認為1,需要手動設置為0,
32          * 如果沒有設置為0,會產生默認的reduce,只不過reduce不處理任何數據
33          */
34         job.setJobName("mr03程式");
35         job.setJarByClass(WordCount03.class);
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(NullWritable.class);
38         Path in = new Path("/word");
39         FileInputFormat.addInputPath(job,in);
40         Path out = new Path("/output");
41         FileSystem fs = FileSystem.get(new Configuration());
42         if(fs.exists(out)){
43             fs.delete(out);
44         }
45         FileOutputFormat.setOutputPath(job,out);
46         job.waitForCompletion(true);
47     }
48 }

 

注意:

有些情況下,不需要reduce(聚合程式),
在不需要聚合操作的時候,可以不需要reduce
而reduce默認為1,需要手動設置為0,
如果沒有設置為0,會產生默認的reduce,只不過reduce不處理任何數據

2.MapReduce中join操作(數據拼接)
  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.NullWritable;
  6 import org.apache.hadoop.io.Text;
  7 import org.apache.hadoop.mapreduce.InputSplit;
  8 import org.apache.hadoop.mapreduce.Job;
  9 import org.apache.hadoop.mapreduce.Mapper;
 10 import org.apache.hadoop.mapreduce.Reducer;
 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 12 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 14 
 15 import java.io.IOException;
 16 import java.util.ArrayList;
 17 
 18 public class WordCount04 {
 19     public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
 20         @Override
 21         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 22             //1.獲取數據的路徑 InputSplit
 23             //context 上面是hdfs 下面如果有reduce就是reduce 沒有就是hdfs
 24             InputSplit inputSplit = context.getInputSplit();
 25             FileSplit fs=(FileSplit)inputSplit;
 26             String url = fs.getPath().toString();
 27             //2.判斷
 28             if(url.contains("students")){//true當前數據為students.txt
 29                 String id = value.toString().split(",")[0];
 30                 //為了方便reduce數據的操作 針對於不同的數據 打一個標籤
 31                 String line = "*" + value.toString();
 32                 context.write(new Text(id),new Text(line));
 33             }else {//false 當前數據為score.txt
 34                 //以學號作為k 也是兩張數據的關聯條件
 35                 String id = value.toString().split(",")[0];
 36                 //為了方便reduce數據的操作 針對於不同的數據 打一個標籤
 37                 String line = "#" + value.toString();
 38                 context.write(new Text(id),new Text(line));
 39             }
 40         }
 41     }
 42     public static class JoinReduce extends Reducer<Text,Text,Text,NullWritable>{
 43         @Override
 44         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 45             //數據在循環之外保存
 46             String stuInfo="";
 47             ArrayList<String> scores = new ArrayList<String>();
 48             //提取數據
 49             for (Text value : values) {
 50                 //獲取一行一行的數據(所有數據包含students.txt和score.txt)
 51                 String line = value.toString();
 52                 if(line.startsWith("*")){//true 為學生數據
 53                     stuInfo= line.substring(1);
 54                 }else {//false  為學生成績數據
 55                     scores.add(line.substring(1));
 56                 }
 57             }
 58             /**
 59              * 求的是 兩張表的拼接
 60              */
 61             //數據拼接
 62             for (String score : scores) {
 63                 String subject = score.split(",")[1];
 64                 String s = score.split(",")[2];
 65                 String end=stuInfo+","+subject+","+s;
 66                 context.write(new Text(end),NullWritable.get());
 67             }
 68             /**
 69              * 求的是 兩張表的拼接 拼接過程中對成績求和
 70              */
 71 //            long sum=0l;
 72 //            for (String s : scores) {
 73 //                Integer sc =Integer.valueOf( s.split(",")[2]);
 74 //                sum+=sc;
 75 //            }
 76 //            String end=stuInfo+","+sum;
 77 //            context.write(new Text(end),NullWritable.get());
 78         }
 79     }
 80     public static void main(String[] args) throws Exception {
 81         Job job = Job.getInstance();
 82         job.setJobName("Join MapReduce");
 83         job.setJarByClass(WordCount04.class);
 84 
 85         job.setMapperClass(JoinMapper.class);
 86         job.setMapOutputKeyClass(Text.class);
 87         job.setMapOutputValueClass(Text.class);
 88 
 89         job.setReducerClass(JoinReduce.class);
 90         job.setOutputKeyClass(Text.class);
 91         job.setOutputValueClass(NullWritable.class);
 92         //指定路徑
 93         FileInputFormat.addInputPath(job,new Path("/word"));
 94         Path path = new Path("/output");
 95         FileSystem fs = FileSystem.get(new Configuration());
 96         if(fs.exists(path)){
 97             fs.delete(path);
 98         }
 99         FileOutputFormat.setOutputPath(job,new Path("/output"));
100         job.waitForCompletion(true);
101         System.out.println("join 正在執行");
102     }
103 }