写点什么

大数据知识专栏 -MapReduce 自定义计数器技术

用户头像
小马哥
关注
发布于: 2021 年 01 月 20 日
大数据知识专栏 -MapReduce 自定义计数器技术

MapReduce 计数器

作用

1, 观察 MapReduce 的运行过程, 相当于日志;

2, 用于根据计数器的数据指标进行优化作业;

3, 辅助诊断系统故障等.

使用

1, MapReduce 内置计数器

给我们提供了一部分计数器, 如下:

2, 自定义计数器


如果内置计数器不满足我们统计需求, 还可以灵活的创建自定义技术器. 获取计数器的方式有两种, 通过 Context 获取或者枚举的方式创建, 都非常简单易操作, 使用任意一种都可以.

方式 1: context.getCounter(分类, 名称)


Mapper 类代码:


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
/** * Mapper逻辑: 将数据原封不动写出去 * 目的: 测试计数器创建于使用 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1,创建自定义的计数器 Counter mapperCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Mapper"); //2, 对计数器进行累加计数: 这里的逻辑是执行一次map函数即计数一次 mapperCounter.increment(1); context.write(key,value); }}
复制代码


Reducer 类代码:


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionerReducer extends Reducer<LongWritable, Text, NullWritable,Text> { /** * Reducer的逻辑: 直接写出Map传递过来的数据 * 目的: 测试计数器的使用 */ @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //1, 创建自定义的计数器 Counter reducerCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Reducer"); for (Text value : values) { //2, 对计数器进行累加: 计算逻辑 - 有一个value就计数1次 reducerCounter.increment(1); context.write(NullWritable.get(),value); } }}
复制代码


Job 启动代码:


import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf()); //1, 设置keyvalue的输入处理类 job.setInputFormatClass(TextInputFormat.class);// TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\patition_in")); TextInputFormat.addInputPath(job,new Path("D:\\Source\\data\\patition_in")); //2.1, 设置Mapper类 job.setMapperClass(PartitionMapper.class);
//2.2 设置Mapper的输出数据类型 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class);
//3设置分区, 4排序, 5规约, 6分组 job.setPartitionerClass(MyPartitioner.class); //7, 设置Reducer类 job.setReducerClass(PartitionerReducer.class); job.setNumReduceTasks(3); //8, 设置输出处理类 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\patition_result")); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);
//启动Job等待运行结束 boolean completed = job.waitForCompletion(true); return completed?1:0; }
public static void main(String[] args) { try { int result = ToolRunner.run(new Configuration(), new MainJob(), args); System.exit(result); } catch (Exception e) { e.printStackTrace(); } }}
复制代码



方式 2: 通过 enum 枚举类型来定义计数器

代码示例: Mapper 与 MainJob 与上面代码一样, 这里只修改了 Reducer 端的计数器作为与方式 1 的对比


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionerReducer extends Reducer<LongWritable, Text, NullWritable, Text> { //1, 通过枚举来定义计数器 , public enum MyCounterGroup { //第一统计项, 可以有任意个, 这里用一个作为示例. MY_ENUM_COUNTER_REDUCER }
/** * Reducer的逻辑: 直接写出Map传递过来的数据 * 目的: 测试计数器的使用 */ @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // Counter reducerCounter = context.getCounter("MyGroup", "MyGroup_SelfDefinitedCounter_Reducer");
for (Text value : values) { //2, 对计数器进行累加: 计算逻辑 - 有一个value就计数1次 //reducerCounter.increment(1); context.getCounter(MyCounterGroup.MY_ENUM_COUNTER_REDUCER).increment(1); context.write(NullWritable.get(), value); } }}
复制代码



发布于: 2021 年 01 月 20 日阅读数: 69
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论

发布
暂无评论
大数据知识专栏 -MapReduce 自定义计数器技术