大数据知识专栏 -MapReduce 自定义计数器技术
发布于: 2021 年 01 月 20 日
MapReduce 计数器
作用
1, 观察 MapReduce 的运行过程, 相当于日志;
2, 用于根据计数器的数据指标进行优化作业;
3, 辅助诊断系统故障等.
使用
1, MapReduce 内置计数器
给我们提供了一部分计数器, 如下:
2, 自定义计数器
如果内置计数器不满足我们统计需求, 还可以灵活的创建自定义技术器. 获取计数器的方式有两种, 通过 Context 获取或者枚举的方式创建, 都非常简单易操作, 使用任意一种都可以.
方式 1: context.getCounter(分类, 名称)
Mapper 类代码:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
/**
* Mapper逻辑: 将数据原封不动写出去
* 目的: 测试计数器创建于使用
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1,创建自定义的计数器
Counter mapperCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Mapper");
//2, 对计数器进行累加计数: 这里的逻辑是执行一次map函数即计数一次
mapperCounter.increment(1);
context.write(key,value);
}
}
复制代码
Reducer 类代码:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionerReducer extends Reducer<LongWritable, Text, NullWritable,Text> {
/**
* Reducer的逻辑: 直接写出Map传递过来的数据
* 目的: 测试计数器的使用
*/
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//1, 创建自定义的计数器
Counter reducerCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Reducer");
for (Text value : values) {
//2, 对计数器进行累加: 计算逻辑 - 有一个value就计数1次
reducerCounter.increment(1);
context.write(NullWritable.get(),value);
}
}
}
复制代码
Job 启动代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf());
//1, 设置keyvalue的输入处理类
job.setInputFormatClass(TextInputFormat.class);
// TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\patition_in"));
TextInputFormat.addInputPath(job,new Path("D:\\Source\\data\\patition_in"));
//2.1, 设置Mapper类
job.setMapperClass(PartitionMapper.class);
//2.2 设置Mapper的输出数据类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//3设置分区, 4排序, 5规约, 6分组
job.setPartitionerClass(MyPartitioner.class);
//7, 设置Reducer类
job.setReducerClass(PartitionerReducer.class);
job.setNumReduceTasks(3);
//8, 设置输出处理类
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\patition_result"));
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//启动Job等待运行结束
boolean completed = job.waitForCompletion(true);
return completed?1:0;
}
public static void main(String[] args) {
try {
int result = ToolRunner.run(new Configuration(), new MainJob(), args);
System.exit(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
方式 2: 通过 enum 枚举类型来定义计数器
代码示例: Mapper 与 MainJob 与上面代码一样, 这里只修改了 Reducer 端的计数器作为与方式 1 的对比
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionerReducer extends Reducer<LongWritable, Text, NullWritable, Text> {
//1, 通过枚举来定义计数器 ,
public enum MyCounterGroup {
//第一统计项, 可以有任意个, 这里用一个作为示例.
MY_ENUM_COUNTER_REDUCER
}
/**
* Reducer的逻辑: 直接写出Map传递过来的数据
* 目的: 测试计数器的使用
*/
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// Counter reducerCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Reducer");
for (Text value : values) {
//2, 对计数器进行累加: 计算逻辑 - 有一个value就计数1次
//reducerCounter.increment(1);
context.getCounter(MyCounterGroup.MY_ENUM_COUNTER_REDUCER).increment(1);
context.write(NullWritable.get(), value);
}
}
}
复制代码
划线
评论
复制
发布于: 2021 年 01 月 20 日阅读数: 69
版权声明: 本文为 InfoQ 作者【小马哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/b46def7a1937162b6e67ef868】。文章转载请联系作者。
小马哥
关注
自强不息,厚德载物 2018.12.22 加入
像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!
评论