MapReduce 的基本思想
先举一个简单的例子: 打个比方我们有三个人斗地主, 要数数牌够不够, 一种最简单的方法可以找一个人数数是不是有 54 张(传统单机计算); 还可以三个人各分一摞牌数各自的(Map 阶段), 三个人的总数加起来汇总(Reduce 阶段).
所以 MapReduce 的思想即: "分治"+"汇总". 大数据量下, 一台机器处理不了的数据, 就用多台机器, 以分布式集群的形式来处理.
关于 Map 与 Reduce 有很多文章将这两个词直译为映射和规约, 其实 Map 的思想就是各自负责一块实行分治, Reduce 的思想即: 将分治的结果汇总. 干嘛翻译的这么生硬呢(故意让人觉得大数据很神秘么?)
MapReduce 的编程入门
还是很简单的模式: 包含 8 个步骤
我们那最简单的单词计数来举例(号称大数据的 HelloWorld), 先让大家跑起来看看现象再说.
按照 MapReduce 思想有两个主要步骤, Mapper 与 Reducer, 剩余的东西 Hadoop 都帮助我们实现了, 先入门实践再了解原理;
MapReducer 有两种运行模式: 1,集群模式(生产环境);2,本地模式(试验学习)
前提:
1, 下载一个 Hadoop 的安装包, 放到本地, 并配置到环境变量里面;
2, 下载一个 hadoop.dll 放到 hadoop 的 bin 目录下
创建 Maven 工程, 导入依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.10.1</version> </dependency>
复制代码
数据文件 D:\\Source\\data\\demo_result1\\xx.txt
hello,world,hadoophive,sqoop,flume,hellokitty,tom,jerry,worldhadoop
复制代码
开始编写代码
第一步, 创建 Mapper 类
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BaseMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(","); Text keyout = new Text(); LongWritable valueout = new LongWritable(1); for (String word : words) { keyout.set(word); context.write(keyout, valueout); } }}
复制代码
第二步, 创建 Reducer 类
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class BaseReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int x = 0; for (LongWritable value : values) { x += value.get(); } context.write(key, new LongWritable(x)); }}
复制代码
第三步, 创建 Job 启动类
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), MainJob.class.getName());
//集群运行时候: 要打包 job.setJarByClass(MainJob.class); //1, 读取输入文件解析类 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\data_in")); //2, 设置Mapper类 job.setMapperClass(BaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
//3, 设置shuffle阶段的分区, 排序, 规约, 分组
//7, 设置Reducer类 job.setReducerClass(BaseReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
//8, 设置文件输出类以及输出地址 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\demo_result1")); //启动MapReduceJob boolean completion = job.waitForCompletion(true); return completion?0:1; }
public static void main(String[] args) { MainJob mainJob = new MainJob(); try { Configuration configuration = new Configuration(); configuration.set("mapreduce.framework.name","local"); configuration.set("yarn.resourcemanager.hostname","local"); int run = ToolRunner.run(configuration, mainJob, args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } }}
复制代码
评论