写点什么

大数据知识专栏 -MapReduce 自定义排序技术

用户头像
小马哥
关注
发布于: 2021 年 01 月 20 日
大数据知识专栏 -MapReduce 自定义排序技术

作用

将对象持久化或者在网络中传输, 需要将其转换成字节流, 这就是序列化; 反之, 从磁盘或者网络中将字节流读取转换成为对象, 这就是反序列化;

​ hadoop 开发了自己的序列化和反序列化, 为什么没有使用 Java 已经实现的解决方式呢?

​ 因为, Java 将对象进行序列化方式相对重量级, 一个对象序列化之后会附带很多额外信息,例如,校验信息, 继承关系等, 不便于网络传输. 于是 hadoop 重新实现了一套解决方案, 精简高效, 需要哪个属性就传递哪个属性, 大大减少了网络开销.

实现方案


Writable 接口是 Hadoop 的序列化接口, 一个类需要实现序列化, 只需要实现这个接口

WritableComparable 是 Writable 的子接口, 在实现序列化的同时, 也实现了对象的比较, 可以用于对象数据的排序

案例实战

需求

​ 对学生成绩进行由大到小排序, 如果成绩相同的学生, 按照名字进行字母正序排序

数据样例

name 制表符 score

Mark	100Tom	100Lucy	88Lily	90HaiMeimei	100LiLei	98MaYun	59MaHuateng	60
复制代码

思路

1.1, 自定义 Student 类, 实现 WritableComparable 接口;

1.2, 接口的比较方法 compareTo()方法, 首先针对成绩做大道小排序, 相同则按照名字进行字母正序排序.

2, 创建 Mapper 类

3, 创建 Job 启动类

代码实现

JavaBean 类

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
public class Student implements WritableComparable<Student> { private String name; private Integer score;
public Student() { }
public Student(String name, Integer score) { this.name = name; this.score = score; }
/** * 排序逻辑位置 * * @param stu * @return */ @Override public int compareTo(Student stu) { //1, 比较逻辑: 首先比较两个Student对象的Score大小,这里计算的正序, 需求是逆序, 所辖else逻辑里面* -1 int compare = this.score.compareTo(stu.getScore()); //2, 比较逻辑, 如果上面的Score比较相同, 那么比较两个Student的name属性, 需求是正序 if (compare == 0) { return this.name.compareTo(stu.getName()); } else { return -1 * compare; } }
/** * 序列化逻辑位置 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { // 代表写出文本数据 out.writeUTF(name); // 代表写出整形数据 out.writeInt(score); }
/** * 反序列化逻辑 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { // 代表读入文本数据 this.name = in.readUTF(); // 代表读入整形数据 this.score = in.readInt(); }
// 下面是JavaBean的get/set方法 public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Integer getScore() { return score; }
public void setScore(Integer score) { this.score = score; }
@Override public String toString() { return "Student{" + "name='" + name + '\'' + ", score=" + score + '}'; }}
复制代码


Mapper 类

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable, Text,Student, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1, 拆分读入的行数据 String[] nameScore = value.toString().split("\t");
//2, 将数据封装成为Student对象 Student student = new Student(nameScore[0], Integer.valueOf(nameScore[1]));
//3, 将对象作为key写出到文件 /** * 由于MapReduce的编程模型中会自动针对数据key进行排序, 而Student类中又实现了Hadoop的WritableComparable接口中的compareTo方法 * 所以, 会按照我们自定义的逻辑进行排序输出 */ context.write(student,NullWritable.get()); }}
复制代码


Job 启动类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf(), "TestDataSort_job"); //1, 设置输入相关: 处理类, 输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("D:\\Source\\data\\data_in_sort")); //2, 设置Mapper类, Mapper的输出数据类型 job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(Student.class); job.setMapOutputValueClass(NullWritable.class); //3-6, 设置是否分区, 排序, 规约, 分组
//7, 设置Reducer类, Reducer任务的输出数据类型
job.setOutputKeyClass(Student.class); job.setOutputValueClass(NullWritable.class); //8, 设置输出处理相关: 处理类, 输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\data_out_sort")); //启动任务, 等待执行 boolean completion = job.waitForCompletion(true); return completion ? 1 : 0; }
public static void main(String[] args) throws Exception { int runState = ToolRunner.run(new Configuration(), new MainJob(), args); System.exit(runState); }}
复制代码

输出结果

[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30	File System Counters		FILE: Number of bytes read=766		FILE: Number of bytes written=974785		FILE: Number of read operations=0		FILE: Number of large read operations=0		FILE: Number of write operations=0	Map-Reduce Framework		Map input records=8		Map output records=8		Map output bytes=91		Map output materialized bytes=113		Input split bytes=116		Combine input records=0		Combine output records=0		Reduce input groups=8		Reduce shuffle bytes=113		Reduce input records=8		Reduce output records=8		Spilled Records=16		Shuffled Maps =1		Failed Shuffles=0		Merged Map outputs=1		GC time elapsed (ms)=0		Total committed heap usage (bytes)=382730240	Shuffle Errors		BAD_ID=0		CONNECTION=0		IO_ERROR=0		WRONG_LENGTH=0		WRONG_MAP=0		WRONG_REDUCE=0	File Input Format Counters 		Bytes Read=84	File Output Format Counters 		Bytes Written=274
复制代码


Student{name='HaiMeimei', score=100}Student{name='Mark', score=100}Student{name='Tom', score=100}Student{name='LiLei', score=98}Student{name='Lily', score=90}Student{name='Lucy', score=88}Student{name='MaHuateng', score=60}Student{name='MaYun', score=59}
复制代码

作为演示排序, 上面直接输出了 JavaBean,也就是其 toString()方法.可以看到已经按照 score 进行逆序排序, score 相同, 则按照 name 字母序.


发布于: 2021 年 01 月 20 日阅读数: 23
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论

发布
暂无评论
大数据知识专栏 -MapReduce 自定义排序技术