使用MapReduce運行WordCount案例

@

一、準備數據

注意:準備的數據的格式必須是文本,每個單詞之間使用製表符分割。編碼必須是utf-8無bom
在這裡插入圖片描述

二、MR的編程規範

MR的編程只需要將自定義的組件和系統默認組件進行組合,組合之後運行即可!

三、編程步驟

①Map階段的核心處理邏輯需要編寫在Mapper
②Reduce階段的核心處理邏輯需要編寫在Reducer
③將編寫的Mapper和Reducer進行組合,組合成一個Job
④對Job進行設置,設置後運行

四、編寫程式

WCMapper.java

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	private Text out_key=new Text();
	private IntWritable out_value=new IntWritable(1);//每個單詞出現一次記為1
	
	// 針對輸入的每個 keyin-valuein調用一次   (0,hello	hi	hello	hi)
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws Exception {
	
		System.out.println("keyin:"+key+"----keyout:"+value);
		
		String[] words = value.toString().split("\t");
		
		for (String word : words) {
			out_key.set(word);
			//寫出數據(單詞,1)
			context.write(out_key, out_value);
		}
			
	}
}

Mapper程式解讀

  1. 導包時,需注意導入 org.apache.hadoop.mapreduce包下的類(hadoop2.0的新api)

  2. 自定義的類必須符合MR的Mapper的規範

  3. 在MR中,只能處理key-value格式的數據
    KEYIN, VALUEIN: mapper輸入的k-v類型,由當前Job的InputFormat的RecordReader決定!封裝輸入的key-value由RecordReader自動進行,不可自定義。
    KEYOUT, VALUEOUT: mapper輸出的k-v類型,可自定義

  4. InputFormat的作用:
    ①驗證輸入目錄中的文件格式,是否符合當前Job的要求
    ②生成切片,每個切片都會交給一個MapTask處理
    ③提供RecordReader,由RR從切片中讀取記錄,交給Mapper進行處理

方法: List<InputSplit> getSplits: 切片
RecordReader<K,V> createRecordReader: 創建RecordReader

默認hadoop使用的是TextInputFormat,TextInputFormat使用LineRecordReader

  1. 在Hadoop中,如果有Reduce階段。通常key-value都需要實現序列化協議!
    MapTask處理後的key-value,只是一個階段性的結果!
    這些key-value需要傳輸到ReduceTask所在的機器!
    將一個對象通過序列化技術,序列化到一個文件中,經過網路傳輸到另外一台機器,
    再使用反序列化技術,從文件中讀取數據,還原為對象是最快捷的方式!

java的序列化協議: Serializable
特點:不僅保存對象的屬性值,類型,還會保存大量的包的結構,子父類和介面的繼承資訊,很笨重。
hadoop開發了一款輕量級的序列化協議: Writable機制!

WCReducer.java

/* 
 *KEYIN, VALUEIN: Mapper輸出的keyout-valueout
 *KEYOUT, VALUEOUT: 自定義		
 */		
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	
	private IntWritable out_value=new IntWritable();
	
	// reduce一次處理一組數據,key相同的視為一組
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws Exception {
		
		int sum=0;
		
		for (IntWritable intWritable : values) {
			sum += intWritable.get();
		}
		
		out_value.set(sum);
		
		//將累加的值寫出
		context.write(key, out_value);
		
	}
}

WCDriver.java

/*
 * 1.啟動這個執行緒,運行Job
 * 
 * 2.本地模式主要用於測試程式是否正確!	
 */
public class WCDriver {
	
	public static void main(String[] args) throws Exception {
		
		//默認使用本地的文件系統
		Path inputPath=new Path("e:/mrinput/wordcount");
		Path outputPath=new Path("e:/mroutput/wordcount");
		
		/*Path inputPath=new Path("/wordcount");
		Path outputPath=new Path("/mroutput/wordcount");*/
		
		//作為整個Job的配置
		Configuration conf = new Configuration();
		
		//使用HDFS
		/*conf.set("fs.defaultFS", "hdfs://hadoop101:9000");
		
		// 在YARN上運行
		conf.set("mapreduce.framework.name", "yarn");
		// RM所在的機器
		conf.set("yarn.resourcemanager.hostname", "hadoop102");*/
		
		//一定要保證輸出目錄不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		
		// ①創建Job
		Job job = Job.getInstance(conf);
		
		// 告訴NM運行時,MR中Job所在的Jar包在哪裡
		//job.setJar("MapReduce-0.0.1-SNAPSHOT.jar");
		// 將某個類所在地jar包作為job的jar包
		job.setJarByClass(WCDriver.class);
			
		// 為Job創建一個名字
		job.setJobName("wordcount");
		
		// ②設置Job
		// 設置Job運行的Mapper,Reducer類型,Mapper,Reducer輸出的key-value類型
		job.setMapperClass(WCMapper.class);
		job.setReducerClass(WCReducer.class);
		
		// Job需要根據Mapper和Reducer輸出的Key-value類型準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
		// 如果Mapper和Reducer輸出的Key-value類型一致,直接設置Job最終的輸出類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 設置輸入目錄和輸出目錄
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// ③運行Job
		job.waitForCompletion(true);
			
	}
}

Tags: