写点什么

大数据训练营 作业

作者:Pyel
  • 2022 年 3 月 13 日
  • 本文字数:2224 字

    阅读完需:约 7 分钟

作业思路


Map 阶段

  • 读取一行数据,切分字段。

    抽取手机号、上行流量、下行流量。

    以手机号为 key,bean 对象为 value 输出,即 context.write(手机号,bean)。

Reduce 阶段

  • 累加上行流量和下行流量得到总流量。

    实现自定义的 bean 来封装流量信息,并将 bean 作为 map 输出的 key 来传输。

    MR 程序在处理数据的过程中会对数据排序 (map 输出的 kv 对传输到 reduce 之前,会排序),排序的依据是 map 输出的 key。


所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到 key 中,让 key 实现接口:WritableComparable。然后重写 key 的 compareTo 方法。


作业实现



// FlowCountMapper
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将一行内容转换成string String line = value.toString(); // 切分字段 String[] fields = line.split("\t"); // 取出手机号 String phoneNumber = fields[1]; // 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); context.write(new Text(phoneNumber), new FlowBean(upFlow, downFlow)); }}

// FlowCountReducer
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; // 遍历所有的bean,将其中的上行流量、下行流量分别累加 for(FlowBean bean : values) { sum_upFlow += bean.getUpFlow(); sum_downFlow += bean.getDownFlow(); } FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); context.write(key, resultBean); }}
// FlowCountTest
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
public class FlowCountTest { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:8020");
FileSystem fs= FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJobName("talen-flowCount"); job.setJarByClass(FlowCountTest.class); job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); }}


// FlowBeanimport lombok.Data;import org.apache.hadoop.io.Writable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
@Datapublic class FlowBean implements Writable{ // 上行流量 private long upFlow; // 下行流量 private long downFlow; // 总流量 private long sumFlow;
public FlowBean() {}
public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; }
@Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); }
@Override public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); }
@Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; }}
复制代码


用户头像

Pyel

关注

还未添加个人签名 2015.01.29 加入

还未添加个人简介

评论

发布
暂无评论
大数据训练营 作业_Pyel_InfoQ写作平台