大数据训练营 -0711 课后作业
作业:MapReduce 编程作业
题目:统计每一个手机号耗费的总上行流量、下行流量、总流量。
一、准备数据
要实现的需求:
统计每一个手机号耗费的总上行流量、下行流量、总流量。
原始数据一共有十列,从左往右依次为:时间戳、电话号码、基站的物理地址、访问网址的 ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码
也就是,我们需要计算出(以第一行为例,第一行数据最全),以手机号为 key,上行流量总和,下行流量综合,上行流量+下行流量之和。
二、实现思路
基本思路:
(1)Map 阶段:
(a)读取一行数据,切分字段
(b)抽取手机号、上行流量、下行流量
(c)以手机号为 key,bean 对象为 value 输出,即 context.write(手机号,bean);
(2)Reduce 阶段:
(a)累加上行流量和下行流量得到总流量。
(b)实现自定义的 bean 来封装流量信息,并将 bean 作为 map 输出的 key 来传输
(c)MR 程序在处理数据的过程中会对数据排序(map 输出的 kv 对传输到 reduce 之前,会排序),排序的依据是 map 输出的 key
所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到 key 中,让 key 实现接口:WritableComparable。
然后重写 key 的 compareTo 方法。
三、代码
1.编写流量统计的 bean 对象 FlowBean
package com.gaoyong.bd;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/*
* 输出实体类
*/
public class FlowBean implements Writable{
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
// 序列化 在写出属性时,如果为引用数据类型,属性不能为null
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//反序列化 序列化和反序列化的顺序要一致
@Override
public void readFields(DataInput in) throws IOException {
upFlow=in.readLong();
downFlow=in.readLong();
sumFlow=in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
2.定义 FlowBeanMapper,手机号为 key,Bean{上行(long,int),下行(long,int),总流量(long,int)}为 value
package com.gaoyong.bd;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* 1. 统计手机号(String)的上行(long,int),下行(long,int),总流量(long,int)
*
* 手机号为key,Bean{上行(long,int),下行(long,int),总流量(long,int)}为value
*
*
*
*
*/
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
private Text out_key=new Text();
private FlowBean out_value=new FlowBean();
// (0,1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200)
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
//封装手机号
out_key.set(words[1]);
// 封装上行,倒数第三列
out_value.setUpFlow(Long.parseLong(words[words.length-3]));
// 封装下行,倒数第二列
out_value.setDownFlow(Long.parseLong(words[words.length-2]));
context.write(out_key, out_value);
}
}
3.定义 FlowBeanReducer,输出:手机号为 key,Bean{上行流量之和,下行流量之和总流量之和
package com.gaoyong.bd;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
private FlowBean out_value=new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
long sumUpFlow=0;
long sumDownFlow=0;
for (FlowBean flowBean : values) {
sumUpFlow+=flowBean.getUpFlow();
sumDownFlow+=flowBean.getDownFlow();
}
out_value.setUpFlow(sumUpFlow);
out_value.setDownFlow(sumDownFlow);
out_value.setSumFlow(sumDownFlow+sumUpFlow);
context.write(key, out_value);
}
}
4.定义 FlowBeanDriver,运行 job
package com.gaoyong.bd;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* 1.一旦启动这个线程,运行Job
*
* 2.本地模式主要用于测试程序是否正确!
*
* 3. 报错:
* ExitCodeException exitCode=1: /bin/bash: line 0: fg: no job control
*/
public class FlowBeanDriver {
public static void main(String[] args) throws Exception {
//定义输入、输出目录
Path inputPath=new Path("/user/student/gaoyong/mrinput/flowbean");
Path outputPath=new Path("/user/student/gaoyong/mroutput/flowbean");
//作为整个Job的配置
Configuration conf = new Configuration();
//保证输出目录不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// ①创建Job
Job job = Job.getInstance(conf);
//重要,否则liunx环境下无法执行,让job能够找到自定义的类
job.setJarByClass(FlowBeanDriver.class);
// ②设置Job
// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
job.setMapperClass(FlowBeanMapper.class);
job.setReducerClass(FlowBeanReducer.class);
// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置输入目录和输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// ③运行Job
job.waitForCompletion(true);
}
}
5.工程 pom.xml 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gaoyong.bd</groupId>
<artifactId>flowcount</artifactId>
<version>1.0-SNAPSHOT</version>
<name>flowcount</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
四、打包部署、运行
1.通过 maven install 打包成 jar 包,本工程为 flowcount-1.0-SNAPSHOT.jar
2.通过 ssh 连接 hadoop 服务器,将 flowcount-1.0-SNAPSHOT.jar 上传到虚机目录下
3.通过以下 hadoop 命令,新建文件输入目录,同时将数据放入
hadoop fs -mkdir /user/student/gaoyong
hadoop fs -mkdir /user/student/gaoyong/mrinput/flowbean
hadoop fs -put HTTP_20130313143750.dat /user/student/gaoyong/mrinput/flowbean
4.运行 job
输入以下命令:hadoop jar /home/student/gaoyong/flowcount-1.0-SNAPSHOT.jar com.gaoyong.bd.FlowBeanDriver
相关运行日志:
五、运行结果文件
输出目录产生的文件如下:
查看相关相关结果记录:
cc
还未添加个人签名 2018.03.19 加入
还未添加个人简介
评论