终于到了最激动人心的章节了,系统实现,本节就要开始实现智能课堂教学评价系统了
废话少说,下面直接开始
系统实现主要分为五部分,每一部分都介绍了所实现的的功能或解决的问题,因篇幅有限,有些比较简单的地方没有详细的进行说明,对系统的使用到的核心代码进行了部分展示。
1. 开发环境
开发工具:IntelliJ IDEA ,MobaXterm
所需环境:因本系统为分布式系统,所以至少需要三台服务器,因实际情况,所以采用使用三台虚拟机来搭建,三台机器分别命名为 node01、node02、node03,
2 业务系统模块
业务系统基于 eduSohu 开源框架的二次开发,系统分为学生端和教师端,采用统一登录,根据角色权限进行分类。
教师端登录成功进入首页如图 4-1 所示:
图 4-1 教师登录首页
当教师点击教学页面之后界面如图 4-2 所示:
图 4-2 教学界面
教学部分包含课程管理、班级管理、分类标签、笔记管理、问答管理、话题管理、反馈管理、题库管理、试卷管理等几个模块。
题库管理界面如图 4-3 所示:
图 4-3 题库管理
选择管理题库界面如图 4-4 所示:
图 4-4 题目管理
点击创建试卷界面如图 4-5 所示:
图 4-5 试卷管理
学生点击学习之后界面如图 4-6 所示:
图 4-6 课程学习
学生对教师进行评价界面如图 4-7 所示:
图 4-7 学生评价
3 数据采集模块
数据采集分为两部分进行,每一部分所采集数据是完全不同的,处理的方式也是不同的,所以需要用不同的工具进行操作,采集非结构化的数据采用 Flume 进行,采集结构化的数据使用 Sqoop。
3.1 Flume 采集及数据预处理
Flume 采集系统的搭建相对简单:
1、在 node02 节点上部署 agent 节点,修改 Flume 的配置文件。
2、启动 node02 节点上的 agent,将采集到的日志数据汇聚到 Hadoop 的 HDFS 中。
对于业务系统中 nginx 日志生成及 js 埋点的日志数据,如果通过 flume1.6 采集,无论是 Spooling Directory Source 还是 Exec Source 都不能满足动态实时收集的需求,在 flume1.7 版本之后,提供了一个很强大可靠的 TaildirSource,使用这个 source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。
核心配置如下:
1 a1.sources = r1
2 a1.sources.r1.type = TAILDIR
3 a1.sources.r1.channels = c1
4 a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
5 a1.sources.r1.filegroups = f1 f2
6 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
7 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
复制代码
Filegroups:指定 filegroups,即文件的组可以有多个,这里指定两个 f1 和 f2。
positionFile:配置检查点文件的路径,检查点文件会以 json 格式保存已经 tail 文件的位置,解决了断点不能续传的缺陷。
filegroups.<filegroupName>:配置每个 filegroup 的文件绝对路径,文件名可以用正则表达式匹配。
通过以上配置,就可以监控文件内容的增加和文件的增加。产生和所配置的文件名正则表达式不匹配的文件,则不会被 tail。
采集数据之后就要进行数据预处理,主要就是过滤“不合规”数据,清洗无意义的数据,格式转换和规整。整个流程如图 4-8 所示:
3.2 采集日志模型 Pageviews
Pageviews 模型数据专注于用户每次会话(session)的识别,以及每次 session 内访问了几步和每一步的停留时间。
在系统分析中,把前后两条访问记录时间差在 30 分钟以内算成一次会话。如果超过 30 分钟,则把下次访问算成新的会话开始。
Pageviews 模型部分核心代码如下:
把清洗后的数据转为 pageview 模型,每条数据打上 session 标识,并且计算出停留的时间。
8 public class PageView {
9 static class PageViewMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {
10 final WebLogBean v = new WebLogBean();
11 Text k = new Text();
12 @Override
13 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
14 //读取清洗规整后的数据
15 final String[] fields = value.toString().split("\001");
16 //封装成weblogbean对象传入reduce,以ip地址为key
17 v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
18 //以ip地址作为key输出
19 if (v.isValid()) {
20 k.set(v.getRemote_addr());
21 context.write(k, v);
22 }
23 }
24 }
25 static class PageViewReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
按照时间排序升序,计算出每个记录的session信息,计算每个请求的停留时间,并打上是这次会话中的哪一步。
26 final Text v = new Text();
27 @Override
28 protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
29 final ArrayList<WebLogBean> webLogBeans = new ArrayList<>();
30 //key:ip地址 values:相同ip的所有请求
31 for (WebLogBean webLogBean : values) {
32 final WebLogBean bean = new WebLogBean();
33 try {
34 BeanUtils.copyProperties(bean, webLogBean);
35 } catch (IllegalAccessException e) {
36 e.printStackTrace();
37 } catch (InvocationTargetException e) {
38 e.printStackTrace();
39 }
40 webLogBeans.add(bean);
41 }
42 //对获取到的记录按照时间字段进行排序
43 Collections.sort(webLogBeans, new Comparator<WebLogBean>() {
44 @Override
45 public int compare(WebLogBean o1, WebLogBean o2) {
46 return o1.getTime_local().compareTo(o2.getTime_local());
47 }
48 });
复制代码
排序完成后,计算 sessionid 以及停留时间还有当前所属步长,判断两次时长是否大于 30 分钟。
//先准备sessionid
50 String session = UUID.randomUUID().toString();
51 int step = 1;
52 for (int i = 0; i < webLogBeans.size(); i++) {
53 final WebLogBean bean = webLogBeans.get(i);
54 //第一次获取的这条数据的session无法计算时长,所以
55 //跳过这次循环进入下一次循环
56 if (webLogBeans.size() == 1) {
57 //输出数据然后跳出循环即可
58 v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" +
59 bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +
60 "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +
61 bean.getBody_bytes_sent() + "\001"
62 + bean.getStatus());
63 context.write(NullWritable.get(), v);
64 }
65 if (i == 0) {
66 continue;//如果只由一次记录也要输出
67 }
68 //进入非第一条的判断
69 //比较两条请求信息的时间是否大于30分钟
70 //首先获取到本次循环信息对应的时间
71 final String time_local = bean.getTime_local();
72 //获取上一次时间
73 final WebLogBean webLogBean = webLogBeans.get(i - 1);
74 final String time_local1 = webLogBean.getTime_local();
75 long diff = 0;
76 try {
77 diff = DateUtils.timeDiff(time_local1, time_local);
78 } catch (ParseException e) {
79 }
复制代码
判断时间差值:
if (diff > 30 * 60 * 1000) {
81 //大于30分钟则换新的session信息
82 //现在输出第一条数据
83 v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() + "\001"
84 + webLogBean.getRequest() + "\001" + step + "\001" + (60) + "\001" + webLogBean.getHttp_referer() + "\001" +
85 webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001"
86 + webLogBean.getStatus());
87 context.write(NullWritable.get(), v);
88 //session重新生成
89 session = UUID.randomUUID().toString();
90 //重置step
91 step = 1;
92 } else {
93 //说明和上一条是同一个session,也是输出上一条信息
94 v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() +
95 "\001" + webLogBean.getRequest() + "\001" + step + "\001" + (diff / 1000) + "\001" + webLogBean.getHttp_referer() + "\001" +
96 webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001"
97 + webLogBean.getStatus());
98 context.write(NullWritable.get(), v);
99 //步长进行累加
100 step++;
101 }
102 //如果此次是遍历的最后一条需要输出本条数据
103 if (i == webLogBeans.size() - 1) {
104 v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" +
105 bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +
106 "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +
107 bean.getBody_bytes_sent() + "\001"
108 + bean.getStatus());
109 //重新生成sessionid
110 context.write(NullWritable.get(), v);
111 }
112 }
113 }
复制代码
3.3 采集日志模型 Visit
Visit 模型专注于每次会话 session 内起始、结束的访问情况信息。比如教师或学生在某一个会话 session 内,进入会话的起始页面和起始时间,会话结束是从哪个页面离开的,离开时间,本次 session 总共访问了几个页面等信息。
Visit 模型的部分代码如下:
114 //根据step或者时间排序,找出本次session的起始,和终止时间和页面
115 static class ClickVisitsReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
116 @Override
117 protected void reduce(Text key, Iterable<PageViewsBean> values, Context context) throws IOException, InterruptedException {
118 final ArrayList<PageViewsBean> pageViewsBeans = new ArrayList<>();
119 for (PageViewsBean bean : values) {
120 PageViewsBean pageViewsBean = new PageViewsBean();
121 try {
122 BeanUtils.copyProperties(pageViewsBean, bean);
123 } catch (IllegalAccessException e) {
124 e.printStackTrace();
125 } catch (InvocationTargetException e) {
126 e.printStackTrace();
127 }
128 pageViewsBeans.add(pageViewsBean);
129 }
130 //按照步长进行排序
131 Collections.sort(pageViewsBeans, new Comparator<PageViewsBean>() {
132 @Override
133 public int compare(PageViewsBean o1, PageViewsBean o2) {
134 return o1.getStep() > o2.getStep() ? 1 : -1;
135 }
136 });
137 }
复制代码
3.4 Sqoop 采集结构化数据
Sqoop 工作机制是将导入或导出命令翻译成 mapreduce 程序来实现。在翻译出的 mapreduce 中主要是对 inputformat 和 outputformat 进行定制。
Sqoop 的安装比价简单,安装完成之后出现如图 4-9 所示的目录结构即安装完成。
该系统中 sqoop 对接 mysql 中数据,因为目前业务系统中并无数据,所以直接采用 sqoop 的增量导入的方式,这里以 course 课程表为例进行导入,其他表的导入方式类似。
导入数据使用 Lastmodified 模式,此模式有两个参数,一个是 append(增量),一个是 merge-key(合并)。第一次导入时使用 append 模式,在 node01 执行以下命令:
138 bin/sqoop import \
139 --connect jdbc:mysql://192.168.109.1:3306/edusohu \
140 --username root --password 7DhYrfLtPSEJ8APP \
141 --table course --m 1 \
142 --target-dir /usr/hadoop/data \
143 --incremental append \
144 --check-column id \
145 --last-value 1205
复制代码
此后导入时需要看 mysql 中的表的变化,如果只是添加数据,则只需要使用 append 模式即可,当表中的数据有更新时,需要使用 merge-key 模式,在 node01 执行以下命令,把 id 字段作为 merge-key:
146 bin/sqoop import \
147 --connect jdbc:mysql://192.168.109.1:3306/edusohu \
148 --username root \
149 --password 7DhYrfLtPSEJ8APP \
150 --table course \
151 --target-dir /usr/hadoop/data \
152 --check-column last_mod \
153 --incremental lastmodified \
154 --last-value "2020-03-13 22:59:45" \
155 --m 1 \
156 --merge-key id
复制代码
至此,业务系统模块及数据采集模块已经开发完毕
评论