大数据训练营 作业
作者:Pyel
- 2022 年 3 月 13 日
本文字数:2224 字
阅读完需:约 7 分钟
作业思路
Map 阶段
读取一行数据,切分字段。
抽取手机号、上行流量、下行流量。
以手机号为 key,bean 对象为 value 输出,即 context.write(手机号,bean)。
Reduce 阶段
累加上行流量和下行流量得到总流量。
实现自定义的 bean 来封装流量信息,并将 bean 作为 map 输出的 key 来传输。
MR 程序在处理数据的过程中会对数据排序 (map 输出的 kv 对传输到 reduce 之前,会排序),排序的依据是 map 输出的 key。
所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到 key 中,让 key 实现接口:WritableComparable。然后重写 key 的 compareTo 方法。
作业实现
// FlowCountMapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将一行内容转换成string
String line = value.toString();
// 切分字段
String[] fields = line.split("\t");
// 取出手机号
String phoneNumber = fields[1];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
context.write(new Text(phoneNumber), new FlowBean(upFlow, downFlow));
}
}
// FlowCountReducer
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_downFlow = 0;
// 遍历所有的bean,将其中的上行流量、下行流量分别累加
for(FlowBean bean : values) {
sum_upFlow += bean.getUpFlow();
sum_downFlow += bean.getDownFlow();
}
FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
context.write(key, resultBean);
}
}
// FlowCountTest
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountTest {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:8020");
FileSystem fs= FileSystem.get(conf);
Job job = Job.getInstance(conf);
job.setJobName("talen-flowCount");
job.setJarByClass(FlowCountTest.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
// FlowBean
import lombok.Data;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@Data
public class FlowBean implements Writable{
// 上行流量
private long upFlow;
// 下行流量
private long downFlow;
// 总流量
private long sumFlow;
public FlowBean() {}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
复制代码
划线
评论
复制
发布于: 刚刚阅读数: 2
Pyel
关注
还未添加个人签名 2015.01.29 加入
还未添加个人简介
评论