🍊本文使用了4个经典案例进行MapReduce实战
🍊参考官方源码,代码风格较为优雅
🍊解析详细
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户写的业务逻辑代码和自身默认代码整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
其整体架构逻辑如下
Map | 读取数据,进行简单数据整理 |
Shuffle | 整合Map的数据 |
Reduce | 计算处理Shuffle中的数据 |
统计文件中每个单词出现的个数。左侧为原始数据,右侧为输出数据。
WordCount统计单词个数是最基础的题目,我们除了要完成题目要求之外,代码尽量更加的优雅,因此我们主要参考的是Hadoop官方提供的WordCount案例
数据的走下如下
Mapper中需要注意的是 Mapper
package com.bcn.mapreduce.wordcount;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** 输入数据为<单词偏移量,单词>* 输出数据为<单词,出现次数>*/
public class WordCountMapper extends Mapper {// Just craete one Text and IntWritable object to reduce waste of resourcesText outK = new Text();IntWritable outV = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// Get one lineString line = value.toString();System.out.println(line);System.out.println(key);// split the word by spaceString[] words = line.split(" ");// outputfor (String word : words) {outK.set(word);context.write(outK, outV);}}
}
我们关注的还是数据的走向 Reducer
这里可能很多人为疑惑为什么输入的是
package com.bcn.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordCountReducer extends Reducer {int sum;IntWritable outV=new IntWritable();@Overrideprotected void reduce(Text key,Iterable values, Context context)throws IOException, InterruptedException {// Sum upsum =0;// The data for example apple,(1,1,1)for (IntWritable count:values){sum += count.get();}//OutputoutV.set(sum);context.write(key,outV);}
}
最后我们设置启动类,也就是Main函数,在其中会配置7套件,这样就可以运行整个MapReduce程序了
package com.bcn.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 1.Get the config and jobConfiguration conf = new Configuration();Job job = Job.getInstance(conf);// 2.Connect Driver with jarjob.setJarByClass(WordCountDriver.class);// 3.Connect with Mapper、Reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4.Set the class of Mapper outputjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5.Set the class of final outputjob.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6.Set the input and output pathFileInputFormat.setInputPaths(job, new Path("E:\\Hadoop and Spark\\data\\word.txt"));FileOutputFormat.setOutputPath(job, new Path("E:\\Hadoop and Spark\\output\\wordCount"));// 7.Submit the jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}
4.1 题目
4.2 解析
4.3 Entity
4.4 Mapper
4.5 Reducer
4.6 Driver
5.1 题目
5.2 解析
5.3 Entity
5.4 Mapper
5.5 Reducer
5.6 Driver