写点什么

MapReduce 排序以及序列化实践

发布于: 44 分钟前

序列化(Serialization)是指把结构化对象转化为字节流。


反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。


当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。


Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop 自己开发了一套序列化机制(Writable),精简,高效。不用像 java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。


Writable 是 Hadoop 的序列化格式,hadoop 定义了这样一个 Writable 接口。 一个类要支持可序列化只需实现这个接口即可。


另外 Writable 有一个子接口是 WritableComparable,writableComparable 是既可实现序列化,也可以对 key 进行比较,我们这里可以通过自定义 key 实现 WritableComparable 来实现我们的排序功能


需求说明:


目前有如下两列数据, 要求第一列按照字典顺序进行排列,第一列相同的时候,第二列按照升序进行排列


数据格式如下:a 1a 9b 3a 7b 8b 10a 5a 9


实现思路:


将 map 端输出的<key,value>中的 key 和 value 组合成一个新的 key(称为 newKey),value 值不变。这里就变成<(key,value),value>,在针对 newKey 排序的时候,如果 key 相同,就再对 value 进行排序。


第一步: 自定义数据类型以及比较器


import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
public class SortBean implements WritableComparable<SortBean>{
private String first ;private Integer second;

public String getFirst() { return first;}
public void setFirst(String first) { this.first = first;}
public Integer getSecond() { return second;}
public void setSecond(Integer second) { this.second = second;}
// 在最后输出的时候, 按照什么格式来输出@Overridepublic String toString() { return first+"\t"+second;}// 执行排序的方法@Overridepublic int compareTo(SortBean o) { // 先按照第一列进行排序, 如果第一列相同, 按照第二列倒序排序 int i = this.first.compareTo(o.first);
if (i == 0 ){ int i1 = o.second.compareTo(this.second); return i1; } return i;}// 序列化的方法@Overridepublic void write(DataOutput out) throws IOException { out.writeUTF(first); out.writeInt(second);}// 反序列化的方法@Overridepublic void readFields(DataInput in) throws IOException { first = in.readUTF(); second = in.readInt();}
复制代码


}


第二步: 自定义 map 逻辑


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MapSortTask extends Mapper<LongWritable,Text,SortBean,NullWritable> { @Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 获取一行数据
String line = value.toString();
//2. 执行切割处理 String[] split = line.split("\t");
//3. 封装sortBean对象 SortBean sortBean = new SortBean(); sortBean.setFirst(split[0]); sortBean.setSecond(Integer.parseInt(split[1]));
//4 写出去 context.write(sortBean,NullWritable.get());
}
复制代码


}


第三步: 自定义 reduce 逻辑


import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;


public class ReducerSortTask extends Reducer<SortBean,NullWritable,SortBean,NullWritable> {


@Overrideprotected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for (NullWritable value : values) { context.write(key,NullWritable.get()); }}
复制代码


}


第四步: 构建 job 任务的主程序


    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class JobSortMain extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Job job = Job.getInstance(super.getConf(), "JobSortMain"); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("file:///input"));

job.setMapperClass(MapSortTask.class); job.setMapOutputKeyClass(SortBean.class); job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(ReducerSortTask.class); job.setOutputKeyClass(SortBean.class); job.setOutputValueClass(NullWritable.class);

job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("file:///output"));
boolean b = job.waitForCompletion(true);
return b?0:1;}
public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); JobSortMain jobSortMain = new JobSortMain();
int i = ToolRunner.run(configuration, jobSortMain, args);
System.exit(i);}}
复制代码


发布于: 44 分钟前阅读数: 7
用户头像

专注于大数据技术研究 2020.11.10 加入

运营公众号:五分钟学大数据。大数据领域原创技术号,深入大数据技术

评论

发布
暂无评论
MapReduce排序以及序列化实践