写点什么

MapReduce 练习案例 1- 统计求和

用户头像
小马哥
关注
发布于: 2021 年 01 月 26 日
MapReduce练习案例1-统计求和

MapReduce 案例

案例 1: 统计求和

1.1 需求

统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和分析:以手机号码作为 key 值,上行流量,下行流量,上行总流量,下行总流量四个字段作为 value 值,然后以这个 key,和 value 作为 map 阶段的输出,reduce 阶段的输入.


数据格式如下:


1.2 思路

​ 1, map 输出:

​ key: 手机号码 msisdn

​ value: 原始 line

​ 2, reduce 输出:

​ key: 手机号码 msisdn

​ value: 对四个字段 upPackNum, downPackNum, upPayLoad, downPayLoad 累计求和


1.3 代码

JavaBean 类

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
/** * 代表流量记录的JavaBean */public class Flow implements WritableComparable<Flow> { private String phoneNum; //手机号码 private Long upPackNum; //上行数据包数量 private Long downPackNum; //下行数据包数量 private Long upPayLoad; //上行总流量 private Long downPayLoad; //下行总流量 private Long totalUpPackNum; //上行数据包数量_总和 private Long totalDownPackNum; //下行数据包数量_总和 private Long totalUpPayLoad; //上行总流量_总和 private Long totalDownPayLoad; //下行总流量_总和
public Flow() { }
public Flow(Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) { this.totalUpPackNum = totalUpPackNum; this.totalDownPackNum = totalDownPackNum; this.totalUpPayLoad = totalUpPayLoad; this.totalDownPayLoad = totalDownPayLoad; }
public String getPhoneNum() { return phoneNum; }// ... 省略getter与setter方法
@Override public String toString() { return totalUpPackNum + "\t" + totalDownPackNum + "\t" + totalUpPayLoad + "\t" + totalDownPayLoad; }
@Override public int compareTo(Flow o) { return 0; }
@Override public void write(DataOutput out) throws IOException {
}
@Override public void readFields(DataInput in) throws IOException {
}}
复制代码

Mapper 类

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Example1Mapper extends Mapper<LongWritable, Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); context.write(new Text(fields[1]),value); }}
复制代码

Reducer 类

import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Example1Reducer extends Reducer<Text,Text,Text,Flow> {
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { long totalUpPackNum = 0L; //上行数据包数量_总和 long totalDownPackNum = 0L; //下行数据包数量_总和 long totalUpPayLoad = 0L; //上行总流量_总和 long totalDownPayLoad = 0L; //下行总流量_总和 for (Text flow : values) { String[] fields = flow.toString().split("\t"); totalUpPackNum+= Long.valueOf(fields[6]); totalDownPackNum+= Long.valueOf(fields[7]); totalUpPayLoad+= Long.valueOf(fields[8]); totalDownPayLoad+= Long.valueOf(fields[9]); } Flow flowOut = new Flow(totalUpPackNum, totalDownPackNum, totalUpPayLoad, totalDownPayLoad); context.write(key,flowOut); }}
复制代码

Job 启动类


import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;

public class MainJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception {
//1,创建一个Job类 Job job = Job.getInstance(super.getConf(), "Example1_job");
//2, 设置输入类,输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1"));
//3, 设置Mapper类, map输出类型 job.setMapperClass(Example1Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);
//4, 设置Reducer类, reduce输出类型 job.setReducerClass(Example1Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class);
//5, 设置输出类, 输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1_result"));
//6, 启动Job, 等待Job执行 boolean completion = job.waitForCompletion(true); return completion?1:0; }
public static void main(String[] args) { int run = 0; try { run = ToolRunner.run(new Configuration(), new MainJob(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(run); }}
复制代码

输出结果

计数器显示

[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30	File System Counters		FILE: Number of bytes read=11298		FILE: Number of bytes written=984294		FILE: Number of read operations=0		FILE: Number of large read operations=0		FILE: Number of write operations=0	Map-Reduce Framework		Map input records=23		<-- Mapper读入23条		Map output records=23		<-- Mapper输出23条		Map output bytes=2830		Map output materialized bytes=2882		Input split bytes=112		Combine input records=0		Combine output records=0		Reduce input groups=21		Reduce shuffle bytes=2882		Reduce input records=23		<-- 输入结果23条		Reduce output records=21	<-- 输出结果21条(验证结果正确)		Spilled Records=46		Shuffled Maps =1		Failed Shuffles=0		Merged Map outputs=1		GC time elapsed (ms)=0		Total committed heap usage (bytes)=382730240	Shuffle Errors		BAD_ID=0		CONNECTION=0		IO_ERROR=0		WRONG_LENGTH=0		WRONG_MAP=0		WRONG_REDUCE=0	File Input Format Counters 		Bytes Read=2583	File Output Format Counters 		Bytes Written=572
复制代码

文件输出显示

13480253104	3	3	180	18013502468823	57	102	7335	11034913560439658	33	24	2034	589213600217502	37	266	2257	203704	<-- 验证是正确的13602846565	15	12	1938	2910......
复制代码


发布于: 2021 年 01 月 26 日阅读数: 20
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论

发布
暂无评论
MapReduce练习案例1-统计求和