写点什么

数据采集之 Flume 采集及点击流模型详解

发布于: 2021 年 03 月 14 日
数据采集之Flume采集及点击流模型详解

在网站流量日志分析这种场景中,对数据采集部分的可靠性、容错能力要求通常不会非常严苛,需要注意的是结合语境明白是何种含义的数据采集。

对于数据从无到有的过程,可以使用 web 服务器自带的日志记录功能和自定义埋点 JavaScript 采集相结合的做法收集用户访问网站的行为数据。

对于数据需要做搬运的操作,可以使用 flume 定制相关的采集方案满足数据采集传输。

 Flume 采集数据

1. Taildir Source 组件

Flume 采集系统的搭建相对简单:

1、在服务器上部署 agent 节点,修改配置文件

2、启动 agent 节点,将采集到的数据汇聚到指定的 HDFS 目录中

针对 nginx 日志生成场景,如果通过 flume(1.6)收集,无论是 Spooling Directory Source 和 Exec Source 均不能满足动态实时收集的需求,在 flume1.7 版本之后,提供了一个非常好用的 TaildirSource,使用这个 source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

核心配置如下:

a1.sources = r1a1.sources.r1.type = TAILDIRa1.sources.r1.channels = c1a1.sources.r1.positionFile = /var/log/flume/taildir_position.jsona1.sources.r1.filegroups = f1 f2a1.sources.r1.filegroups.f1 = /var/log/test1/example.loga1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
复制代码

filegroups:指定 filegroups,可以有多个,以空格分隔;(TailSource 可以同时监控 tail 多个目录中的文件)

positionFile:配置检查点文件的路径,检查点文件会以 json 格式保存已经 tail 文件的位置,解决了断点不能续传的缺陷。

filegroups.<filegroupName>:配置每个 filegroup 的文件绝对路径,文件名可以用正则表达式匹配

通过以上配置,就可以监控文件内容的增加和文件的增加。产生和所配置的文件名正则表达式不匹配的文件,则不会被 tail。

2. HDFS sink 文件滚动属性

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1
# Describe/configure the sourcea1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /weblog/flume/taildir_position.jsona1.sources.r1.filegroups = f1 f2a1.sources.r1.filegroups.f1 = /weblog/test1/example.loga1.sources.r1.filegroups.f2 = /weblog/test2/.*log.*
# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.channel = c1a1.sinks.k1.hdfs.path = /itheima/weblog/%y-%m-%d/%H-%M/a1.sinks.k1.hdfs.filePrefix = itheima-a1.sinks.k1.hdfs.fileSuffix = .data1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.roundUnit = minutea1.sinks.k1.hdfs.rollInterval =0a1.sinks.k1.hdfs.rollSize = 134217728a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.batchSize = 100a1.sinks.k1.hdfs.useLocalTimeStamp = true#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
复制代码


  • 基于文件闲置时间策略


配置项:hdfs.idleTimeout


默认值:0


说明:默认启动这个功能


这种策略很简单,如果文件在 hdfs.idleTimeout 秒的时间里都是闲置的,没有任何数据写入,那么当前文件关闭,滚动到下一个文件。


  • 基于 hdfs 文件副本数


配置项:hdfs.minBlockReplicas


默认值:和 hdfs 的副本数一致


原理:


hdfs.minBlockReplicas 是为了让 flume 感知不到 hdfs 的块复制,这样滚动方式配置(比如时间间隔、文件大小、events 数量等)才不会受影响。

假如 hdfs 的副本为 3.那么配置的滚动时间为 10 秒,那么在第二秒的时候,flume 检测到 hdfs 在复制块,那么这时候 flume 就会滚动,这样导致 flume 的滚动方式受到影响。所以通常 hdfs.minBlockReplicas 配置为 1,就检测不到副本的复制了。但是 hdfs 的副本还是 3。


3. 数据内容样例


58.215.204.118 - - [18/Sep/2018:06:51:35 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
复制代码


字段解析:

1、访客 ip 地址:   58.215.204.118

2、访客用户信息:  - -

3、请求时间:[18/Sep/2018:06:51:35 +0000]

4、请求方式:GET

5、请求的 url:/wp-includes/js/jquery/jquery.js?ver=1.10.2

6、请求所用协议:HTTP/1.1

7、响应码:304

8、返回的数据流量:0

9、访客的来源 url:http://blog.fens.me/nodejs-socketio-chat/

10、访客所用浏览器:Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0

模块开发----数据预处理及点击流模型

1. 主要目的

过滤“不合规”数据,清洗无意义的数据

格式转换和规整

根据后续的统计需求,过滤分离出各种不同主题(不同栏目 path)的基础数据。



2. 实现方式


pom 文件

<repositories>    <repository>        <id>cloudera</id>        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>    </repository></repositories><dependencies>    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh5.14.0</version> </dependency>
</dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version>
</plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin>
</plugins>

</build>
复制代码


使用 MapReduce 程序对数据进行预处理:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;import java.util.HashSet;

/*主要对原始数据进行预处理,过滤出真实的请求,转换日期格式,对当前数据标记是否有效invalid和valid */public class WeblogParseProcess {
static class WeblogParseMapper extends Mapper<LongWritable, Text, Text, NullWritable> { //静态资源 HashSet<String> pages = new HashSet<String>(); Text text = new Text();
//模仿加载外部文件,通过setup方法 @Override protected void setup(Context context) throws IOException, InterruptedException { //初始化一些资源共map使用 pages.add("/about"); pages.add("/black-ip-list/"); pages.add("/cassandra-clustor/"); pages.add("/finance-rhive-repurchase/"); pages.add("/hadoop-family-roadmap/"); pages.add("/hadoop-hive-intro/"); pages.add("/hadoop-zookeeper-intro/"); pages.add("/hadoop-mahout-roadmap/"); }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取数据,切分出需要的字段, final String line = value.toString(); final WebLogBean webLogBean = WebLogParser.parser(line); if (null != webLogBean) { //过滤掉静态资源的请求 WebLogParser.filtStaticResource(webLogBean, pages); //输出即可 text.set(webLogBean.toString()); context.write(text, NullWritable.get()); }
} }
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final Configuration conf = new Configuration(); final Job job = Job.getInstance(conf); //设置读取的jar包路径 job.setJarByClass(WeblogParseProcess.class); //设置mapper属性 job.setMapperClass(WeblogParseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("D:\\access.log.2021031411.dat")); //设置reduce属性 job.setNumReduceTasks(0); TextOutputFormat.setOutputPath(job, new Path("D:\\out")); final boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); }}
复制代码


预处理过程中有些编程小技巧需要注意:


  • 如果涉及多属性值数据传递通常可建立与之对应的 javabean 携带数据传递

注意要实现 Hadoop 序列化机制---writable 接口。

  • 有意识的把 javabean 中 toString 方法重写,以\001 进行分割,方便后续数据入 hive 映射方便。

  • 如涉及不符合本次分析的脏数据,往往采用逻辑删除,也就是自定义标记位,比如使用 1 或者 0 来表示数据是否有效,而不是直接物理删除。


3. 点击流模型数据

1. 点击流概念

点击流(Click Stream)是指用户在网站上持续访问的轨迹。注重用户浏览网站的整个流程。用户对网站的每次访问包含了一系列的点击动作行为,这些点击行为数据就构成了点击流数据(Click Stream Data),它代表了用户浏览网站的整个流程。

点击流和网站日志是两个不同的概念,点击流是从用户的角度出发,注重用户浏览网站的整个流程;而网站日志是面向整个站点,它包含了用户行为数据、服务器响应数据等众多日志信息,我们通过对网站日志的分析可以获得用户的点击流数据。


点击流模型完全是业务模型,相关概念由业务指定而来。由于大量的指标统计从点击流模型中更容易得出,所以在预处理阶段,可以使用 MapReduce 程序来生成点击流模型的数据。

在点击流模型中,存在着两种模型数据:PageViews、Visits

2. 点击流模型 pageviews

Pageviews 模型数据专注于用户每次会话(session)的识别,以及每次 session 内访问了几步和每一步的停留时间。

在网站分析中,通常把前后两条访问记录时间差在 30 分钟以内算成一次会话。如果超过 30 分钟,则把下次访问算成新的会话开始。

大致步骤如下:

  1. 在所有访问日志中找出该用户的所有访问记录

  2. 把该用户所有访问记录按照时间正序排序

  3. 计算前后两条记录时间差是否为 30 分钟

  4. 如果小于 30 分钟,则是同一会话 session 的延续

  5. 如果大于 30 分钟,则是下一会话 session 的开始

  6. 用前后两条记录时间差算出上一步停留时间

  7. 最后一步和只有一步的  业务默认指定页面停留时间 60s


部分代码如下:

import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.text.ParseException;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.UUID;
/*把清洗后的数据转为pageview模型,每条数据打上session标识,并且计算出停留的时间 */public class PageView { static class PageViewMapper extends Mapper<LongWritable, Text, Text, WebLogBean> { final WebLogBean v = new WebLogBean(); Text k = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取清洗规整后的数据 final String[] fields = value.toString().split("\001"); //封装成weblogbean对象传入reduce,以ip地址为key
v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
//以ip地址作为key输出 if (v.isValid()) { k.set(v.getRemote_addr()); context.write(k, v); } }

}

static class PageViewReducer extends Reducer<Text, WebLogBean, NullWritable, Text> { //按照时间排序升序;计算出每个记录的session信息,计算每个请求的停留时间,并打上是这次会话中的哪一步 final Text v = new Text();
@Override protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
final ArrayList<WebLogBean> webLogBeans = new ArrayList<>(); //key:ip地址 values:相同ip的所有请求 for (WebLogBean webLogBean : values) { final WebLogBean bean = new WebLogBean(); try { BeanUtils.copyProperties(bean, webLogBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } webLogBeans.add(bean); } //对获取到的记录按照时间字段进行排序 Collections.sort(webLogBeans, new Comparator<WebLogBean>() { @Override public int compare(WebLogBean o1, WebLogBean o2) { return o1.getTime_local().compareTo(o2.getTime_local()); } }); //排序完成后,计算sessionid以及停留时间还有当前所属步长 //先准备sessionid String session = UUID.randomUUID().toString(); int step = 1; for (int i = 0; i < webLogBeans.size(); i++) { final WebLogBean bean = webLogBeans.get(i); //第一次获取的这条数据的session应该是多少?步长是多少?停留时长是多少?第一次无法计算时长,所以 //跳过这次循环进入下一次循环 if (webLogBeans.size() == 1) { //输出数据然后跳出循环即可
v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), v); } if (i == 0) { continue;//不妥,如果只由一次记录也要输出 } //进入非第一条的判断 //比较两条请求信息的时间是否大于30分钟 //首先获取到本次循环信息对应的时间 final String time_local = bean.getTime_local(); //获取上一次时间 final WebLogBean webLogBean = webLogBeans.get(i - 1); final String time_local1 = webLogBean.getTime_local(); long diff = 0; try { diff = DateUtils.timeDiff(time_local1, time_local); } catch (ParseException e) {
} //判断时间差值 if (diff > 30 * 60 * 1000) { //大于30分钟则换新的session信息 //现在输出第一条数据 v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() + "\001" + webLogBean.getRequest() + "\001" + step + "\001" + (60) + "\001" + webLogBean.getHttp_referer() + "\001" + webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001" + webLogBean.getStatus()); context.write(NullWritable.get(), v); //session重新生成 session = UUID.randomUUID().toString(); //重置step step = 1; } else { //说明和上一条是同一个session,也是输出上一条信息 v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() + "\001" + webLogBean.getRequest() + "\001" + step + "\001" + (diff / 1000) + "\001" + webLogBean.getHttp_referer() + "\001" + webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001" + webLogBean.getStatus()); context.write(NullWritable.get(), v); //步长进行累加 step++;
} //如果此次是遍历的最后一条需要输出本条数据 if (i == webLogBeans.size() - 1) { v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); //重新生成sessionid,好像不需要重新生成session信息?? context.write(NullWritable.get(), v); } } } }
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final Configuration conf = new Configuration(); final Job job = Job.getInstance(conf); //设置读取的jar包路径 job.setJarByClass(PageView.class); //设置mapper属性 job.setMapperClass(PageViewMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(WebLogBean.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("D:\\out")); //设置reduce属性 job.setReducerClass(PageViewReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); TextOutputFormat.setOutputPath(job, new Path("D:\\pageview_out")); final boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1); }}
复制代码


3. 点击流模型 visit

Visit 模型专注于每次会话 session 内起始、结束的访问情况信息。比如用户在某一个会话 session 内,进入会话的起始页面和起始时间,会话结束是从哪个页面离开的,离开时间,本次 session 总共访问了几个页面等信息。

大致步骤如下:

  1. 在 pageviews 模型上进行梳理

  2. 在每一次回收 session 内所有访问记录按照时间正序排序

  3. 第一天的时间页面就是起始时间页面

  4. 业务指定最后一条记录的时间页面作为离开时间和离开页面


import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;
public class ClickVisits { static class ClickVistsMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> { PageViewsBean pvBean = new PageViewsBean(); Text k = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { final String[] fields = value.toString().split("\001"); final int step = Integer.parseInt(fields[5]); pvBean.set(fields[0], fields[1], fields[2], fields[3], fields[4], step, fields[6], fields[7], fields[8], fields[9]); k.set(pvBean.getSession()); context.write(k, pvBean); } }
//根据step或者时间排序,找出本次session的起始,和中支时间和页面 static class ClickVisitsReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> { @Override protected void reduce(Text key, Iterable<PageViewsBean> values, Context context) throws IOException, InterruptedException { final ArrayList<PageViewsBean> pageViewsBeans = new ArrayList<>(); for (PageViewsBean bean : values) { PageViewsBean pageViewsBean = new PageViewsBean(); try { BeanUtils.copyProperties(pageViewsBean, bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } pageViewsBeans.add(pageViewsBean); } //按照步长进行排序 Collections.sort(pageViewsBeans, new Comparator<PageViewsBean>() { @Override public int compare(PageViewsBean o1, PageViewsBean o2) { return o1.getStep() > o2.getStep() ? 1 : -1; } }); //准备输出 final VisitBean vBean = new VisitBean(); vBean.setSession(pageViewsBeans.get(0).getSession()); vBean.setInPage(pageViewsBeans.get(0).getRequest()); vBean.setInTime(pageViewsBeans.get(0).getTimestr()); vBean.setOutPage(pageViewsBeans.get(pageViewsBeans.size() - 1).getRequest()); vBean.setOutTime(pageViewsBeans.get(pageViewsBeans.size() - 1).getTimestr()); vBean.setReferal(pageViewsBeans.get(0).getReferal()); vBean.setPageVisits(pageViewsBeans.size()); vBean.setRemote_addr(pageViewsBeans.get(0).getRemote_addr()); context.write(NullWritable.get(), vBean); } }
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final Configuration conf = new Configuration(); final Job job = Job.getInstance(conf); //设置读取的jar包路径 job.setJarByClass(ClickVisits.class); //设置mapper属性 job.setMapperClass(ClickVistsMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PageViewsBean.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path(args[0]));
//设置reduce属性 job.setReducerClass(ClickVisitsReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(VisitBean.class); TextOutputFormat.setOutputPath(job, new Path(args[1]));
final boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1); }}
复制代码


发布于: 2021 年 03 月 14 日阅读数: 18
用户头像

还未添加个人签名 2020.11.10 加入

专注于大数据技术

评论

发布
暂无评论
数据采集之Flume采集及点击流模型详解