写点什么

大数据训练营 - 作业 1

用户头像
talen
关注
发布于: 2021 年 07 月 14 日

大数据训练营-作业 1,执行后的截图:


程序代码块:

FlowCountMapper


import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将一行内容转换成string String line = value.toString(); // 切分字段 String[] fields = line.split("\t"); // 取出手机号 String phoneNumber = fields[1]; // 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); context.write(new Text(phoneNumber), new FlowBean(upFlow, downFlow)); }}
复制代码

FlowCountReducer



import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; // 遍历所有的bean,将其中的上行流量,下行流量分别累加 for(FlowBean bean : values) { sum_upFlow += bean.getUpFlow(); sum_downFlow += bean.getDownFlow(); } FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); context.write(key, resultBean); }}
复制代码

FlowCountTest

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountTest { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://jikehadoop01:8020");
FileSystem fs= FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJobName("talen-flowCount"); job.setJarByClass(FlowCountTest.class); job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); }}
复制代码

FlowBean

import lombok.Data;import org.apache.hadoop.io.Writable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
@Datapublic class FlowBean implements Writable{ // 上行流量 private long upFlow; // 下行流量 private long downFlow; // 总流量 private long sumFlow;
public FlowBean() {}
public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; }
@Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); }
@Override public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); }
@Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; }}
复制代码


发布于: 2021 年 07 月 14 日阅读数: 21
用户头像

talen

关注

还未添加个人签名 2018.07.19 加入

还未添加个人简介

评论

发布
暂无评论
大数据训练营-作业1