MapReduce之自定義分區器Partitioner

@

問題引出

要求將統計結果按照條件輸出到不同文件中(分區)。

比如:將統計結果按照手機歸屬地不同省份輸出到不同文件中(分區)

默認Partitioner分區

public class HashPartitioner<K,V> extends Partitioner<K,V>{
	public int getPartition(K key,V value, int numReduceTasks){
		return (key.hashCode() & Integer.MAX VALUE) & numReduceTasks;
	}
}
  • 默認分區是根據keyhashCodeReduceTasks個數取模得到的。
  • 用戶沒法控制哪個key存儲到哪個分區。

自定義Partitioner步驟

  1. 自定義類繼承Partitioner,重寫getPartition()方法
public class CustomPartitioner extends Partitioner<Text,FlowBea>{
	@Override 
	public int getPartition(Text key,FlowBean value,int numPartitions){
		//控制分區代碼邏輯
		……
		return partition;
	}
}
  1. 在Job驅動類中,設置自定義Partitioner
job.setPartitionerClass(CustomPartitioner.class)
  1. 自定義Partition後,要根據自定義Partitioner的邏輯設置相應數量的ReduceTask
 job.setNumReduceTask(5);//假設需要分5個區

Partition分區案例實操

將統計結果按照手機歸屬地不同省份輸出到不同文件中(分區)

輸入數據:
在這裡插入圖片描述

期望輸出數據:
手機號136、137、138、139開頭都分別放到一個獨立的4個文件中,其他開頭的放到一個文件中。所以總共分為5個文件,也就是五個區。

相比於之前的自定義flowbean,這次自定義分區,只需要多編寫一個分區器,以及在job驅動類中設置分區器,mapper和reducer類不改變

MyPartitioner.java

/*
 * KEY, VALUE: Mapper輸出的Key-value類型
 */
public class MyPartitioner extends Partitioner<Text, FlowBean>{

	// 計算分區  numPartitions為總的分區數,reduceTask的數量
	// 分區號必須為int型的值,且必須符合 0<= partitionNum < numPartitions
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
		
		String suffix = key.toString().substring(0, 3);//前開後閉,取手機號前三位數
		
		int partitionNum=0;//分區編號
		
		
		switch (suffix) {
		case "136":
			partitionNum=numPartitions-1;//由於分區編號不能大於分區總數,所以用這種方法比較好
			break;
		case "137":
			partitionNum=numPartitions-2;
			break;
		case "138":
			partitionNum=numPartitions-3;
			break;
		case "139":
			partitionNum=numPartitions-4;
			break;

		default:
			break;
		}

		return partitionNum;
	}

}

FlowBeanDriver.java

public class FlowBeanDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("e:/mrinput/flowbean");
		Path outputPath=new Path("e:/mroutput/partitionflowbean");
		
		//作為整個Job的配置
		Configuration conf = new Configuration();
		
		//保證輸出目錄不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		
		// ①創建Job
		Job job = Job.getInstance(conf);
		
		// ②設置Job
		// 設置Job運行的Mapper,Reducer類型,Mapper,Reducer輸出的key-value類型
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);
		
		// Job需要根據Mapper和Reducer輸出的Key-value類型準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
		// 如果Mapper和Reducer輸出的Key-value類型一致,直接設置Job最終的輸出類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		// 設置輸入目錄和輸出目錄
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// 設置ReduceTask的數量為5
		job.setNumReduceTasks(5);
		
		// 設置使用自定義的分區器
		job.setPartitionerClass(MyPartitioner.class);
		
		// ③運行Job
		job.waitForCompletion(true);
		
	}
}

FlowBeanMapper.java

/*
 * 1. 統計手機號(String)的上行(long,int),下行(long,int),總流量(long,int)
 * 
 * 手機號為key,Bean{上行(long,int),下行(long,int),總流量(long,int)}為value
 * 		
 * 
 * 
 * 
 */
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
	
	private Text out_key=new Text();
	private FlowBean out_value=new FlowBean();
	
	// (0,1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200)
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		
		String[] words = value.toString().split("\t");
		
		//封裝手機號
		out_key.set(words[1]);
		// 封裝上行
		out_value.setUpFlow(Long.parseLong(words[words.length-3]));
		// 封裝下行
		out_value.setDownFlow(Long.parseLong(words[words.length-2]));

		context.write(out_key, out_value);
	}
}

FlowBeanReducer.java

public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
	
	private FlowBean out_value=new FlowBean();
	
	@Override
	protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		
		long sumUpFlow=0;
		long sumDownFlow=0;
		
		for (FlowBean flowBean : values) {
			
			sumUpFlow+=flowBean.getUpFlow();
			sumDownFlow+=flowBean.getDownFlow();
			
		}
		
		out_value.setUpFlow(sumUpFlow);
		out_value.setDownFlow(sumDownFlow);
		out_value.setSumFlow(sumDownFlow+sumUpFlow);
		
		context.write(key, out_value);
		
	}
}

FlowBean.java

public class FlowBean implements Writable{
	
	private long upFlow;
	private long downFlow;
	private long sumFlow;
	
	public FlowBean() {
		
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	// 序列化   在寫出屬性時,如果為引用數據類型,屬性不能為null
	@Override
	public void write(DataOutput out) throws IOException {
		
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
		
		
	}

	//反序列化   序列化和反序列化的順序要一致
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow=in.readLong();
		downFlow=in.readLong();
		sumFlow=in.readLong();
		
	}

	@Override
	public String toString() {
		return  upFlow + "\t" + downFlow + "\t" + sumFlow;
	}
}

輸出結果:
總共五個文件
在這裡插入圖片描述
一號區:
在這裡插入圖片描述
二號區:
在這裡插入圖片描述
三號區:
在這裡插入圖片描述

四號區:
在這裡插入圖片描述

其他號碼為第五號區:
在這裡插入圖片描述

分區總結

  • 如果ReduceTask的數量 > getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx
  • 如果Reduceask的數量 < getPartition的結果數,則有一部分分區數據無處安放,會Exception
  • 如果ReduceTask的數量 = 1,則不管MapTask端輸出多少個分區文件,最終結果都交給這一個ReduceTask,最終也就只會產生一個結果文件partr-00000

以剛才的案例分析:
例如:假設自定義分區數為5,則

  • job.setlNlurmReduce Task(1);會正常運行,只不過會產生一個輸出文件
  • job.setlNlunReduce Task(2),會報錯
  • job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件
Tags: