写点什么

MapReduce 练习案例 2 - 自定义排序

用户头像
小马哥
关注
发布于: 2021 年 01 月 26 日
MapReduce练习案例2 - 自定义排序

案例 2: 上行流量倒序排序(递减排序)

2.1 需求

​ 上行流量倒序排序(递减排序)

2.2 思路

​ 上一案例中, 已经计算出来的了每个电话号码的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和. 在该案例中, 直接把该数据当做输入源即可, 读取每行数据之后, 直接封装到 Flow 中作为 key, 经过 Map 直接 shuffle 出去, 这个过程中会对数据根据 key 进行自动排序.


​ 1, 定义 JavaBean: Flow(上一案例中已经定义)

​ 2, 实现 WritableComparable 接口: 实现三个方法

​ compareTo(Flow o): 本案例实现排序的核心逻辑

​ write(DataOutput out)

​ readFields(DataInput in)

​ 3, Map 输出:

​ key: Flow 对象(实现 compareTo 方法逻辑)

​ value: Text(整行数据)

​ 4, Reduce 输出:

​ key: Text

​ value:Text(整行数据)

2.3 代码

JavaBean 类

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
/** * 代表流量记录的JavaBean */public class Flow implements WritableComparable<Flow> { private String phoneNum; //手机号码 private Long upPackNum; //上行数据包数量 private Long downPackNum; //下行数据包数量 private Long upPayLoad; //上行总流量 private Long downPayLoad; //下行总流量 private Long totalUpPackNum; //上行数据包数量_总和 private Long totalDownPackNum; //下行数据包数量_总和 private Long totalUpPayLoad; //上行总流量_总和 private Long totalDownPayLoad; //下行总流量_总和
public Flow() { }
public Flow(Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) { this.totalUpPackNum = totalUpPackNum; this.totalDownPackNum = totalDownPackNum; this.totalUpPayLoad = totalUpPayLoad; this.totalDownPayLoad = totalDownPayLoad; }
public Flow(String phoneNum, Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) { this.phoneNum = phoneNum; this.totalUpPackNum = totalUpPackNum; this.totalDownPackNum = totalDownPackNum; this.totalUpPayLoad = totalUpPayLoad; this.totalDownPayLoad = totalDownPayLoad; }
//中间略所有getter与setter方法 //...... @Override public String toString() { return totalUpPackNum + "\t" + totalDownPackNum + "\t" + totalUpPayLoad + "\t" + totalDownPayLoad; } //实现排序的逻辑 @Override public int compareTo(Flow o) { // this.totalUpPackNum.compareTo(o.totalUpPackNum)是正排序, *(-1)为倒排 return -1* this.totalUpPackNum.compareTo(o.totalUpPackNum); }
@Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNum); out.writeLong(totalUpPackNum); out.writeLong(totalDownPackNum); out.writeLong(totalUpPayLoad); out.writeLong(totalDownPayLoad); }
@Override public void readFields(DataInput in) throws IOException { this.phoneNum = in.readUTF(); this.totalUpPackNum = in.readLong(); this.totalDownPackNum = in.readLong(); this.totalUpPayLoad = in.readLong(); this.totalDownPayLoad = in.readLong(); }}
复制代码

Mapper 类

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Example2Mapper extends Mapper<LongWritable, Text, Flow, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取案例1的输出结果, 封装到key: Flow中; value: value /* * 数据格式样例: 13480253104 3 3 180 180 */ String[] fields = value.toString().split("\t");
//封装数据到Flow中 Flow(String phoneNum, Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) Flow flow = new Flow(fields[0], Long.valueOf(fields[1]), Long.valueOf(fields[2]), Long.valueOf(fields[3]), Long.valueOf(fields[4])); context.write(flow, value); }}
复制代码

Reducer 类

import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Example2Reducer extends Reducer<Flow, Text,Text,Text> { @Override protected void reduce(Flow key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { // 因为数据是已经统计去重过的, 所以values里面只有一个value, 在map之后还加一个reducer模型, 目的是实现全局排序 context.write(new Text(key.getPhoneNum()),value); } }}
复制代码

Job 启动类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;

public class MainJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception {
//1,创建一个Job类 Job job = Job.getInstance(super.getConf(), "Example1_job");
//2, 设置输入类,输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1_result"));
//3, 设置Mapper类, map输出类型 job.setMapperClass(Example2Mapper.class); job.setMapOutputKeyClass(Flow.class); job.setMapOutputValueClass(Text.class);
//4, 设置Reducer类, reduce输出类型 job.setReducerClass(Example2Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
//5, 设置输出类, 输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example2_result"));
//6, 启动Job, 等待Job执行 boolean completion = job.waitForCompletion(true); return completion?1:0; }
public static void main(String[] args) { int run = 0; try { run = ToolRunner.run(new Configuration(), new MainJob(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(run); }}
复制代码

输出结果

计数器结果:

[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30	File System Counters		FILE: Number of bytes read=4632		FILE: Number of bytes written=980583		FILE: Number of read operations=0		FILE: Number of large read operations=0		FILE: Number of write operations=0	Map-Reduce Framework		Map input records=21		Map output records=21		Map output bytes=1498		Map output materialized bytes=1546		Input split bytes=122		Combine input records=0		Combine output records=0		Reduce input groups=14		Reduce shuffle bytes=1546		Reduce input records=21		Reduce output records=21		Spilled Records=42		Shuffled Maps =1		Failed Shuffles=0		Merged Map outputs=1		GC time elapsed (ms)=0		Total committed heap usage (bytes)=382730240	Shuffle Errors		BAD_ID=0		CONNECTION=0		IO_ERROR=0		WRONG_LENGTH=0		WRONG_MAP=0		WRONG_REDUCE=0	File Input Format Counters 		Bytes Read=576	File Output Format Counters 		Bytes Written=821
Process finished with exit code 1
复制代码


输出文件结果:

13925057413	13925057413	69	63	11058	4824313502468823	13502468823	57	102	7335	11034913600217502	13600217502	37	266	2257	20370413560439658	13560439658	33	24	2034	589215013685858	15013685858	28	27	3659	353813726230503	13726230503	24	27	2481	2468113660577991	13660577991	24	9	6960	69018320173382	18320173382	21	18	9531	241215920133257	15920133257	20	20	3156	293684138413	84138413	20	16	4116	143213602846565	13602846565	15	12	1938	291018211575961	18211575961	15	12	1527	2106
复制代码


发布于: 2021 年 01 月 26 日阅读数: 18
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论

发布
暂无评论
MapReduce练习案例2 - 自定义排序