Map 端的 Reduce - Combiner
作用
Map 任务中会产生大量的数据, 在 shuffle 阶段, 这些数据通过网络传递到 Reduce 任务节点, Combiner 的作用即通过在 Map 任务中输出的数据先进行一次合并, 减少网络中传输的数据量.
本质
Map 端的 Reduce, 是在每一个 Mapper 端都会运行, 其实, Combiner 的父类正是 Reducer.和 Reduce 任务不同的是运行位置.
使用
1, 创建自定义 Combiner: 解决的是减少网络传输数据的目的, 那么用 reduce 的逻辑直接用作 combiner 即可
2, 在 Job 中设置 Combiner: Job.setCombinerClass(XxxReducer.class);
观察效果的手段: 计数器
案例
需求: 单词计数
样例数据: 故意让一行中数据重复, 目的, 尽量让一个 block 中有重复的数据, 这样, 测试 Combiner 会在 mapper 任务中使用自定义的逻辑合并数据.
这里 Combiner 合并数据的逻辑直接使用的我们自定义的 Reducer.
hadoop,hdfs,mapreduce,spark,hadoop,hdfs,mapreduce,spark
hive,impala,sparkSQL,kylin,hive,impala,sparkSQL,kylin
flume,kafka,flume,kafka
复制代码
Mapper 逻辑: 根据分隔符","拆分数据
Reducer 逻辑: 单词计数
程序简单, 所以上面两个代码逻辑省略, 前面几篇博文中有类似, 可以翻看, 这里只介绍如何在 Job 中配置 Combiner, 如下第(3)步中: 打开或者关闭注释做测试
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), MainJob.class.getName());
//集群运行时候: 要打包
job.setJarByClass(MainJob.class);
//1, 读取输入文件解析类
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\data_in_combiner"));
//2, 设置Mapper类
job.setMapperClass(BaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//3, 设置shuffle阶段的分区, 排序, 规约, 分组
// 注释掉测试无Combiner的shuffle数据量; 去掉注释测试有Combiner的shuffle数据量
//job.setCombinerClass(BaseReducer.class); //<---- 直接在map端执行reduce的逻辑作为combiner
//7, 设置Reducer类
job.setReducerClass(BaseReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8, 设置文件输出类以及输出地址
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\demo_out_combiner"));
boolean completion = job.waitForCompletion(true);
return completion?0:1;
}
public static void main(String[] args) {
MainJob mainJob = new MainJob();
try {
Configuration configuration = new Configuration();
configuration.set("mapreduce.framework.name","local");
configuration.set("yarn.resourcemanager.hostname","local");
int run = ToolRunner.run(configuration, mainJob, args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
试验: 不使用 Combiner 时, 计数器如下
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
File System Counters
FILE: Number of bytes read=1320
FILE: Number of bytes written=980957
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=3
Map output records=20
Map output bytes=294
Map output materialized bytes=340
Input split bytes=112
Combine input records=0 <--
Combine output records=0 <--
Reduce input groups=10
Reduce shuffle bytes=340 <--
Reduce input records=20 <--
Reduce output records=10
Spilled Records=40
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=382730240
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=137
File Output Format Counters
Bytes Written=99
复制代码
使用 Combiner 的时候, 计数器如下
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
File System Counters
FILE: Number of bytes read=986
FILE: Number of bytes written=981340
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=3
Map output records=20
Map output bytes=294
Map output materialized bytes=173
Input split bytes=112
Combine input records=20 <--
Combine output records=10 <--
Reduce input groups=10
Reduce shuffle bytes=173 <--
Reduce input records=10 <--
Reduce output records=10
Spilled Records=20
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=382730240
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=137
File Output Format Counters
Bytes Written=99
复制代码
对比两个 Job 执行的计数器, 如上箭头(<--)标识, shuffle 的数据量会有明显区别. 在生产中大数据量情况下, 使用 Combiner 对于性能调优还是非常有意义的.
评论