MapReduce 练习案例 3 - 自定义分区
发布于: 2021 年 01 月 26 日

案例 3: 手机号码分区
3.1 需求
在案例一的基础上,继续完善,将不同的手机号分到不同的数据文件的当中去,需要自定义分区来实现,这里我们自定义来模拟分区,将以下数字开头的手机号进行分开
135 开头数据到一个分区文件
136 开头数据到一个分区文件
137 开头数据到一个分区文件
其他分区
3.2 思路
1, 在案例 1 的基础上, 添加分区;
2, 分区实现: 使用 MapReduce 的自定义分区技术, 实现 Partitioner 逻辑;
3, 在 Job 启动类中设置 job 的分区类.
3.3 代码
案例 1 中代码基础上(代码略, 请翻看前面博文), 添加自定义 Paritioner 类
Partitioner 类
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;
public class PhoneNumPartitioner extends Partitioner<Text, Text> { /** * 分区逻辑 * 135 开头数据到一个分区文件 * 136 开头数据到一个分区文件 * 137 开头数据到一个分区文件 * 其他分区 * * @param key : Mapper任务的输出key * @param value: : Mapper任务的输出value * @param numPartitions: 分几个区, 就需要有几个ReduceTask, 这里设置分区个数 * @return */ @Override public int getPartition(Text key, Text value, int numPartitions) { String phoneNum = key.toString(); if (phoneNum.startsWith("135")) { return 0; } else if (phoneNum.startsWith("136")) { return 1; } else if (phoneNum.startsWith("137")) { return 2; } else { return 3; } }}复制代码
Job 启动类
关键两行代码: 星号位置
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception {
//1,创建一个Job类 Job job = Job.getInstance(super.getConf(), "Example1_job");
//2, 设置输入类,输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1"));
//3, 设置Mapper类, map输出类型 job.setMapperClass(Example1Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //** 设置分区类 ** job.setPartitionerClass(PhoneNumPartitioner.class);
//4, 设置Reducer类, reduce输出类型 job.setReducerClass(Example1Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class);
//** 设置ReduceTask的个数 ** job.setNumReduceTasks(4);
//5, 设置输出类, 输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1_partitioner_result"));
//6, 启动Job, 等待Job执行 boolean completion = job.waitForCompletion(true); return completion?1:0; }
public static void main(String[] args) { int run = 0; try { run = ToolRunner.run(new Configuration(), new MainJob(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(run); }}复制代码
输出结果
计数器显示
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30 File System Counters FILE: Number of bytes read=45014 FILE: Number of bytes written=2461514 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=23 Map output records=23 Map output bytes=2830 Map output materialized bytes=2900 Input split bytes=112 Combine input records=0 Combine output records=0 Reduce input groups=21 Reduce shuffle bytes=2900 Reduce input records=23 Reduce output records=21 Spilled Records=46 Shuffled Maps =4 Failed Shuffles=0 Merged Map outputs=4 GC time elapsed (ms)=9 Total committed heap usage (bytes)=956825600 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=2583 File Output Format Counters Bytes Written=604
Process finished with exit code 1
复制代码
结果文件显示
2021/01/26 17:58 59 part-r-000002021/01/26 17:58 85 part-r-000012021/01/26 17:58 75 part-r-000022021/01/26 17:58 337 part-r-000032021/01/26 17:58 0 _SUCCESS复制代码
打开其中一个文件
13480253104 3 3 180 18013823070001 6 3 360 18013826544101 4 0 264 013922314466 12 12 3008 372013925057413 69 63 11058 4824313926251106 4 0 240 0......复制代码
划线
评论
复制
发布于: 2021 年 01 月 26 日阅读数: 20
版权声明: 本文为 InfoQ 作者【小马哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/0e1e9e9170e950f3dbcc7d008】。文章转载请联系作者。
小马哥
关注
自强不息,厚德载物 2018.12.22 加入
像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!











评论