MapReduce 的基本思想
先举一个简单的例子: 打个比方我们有三个人斗地主, 要数数牌够不够, 一种最简单的方法可以找一个人数数是不是有 54 张(传统单机计算); 还可以三个人各分一摞牌数各自的(Map 阶段), 三个人的总数加起来汇总(Reduce 阶段).
所以 MapReduce 的思想即: "分治"+"汇总". 大数据量下, 一台机器处理不了的数据, 就用多台机器, 以分布式集群的形式来处理.
关于 Map 与 Reduce 有很多文章将这两个词直译为映射和规约, 其实 Map 的思想就是各自负责一块实行分治,  Reduce 的思想即: 将分治的结果汇总. 干嘛翻译的这么生硬呢(故意让人觉得大数据很神秘么?)
MapReduce 的编程入门
还是很简单的模式: 包含 8 个步骤
我们那最简单的单词计数来举例(号称大数据的 HelloWorld), 先让大家跑起来看看现象再说.
按照 MapReduce 思想有两个主要步骤, Mapper 与 Reducer, 剩余的东西 Hadoop 都帮助我们实现了, 先入门实践再了解原理;
MapReducer 有两种运行模式: 1,集群模式(生产环境);2,本地模式(试验学习)
前提: 
1, 下载一个 Hadoop 的安装包, 放到本地, 并配置到环境变量里面;
2, 下载一个 hadoop.dll 放到 hadoop 的 bin 目录下
创建 Maven 工程, 导入依赖
 	  <dependency>      <groupId>org.apache.hadoop</groupId>      <artifactId>hadoop-common</artifactId>      <version>2.10.1</version>    </dependency>    <dependency>      <groupId>org.apache.hadoop</groupId>      <artifactId>hadoop-client</artifactId>      <version>2.10.1</version>    </dependency>    <dependency>      <groupId>org.apache.hadoop</groupId>      <artifactId>hadoop-hdfs</artifactId>      <version>2.10.1</version>    </dependency>    <dependency>      <groupId>org.apache.hadoop</groupId>      <artifactId>hadoop-mapreduce-client-core</artifactId>      <version>2.10.1</version>    </dependency>
   复制代码
 数据文件 D:\\Source\\data\\demo_result1\\xx.txt
 hello,world,hadoophive,sqoop,flume,hellokitty,tom,jerry,worldhadoop
   复制代码
 
开始编写代码
第一步, 创建 Mapper 类
 import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BaseMapper extends Mapper<LongWritable, Text, Text, LongWritable> {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String[] words = value.toString().split(",");        Text keyout = new Text();        LongWritable valueout = new LongWritable(1);        for (String word : words) {            keyout.set(word);            context.write(keyout, valueout);        }    }}
   复制代码
 
第二步, 创建 Reducer 类
 import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class BaseReducer extends Reducer<Text, LongWritable, Text, LongWritable> {    @Override    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {        int x = 0;        for (LongWritable value : values) {            x += value.get();        }        context.write(key, new LongWritable(x));    }}
   复制代码
 
第三步, 创建 Job 启动类
 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
    @Override    public int run(String[] strings) throws Exception {        Job job = Job.getInstance(super.getConf(), MainJob.class.getName());
        //集群运行时候: 要打包        job.setJarByClass(MainJob.class);        //1, 读取输入文件解析类        job.setInputFormatClass(TextInputFormat.class);        TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\data_in"));        //2, 设置Mapper类        job.setMapperClass(BaseMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);
        //3, 设置shuffle阶段的分区, 排序, 规约, 分组
        //7, 设置Reducer类        job.setReducerClass(BaseReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);
        //8, 设置文件输出类以及输出地址        job.setOutputFormatClass(TextOutputFormat.class);        TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\demo_result1"));				      	//启动MapReduceJob        boolean completion = job.waitForCompletion(true);        return completion?0:1;    }
    public static void main(String[] args) {        MainJob mainJob = new MainJob();        try {            Configuration configuration = new Configuration();            configuration.set("mapreduce.framework.name","local");            configuration.set("yarn.resourcemanager.hostname","local");            int run = ToolRunner.run(configuration, mainJob, args);            System.exit(run);        } catch (Exception e) {            e.printStackTrace();        }    }}
   复制代码
 
评论