写点什么

大数据知识专栏 -MapReduce 自定义分区技术

用户头像
小马哥
关注
发布于: 2021 年 01 月 19 日
大数据知识专栏-MapReduce自定义分区技术

作用


实现数据分类统计


原理


分区处于 MapReduce 的 Shuffle 阶段, 目的是通过 Partitioner 的逻辑, 将相同分类的数据, 分发到同一个 ReduceTask 里面, (因为一个 ReduceTask 产生的数据都写到一个文件里面, 从而做到根据我们的逻辑分类, 将不同类别的数据写到不同文件), 做到分类


Partitioner 的逻辑: 很简单, 通过逻辑判断, 将不同数据打上分类标记, 这样同类的数据发送到同一个 ReduceTask, 设置几个分区就设置启动几个 ReduceTask, 从而产生几个结果文件;


代码


继承抽象类 org.apache.hadoop.mapreduce.Partitioner, 实现 public abstract int getPartition(KEY key, VALUE value, int numPartitions);


示例:


​ 根据文本文件中第 6 列数据将数据拆分到三个文件, 逻辑分别为 fields[5]<15,fields[5]=15,fields[5]>15


思路:


1, Mapper 读取文件, 原样输出;


2, Partitioner 接收 Mapper 的输出, 实现分区逻辑;


3, Reducer 通过 shuffle 得到 Partitioner 的分区数据, 将不同分类的数据写出到三个对应文件;


代码实现:


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
/** * Mapper逻辑: 将数据原封不动写出去 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key,value); }}
复制代码


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;
/* Mapper Out --> Key:LongWritable Value:Text */public class MyPartitioner extends Partitioner<LongWritable, Text> {
@Override public int getPartition(LongWritable key, Text value, int numPartitions) { //分区逻辑: 按照第六列的值, 小于15, 等于15, 大于15 //样例数据行如下 //2 0 1 2017-07-31 23:15:03 837256 14 4+7+3=14 大,双 0 0.00 0.00 1 0.00 4 1 String[] fields = value.toString().split("\t"); int compare = Integer.valueOf(fields[5]).compareTo(15); int flag = 0; if (compare == -1) { flag = 0; } else if (compare == 0) { flag = 1; } else { flag = 2; } return flag; }}
复制代码


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionerReducer extends Reducer<LongWritable, Text, NullWritable,Text> {
//逻辑简单, 将数据直接写出到文件 @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(NullWritable.get(),value); } }}
复制代码


import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf()); //1, 设置keyvalue的输入处理类 job.setInputFormatClass(TextInputFormat.class); //TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\patition_in")); TextInputFormat.addInputPath(job,new Path("D:\\Source\\data\\patition_in")); //2.1, 设置Mapper类 job.setMapperClass(PartitionMapper.class);
//2.2 设置Mapper的输出数据类型 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class);
//3设置分区, 4排序, 5规约, 6分组 job.setPartitionerClass(MyPartitioner.class); //7, 设置Reducer类 job.setReducerClass(PartitionerReducer.class); // 关键, 设置ReduceTask的数量, 三个区, 注意要设置3个ReduceTask, 当然, 你也可以试试不按照常理设置会有什么效果 job.setNumReduceTasks(3); //8, 设置输出处理类 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\patition_result")); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);
//启动Job等待运行结束 boolean completed = job.waitForCompletion(true); return completed?1:0; }
public static void main(String[] args) { try { int result = ToolRunner.run(new Configuration(), new MainJob(), args); System.exit(result); } catch (Exception e) { e.printStackTrace(); } }}
复制代码


发布于: 2021 年 01 月 19 日阅读数: 21
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论

发布
暂无评论
大数据知识专栏-MapReduce自定义分区技术