在网站流量日志分析这种场景中,对数据采集部分的可靠性、容错能力要求通常不会非常严苛,需要注意的是结合语境明白是何种含义的数据采集。
对于数据从无到有的过程,可以使用 web 服务器自带的日志记录功能和自定义埋点 JavaScript 采集相结合的做法收集用户访问网站的行为数据。
对于数据需要做搬运的操作,可以使用 flume 定制相关的采集方案满足数据采集传输。
Flume 采集数据
1. Taildir Source 组件
Flume 采集系统的搭建相对简单:
1、在服务器上部署 agent 节点,修改配置文件
2、启动 agent 节点,将采集到的数据汇聚到指定的 HDFS 目录中
针对 nginx 日志生成场景,如果通过 flume(1.6)收集,无论是 Spooling Directory Source 和 Exec Source 均不能满足动态实时收集的需求,在 flume1.7 版本之后,提供了一个非常好用的 TaildirSource,使用这个 source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。
核心配置如下:
a1.sources = r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
复制代码
filegroups:指定 filegroups,可以有多个,以空格分隔;(TailSource 可以同时监控 tail 多个目录中的文件)
positionFile:配置检查点文件的路径,检查点文件会以 json 格式保存已经 tail 文件的位置,解决了断点不能续传的缺陷。
filegroups.<filegroupName>:配置每个 filegroup 的文件绝对路径,文件名可以用正则表达式匹配
通过以上配置,就可以监控文件内容的增加和文件的增加。产生和所配置的文件名正则表达式不匹配的文件,则不会被 tail。
2. HDFS sink 文件滚动属性
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /weblog/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /weblog/test1/example.log
a1.sources.r1.filegroups.f2 = /weblog/test2/.*log.*
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /itheima/weblog/%y-%m-%d/%H-%M/
a1.sinks.k1.hdfs.filePrefix = itheima-
a1.sinks.k1.hdfs.fileSuffix = .dat
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
复制代码
配置项:hdfs.idleTimeout
默认值:0
说明:默认启动这个功能
这种策略很简单,如果文件在 hdfs.idleTimeout 秒的时间里都是闲置的,没有任何数据写入,那么当前文件关闭,滚动到下一个文件。
配置项:hdfs.minBlockReplicas
默认值:和 hdfs 的副本数一致
原理:
hdfs.minBlockReplicas 是为了让 flume 感知不到 hdfs 的块复制,这样滚动方式配置(比如时间间隔、文件大小、events 数量等)才不会受影响。
假如 hdfs 的副本为 3.那么配置的滚动时间为 10 秒,那么在第二秒的时候,flume 检测到 hdfs 在复制块,那么这时候 flume 就会滚动,这样导致 flume 的滚动方式受到影响。所以通常 hdfs.minBlockReplicas 配置为 1,就检测不到副本的复制了。但是 hdfs 的副本还是 3。
3. 数据内容样例
58.215.204.118 - - [18/Sep/2018:06:51:35 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
复制代码
字段解析:
1、访客 ip 地址: 58.215.204.118
2、访客用户信息: - -
3、请求时间:[18/Sep/2018:06:51:35 +0000]
4、请求方式:GET
5、请求的 url:/wp-includes/js/jquery/jquery.js?ver=1.10.2
6、请求所用协议:HTTP/1.1
7、响应码:304
8、返回的数据流量:0
9、访客的来源 url:http://blog.fens.me/nodejs-socketio-chat/
10、访客所用浏览器:Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0
模块开发----数据预处理及点击流模型
1. 主要目的
过滤“不合规”数据,清洗无意义的数据
格式转换和规整
根据后续的统计需求,过滤分离出各种不同主题(不同栏目 path)的基础数据。
2. 实现方式
pom 文件
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
复制代码
使用 MapReduce 程序对数据进行预处理:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.util.HashSet;
/*
主要对原始数据进行预处理,过滤出真实的请求,转换日期格式,对当前数据标记是否有效invalid和valid
*/
public class WeblogParseProcess {
static class WeblogParseMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
//静态资源
HashSet<String> pages = new HashSet<String>();
Text text = new Text();
//模仿加载外部文件,通过setup方法
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//初始化一些资源共map使用
pages.add("/about");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取数据,切分出需要的字段,
final String line = value.toString();
final WebLogBean webLogBean = WebLogParser.parser(line);
if (null != webLogBean) {
//过滤掉静态资源的请求
WebLogParser.filtStaticResource(webLogBean, pages);
//输出即可
text.set(webLogBean.toString());
context.write(text, NullWritable.get());
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf);
//设置读取的jar包路径
job.setJarByClass(WeblogParseProcess.class);
//设置mapper属性
job.setMapperClass(WeblogParseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("D:\\access.log.2021031411.dat"));
//设置reduce属性
job.setNumReduceTasks(0);
TextOutputFormat.setOutputPath(job, new Path("D:\\out"));
final boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
复制代码
预处理过程中有些编程小技巧需要注意:
注意要实现 Hadoop 序列化机制---writable 接口。
3. 点击流模型数据
1. 点击流概念
点击流(Click Stream)是指用户在网站上持续访问的轨迹。注重用户浏览网站的整个流程。用户对网站的每次访问包含了一系列的点击动作行为,这些点击行为数据就构成了点击流数据(Click Stream Data),它代表了用户浏览网站的整个流程。
点击流和网站日志是两个不同的概念,点击流是从用户的角度出发,注重用户浏览网站的整个流程;而网站日志是面向整个站点,它包含了用户行为数据、服务器响应数据等众多日志信息,我们通过对网站日志的分析可以获得用户的点击流数据。
点击流模型完全是业务模型,相关概念由业务指定而来。由于大量的指标统计从点击流模型中更容易得出,所以在预处理阶段,可以使用 MapReduce 程序来生成点击流模型的数据。
在点击流模型中,存在着两种模型数据:PageViews、Visits。
2. 点击流模型 pageviews
Pageviews 模型数据专注于用户每次会话(session)的识别,以及每次 session 内访问了几步和每一步的停留时间。
在网站分析中,通常把前后两条访问记录时间差在 30 分钟以内算成一次会话。如果超过 30 分钟,则把下次访问算成新的会话开始。
大致步骤如下:
在所有访问日志中找出该用户的所有访问记录
把该用户所有访问记录按照时间正序排序
计算前后两条记录时间差是否为 30 分钟
如果小于 30 分钟,则是同一会话 session 的延续
如果大于 30 分钟,则是下一会话 session 的开始
用前后两条记录时间差算出上一步停留时间
最后一步和只有一步的 业务默认指定页面停留时间 60s
部分代码如下:
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.UUID;
/*
把清洗后的数据转为pageview模型,每条数据打上session标识,并且计算出停留的时间
*/
public class PageView {
static class PageViewMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {
final WebLogBean v = new WebLogBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取清洗规整后的数据
final String[] fields = value.toString().split("\001");
//封装成weblogbean对象传入reduce,以ip地址为key
v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
//以ip地址作为key输出
if (v.isValid()) {
k.set(v.getRemote_addr());
context.write(k, v);
}
}
}
static class PageViewReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
//按照时间排序升序;计算出每个记录的session信息,计算每个请求的停留时间,并打上是这次会话中的哪一步
final Text v = new Text();
@Override
protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
final ArrayList<WebLogBean> webLogBeans = new ArrayList<>();
//key:ip地址 values:相同ip的所有请求
for (WebLogBean webLogBean : values) {
final WebLogBean bean = new WebLogBean();
try {
BeanUtils.copyProperties(bean, webLogBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
webLogBeans.add(bean);
}
//对获取到的记录按照时间字段进行排序
Collections.sort(webLogBeans, new Comparator<WebLogBean>() {
@Override
public int compare(WebLogBean o1, WebLogBean o2) {
return o1.getTime_local().compareTo(o2.getTime_local());
}
});
//排序完成后,计算sessionid以及停留时间还有当前所属步长
//先准备sessionid
String session = UUID.randomUUID().toString();
int step = 1;
for (int i = 0; i < webLogBeans.size(); i++) {
final WebLogBean bean = webLogBeans.get(i);
//第一次获取的这条数据的session应该是多少?步长是多少?停留时长是多少?第一次无法计算时长,所以
//跳过这次循环进入下一次循环
if (webLogBeans.size() == 1) {
//输出数据然后跳出循环即可
v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" +
bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +
"\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +
bean.getBody_bytes_sent() + "\001"
+ bean.getStatus());
context.write(NullWritable.get(), v);
}
if (i == 0) {
continue;//不妥,如果只由一次记录也要输出
}
//进入非第一条的判断
//比较两条请求信息的时间是否大于30分钟
//首先获取到本次循环信息对应的时间
final String time_local = bean.getTime_local();
//获取上一次时间
final WebLogBean webLogBean = webLogBeans.get(i - 1);
final String time_local1 = webLogBean.getTime_local();
long diff = 0;
try {
diff = DateUtils.timeDiff(time_local1, time_local);
} catch (ParseException e) {
}
//判断时间差值
if (diff > 30 * 60 * 1000) {
//大于30分钟则换新的session信息
//现在输出第一条数据
v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() + "\001"
+ webLogBean.getRequest() + "\001" + step + "\001" + (60) + "\001" + webLogBean.getHttp_referer() + "\001" +
webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001"
+ webLogBean.getStatus());
context.write(NullWritable.get(), v);
//session重新生成
session = UUID.randomUUID().toString();
//重置step
step = 1;
} else {
//说明和上一条是同一个session,也是输出上一条信息
v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() +
"\001" + webLogBean.getRequest() + "\001" + step + "\001" + (diff / 1000) + "\001" + webLogBean.getHttp_referer() + "\001" +
webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001"
+ webLogBean.getStatus());
context.write(NullWritable.get(), v);
//步长进行累加
step++;
}
//如果此次是遍历的最后一条需要输出本条数据
if (i == webLogBeans.size() - 1) {
v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" +
bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +
"\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +
bean.getBody_bytes_sent() + "\001"
+ bean.getStatus());
//重新生成sessionid,好像不需要重新生成session信息??
context.write(NullWritable.get(), v);
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf);
//设置读取的jar包路径
job.setJarByClass(PageView.class);
//设置mapper属性
job.setMapperClass(PageViewMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WebLogBean.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("D:\\out"));
//设置reduce属性
job.setReducerClass(PageViewReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(job, new Path("D:\\pageview_out"));
final boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
复制代码
3. 点击流模型 visit
Visit 模型专注于每次会话 session 内起始、结束的访问情况信息。比如用户在某一个会话 session 内,进入会话的起始页面和起始时间,会话结束是从哪个页面离开的,离开时间,本次 session 总共访问了几个页面等信息。
大致步骤如下:
在 pageviews 模型上进行梳理
在每一次回收 session 内所有访问记录按照时间正序排序
第一天的时间页面就是起始时间页面
业务指定最后一条记录的时间页面作为离开时间和离开页面
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
public class ClickVisits {
static class ClickVistsMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {
PageViewsBean pvBean = new PageViewsBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] fields = value.toString().split("\001");
final int step = Integer.parseInt(fields[5]);
pvBean.set(fields[0], fields[1], fields[2], fields[3], fields[4], step, fields[6], fields[7], fields[8], fields[9]);
k.set(pvBean.getSession());
context.write(k, pvBean);
}
}
//根据step或者时间排序,找出本次session的起始,和中支时间和页面
static class ClickVisitsReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
@Override
protected void reduce(Text key, Iterable<PageViewsBean> values, Context context) throws IOException, InterruptedException {
final ArrayList<PageViewsBean> pageViewsBeans = new ArrayList<>();
for (PageViewsBean bean : values) {
PageViewsBean pageViewsBean = new PageViewsBean();
try {
BeanUtils.copyProperties(pageViewsBean, bean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
pageViewsBeans.add(pageViewsBean);
}
//按照步长进行排序
Collections.sort(pageViewsBeans, new Comparator<PageViewsBean>() {
@Override
public int compare(PageViewsBean o1, PageViewsBean o2) {
return o1.getStep() > o2.getStep() ? 1 : -1;
}
});
//准备输出
final VisitBean vBean = new VisitBean();
vBean.setSession(pageViewsBeans.get(0).getSession());
vBean.setInPage(pageViewsBeans.get(0).getRequest());
vBean.setInTime(pageViewsBeans.get(0).getTimestr());
vBean.setOutPage(pageViewsBeans.get(pageViewsBeans.size() - 1).getRequest());
vBean.setOutTime(pageViewsBeans.get(pageViewsBeans.size() - 1).getTimestr());
vBean.setReferal(pageViewsBeans.get(0).getReferal());
vBean.setPageVisits(pageViewsBeans.size());
vBean.setRemote_addr(pageViewsBeans.get(0).getRemote_addr());
context.write(NullWritable.get(), vBean);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf);
//设置读取的jar包路径
job.setJarByClass(ClickVisits.class);
//设置mapper属性
job.setMapperClass(ClickVistsMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PageViewsBean.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
//设置reduce属性
job.setReducerClass(ClickVisitsReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(VisitBean.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
final boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
复制代码
评论