MapReduce入門實戰
MapReduce 思想
MapReduce 是 Google 提出的一個軟件架構,用於大規模數據集的並行運算。概率「Map(映射)」和「Reduce(歸約)」以及它們的思想都是從函數式編程語言借鑒的,還有從矢量編程語言借來的特性。
當前的軟件實現是指定一個「Map」函數,用來把一組鍵值對映射成一組新的鍵值對,指定並發的「Reduce」函數,用來保證所有映射的鍵值對中的每一個都共享相同的鍵組。
Hadoop MapReduce 的任務過程分為兩個階段:
- Map 階段:把大任務分解為若干個小任務來並行處理。這些任務可以並行計算,彼此之間沒有依賴關係。
- Reduce 階段:對 map 階段的結果進行全局匯總。
Hadoop 序列化
為什麼要序列化?
序列化是我們通過網絡通信傳輸數據時或者把對象持久化到文件,需要把對象序列化成二進制的結構。
觀察源碼時發現自定義 Mapper 類與自定義 Reducer 類都有泛型類約束,比如自定義 Mapper 有四個泛型參數,但是都不是 Java 基本類型。
為什麼 Hadoop 要選擇建立自己的序列化格式而不使用 java 自帶 serializable?
- 序列化在分佈式程序中非常重要,在 Hadoop 中,集群中多個節點的進程間的通信是通過 RPC(遠程過程調用:RemoteProcedureCall)實現;RPC 將消息序列化成二進制流發送到遠程節點,遠程節點再將接收到的二進制數據反序列化為原始的消息,因此 RPC 往往追求如下特點:
- 數據更緊湊,能充分利用網絡帶寬資源
- 快速:序列化和反序列化的性能開銷更低
- Hadoop 使用的是自己的序列化格式 Writable,它比 java 的序列化 serialization 更緊湊速度更快。一個對象使用 Serializable 序列化後,會攜帶很多額外信息比如校驗信息,Header,繼承體系等
Java 基本類型與 Hadoop 常用序列化類型
Java 基本類型 | Hadoop Writable 類型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
基本的序列化類型往往不能滿足需求,比如我們常常需要傳遞一些自定義的 bean 對象。在 Hadoop 中為了實現自定義對象序列化需要實現 Writable 接口。
- 實現 Writable 接口
- 有無參構造函數
- 重寫序列化 write 方法和反序列化 readFields 方法。(注意序列化和反序列化的字段順序必須完全一致)
- 如果自定義 Bean 對象需要放在 Mapper 輸出 KV 中的 K 裏面,那麼該對象還需要實現 Comparable 接口,因為 MapReduce 框架中的 Shuffle 過程要求 key 必須能排序
案例實戰
需求:下面有一個水果攤老闆的一個售賣記錄,這三列分別是:水果名稱、水果重量、還有總價。我們需要統計每個水果的總重量和重價。
蘋果 3 12
李子 4 8
蘋果 2 8
桃子 4 20
香蕉 2 4
火龍果 1 4
- 配置 Hadoop 環境變量
- 導入 maven 依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop-version}</version>
</dependency>
- 編寫保存售賣記錄的實體類
@Setter
@Getter
public class FruitsRecord implements Writable {
private int weight;
private double totalPrice;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(weight);
out.writeDouble(totalPrice);
}
@Override
public void readFields(DataInput in) throws IOException {
this.weight = in.readInt();
this.totalPrice = in.readDouble();
}
@Override
public String toString() {
return "FruitsRecord{" +
"weight=" + weight +
", totalPrice=" + totalPrice +
'}';
}
}
- 編寫 Mapper 類
import com.mmc.hadoop.bean.FruitsRecord;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FruitsMapper extends Mapper<LongWritable,Text,Text, FruitsRecord> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FruitsRecord>.Context context) throws IOException, InterruptedException {
//獲取一行的數據
String line = value.toString();
String[] fields = line.split(" ");
Text outKey= new Text(fields[0]);
FruitsRecord fruitsRecord=new FruitsRecord();
fruitsRecord.setWeight(Integer.parseInt(fields[1]));
fruitsRecord.setTotalPrice(Double.parseDouble(fields[2]));
context.write(outKey,fruitsRecord);
}
}
- 編寫 Reduce 類
import com.mmc.hadoop.bean.FruitsRecord;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FruitsReducer extends Reducer<Text, FruitsRecord,Text,FruitsRecord> {
@Override
protected void reduce(Text key, Iterable<FruitsRecord> values, Reducer<Text, FruitsRecord, Text, FruitsRecord>.Context context) throws IOException, InterruptedException {
int totalWeight = 0;
double totalPrice =0;
for (FruitsRecord fruitsRecord : values){
totalWeight += fruitsRecord.getWeight();
totalPrice+= fruitsRecord.getTotalPrice();
}
FruitsRecord fruitsRecord = new FruitsRecord();
fruitsRecord.setWeight(totalWeight);
fruitsRecord.setTotalPrice(totalPrice);
context.write(key, fruitsRecord);
}
}
- 編寫 Driver 類
import com.mmc.hadoop.bean.FruitsRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FruitsDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// System.setProperty("java.library.path","d://");
Configuration conf = new Configuration();
Job job=Job.getInstance(conf,"FruitsDriver");
//指定本程序的jar包所在的路徑
job.setJarByClass(FruitsDriver.class);
//指定本業務job要使用的mapper/Reducer業務類
job.setMapperClass(FruitsMapper.class);
job.setReducerClass(FruitsReducer.class);
//指定mapper輸出數據的kv類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FruitsRecord.class);
//指定reduce輸出數據的kv類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FruitsRecord.class);
//指定job的輸入文件目錄和輸出目錄
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit( result ? 0: 1);
}
}
總結:
Mapper 裏面,Mapper 類的四個泛型分別為入參的 KV 和出參的 KV。Reduce 裏面的也有4個泛型,分別為入參的KV和出參的KV。Reduce入參的 KV 與 Mapper 裏面出參的 KV 類型是對應的。只不過 Reduce 的入參的 Value 類型是集合類型的。
時序圖如下:
運行任務
本地模式
直接在 IDEA 中運行驅動類即可。因為程序里輸入文件路徑和輸出文件路徑是取的 main 函數里的 args。所以運行的時候需要指定參數。
遇到的問題
問題 1:
問題 2:創建目錄錯誤
org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String
解決方案:
兩個問題都是 windows 的 hadoop/bin 目錄下缺少文件導致的。文件下載路徑://github.com/cdarlint/winutils
- 找到對應版本的 hadoop.dll 和 winutils.exe 下載下來放到 hadoop/bin 目錄下。
- C: windows\System32 放入 hadoop.dll 文件
- 重啟電腦
輸出目錄:
打開結果文件 part-r-00000:
李子 FruitsRecord{weight=4, totalPrice=8.0}
桃子 FruitsRecord{weight=4, totalPrice=20.0}
火龍果 FruitsRecord{weight=1, totalPrice=4.0}
蘋果 FruitsRecord{weight=5, totalPrice=20.0}
香蕉 FruitsRecord{weight=2, totalPrice=4.0}
Yarn 集群模式
- 把程序打包成 jar 包,上傳到 linux
- 將測試的 txt 上傳到 HDFS 上面
- 啟動 Hadoop 集群
- 使用 Hadoop 命令提交任務運行
hadoop jar wc.jar com.mmc.hadoop.FruitsDriver
/user/input /user/output