大数据知识专栏 -MapReduce 自定义计数器技术
发布于: 2021 年 01 月 20 日

MapReduce 计数器
作用
1, 观察 MapReduce 的运行过程, 相当于日志;
2, 用于根据计数器的数据指标进行优化作业;
3, 辅助诊断系统故障等.
使用
1, MapReduce 内置计数器
给我们提供了一部分计数器, 如下:
 
 2, 自定义计数器
如果内置计数器不满足我们统计需求, 还可以灵活的创建自定义技术器. 获取计数器的方式有两种, 通过 Context 获取或者枚举的方式创建, 都非常简单易操作, 使用任意一种都可以.
方式 1: context.getCounter(分类, 名称)
Mapper 类代码:
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
    /**     * Mapper逻辑: 将数据原封不动写出去     * 目的: 测试计数器创建于使用     */    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //1,创建自定义的计数器        Counter mapperCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Mapper");        //2, 对计数器进行累加计数: 这里的逻辑是执行一次map函数即计数一次        mapperCounter.increment(1);        context.write(key,value);    }}复制代码
 Reducer 类代码:
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionerReducer extends Reducer<LongWritable, Text, NullWritable,Text> {    /**     * Reducer的逻辑: 直接写出Map传递过来的数据     *  目的: 测试计数器的使用     */    @Override    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {        //1, 创建自定义的计数器        Counter reducerCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Reducer");        for (Text value : values) {            //2, 对计数器进行累加: 计算逻辑 - 有一个value就计数1次            reducerCounter.increment(1);            context.write(NullWritable.get(),value);        }    }}复制代码
 Job 启动代码:
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        Job job = Job.getInstance(super.getConf());        //1, 设置keyvalue的输入处理类        job.setInputFormatClass(TextInputFormat.class);//        TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\patition_in"));        TextInputFormat.addInputPath(job,new Path("D:\\Source\\data\\patition_in"));        //2.1, 设置Mapper类        job.setMapperClass(PartitionMapper.class);
        //2.2 设置Mapper的输出数据类型        job.setMapOutputKeyClass(LongWritable.class);        job.setMapOutputValueClass(Text.class);
        //3设置分区, 4排序, 5规约, 6分组        job.setPartitionerClass(MyPartitioner.class);        //7, 设置Reducer类        job.setReducerClass(PartitionerReducer.class);        job.setNumReduceTasks(3);        //8, 设置输出处理类        job.setOutputFormatClass(TextOutputFormat.class);        TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\patition_result"));        job.setOutputKeyClass(NullWritable.class);        job.setOutputValueClass(Text.class);
        //启动Job等待运行结束        boolean completed = job.waitForCompletion(true);        return completed?1:0;    }
    public static void main(String[] args) {        try {            int result = ToolRunner.run(new Configuration(), new MainJob(), args);            System.exit(result);        } catch (Exception e) {            e.printStackTrace();        }    }}复制代码
  
 方式 2: 通过 enum 枚举类型来定义计数器
代码示例: Mapper 与 MainJob 与上面代码一样, 这里只修改了 Reducer 端的计数器作为与方式 1 的对比
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionerReducer extends Reducer<LongWritable, Text, NullWritable, Text> {    //1, 通过枚举来定义计数器 ,    public enum MyCounterGroup {        //第一统计项, 可以有任意个, 这里用一个作为示例.        MY_ENUM_COUNTER_REDUCER    }
    /**     * Reducer的逻辑: 直接写出Map传递过来的数据     * 目的: 测试计数器的使用     */    @Override    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {        //   Counter reducerCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Reducer");
        for (Text value : values) {            //2, 对计数器进行累加: 计算逻辑 - 有一个value就计数1次            //reducerCounter.increment(1);            context.getCounter(MyCounterGroup.MY_ENUM_COUNTER_REDUCER).increment(1);            context.write(NullWritable.get(), value);        }    }}复制代码
  
 划线
评论
复制
发布于: 2021 年 01 月 20 日阅读数: 69
版权声明: 本文为 InfoQ 作者【小马哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/b46def7a1937162b6e67ef868】。文章转载请联系作者。


小马哥
关注
自强不息,厚德载物 2018.12.22 加入
像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!











 
    
评论