MapReduce之自定義InputFormat

在企業開發中,Hadoop框架自帶的InputFormat類型不能滿足所有應用場景,需要自定義InputFormat來解決實際問題。
自定義InputFormat步驟如下:

  • (1)自定義一個類繼承FilelnputFormat
  • (2)自定義一個類繼承RecordReader,實現一次讀取一個完整文件,將文件名為key,文件內容為value。
  • (3)在輸出時使用SequenceFileOutPutFormat輸出合併文件。

無論HDFS還是MapReduce,在處理小文件時效率都非常低,但又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案。可以自定義InputFormat實現小文件的合併

1. 需求

將多個小文件合併成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進位形式的key-value(bytes) 對的文件格式),SequenceFile裡面存儲著多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value

(1)輸入數據
在這裡插入圖片描述
(2)期望輸出文件格式
在這裡插入圖片描述

2. 需求分析

  1. 自定義一個類繼承FileInputFormat
    (1)重寫isSplitable()方法,返回false,讓文件不可切,整個文件作為1片
    (2)重寫createRecordReader(),返回自定義的RecordReader對象

  2. 自定義一個類繼承RecordReader
    在RecordReader中,nextKeyValue()是最重要的方法,返回當前讀取到的key-value,如果讀到返回true,調用Mapper的map()來處理,否則返回false

3. 編寫程式

MyInputFormat.java

/*
 * 1. 改變切片策略,一個文件固定切1片,通過指定文件不可切
 * 
 * 2. 提供RR ,這個RR讀取切片的文件名作為key,讀取切片的內容封裝到bytes作為value
 */
public class MyInputFormat extends FileInputFormat {

	@Override
	public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		return new MyRecordReader();
	}
	
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}
}

MyRecordReader.java

/*
 * RecordReader從MapTask處理的當前切片中讀取數據
 * 
 * XXXContext都是Job的上下文,通過XXXContext可以獲取Job的配置Configuration對象
 */
public class MyRecordReader extends RecordReader {
	
	private Text key;
	private BytesWritable value;
	
	private String filename;
	private int length;
	
	private FileSystem fs;
	private Path path;
	
	private FSDataInputStream is;
	
	private boolean flag=true;

	// MyRecordReader在創建後,在進入Mapper的run()之前,自動調用
	// 文件的所有內容設置為1個切片,切片的長度等於文件的長度
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

		FileSplit fileSplit=(FileSplit) split;
		
		filename=fileSplit.getPath().getName();
		
		length=(int) fileSplit.getLength();
		
		path=fileSplit.getPath();
		
		//獲取當前Job的配置對象
		Configuration conf = context.getConfiguration();
		
		//獲取當前Job使用的文件系統
		fs=FileSystem.get(conf);
		
		 is = fs.open(path);
		
	}

	// 讀取一組輸入的key-value,讀到返回true,否則返回false
	// 將文件的名稱封裝為key,將文件的內容封裝為BytesWritable類型的value,返回true
	// 第二次調用nextKeyValue()返回false
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		
		if (flag) {
			
			//實例化對象
			if (key==null) {
				key=new Text();
			}
			
			if (value==null) {
				value=new BytesWritable();
			}
			
			//賦值
			//將文件名封裝到key中
			key.set(filename);
			
			// 將文件的內容讀取到BytesWritable中
			byte [] content=new byte[length];
			
			IOUtils.readFully(is, content, 0, length);
			
			value.set(content, 0, length);
			
			flag=false;
			
			return true;
			
		}
		return false;
	}

	//返回當前讀取到的key-value中的key
	@Override
	public Object getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	//返回當前讀取到的key-value中的value
	@Override
	public Object getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	//返回讀取切片的進度
	@Override
	public float getProgress() throws IOException, InterruptedException {
		return 0;
	}

	// 在Mapper的輸入關閉時調用,清理工作
	@Override
	public void close() throws IOException {
		if (is != null) {
			IOUtils.closeStream(is);
		}	
		if (fs !=null) {
			fs.close();
		}
	}
}

CustomIFMapper.java

public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

}

CustomIFReducer.java

public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{

}

CustomIFDriver.java

public class CustomIFDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("e:/mrinput/custom");
		Path outputPath=new Path("e:/mroutput/custom");
		
		//作為整個Job的配置
		Configuration conf = new Configuration();
		//保證輸出目錄不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		
		// 創建Job
		Job job = Job.getInstance(conf);

		// 設置Job運行的Mapper,Reducer類型,Mapper,Reducer輸出的key-value類型
		job.setMapperClass(CustomIFMapper.class);
		job.setReducerClass(CustomIFReducer.class);
		
		// Job需要根據Mapper和Reducer輸出的Key-value類型準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
		// 如果Mapper和Reducer輸出的Key-value類型一致,直接設置Job最終的輸出類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);
		
		// 設置輸入目錄和輸出目錄
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// 設置輸入和輸出格式
		job.setInputFormatClass(MyInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
		
		// ③運行Job
		job.waitForCompletion(true);
			
	}
}
Tags: