写点什么

0711 作业:MapReduce 编程作业

作者:arctec
  • 2021 年 11 月 17 日
  • 本文字数:1773 字

    阅读完需:约 6 分钟

统计每一个手机号耗费的总上行流量、下行流量、总流量。

1.需求:统计每一个手机号耗费的总上行流量、下行流量、总流量

2.数据准备:(1)输入数据格式:时间戳、电话号码、基站的物理地址、访问网址的 ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码 2)最终输出的数据格式:手机号码 上行流量 下行流量 总流量

3.基本思路:

(1)Map 阶段:(a)读取一行数据,切分字段(b)抽取手机号、上行流量、下行流量(c)以手机号为 key,bean 对象为 value 输出,即 context.write(手机号,bean);

(2)Reduce 阶段:(a)累加上行流量和下行流量得到总流量。(b)实现自定义的 bean 来封装流量信息,并将 bean 作为 map 输出的 key 来传输(c)MR 程序在处理数据的过程中会对数据排序(map 输出的 kv 对传输到 reduce 之前,会排序),排序的依据是 map 输出的 key 所以,如果要实现自己需要的排序规则,考虑将排序因素放到 key 中,让 key 实现接口:WritableComparable。然后重写 key 的 compareTo 方法。


4.程序代码:

(1)编写流量统计的 bean 对象 FlowBean

package phoneData;


import lombok.Getter;

import lombok.Setter;

import org.apache.hadoop.io.Writable;


import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;


// 1 实现 writable 接口 @Setter@Getterpublic class FlowBean implements Writable {//上传流量 private long upFlow;//下载流量 private long downFlow;//流量总和 private long sumFlow;


//反序列化要调用空参构造器public FlowBean() {}
public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow;}
public void set(long upFlow, long downFlow){ this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow;}

/** * 序列化 * * @param out * @throws IOException */@Overridepublic void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow);}
/** * 反序列化 * 注:字段属性顺序必须一致 * * @param in * @throws IOException */@Overridepublic void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong();}@Overridepublic String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow;}
复制代码


}


(2)编写 MapperFlow

CountMapper.javapackage phoneData;


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;


import java.io.IOException;


/**


  • LongWritable, Text ===> Map 输入 <偏移量,手机号>

  • Text, FlowBean ======> Map 的输出:<手机号、流量上传下载总和>*/public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();

  • @Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取每一行数据 String line = value.toString();

  • }}


(3)编写 ReducerFlowCountReducer.javapackage phoneData;


import java.io.IOException;


import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;


public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {


@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {    //上传和下载的总和初始化    long sum_upFlow = 0;    long sum_downFlow = 0;
// 1 遍历所用bean,将其中的上行流量,下行流量分别累加 for (FlowBean flowBean : values) { //所有上传的流量加在一起 sum_upFlow += flowBean.getUpFlow(); //所有下载流量加在一起 sum_downFlow += flowBean.getDownFlow(); } // 2 封装对象 FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); // 3 写出 context.write(key, resultBean);}
复制代码


}

5.运行结果:


用户头像

arctec

关注

还未添加个人签名 2019.08.21 加入

还未添加个人简介

评论

发布
暂无评论
0711作业:MapReduce 编程作业