写点什么

基于 SparkMLlib 智能课堂教学评价系统 - 系统实现(四)

发布于: 2021 年 03 月 15 日
基于 SparkMLlib 智能课堂教学评价系统 - 系统实现(四)

终于到了最激动人心的章节了,系统实现,本节就要开始实现智能课堂教学评价系统了

废话少说,下面直接开始


系统实现主要分为五部分,每一部分都介绍了所实现的的功能或解决的问题,因篇幅有限,有些比较简单的地方没有详细的进行说明,对系统的使用到的核心代码进行了部分展示。

1. 开发环境

开发工具:IntelliJ  IDEA ,MobaXterm

所需环境:因本系统为分布式系统,所以至少需要三台服务器,因实际情况,所以采用使用三台虚拟机来搭建,三台机器分别命名为 node01、node02、node03,


2 业务系统模块

业务系统基于 eduSohu 开源框架的二次开发,系统分为学生端和教师端,采用统一登录,根据角色权限进行分类。

教师端登录成功进入首页如图 4-1 所示:


图 4-1 教师登录首页

当教师点击教学页面之后界面如图 4-2 所示:


图 4-2 教学界面

教学部分包含课程管理、班级管理、分类标签、笔记管理、问答管理、话题管理、反馈管理、题库管理、试卷管理等几个模块。

题库管理界面如图 4-3 所示:


图 4-3 题库管理

 

选择管理题库界面如图 4-4 所示:


图 4-4 题目管理

 

点击创建试卷界面如图 4-5 所示:


图 4-5 试卷管理

 

学生点击学习之后界面如图 4-6 所示:



图 4-6 课程学习

 

学生对教师进行评价界面如图 4-7 所示:



图 4-7 学生评价

3 数据采集模块

数据采集分为两部分进行,每一部分所采集数据是完全不同的,处理的方式也是不同的,所以需要用不同的工具进行操作,采集非结构化的数据采用 Flume 进行,采集结构化的数据使用 Sqoop。

3.1   Flume 采集及数据预处理   

Flume 采集系统的搭建相对简单:

1、在 node02 节点上部署 agent 节点,修改 Flume 的配置文件。

2、启动 node02 节点上的 agent,将采集到的日志数据汇聚到 Hadoop 的 HDFS 中。

对于业务系统中 nginx 日志生成及 js 埋点的日志数据,如果通过 flume1.6 采集,无论是 Spooling Directory Source 还是 Exec Source 都不能满足动态实时收集的需求,在 flume1.7 版本之后,提供了一个很强大可靠的 TaildirSource,使用这个 source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

核心配置如下:

1 	a1.sources = r12 	a1.sources.r1.type = TAILDIR3 	a1.sources.r1.channels = c14 	a1.sources.r1.positionFile = /var/log/flume/taildir_position.json5 	a1.sources.r1.filegroups = f1 f26 	a1.sources.r1.filegroups.f1 = /var/log/test1/example.log7 	a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
复制代码

Filegroups:指定 filegroups,即文件的组可以有多个,这里指定两个 f1 和 f2。

positionFile:配置检查点文件的路径,检查点文件会以 json 格式保存已经 tail 文件的位置,解决了断点不能续传的缺陷。

filegroups.<filegroupName>:配置每个 filegroup 的文件绝对路径,文件名可以用正则表达式匹配。

通过以上配置,就可以监控文件内容的增加和文件的增加。产生和所配置的文件名正则表达式不匹配的文件,则不会被 tail。

采集数据之后就要进行数据预处理,主要就是过滤“不合规”数据,清洗无意义的数据,格式转换和规整。整个流程如图 4-8 所示:

3.2   采集日志模型 Pageviews

Pageviews 模型数据专注于用户每次会话(session)的识别,以及每次 session 内访问了几步和每一步的停留时间。

在系统分析中,把前后两条访问记录时间差在 30 分钟以内算成一次会话。如果超过 30 分钟,则把下次访问算成新的会话开始。

Pageviews 模型部分核心代码如下:

把清洗后的数据转为 pageview 模型,每条数据打上 session 标识,并且计算出停留的时间。

8 	public class PageView {9 	    static class PageViewMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {10 	        final WebLogBean v = new WebLogBean();11 	        Text k = new Text();12 	        @Override13 	        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {14 	            //读取清洗规整后的数据15 	            final String[] fields = value.toString().split("\001");16 	            //封装成weblogbean对象传入reduce,以ip地址为key17 	            v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);18 	            //以ip地址作为key输出19 	            if (v.isValid()) {20 	                k.set(v.getRemote_addr());21 	                context.write(k, v);22 	            }23 	        }24 	    }25 	static class PageViewReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {   按照时间排序升序,计算出每个记录的session信息,计算每个请求的停留时间,并打上是这次会话中的哪一步。26 	     final Text v = new Text();27 	        @Override28 	        protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {29 	            final ArrayList<WebLogBean> webLogBeans = new ArrayList<>();30 	            //key:ip地址 values:相同ip的所有请求31 	            for (WebLogBean webLogBean : values) {32 	                final WebLogBean bean = new WebLogBean();33 	                try {34 	                    BeanUtils.copyProperties(bean, webLogBean);35 	                } catch (IllegalAccessException e) {36 	                    e.printStackTrace();37 	                } catch (InvocationTargetException e) {38 	                    e.printStackTrace();39 	                }40 	                webLogBeans.add(bean);41 	            }42 	            //对获取到的记录按照时间字段进行排序43 	            Collections.sort(webLogBeans, new Comparator<WebLogBean>() {44 	                @Override45 	                public int compare(WebLogBean o1, WebLogBean o2) {46 	                    return o1.getTime_local().compareTo(o2.getTime_local());47 	                }48 	            });
复制代码

排序完成后,计算 sessionid 以及停留时间还有当前所属步长,判断两次时长是否大于 30 分钟。

   //先准备sessionid50 	            String session = UUID.randomUUID().toString();51 	            int step = 1;52 	            for (int i = 0; i < webLogBeans.size(); i++) {53 	                final WebLogBean bean = webLogBeans.get(i);54 	                //第一次获取的这条数据的session无法计算时长,所以55 	                //跳过这次循环进入下一次循环56 	                if (webLogBeans.size() == 1) {57 	                    //输出数据然后跳出循环即可58 	                    v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" +59 	                            bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +60 	                            "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +61 	                            bean.getBody_bytes_sent() + "\001"62 	                            + bean.getStatus());63 	                                        context.write(NullWritable.get(), v);64 	                }65 	                if (i == 0) {66 	                    continue;//如果只由一次记录也要输出67 	                }68 	                //进入非第一条的判断69 	                //比较两条请求信息的时间是否大于30分钟70 	                //首先获取到本次循环信息对应的时间71 	                final String time_local = bean.getTime_local();72 	                //获取上一次时间73 	                final WebLogBean webLogBean = webLogBeans.get(i - 1);74 	                final String time_local1 = webLogBean.getTime_local();75 	                long diff = 0;76 	                try {77 	                    diff = DateUtils.timeDiff(time_local1, time_local);78 	                } catch (ParseException e) {79 	                }
复制代码

判断时间差值:

  if (diff > 30 * 60 * 1000) {81 	                    //大于30分钟则换新的session信息82 	                    //现在输出第一条数据83 	                    v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() + "\001"84 	                            + webLogBean.getRequest() + "\001" + step + "\001" + (60) + "\001" + webLogBean.getHttp_referer() + "\001" +85 	                            webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001"86 	                            + webLogBean.getStatus());87 	                    context.write(NullWritable.get(), v);88 	                    //session重新生成89 	                    session = UUID.randomUUID().toString();90 	                    //重置step91 	                    step = 1;92 	                } else {93 	                    //说明和上一条是同一个session,也是输出上一条信息94 	                    v.set(session + "\001" + key.toString() + "\001" + webLogBean.getRemote_user() + "\001" + webLogBean.getTime_local() +95 	                            "\001" + webLogBean.getRequest() + "\001" + step + "\001" + (diff / 1000) + "\001" + webLogBean.getHttp_referer() + "\001" +96 	                            webLogBean.getHttp_user_agent() + "\001" + webLogBean.getBody_bytes_sent() + "\001"97 	                            + webLogBean.getStatus());98 	                    context.write(NullWritable.get(), v);99 	                    //步长进行累加100 	                    step++;101 	                }102 	                //如果此次是遍历的最后一条需要输出本条数据103 	                if (i == webLogBeans.size() - 1) {104 	                    v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" +105 	                            bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) +106 	                            "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" +107 	                            bean.getBody_bytes_sent() + "\001"108 	                            + bean.getStatus());109 	                    //重新生成sessionid110 	                    context.write(NullWritable.get(), v);111 	                }112 	            }113 	        }
复制代码

3.3   采集日志模型 Visit

Visit 模型专注于每次会话 session 内起始、结束的访问情况信息。比如教师或学生在某一个会话 session 内,进入会话的起始页面和起始时间,会话结束是从哪个页面离开的,离开时间,本次 session 总共访问了几个页面等信息。

Visit 模型的部分代码如下:

114 	//根据step或者时间排序,找出本次session的起始,和终止时间和页面115 	    static class ClickVisitsReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {116 	        @Override117 	        protected void reduce(Text key, Iterable<PageViewsBean> values, Context context) throws IOException, InterruptedException {118 	            final ArrayList<PageViewsBean> pageViewsBeans = new ArrayList<>();119 	            for (PageViewsBean bean : values) {120 	                PageViewsBean pageViewsBean = new PageViewsBean();121 	                try {122 	                    BeanUtils.copyProperties(pageViewsBean, bean);123 	                } catch (IllegalAccessException e) {124 	                    e.printStackTrace();125 	                } catch (InvocationTargetException e) {126 	                    e.printStackTrace();127 	                }128 	                pageViewsBeans.add(pageViewsBean);129 	            }130 	            //按照步长进行排序131 	            Collections.sort(pageViewsBeans, new Comparator<PageViewsBean>() {132 	                @Override133 	                public int compare(PageViewsBean o1, PageViewsBean o2) {134 	                    return o1.getStep() > o2.getStep() ? 1 : -1;135 	                }136 	            });137 	}
复制代码

3.4   Sqoop 采集结构化数据

Sqoop 工作机制是将导入或导出命令翻译成 mapreduce 程序来实现。在翻译出的 mapreduce 中主要是对 inputformat 和 outputformat 进行定制。

Sqoop 的安装比价简单,安装完成之后出现如图 4-9 所示的目录结构即安装完成。

该系统中 sqoop 对接 mysql 中数据,因为目前业务系统中并无数据,所以直接采用 sqoop 的增量导入的方式,这里以 course 课程表为例进行导入,其他表的导入方式类似。

导入数据使用 Lastmodified 模式,此模式有两个参数,一个是 append(增量),一个是 merge-key(合并)。第一次导入时使用 append 模式,在 node01 执行以下命令:

138 	bin/sqoop import \139 	--connect jdbc:mysql://192.168.109.1:3306/edusohu \140 	--username root  --password 7DhYrfLtPSEJ8APP \141 	--table course --m 1 \142 	--target-dir /usr/hadoop/data \143 	--incremental append \144 	--check-column id \145 	--last-value  1205
复制代码

此后导入时需要看 mysql 中的表的变化,如果只是添加数据,则只需要使用 append 模式即可,当表中的数据有更新时,需要使用 merge-key 模式,在 node01 执行以下命令,把 id 字段作为 merge-key:

146 	bin/sqoop import \147 	--connect jdbc:mysql://192.168.109.1:3306/edusohu \148 	--username root \149 	--password 7DhYrfLtPSEJ8APP \150 	--table course \151 	--target-dir /usr/hadoop/data \152 	--check-column last_mod \153 	--incremental lastmodified \154 	--last-value "2020-03-13 22:59:45" \155 	--m 1 \156 	--merge-key id
复制代码

至此,业务系统模块及数据采集模块已经开发完毕

发布于: 2021 年 03 月 15 日阅读数: 14
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
基于 SparkMLlib 智能课堂教学评价系统 - 系统实现(四)