终于到了最激动人心的章节了,系统实现,本节就要开始实现智能课堂教学评价系统了
废话少说,下面直接开始
系统实现主要分为五部分,每一部分都介绍了所实现的的功能或解决的问题,因篇幅有限,有些比较简单的地方没有详细的进行说明,对系统的使用到的核心代码进行了部分展示。
1. 开发环境
开发工具:IntelliJ IDEA ,MobaXterm
所需环境:因本系统为分布式系统,所以至少需要三台服务器,因实际情况,所以采用使用三台虚拟机来搭建,三台机器分别命名为 node01、node02、node03,
2 业务系统模块
业务系统基于 eduSohu 开源框架的二次开发,系统分为学生端和教师端,采用统一登录,根据角色权限进行分类。
教师端登录成功进入首页如图 4-1 所示:
图 4-1 教师登录首页
当教师点击教学页面之后界面如图 4-2 所示:
图 4-2 教学界面
教学部分包含课程管理、班级管理、分类标签、笔记管理、问答管理、话题管理、反馈管理、题库管理、试卷管理等几个模块。
题库管理界面如图 4-3 所示:
图 4-3 题库管理
选择管理题库界面如图 4-4 所示:
图 4-4 题目管理
点击创建试卷界面如图 4-5 所示:
图 4-5 试卷管理
学生点击学习之后界面如图 4-6 所示:
图 4-6 课程学习
学生对教师进行评价界面如图 4-7 所示:
图 4-7 学生评价
3 数据采集模块
数据采集分为两部分进行,每一部分所采集数据是完全不同的,处理的方式也是不同的,所以需要用不同的工具进行操作,采集非结构化的数据采用 Flume 进行,采集结构化的数据使用 Sqoop。
3.1 Flume 采集及数据预处理
Flume 采集系统的搭建相对简单:
1、在 node02 节点上部署 agent 节点,修改 Flume 的配置文件。
2、启动 node02 节点上的 agent,将采集到的日志数据汇聚到 Hadoop 的 HDFS 中。
对于业务系统中 nginx 日志生成及 js 埋点的日志数据,如果通过 flume1.6 采集,无论是 Spooling Directory Source 还是 Exec Source 都不能满足动态实时收集的需求,在 flume1.7 版本之后,提供了一个很强大可靠的 TaildirSource,使用这个 source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。
核心配置如下:
1 a1.sources = r12 a1.sources.r1.type = TAILDIR3 a1.sources.r1.channels = c14 a1.sources.r1.positionFile = /var/log/flume/taildir_position.json5 a1.sources.r1.filegroups = f1 f26 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log7 a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
复制代码
Filegroups:指定 filegroups,即文件的组可以有多个,这里指定两个 f1 和 f2。
positionFile:配置检查点文件的路径,检查点文件会以 json 格式保存已经 tail 文件的位置,解决了断点不能续传的缺陷。
filegroups.<filegroupName>:配置每个 filegroup 的文件绝对路径,文件名可以用正则表达式匹配。
通过以上配置,就可以监控文件内容的增加和文件的增加。产生和所配置的文件名正则表达式不匹配的文件,则不会被 tail。
采集数据之后就要进行数据预处理,主要就是过滤“不合规”数据,清洗无意义的数据,格式转换和规整。整个流程如图 4-8 所示:
3.2 采集日志模型 Pageviews
Pageviews 模型数据专注于用户每次会话(session)的识别,以及每次 session 内访问了几步和每一步的停留时间。
在系统分析中,把前后两条访问记录时间差在 30 分钟以内算成一次会话。如果超过 30 分钟,则把下次访问算成新的会话开始。
Pageviews 模型部分核心代码如下:
把清洗后的数据转为 pageview 模型,每条数据打上 session 标识,并且计算出停留的时间。
8 public class PageView {9 static class PageViewMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {10 final WebLogBean v = new WebLogBean();11 Text k = new Text();12 @Override13 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {14 //读取清洗规整后的数据15 final String[] fields = value.toString().split("\001");16 //封装成weblogbean对象传入reduce,以ip地址为key17 v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);18 //以ip地址作为key输出19 if (v.isValid()) {20 k.set(v.getRemote_addr());21 context.write(k, v);22 }23 }24 }25 static class PageViewReducer extends Reducer<Text, WebLogBean, NullWritable, Text> { 按照时间排序升序,计算出每个记录的session信息,计算每个请求的停留时间,并打上是这次会话中的哪一步。26 final Text v = new Text();27 @Override28 protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {29 final ArrayList<WebLogBean> webLogBeans = new ArrayList<>();30 //key:ip地址 values:相同ip的所有请求31 for (WebLogBean webLogBean : values) {32 final WebLogBean bean = new WebLogBean();33 try {34 BeanUtils.copyProperties(bean, webLogBean);35 } catch (IllegalAccessException e) {36 e.printStackTrace();37 } catch (InvocationTargetException e) {38 e.printStackTrace();39 }40 webLogBeans.add(bean);41 }42 //对获取到的记录按照时间字段进行排序43 Collections.sort(webLogBeans, new Comparator<WebLogBean>() {44 @Override45 public int compare(WebLogBean o1, WebLogBean o2) {46 return o1.getTime_local().compareTo(o2.getTime_local());47 }48 });
复制代码
排序完成后,计算 sessionid 以及停留时间还有当前所属步长,判断两次时长是否大于 30 分钟。
//先准备sessionid50 String session = UUID.randomUUID().toString();51 int step = 1;52 for (int i = 0; i < webLogBeans.size(); i++) {53 final WebLogBean bean = webLogBeans.get(i);54 //第一次获取的这条数据的session无法计算时长,所以55 //跳过这次循环进入下一次循环56 if (webLogBeans.size() == 1) {57 //输出数据然后跳出循环即可58 v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" +59 bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +60 "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +61 bean.getBody_bytes_sent() + "\001"62 + bean.getStatus());63 context.write(NullWritable.get(), v);64 }65 if (i == 0) {66 continue;//如果只由一次记录也要输出67 }68 //进入非第一条的判断69 //比较两条请求信息的时间是否大于30分钟70 //首先获取到本次循环信息对应的时间71 final String time_local = bean.getTime_local();72 //获取上一次时间73 final WebLogBean webLogBean = webLogBeans.get(i - 1);74 final String time_local1 = webLogBean.getTime_local();75 long diff = 0;76 try {77 diff = DateUtils.timeDiff(time_local1, time_local);78 } catch (ParseException e) {79 }
复制代码
判断时间差值:
if (diff > 30 * 60 * 1000) {81 //大于30分钟则换新的session信息82 //现在输出第一条数据83 v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() + "\001"84 + webLogBean.getRequest() + "\001" + step + "\001" + (60) + "\001" + webLogBean.getHttp_referer() + "\001" +85 webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001"86 + webLogBean.getStatus());87 context.write(NullWritable.get(), v);88 //session重新生成89 session = UUID.randomUUID().toString();90 //重置step91 step = 1;92 } else {93 //说明和上一条是同一个session,也是输出上一条信息94 v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() +95 "\001" + webLogBean.getRequest() + "\001" + step + "\001" + (diff / 1000) + "\001" + webLogBean.getHttp_referer() + "\001" +96 webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001"97 + webLogBean.getStatus());98 context.write(NullWritable.get(), v);99 //步长进行累加100 step++;101 }102 //如果此次是遍历的最后一条需要输出本条数据103 if (i == webLogBeans.size() - 1) {104 v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" +105 bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +106 "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +107 bean.getBody_bytes_sent() + "\001"108 + bean.getStatus());109 //重新生成sessionid110 context.write(NullWritable.get(), v);111 }112 }113 }
复制代码
3.3 采集日志模型 Visit
Visit 模型专注于每次会话 session 内起始、结束的访问情况信息。比如教师或学生在某一个会话 session 内,进入会话的起始页面和起始时间,会话结束是从哪个页面离开的,离开时间,本次 session 总共访问了几个页面等信息。
Visit 模型的部分代码如下:
114 //根据step或者时间排序,找出本次session的起始,和终止时间和页面115 static class ClickVisitsReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {116 @Override117 protected void reduce(Text key, Iterable<PageViewsBean> values, Context context) throws IOException, InterruptedException {118 final ArrayList<PageViewsBean> pageViewsBeans = new ArrayList<>();119 for (PageViewsBean bean : values) {120 PageViewsBean pageViewsBean = new PageViewsBean();121 try {122 BeanUtils.copyProperties(pageViewsBean, bean);123 } catch (IllegalAccessException e) {124 e.printStackTrace();125 } catch (InvocationTargetException e) {126 e.printStackTrace();127 }128 pageViewsBeans.add(pageViewsBean);129 }130 //按照步长进行排序131 Collections.sort(pageViewsBeans, new Comparator<PageViewsBean>() {132 @Override133 public int compare(PageViewsBean o1, PageViewsBean o2) {134 return o1.getStep() > o2.getStep() ? 1 : -1;135 }136 });137 }
复制代码
3.4 Sqoop 采集结构化数据
Sqoop 工作机制是将导入或导出命令翻译成 mapreduce 程序来实现。在翻译出的 mapreduce 中主要是对 inputformat 和 outputformat 进行定制。
Sqoop 的安装比价简单,安装完成之后出现如图 4-9 所示的目录结构即安装完成。
该系统中 sqoop 对接 mysql 中数据,因为目前业务系统中并无数据,所以直接采用 sqoop 的增量导入的方式,这里以 course 课程表为例进行导入,其他表的导入方式类似。
导入数据使用 Lastmodified 模式,此模式有两个参数,一个是 append(增量),一个是 merge-key(合并)。第一次导入时使用 append 模式,在 node01 执行以下命令:
138 bin/sqoop import \139 --connect jdbc:mysql://192.168.109.1:3306/edusohu \140 --username root --password 7DhYrfLtPSEJ8APP \141 --table course --m 1 \142 --target-dir /usr/hadoop/data \143 --incremental append \144 --check-column id \145 --last-value 1205
复制代码
此后导入时需要看 mysql 中的表的变化,如果只是添加数据,则只需要使用 append 模式即可,当表中的数据有更新时,需要使用 merge-key 模式,在 node01 执行以下命令,把 id 字段作为 merge-key:
146 bin/sqoop import \147 --connect jdbc:mysql://192.168.109.1:3306/edusohu \148 --username root \149 --password 7DhYrfLtPSEJ8APP \150 --table course \151 --target-dir /usr/hadoop/data \152 --check-column last_mod \153 --incremental lastmodified \154 --last-value "2020-03-13 22:59:45" \155 --m 1 \156 --merge-key id
复制代码
至此,业务系统模块及数据采集模块已经开发完毕
评论