Map 端的 Reduce - Combiner
作用
Map 任务中会产生大量的数据, 在 shuffle 阶段, 这些数据通过网络传递到 Reduce 任务节点, Combiner 的作用即通过在 Map 任务中输出的数据先进行一次合并, 减少网络中传输的数据量.
本质
Map 端的 Reduce, 是在每一个 Mapper 端都会运行, 其实, Combiner 的父类正是 Reducer.和 Reduce 任务不同的是运行位置.
使用
1, 创建自定义 Combiner: 解决的是减少网络传输数据的目的, 那么用 reduce 的逻辑直接用作 combiner 即可
2, 在 Job 中设置 Combiner: Job.setCombinerClass(XxxReducer.class);
观察效果的手段: 计数器
案例
需求: 单词计数
样例数据: 故意让一行中数据重复, 目的, 尽量让一个 block 中有重复的数据, 这样, 测试 Combiner 会在 mapper 任务中使用自定义的逻辑合并数据.
这里 Combiner 合并数据的逻辑直接使用的我们自定义的 Reducer.
hadoop,hdfs,mapreduce,spark,hadoop,hdfs,mapreduce,sparkhive,impala,sparkSQL,kylin,hive,impala,sparkSQL,kylinflume,kafka,flume,kafka
复制代码
Mapper 逻辑: 根据分隔符","拆分数据
Reducer 逻辑: 单词计数
程序简单, 所以上面两个代码逻辑省略, 前面几篇博文中有类似, 可以翻看, 这里只介绍如何在 Job 中配置 Combiner, 如下第(3)步中: 打开或者关闭注释做测试
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), MainJob.class.getName());
//集群运行时候: 要打包 job.setJarByClass(MainJob.class); //1, 读取输入文件解析类 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\data_in_combiner")); //2, 设置Mapper类 job.setMapperClass(BaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
//3, 设置shuffle阶段的分区, 排序, 规约, 分组 // 注释掉测试无Combiner的shuffle数据量; 去掉注释测试有Combiner的shuffle数据量 //job.setCombinerClass(BaseReducer.class); //<---- 直接在map端执行reduce的逻辑作为combiner
//7, 设置Reducer类 job.setReducerClass(BaseReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
//8, 设置文件输出类以及输出地址 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\demo_out_combiner"));
boolean completion = job.waitForCompletion(true); return completion?0:1; }
public static void main(String[] args) { MainJob mainJob = new MainJob(); try { Configuration configuration = new Configuration(); configuration.set("mapreduce.framework.name","local"); configuration.set("yarn.resourcemanager.hostname","local"); int run = ToolRunner.run(configuration, mainJob, args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } }}
复制代码
试验: 不使用 Combiner 时, 计数器如下
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30 File System Counters FILE: Number of bytes read=1320 FILE: Number of bytes written=980957 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=3 Map output records=20 Map output bytes=294 Map output materialized bytes=340 Input split bytes=112 Combine input records=0 <-- Combine output records=0 <-- Reduce input groups=10 Reduce shuffle bytes=340 <-- Reduce input records=20 <-- Reduce output records=10 Spilled Records=40 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 Total committed heap usage (bytes)=382730240 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=137 File Output Format Counters Bytes Written=99
复制代码
使用 Combiner 的时候, 计数器如下
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30 File System Counters FILE: Number of bytes read=986 FILE: Number of bytes written=981340 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=3 Map output records=20 Map output bytes=294 Map output materialized bytes=173 Input split bytes=112 Combine input records=20 <-- Combine output records=10 <-- Reduce input groups=10 Reduce shuffle bytes=173 <-- Reduce input records=10 <-- Reduce output records=10 Spilled Records=20 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 Total committed heap usage (bytes)=382730240 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=137 File Output Format Counters Bytes Written=99
复制代码
对比两个 Job 执行的计数器, 如上箭头(<--)标识, shuffle 的数据量会有明显区别. 在生产中大数据量情况下, 使用 Combiner 对于性能调优还是非常有意义的.
评论