MapReduce 案例(一)-- 流量统计
发布于: 2 小时前
1.需求:
统计每一个手机号耗费的总上行流量、下行流量、总流量
2.数据准备:
(1)输入数据格式:
时间戳、 电话号码、 基站的物理地址、 访问网址的ip、 网站域名、 数据包、接包数、上行/传流量、下行/载流量、响应码1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 2001363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 2001363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 2001363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 2001363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 2001363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 2001363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 2001363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 2001363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 2001363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 2001363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 2001363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 2001363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 2001363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 2001363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 2001363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 2001363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 2001363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 2001363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 2001363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 2001363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 2001363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200复制代码
(2)最终输出的数据格式:
手机号码 上行流量 下行流量 总流量13480253104 180 180 36013502468823 7335 110349 11768413560436666 1116 954 207013560439658 2034 5892 792613602846565 1938 2910 484813660577991 6960 690 765013719199419 240 0 24013726230503 2481 24681 2716213726238888 2481 24681 2716213760778710 120 120 24013826544101 264 0 26413922314466 3008 3720 672813925057413 11058 48243 5930113926251106 240 0 24013926435656 132 1512 164415013685858 3659 3538 719715920133257 3156 2936 609215989002119 1938 180 211818211575961 1527 2106 363318320173382 9531 2412 1194384138413 4116 1432 5548复制代码
3.求解:
其实这个题目很经典,在网上一搜到处都是答案,这里就分析一下我自己的解法吧。
核心就在 map 阶段和 reduce 阶段
3.1 Map 阶段
(1)读取一行数据,切分字段
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
复制代码
(2)抽取手机号、上行流量、下行流量
1363157985066 2481 24681复制代码
(3)以手机号为 key,bean 对象为 value 输出,即 context.write(手机号,bean);
(4)bean 对象要想能够传输,必须实现序列化接口;
3.2、Reduce 阶段
(1)累加上行流量和下行流量得到总流量。
13560436666 1116 + 954 = 2070手机号码 上行流量 下行流量 总流量复制代码
4、具体编码
4.1 FlowBean 类
import org.apache.hadoop.io.Writable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
/** * 1、定义类实现writable接口 * 2、重写序列化和反序列化方法 * 3、重写空参构造 * 4、toString方法 */public class FlowBean implements Writable { private long upFlow; // 上行流量 private long downFlow; // 下行流量 private long sumFlow; // 总流量
// 空参构造 public FlowBean() { }
public long getUpFlow() { return upFlow; }
public void setUpFlow(long upFlow) { this.upFlow = upFlow; }
public long getDownFlow() { return downFlow; }
public void setDownFlow(long downFlow) { this.downFlow = downFlow; }
public long getSumFlow() { return sumFlow; }
public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; }
@Override public void write(DataOutput out) throws IOException {
out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
@Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); }
@Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; }}
复制代码
4.2 FlowMapper 类
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text(); private FlowBean outV = new FlowBean();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行 // 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 String line = value.toString();
// 2 切割
String[] split = line.split("\t");
// 3 抓取想要的数据 // 手机号:13736230513 // 上行流量和下行流量:2481,24681 String phone = split[1]; String up = split[split.length - 3]; String down = split[split.length - 2];
// 4封装 outK.set(phone); outV.setUpFlow(Long.parseLong(up)); outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow();
// 5 写出 context.write(outK, outV); }}
复制代码
4.3 FlowReducer 类
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> { private FlowBean outV = new FlowBean();
@Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
// 1 遍历集合累加值 long totalUp = 0; long totaldown = 0;
for (FlowBean value : values) { totalUp += value.getUpFlow(); totaldown += value.getDownFlow(); }
// 2 封装outk, outv outV.setUpFlow(totalUp); outV.setDownFlow(totaldown); outV.setSumFlow();
// 3 写出 context.write(key, outV); }}
复制代码
4.4 FlowDriver 类
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
// 2 设置jar job.setJarByClass(FlowDriver.class);
// 3 关联mapper 和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class);
// 4 设置mapper 输出的key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class);
// 5 设置最终数据输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class);
// 6 设置数据的输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path("/Users/qianjiangbing/Desktop/data/mr/inputflow")); FileOutputFormat.setOutputPath(job, new Path("/Users/qianjiangbing/Desktop/data/mr/output"));
// 7 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
复制代码
执行结果
划线
评论
复制
发布于: 2 小时前阅读数: 4
钱江兵
关注
还未添加个人签名 2020.09.11 加入
还未添加个人简介











评论