大数据知识专栏 -MapReduce 自定义排序技术
作用
将对象持久化或者在网络中传输, 需要将其转换成字节流, 这就是序列化; 反之, 从磁盘或者网络中将字节流读取转换成为对象, 这就是反序列化;
hadoop 开发了自己的序列化和反序列化, 为什么没有使用 Java 已经实现的解决方式呢?
因为, Java 将对象进行序列化方式相对重量级, 一个对象序列化之后会附带很多额外信息,例如,校验信息, 继承关系等, 不便于网络传输. 于是 hadoop 重新实现了一套解决方案, 精简高效, 需要哪个属性就传递哪个属性, 大大减少了网络开销.
实现方案
Writable 接口是 Hadoop 的序列化接口, 一个类需要实现序列化, 只需要实现这个接口
WritableComparable 是 Writable 的子接口, 在实现序列化的同时, 也实现了对象的比较, 可以用于对象数据的排序
案例实战
需求
对学生成绩进行由大到小排序, 如果成绩相同的学生, 按照名字进行字母正序排序
数据样例
name 制表符 score
Mark 100
Tom 100
Lucy 88
Lily 90
HaiMeimei 100
LiLei 98
MaYun 59
MaHuateng 60
思路
1.1, 自定义 Student 类, 实现 WritableComparable 接口;
1.2, 接口的比较方法 compareTo()方法, 首先针对成绩做大道小排序, 相同则按照名字进行字母正序排序.
2, 创建 Mapper 类
3, 创建 Job 启动类
代码实现
JavaBean 类
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Student implements WritableComparable<Student> {
private String name;
private Integer score;
public Student() {
}
public Student(String name, Integer score) {
this.name = name;
this.score = score;
}
/**
* 排序逻辑位置
*
* @param stu
* @return
*/
@Override
public int compareTo(Student stu) {
//1, 比较逻辑: 首先比较两个Student对象的Score大小,这里计算的正序, 需求是逆序, 所辖else逻辑里面* -1
int compare = this.score.compareTo(stu.getScore());
//2, 比较逻辑, 如果上面的Score比较相同, 那么比较两个Student的name属性, 需求是正序
if (compare == 0) {
return this.name.compareTo(stu.getName());
} else {
return -1 * compare;
}
}
/**
* 序列化逻辑位置
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
// 代表写出文本数据
out.writeUTF(name);
// 代表写出整形数据
out.writeInt(score);
}
/**
* 反序列化逻辑
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
// 代表读入文本数据
this.name = in.readUTF();
// 代表读入整形数据
this.score = in.readInt();
}
// 下面是JavaBean的get/set方法
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getScore() {
return score;
}
public void setScore(Integer score) {
this.score = score;
}
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", score=" + score +
'}';
}
}
Mapper 类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable, Text,Student, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1, 拆分读入的行数据
String[] nameScore = value.toString().split("\t");
//2, 将数据封装成为Student对象
Student student = new Student(nameScore[0], Integer.valueOf(nameScore[1]));
//3, 将对象作为key写出到文件
/**
* 由于MapReduce的编程模型中会自动针对数据key进行排序, 而Student类中又实现了Hadoop的WritableComparable接口中的compareTo方法
* 所以, 会按照我们自定义的逻辑进行排序输出
*/
context.write(student,NullWritable.get());
}
}
Job 启动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "TestDataSort_job");
//1, 设置输入相关: 处理类, 输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("D:\\Source\\data\\data_in_sort"));
//2, 设置Mapper类, Mapper的输出数据类型
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(Student.class);
job.setMapOutputValueClass(NullWritable.class);
//3-6, 设置是否分区, 排序, 规约, 分组
//7, 设置Reducer类, Reducer任务的输出数据类型
job.setOutputKeyClass(Student.class);
job.setOutputValueClass(NullWritable.class);
//8, 设置输出处理相关: 处理类, 输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\data_out_sort"));
//启动任务, 等待执行
boolean completion = job.waitForCompletion(true);
return completion ? 1 : 0;
}
public static void main(String[] args) throws Exception {
int runState = ToolRunner.run(new Configuration(), new MainJob(), args);
System.exit(runState);
}
}
输出结果
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
File System Counters
FILE: Number of bytes read=766
FILE: Number of bytes written=974785
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=8
Map output records=8
Map output bytes=91
Map output materialized bytes=113
Input split bytes=116
Combine input records=0
Combine output records=0
Reduce input groups=8
Reduce shuffle bytes=113
Reduce input records=8
Reduce output records=8
Spilled Records=16
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=382730240
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=84
File Output Format Counters
Bytes Written=274
Student{name='HaiMeimei', score=100}
Student{name='Mark', score=100}
Student{name='Tom', score=100}
Student{name='LiLei', score=98}
Student{name='Lily', score=90}
Student{name='Lucy', score=88}
Student{name='MaHuateng', score=60}
Student{name='MaYun', score=59}
作为演示排序, 上面直接输出了 JavaBean,也就是其 toString()方法.可以看到已经按照 score 进行逆序排序, score 相同, 则按照 name 字母序.
版权声明: 本文为 InfoQ 作者【小马哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/93cc723612a02330871c70a72】。文章转载请联系作者。
小马哥
自强不息,厚德载物 2018.12.22 加入
像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!
评论