Mapreduce學習(一)
MapReduce 介紹
簡單介紹:
MapReduce思想在生活中處處可見。或多或少都曾接觸過這種思想。MapReduce的思想核心
是「分而治之」,適用於大量複雜的任務處理場景(大規模數據處理場景)。
- Map負責「分」,即把複雜的任務分解為若干個「簡單的任務」來並行處理。可以進行拆分的
- 前提是這些小任務可以並行計算,彼此間幾乎沒有依賴關係。
- Reduce負責「合」,即對map階段的結果進行全局匯總。
- MapReduce運行在yarn集群
- 1. ResourceManager
- 2. NodeManager
這兩個階段合起來正是MapReduce思想的體現。
MapReduce 設計構思
MapReduce是一個分散式運算程式的編程框架,核心功能是將用戶編寫的業務邏輯程式碼和自
帶默認組件整合成一個完整的分散式運算程式,並發運行在Hadoop集群上。
MapReduce設計並提供了統一的計算框架,為程式設計師隱藏了絕大多數系統層面的處理細節。
為程式設計師提供一個抽象和高層的編程介面和框架。程式設計師僅需要關心其應用層的具體計算問
題,僅需編寫少量的處理應用本身計算問題的程式程式碼。如何具體完成這個並行計算任務所
相關的諸多系統層細節被隱藏起來,交給計算框架去處理:
Map和Reduce為程式設計師提供了一個清晰的操作介面抽象描述。MapReduce中定義了如下的Map
和Reduce兩個抽象的編程介面,由用戶去編程實現.Map和Reduce,MapReduce處理的數據類型
是<key,value>鍵值對。
Map: (k1; v1) → [(k2; v2)]
Reduce: (k2; [v2]) → [(k3; v3)]
一個完整的mapreduce程式在分散式運行時有三類實例進程:
1. MRAppMaster 負責整個程式的過程調度及狀態協調
2. MapTask 負責map階段的整個數據處理流程
3. ReduceTask 負責reduce階段的整個數據處理流程
MapReduce 編程規範
Map 階段 2 個步驟
1. 設置 InputFormat 類, 將數據切分為 Key-Value(K1和V1) 對, 輸入到第二步
2. 自定義 Map 邏輯, 將第一步的結果轉換成另外的 Key-Value(K2和V2) 對, 輸出結果
Shuffle 階段 4 個步驟
3. 對輸出的 Key-Value 對進行分區
4. 對不同分區的數據按照相同的 Key 排序
5. (可選) 對分組過的數據初步規約, 降低數據的網路拷貝
6. 對數據進行分組, 相同 Key 的 Value 放入一個集合中
Reduce 階段 2 個步驟
7. 對多個 Map 任務的結果進行排序以及合併, 編寫 Reduce 函數實現自己的邏輯, 對輸入的
Key-Value 進行處理, 轉為新的 Key-Value(K3和V3)輸出
8. 設置 OutputFormat 處理並保存 Reduce 輸出的 Key-Value 數據
WordCount單詞統計實戰
程式碼編寫
數據準備:在Hadoop101客戶端上寫一個文件上傳到hadoop
WordCountMapper
package cn.itcast.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
//map方法就是將k1和v1轉為k2和v2
/*
參數:
key:K1 行偏移量
value: v1 每一行的文本數據
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
LongWritable longWritable = new LongWritable();
//1.將一行的文本數據進行拆分
String[] split = value.toString().split(",");
//2.遍曆數組,組裝K2和V2
for (String word:split){
//3.將K2和V2寫入上下文
text.set(word);
longWritable.set(1);
context.write(text,longWritable);
}
}
}
WordCountReduce
package cn.itcast.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.awt.*;
import java.io.IOException;
public class WordCountReduce extends Reducer<Text,LongWritable,Text,LongWritable> {
//reduce將K2和V2轉為K3和V3,將K3和V3寫入上下文中
/*
參數:
key:新K2
values:結合 新V2
context:表示上下文對象
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1.便利結合,將集合中的數字相加,得到V3
long count=0;
for (LongWritable value : values) {
count+=value.get();
}
//2.將K3和V3寫入上下文中
context.write(key,new LongWritable(count));
}
}
JobMain
package cn.itcast.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
//該方法用於指定一個job任務
@Override
public int run(String[] strings) throws Exception {
//1.創建一個job任務對象
Job job = Job.getInstance(super.getConf(),"wordcount");
//2.配置job任務對象(八個步驟)
//打包jar路徑主類
job.setJarByClass(JobMain.class);
//第一步:指定文件的讀取方式和讀取路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/wordcount"));
//本地測試元數據
//TextInputFormat.addInputPath(job,new Path("file:///E:\\mapreduce\\input"));
//第二步:指定map階段的處理方式和數據類型
job.setMapperClass(WordCountMapper.class);
//設置Map階段K2的類型
job.setMapOutputKeyClass(Text.class);
//設置Map階段V2的類型
job.setMapOutputValueClass(LongWritable.class);
//第三,四,五,六採用默認方式
//第七步:指定reduce階段的處理方式和數據類型
job.setReducerClass(WordCountReduce.class);
//設置K3的類型
job.setOutputKeyClass(Text.class);
//設置V3的類型
job.setOutputValueClass(LongWritable.class);
//第八步:設置輸出類型
job.setOutputFormatClass(TextOutputFormat.class);
//設置輸出的路徑
Path path = new Path("hdfs://hadoop101:8020/wordcount_out");
TextOutputFormat.setOutputPath(job,path);
//本地測試輸出
//TextOutputFormat.setOutputPath(job,new Path("file:///E:\\mapreduce\\output"));
//獲取filesystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration());
//判斷目錄存在
boolean bl2=fileSystem.exists(path);
if(bl2){
fileSystem.delete(path,true);
}
//等待任務結束
boolean bl=job.waitForCompletion(true);
return bl?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//1.啟動job任務
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
程式運行方式
①打包成jar包,然後上傳到linux伺服器,用命令來運行
hadoop jar 包名 程式主程式路徑
②在程式里進行測試,首先得有本地的存儲測試input的文件路徑,然後定義一個輸出路徑(該路徑必須不存在,否則會報錯)
//本地測試元數據
TextInputFormat.addInputPath(job,newPath("file:///E:\\mapreduce\\input"));
//本地測試輸出
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\mapreduce\\output"));
程式運行結果
源數據文件內容(input):
運行輸出結果(output):