0711 作业:MapReduce 编程作业
统计每一个手机号耗费的总上行流量、下行流量、总流量。
1.需求:统计每一个手机号耗费的总上行流量、下行流量、总流量
2.数据准备:(1)输入数据格式:时间戳、电话号码、基站的物理地址、访问网址的 ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码 2)最终输出的数据格式:手机号码 上行流量 下行流量 总流量
3.基本思路:
(1)Map 阶段:(a)读取一行数据,切分字段(b)抽取手机号、上行流量、下行流量(c)以手机号为 key,bean 对象为 value 输出,即 context.write(手机号,bean);
(2)Reduce 阶段:(a)累加上行流量和下行流量得到总流量。(b)实现自定义的 bean 来封装流量信息,并将 bean 作为 map 输出的 key 来传输(c)MR 程序在处理数据的过程中会对数据排序(map 输出的 kv 对传输到 reduce 之前,会排序),排序的依据是 map 输出的 key 所以,如果要实现自己需要的排序规则,考虑将排序因素放到 key 中,让 key 实现接口:WritableComparable。然后重写 key 的 compareTo 方法。
4.程序代码:
(1)编写流量统计的 bean 对象 FlowBean
package phoneData;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
// 1 实现 writable 接口 @Setter@Getterpublic class FlowBean implements Writable {//上传流量 private long upFlow;//下载流量 private long downFlow;//流量总和 private long sumFlow;
}
(2)编写 MapperFlow
CountMapper.javapackage phoneData;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
LongWritable, Text ===> Map 输入 <偏移量,手机号>
Text, FlowBean ======> Map 的输出:<手机号、流量上传下载总和>*/public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();
@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取每一行数据 String line = value.toString();
}}
(3)编写 ReducerFlowCountReducer.javapackage phoneData;
import java.io.IOException;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
}
5.运行结果:
评论