MapReduce 练习案例 1- 统计求和
发布于: 2021 年 01 月 26 日

MapReduce 案例
案例 1: 统计求和
1.1 需求
统计每个手机号的上行数据包总和,下行数据包总和,上行总流量之和,下行总流量之和分析:以手机号码作为 key 值,上行流量,下行流量,上行总流量,下行总流量四个字段作为 value 值,然后以这个 key,和 value 作为 map 阶段的输出,reduce 阶段的输入.
数据格式如下:
1.2 思路
1, map 输出:
key: 手机号码 msisdn
value: 原始 line
2, reduce 输出:
key: 手机号码 msisdn
value: 对四个字段 upPackNum, downPackNum, upPayLoad, downPayLoad 累计求和
1.3 代码
JavaBean 类
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
/** * 代表流量记录的JavaBean */public class Flow implements WritableComparable<Flow> { private String phoneNum; //手机号码 private Long upPackNum; //上行数据包数量 private Long downPackNum; //下行数据包数量 private Long upPayLoad; //上行总流量 private Long downPayLoad; //下行总流量 private Long totalUpPackNum; //上行数据包数量_总和 private Long totalDownPackNum; //下行数据包数量_总和 private Long totalUpPayLoad; //上行总流量_总和 private Long totalDownPayLoad; //下行总流量_总和
public Flow() { }
public Flow(Long totalUpPackNum, Long totalDownPackNum, Long totalUpPayLoad, Long totalDownPayLoad) { this.totalUpPackNum = totalUpPackNum; this.totalDownPackNum = totalDownPackNum; this.totalUpPayLoad = totalUpPayLoad; this.totalDownPayLoad = totalDownPayLoad; }
public String getPhoneNum() { return phoneNum; }// ... 省略getter与setter方法
@Override public String toString() { return totalUpPackNum + "\t" + totalDownPackNum + "\t" + totalUpPayLoad + "\t" + totalDownPayLoad; }
@Override public int compareTo(Flow o) { return 0; }
@Override public void write(DataOutput out) throws IOException {
}
@Override public void readFields(DataInput in) throws IOException {
}}复制代码
Mapper 类
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Example1Mapper extends Mapper<LongWritable, Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); context.write(new Text(fields[1]),value); }}复制代码
Reducer 类
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Example1Reducer extends Reducer<Text,Text,Text,Flow> {
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { long totalUpPackNum = 0L; //上行数据包数量_总和 long totalDownPackNum = 0L; //下行数据包数量_总和 long totalUpPayLoad = 0L; //上行总流量_总和 long totalDownPayLoad = 0L; //下行总流量_总和 for (Text flow : values) { String[] fields = flow.toString().split("\t"); totalUpPackNum+= Long.valueOf(fields[6]); totalDownPackNum+= Long.valueOf(fields[7]); totalUpPayLoad+= Long.valueOf(fields[8]); totalDownPayLoad+= Long.valueOf(fields[9]); } Flow flowOut = new Flow(totalUpPackNum, totalDownPackNum, totalUpPayLoad, totalDownPayLoad); context.write(key,flowOut); }}复制代码
Job 启动类
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception {
//1,创建一个Job类 Job job = Job.getInstance(super.getConf(), "Example1_job");
//2, 设置输入类,输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1"));
//3, 设置Mapper类, map输出类型 job.setMapperClass(Example1Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);
//4, 设置Reducer类, reduce输出类型 job.setReducerClass(Example1Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class);
//5, 设置输出类, 输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1_result"));
//6, 启动Job, 等待Job执行 boolean completion = job.waitForCompletion(true); return completion?1:0; }
public static void main(String[] args) { int run = 0; try { run = ToolRunner.run(new Configuration(), new MainJob(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(run); }}复制代码
输出结果
计数器显示
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30 File System Counters FILE: Number of bytes read=11298 FILE: Number of bytes written=984294 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=23 <-- Mapper读入23条 Map output records=23 <-- Mapper输出23条 Map output bytes=2830 Map output materialized bytes=2882 Input split bytes=112 Combine input records=0 Combine output records=0 Reduce input groups=21 Reduce shuffle bytes=2882 Reduce input records=23 <-- 输入结果23条 Reduce output records=21 <-- 输出结果21条(验证结果正确) Spilled Records=46 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 Total committed heap usage (bytes)=382730240 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=2583 File Output Format Counters Bytes Written=572复制代码
文件输出显示
13480253104 3 3 180 18013502468823 57 102 7335 11034913560439658 33 24 2034 589213600217502 37 266 2257 203704 <-- 验证是正确的13602846565 15 12 1938 2910......复制代码
划线
评论
复制
发布于: 2021 年 01 月 26 日阅读数: 20
版权声明: 本文为 InfoQ 作者【小马哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/da540eba35ae44551c1dc1f81】。文章转载请联系作者。
小马哥
关注
自强不息,厚德载物 2018.12.22 加入
像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!











评论