MapReduce 练习案例 1- 统计求和
发布于: 2021 年 01 月 26 日
MapReduce 案例
案例 1: 统计求和
1.1 需求
统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和分析:以手机号码作为 key 值,上行流量,下行流量,上行总流量,下行总流量四个字段作为 value 值,然后以这个 key,和 value 作为 map 阶段的输出,reduce 阶段的输入.
数据格式如下:
1.2 思路
1, map 输出:
key: 手机号码 msisdn
value: 原始 line
2, reduce 输出:
key: 手机号码 msisdn
value: 对四个字段 upPackNum, downPackNum, upPayLoad, downPayLoad 累计求和
1.3 代码
JavaBean 类
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 代表流量记录的JavaBean
*/
public class Flow implements WritableComparable<Flow> {
private String phoneNum; //手机号码
private Long upPackNum; //上行数据包数量
private Long downPackNum; //下行数据包数量
private Long upPayLoad; //上行总流量
private Long downPayLoad; //下行总流量
private Long totalUpPackNum; //上行数据包数量_总和
private Long totalDownPackNum; //下行数据包数量_总和
private Long totalUpPayLoad; //上行总流量_总和
private Long totalDownPayLoad; //下行总流量_总和
public Flow() {
}
public Flow(Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) {
this.totalUpPackNum = totalUpPackNum;
this.totalDownPackNum = totalDownPackNum;
this.totalUpPayLoad = totalUpPayLoad;
this.totalDownPayLoad = totalDownPayLoad;
}
public String getPhoneNum() {
return phoneNum;
}
// ... 省略getter与setter方法
@Override
public String toString() {
return totalUpPackNum +
"\t" + totalDownPackNum +
"\t" + totalUpPayLoad +
"\t" + totalDownPayLoad;
}
@Override
public int compareTo(Flow o) {
return 0;
}
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
}
}
复制代码
Mapper 类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Example1Mapper extends Mapper<LongWritable, Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
context.write(new Text(fields[1]),value);
}
}
复制代码
Reducer 类
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Example1Reducer extends Reducer<Text,Text,Text,Flow> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
long totalUpPackNum = 0L; //上行数据包数量_总和
long totalDownPackNum = 0L; //下行数据包数量_总和
long totalUpPayLoad = 0L; //上行总流量_总和
long totalDownPayLoad = 0L; //下行总流量_总和
for (Text flow : values) {
String[] fields = flow.toString().split("\t");
totalUpPackNum+= Long.valueOf(fields[6]);
totalDownPackNum+= Long.valueOf(fields[7]);
totalUpPayLoad+= Long.valueOf(fields[8]);
totalDownPayLoad+= Long.valueOf(fields[9]);
}
Flow flowOut = new Flow(totalUpPackNum, totalDownPackNum, totalUpPayLoad, totalDownPayLoad);
context.write(key,flowOut);
}
}
复制代码
Job 启动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//1,创建一个Job类
Job job = Job.getInstance(super.getConf(), "Example1_job");
//2, 设置输入类,输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1"));
//3, 设置Mapper类, map输出类型
job.setMapperClass(Example1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//4, 设置Reducer类, reduce输出类型
job.setReducerClass(Example1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Flow.class);
//5, 设置输出类, 输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1_result"));
//6, 启动Job, 等待Job执行
boolean completion = job.waitForCompletion(true);
return completion?1:0;
}
public static void main(String[] args) {
int run = 0;
try {
run = ToolRunner.run(new Configuration(), new MainJob(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(run);
}
}
复制代码
输出结果
计数器显示
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
File System Counters
FILE: Number of bytes read=11298
FILE: Number of bytes written=984294
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=23 <-- Mapper读入23条
Map output records=23 <-- Mapper输出23条
Map output bytes=2830
Map output materialized bytes=2882
Input split bytes=112
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=2882
Reduce input records=23 <-- 输入结果23条
Reduce output records=21 <-- 输出结果21条(验证结果正确)
Spilled Records=46
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=382730240
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2583
File Output Format Counters
Bytes Written=572
复制代码
文件输出显示
13480253104 3 3 180 180
13502468823 57 102 7335 110349
13560439658 33 24 2034 5892
13600217502 37 266 2257 203704 <-- 验证是正确的
13602846565 15 12 1938 2910
......
复制代码
划线
评论
复制
发布于: 2021 年 01 月 26 日阅读数: 20
版权声明: 本文为 InfoQ 作者【小马哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/da540eba35ae44551c1dc1f81】。文章转载请联系作者。
小马哥
关注
自强不息,厚德载物 2018.12.22 加入
像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!
评论