写点什么

MapReduce 案例(一)-- 流量统计

用户头像
钱江兵
关注
发布于: 2 小时前

1.需求:

统计每一个手机号耗费的总上行流量、下行流量、总流量

2.数据准备:

(1)输入数据格式:

时间戳、          电话号码、    基站的物理地址、          访问网址的ip、    网站域名、       数据包、接包数、上行/传流量、下行/载流量、响应码1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	2001363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	2001363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	2001363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	2001363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	2001363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	2001363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	2001363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	2001363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	2001363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	2001363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	2001363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	2001363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	2001363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	2001363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	2001363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	2001363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	2001363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	2001363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	2001363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	2001363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	2001363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
复制代码


(2)最终输出的数据格式:

手机号码 上行流量       下行流量 总流量13480253104	180	180	36013502468823	7335	110349	11768413560436666	1116	954	207013560439658	2034	5892	792613602846565	1938	2910	484813660577991	6960	690	765013719199419	240	0	24013726230503	2481	24681	2716213726238888	2481	24681	2716213760778710	120	120	24013826544101	264	0	26413922314466	3008	3720	672813925057413	11058	48243	5930113926251106	240	0	24013926435656	132	1512	164415013685858	3659	3538	719715920133257	3156	2936	609215989002119	1938	180	211818211575961	1527	2106	363318320173382	9531	2412	1194384138413	4116	1432	5548
复制代码


3.求解:

其实这个题目很经典,在网上一搜到处都是答案,这里就分析一下我自己的解法吧。

核心就在 map 阶段和 reduce 阶段

3.1 Map 阶段


(1)读取一行数据,切分字段

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
复制代码


(2)抽取手机号、上行流量、下行流量

1363157985066	2481	24681
复制代码


(3)以手机号为 key,bean 对象为 value 输出,即 context.write(手机号,bean);


(4)bean 对象要想能够传输,必须实现序列化接口;


3.2、Reduce 阶段


(1)累加上行流量和下行流量得到总流量。

13560436666 1116 + 954 = 2070手机号码 上行流量 下行流量 总流量
复制代码


4、具体编码

4.1 FlowBean 类


import org.apache.hadoop.io.Writable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
/** * 1、定义类实现writable接口 * 2、重写序列化和反序列化方法 * 3、重写空参构造 * 4、toString方法 */public class FlowBean implements Writable { private long upFlow; // 上行流量 private long downFlow; // 下行流量 private long sumFlow; // 总流量
// 空参构造 public FlowBean() { }
public long getUpFlow() { return upFlow; }
public void setUpFlow(long upFlow) { this.upFlow = upFlow; }
public long getDownFlow() { return downFlow; }
public void setDownFlow(long downFlow) { this.downFlow = downFlow; }
public long getSumFlow() { return sumFlow; }
public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; }
@Override public void write(DataOutput out) throws IOException {
out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
@Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); }
@Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; }}
复制代码


4.2 FlowMapper 类


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text(); private FlowBean outV = new FlowBean();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行 // 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 String line = value.toString();
// 2 切割
String[] split = line.split("\t");
// 3 抓取想要的数据 // 手机号:13736230513 // 上行流量和下行流量:2481,24681 String phone = split[1]; String up = split[split.length - 3]; String down = split[split.length - 2];
// 4封装 outK.set(phone); outV.setUpFlow(Long.parseLong(up)); outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow();
// 5 写出 context.write(outK, outV); }}
复制代码


4.3 FlowReducer 类



import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> { private FlowBean outV = new FlowBean();
@Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
// 1 遍历集合累加值 long totalUp = 0; long totaldown = 0;
for (FlowBean value : values) { totalUp += value.getUpFlow(); totaldown += value.getDownFlow(); }
// 2 封装outk, outv outV.setUpFlow(totalUp); outV.setDownFlow(totaldown); outV.setSumFlow();
// 3 写出 context.write(key, outV); }}
复制代码


4.4 FlowDriver 类


import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
// 2 设置jar job.setJarByClass(FlowDriver.class);
// 3 关联mapper 和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class);
// 4 设置mapper 输出的key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class);
// 5 设置最终数据输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class);
// 6 设置数据的输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path("/Users/qianjiangbing/Desktop/data/mr/inputflow")); FileOutputFormat.setOutputPath(job, new Path("/Users/qianjiangbing/Desktop/data/mr/output"));
// 7 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
复制代码


执行结果


用户头像

钱江兵

关注

还未添加个人签名 2020.09.11 加入

还未添加个人简介

评论

发布
暂无评论
MapReduce案例(一)-- 流量统计