MapReduce 练习案例 2 - 自定义排序
发布于: 2021 年 01 月 26 日
案例 2: 上行流量倒序排序(递减排序)
2.1 需求
上行流量倒序排序(递减排序)
2.2 思路
上一案例中, 已经计算出来的了每个电话号码的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和. 在该案例中, 直接把该数据当做输入源即可, 读取每行数据之后, 直接封装到 Flow 中作为 key, 经过 Map 直接 shuffle 出去, 这个过程中会对数据根据 key 进行自动排序.
1, 定义 JavaBean: Flow(上一案例中已经定义)
2, 实现 WritableComparable 接口: 实现三个方法
compareTo(Flow o): 本案例实现排序的核心逻辑
write(DataOutput out)
readFields(DataInput in)
3, Map 输出:
key: Flow 对象(实现 compareTo 方法逻辑)
value: Text(整行数据)
4, Reduce 输出:
key: Text
value:Text(整行数据)
2.3 代码
JavaBean 类
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 代表流量记录的JavaBean
*/
public class Flow implements WritableComparable<Flow> {
private String phoneNum; //手机号码
private Long upPackNum; //上行数据包数量
private Long downPackNum; //下行数据包数量
private Long upPayLoad; //上行总流量
private Long downPayLoad; //下行总流量
private Long totalUpPackNum; //上行数据包数量_总和
private Long totalDownPackNum; //下行数据包数量_总和
private Long totalUpPayLoad; //上行总流量_总和
private Long totalDownPayLoad; //下行总流量_总和
public Flow() {
}
public Flow(Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) {
this.totalUpPackNum = totalUpPackNum;
this.totalDownPackNum = totalDownPackNum;
this.totalUpPayLoad = totalUpPayLoad;
this.totalDownPayLoad = totalDownPayLoad;
}
public Flow(String phoneNum, Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) {
this.phoneNum = phoneNum;
this.totalUpPackNum = totalUpPackNum;
this.totalDownPackNum = totalDownPackNum;
this.totalUpPayLoad = totalUpPayLoad;
this.totalDownPayLoad = totalDownPayLoad;
}
//中间略所有getter与setter方法
//......
@Override
public String toString() {
return totalUpPackNum +
"\t" + totalDownPackNum +
"\t" + totalUpPayLoad +
"\t" + totalDownPayLoad;
}
//实现排序的逻辑
@Override
public int compareTo(Flow o) {
// this.totalUpPackNum.compareTo(o.totalUpPackNum)是正排序, *(-1)为倒排
return -1* this.totalUpPackNum.compareTo(o.totalUpPackNum);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNum);
out.writeLong(totalUpPackNum);
out.writeLong(totalDownPackNum);
out.writeLong(totalUpPayLoad);
out.writeLong(totalDownPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
this.phoneNum = in.readUTF();
this.totalUpPackNum = in.readLong();
this.totalDownPackNum = in.readLong();
this.totalUpPayLoad = in.readLong();
this.totalDownPayLoad = in.readLong();
}
}
复制代码
Mapper 类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Example2Mapper extends Mapper<LongWritable, Text, Flow, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取案例1的输出结果, 封装到key: Flow中; value: value
/*
* 数据格式样例: 13480253104 3 3 180 180
*/
String[] fields = value.toString().split("\t");
//封装数据到Flow中 Flow(String phoneNum, Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad)
Flow flow = new Flow(fields[0], Long.valueOf(fields[1]), Long.valueOf(fields[2]), Long.valueOf(fields[3]), Long.valueOf(fields[4]));
context.write(flow, value);
}
}
复制代码
Reducer 类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Example2Reducer extends Reducer<Flow, Text,Text,Text> {
@Override
protected void reduce(Flow key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
// 因为数据是已经统计去重过的, 所以values里面只有一个value, 在map之后还加一个reducer模型, 目的是实现全局排序
context.write(new Text(key.getPhoneNum()),value);
}
}
}
复制代码
Job 启动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//1,创建一个Job类
Job job = Job.getInstance(super.getConf(), "Example1_job");
//2, 设置输入类,输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1_result"));
//3, 设置Mapper类, map输出类型
job.setMapperClass(Example2Mapper.class);
job.setMapOutputKeyClass(Flow.class);
job.setMapOutputValueClass(Text.class);
//4, 设置Reducer类, reduce输出类型
job.setReducerClass(Example2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//5, 设置输出类, 输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example2_result"));
//6, 启动Job, 等待Job执行
boolean completion = job.waitForCompletion(true);
return completion?1:0;
}
public static void main(String[] args) {
int run = 0;
try {
run = ToolRunner.run(new Configuration(), new MainJob(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(run);
}
}
复制代码
输出结果
计数器结果:
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
File System Counters
FILE: Number of bytes read=4632
FILE: Number of bytes written=980583
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=21
Map output records=21
Map output bytes=1498
Map output materialized bytes=1546
Input split bytes=122
Combine input records=0
Combine output records=0
Reduce input groups=14
Reduce shuffle bytes=1546
Reduce input records=21
Reduce output records=21
Spilled Records=42
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=382730240
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=576
File Output Format Counters
Bytes Written=821
Process finished with exit code 1
复制代码
输出文件结果:
13925057413 13925057413 69 63 11058 48243
13502468823 13502468823 57 102 7335 110349
13600217502 13600217502 37 266 2257 203704
13560439658 13560439658 33 24 2034 5892
15013685858 15013685858 28 27 3659 3538
13726230503 13726230503 24 27 2481 24681
13660577991 13660577991 24 9 6960 690
18320173382 18320173382 21 18 9531 2412
15920133257 15920133257 20 20 3156 2936
84138413 84138413 20 16 4116 1432
13602846565 13602846565 15 12 1938 2910
18211575961 18211575961 15 12 1527 2106
复制代码
划线
评论
复制
发布于: 2021 年 01 月 26 日阅读数: 18
版权声明: 本文为 InfoQ 作者【小马哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/6221e6a23236cd1feb43ee603】。文章转载请联系作者。
小马哥
关注
自强不息,厚德载物 2018.12.22 加入
像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!
评论