作用
实现数据分类统计
原理
分区处于 MapReduce 的 Shuffle 阶段, 目的是通过 Partitioner 的逻辑, 将相同分类的数据, 分发到同一个 ReduceTask 里面, (因为一个 ReduceTask 产生的数据都写到一个文件里面, 从而做到根据我们的逻辑分类, 将不同类别的数据写到不同文件), 做到分类
Partitioner 的逻辑: 很简单, 通过逻辑判断, 将不同数据打上分类标记, 这样同类的数据发送到同一个 ReduceTask, 设置几个分区就设置启动几个 ReduceTask, 从而产生几个结果文件;
代码
继承抽象类 org.apache.hadoop.mapreduce.Partitioner, 实现 public abstract int getPartition(KEY key, VALUE value, int numPartitions);
示例:
根据文本文件中第 6 列数据将数据拆分到三个文件, 逻辑分别为 fields[5]<15,fields[5]=15,fields[5]>15
思路:
1, Mapper 读取文件, 原样输出;
2, Partitioner 接收 Mapper 的输出, 实现分区逻辑;
3, Reducer 通过 shuffle 得到 Partitioner 的分区数据, 将不同分类的数据写出到三个对应文件;
代码实现:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
/**
* Mapper逻辑: 将数据原封不动写出去
*
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
复制代码
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/*
Mapper Out --> Key:LongWritable Value:Text
*/
public class MyPartitioner extends Partitioner<LongWritable, Text> {
@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
//分区逻辑: 按照第六列的值, 小于15, 等于15, 大于15
//样例数据行如下
//2 0 1 2017-07-31 23:15:03 837256 14 4+7+3=14 大,双 0 0.00 0.00 1 0.00 4 1
String[] fields = value.toString().split("\t");
int compare = Integer.valueOf(fields[5]).compareTo(15);
int flag = 0;
if (compare == -1) {
flag = 0;
} else if (compare == 0) {
flag = 1;
} else {
flag = 2;
}
return flag;
}
}
复制代码
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionerReducer extends Reducer<LongWritable, Text, NullWritable,Text> {
//逻辑简单, 将数据直接写出到文件
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(NullWritable.get(),value);
}
}
}
复制代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf());
//1, 设置keyvalue的输入处理类
job.setInputFormatClass(TextInputFormat.class);
//TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\patition_in"));
TextInputFormat.addInputPath(job,new Path("D:\\Source\\data\\patition_in"));
//2.1, 设置Mapper类
job.setMapperClass(PartitionMapper.class);
//2.2 设置Mapper的输出数据类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//3设置分区, 4排序, 5规约, 6分组
job.setPartitionerClass(MyPartitioner.class);
//7, 设置Reducer类
job.setReducerClass(PartitionerReducer.class);
// 关键, 设置ReduceTask的数量, 三个区, 注意要设置3个ReduceTask, 当然, 你也可以试试不按照常理设置会有什么效果
job.setNumReduceTasks(3);
//8, 设置输出处理类
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\patition_result"));
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//启动Job等待运行结束
boolean completed = job.waitForCompletion(true);
return completed?1:0;
}
public static void main(String[] args) {
try {
int result = ToolRunner.run(new Configuration(), new MainJob(), args);
System.exit(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
评论