写点什么

大数据知识专栏 - MapReduce 的 Combiner 实现 shuffle 调优

用户头像
小马哥
关注
发布于: 2021 年 01 月 20 日
大数据知识专栏 - MapReduce的Combiner实现shuffle调优

Map 端的 Reduce - Combiner

作用

​ Map 任务中会产生大量的数据, 在 shuffle 阶段, 这些数据通过网络传递到 Reduce 任务节点, Combiner 的作用即通过在 Map 任务中输出的数据先进行一次合并, 减少网络中传输的数据量.

本质

​ Map 端的 Reduce, 是在每一个 Mapper 端都会运行, 其实, Combiner 的父类正是 Reducer.和 Reduce 任务不同的是运行位置.

使用

​ 1, 创建自定义 Combiner: 解决的是减少网络传输数据的目的, 那么用 reduce 的逻辑直接用作 combiner 即可

​ 2, 在 Job 中设置 Combiner: Job.setCombinerClass(XxxReducer.class);


观察效果的手段: 计数器


案例

需求: 单词计数

样例数据: 故意让一行中数据重复, 目的, 尽量让一个 block 中有重复的数据, 这样, 测试 Combiner 会在 mapper 任务中使用自定义的逻辑合并数据.


​ 这里 Combiner 合并数据的逻辑直接使用的我们自定义的 Reducer.


hadoop,hdfs,mapreduce,spark,hadoop,hdfs,mapreduce,sparkhive,impala,sparkSQL,kylin,hive,impala,sparkSQL,kylinflume,kafka,flume,kafka
复制代码


Mapper 逻辑: 根据分隔符","拆分数据

Reducer 逻辑: 单词计数

程序简单, 所以上面两个代码逻辑省略, 前面几篇博文中有类似, 可以翻看, 这里只介绍如何在 Job 中配置 Combiner, 如下第(3)步中: 打开或者关闭注释做测试


import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), MainJob.class.getName());
//集群运行时候: 要打包 job.setJarByClass(MainJob.class); //1, 读取输入文件解析类 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\data_in_combiner")); //2, 设置Mapper类 job.setMapperClass(BaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
//3, 设置shuffle阶段的分区, 排序, 规约, 分组 // 注释掉测试无Combiner的shuffle数据量; 去掉注释测试有Combiner的shuffle数据量 //job.setCombinerClass(BaseReducer.class); //<---- 直接在map端执行reduce的逻辑作为combiner
//7, 设置Reducer类 job.setReducerClass(BaseReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
//8, 设置文件输出类以及输出地址 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\demo_out_combiner"));
boolean completion = job.waitForCompletion(true); return completion?0:1; }
public static void main(String[] args) { MainJob mainJob = new MainJob(); try { Configuration configuration = new Configuration(); configuration.set("mapreduce.framework.name","local"); configuration.set("yarn.resourcemanager.hostname","local"); int run = ToolRunner.run(configuration, mainJob, args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } }}
复制代码

试验: 不使用 Combiner 时, 计数器如下

[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30	File System Counters		FILE: Number of bytes read=1320		FILE: Number of bytes written=980957		FILE: Number of read operations=0		FILE: Number of large read operations=0		FILE: Number of write operations=0	Map-Reduce Framework		Map input records=3		Map output records=20		Map output bytes=294		Map output materialized bytes=340		Input split bytes=112		Combine input records=0		<--		Combine output records=0	<--		Reduce input groups=10				Reduce shuffle bytes=340	<--		Reduce input records=20		<--		Reduce output records=10		Spilled Records=40		Shuffled Maps =1		Failed Shuffles=0		Merged Map outputs=1		GC time elapsed (ms)=0		Total committed heap usage (bytes)=382730240	Shuffle Errors		BAD_ID=0		CONNECTION=0		IO_ERROR=0		WRONG_LENGTH=0		WRONG_MAP=0		WRONG_REDUCE=0	File Input Format Counters 		Bytes Read=137	File Output Format Counters 		Bytes Written=99
复制代码


使用 Combiner 的时候, 计数器如下

[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30	File System Counters		FILE: Number of bytes read=986		FILE: Number of bytes written=981340		FILE: Number of read operations=0		FILE: Number of large read operations=0		FILE: Number of write operations=0	Map-Reduce Framework		Map input records=3		Map output records=20		Map output bytes=294		Map output materialized bytes=173		Input split bytes=112		Combine input records=20	<--		Combine output records=10	<--		Reduce input groups=10		Reduce shuffle bytes=173	<--		Reduce input records=10		<--		Reduce output records=10		Spilled Records=20				Shuffled Maps =1		Failed Shuffles=0		Merged Map outputs=1		GC time elapsed (ms)=0		Total committed heap usage (bytes)=382730240	Shuffle Errors		BAD_ID=0		CONNECTION=0		IO_ERROR=0		WRONG_LENGTH=0		WRONG_MAP=0		WRONG_REDUCE=0	File Input Format Counters 		Bytes Read=137	File Output Format Counters 		Bytes Written=99
复制代码


对比两个 Job 执行的计数器, 如上箭头(<--)标识, shuffle 的数据量会有明显区别. 在生产中大数据量情况下, 使用 Combiner 对于性能调优还是非常有意义的.


发布于: 2021 年 01 月 20 日阅读数: 19
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论

发布
暂无评论
大数据知识专栏 - MapReduce的Combiner实现shuffle调优