写点什么

使用 Spark Mllib 进行数据分析

发布于: 2021 年 03 月 16 日
使用Spark Mllib进行数据分析

本节内容主要是数据采集到大数据平台之后,然后通过算法模型对数据进行分析,得到分析结果。在教学分析时,采用了多种数据模型及算法。机器学习阶段主要采用监督式学习中的随机森林算法及非监督式学习中的 K-mean 算法。

 

1.1.1   S-T 教学分析法

S-T 法是应用于在教学过程中及教学活动中产生的数据进行分析,包括定量分析和定性评价[9]。因为 S-T 法只有两个维度,使得在分析时变得相对容易,减少模糊性,提高客观性以及可靠性。S 就是对学生的行为分析,T 就是对教师的行为进行分析。

在具体的行为中,T 行为包括听觉和视觉两方面,听觉主要是教师的讲话行为,视觉包括教师板书、演示等。在本系统中,这些行为的具体表现是:教师课程上传、提问、教学评价、课程反馈等。S 行为就是除 T 行为外的所有行为,在本系统的具体表现是:课堂笔记、课程考试、作业、话题讨论等。

S-T 法的数据收集

在系统中通过教师和学生的数据上传,互动等,以一定的时间间隔,采集相应的数据组成样本点,根据这些数据样本的行为类别,转化成对应的 S 或 T,构成 S-T 时序列数据,即 S-T 数据。

收集 S-T 时,需要定时采样,采样的间隔前期设置为 2 小时。对于采样的数据,必须包含时刻和行为的类别。

绘制 S-T 图

 横轴表示 T,纵轴表示 S,纵横轴的长度表示行为时间。原点处表示时间为零,即起始数据。每堂课设定 45 分钟,则学生与教师可绘制如图 4-10 所示的网格图:


图 4-10 S-T 图

1.1.2   教学分析模型 Rt 和 Ch

Rt 和 Ch 模型是根据 S-T 教学分析法得到[10]。Rt 表示 T 的行为占有率,在系统的表现为 T 所占的比例。

Rt=Nt/N

N:行为采样的总数。Nt:T 的行为数量。

Ch 表示行为转化率,即 T 行为与 S 行为之间的转化次数与总的行为采样数的比例。

   Ch=(g-1)/N

g:连数,相同的行为的一个连续称为一个连数。

设采样数据的总数 N 为 20,样本数据的顺序为:

S S T T T S S T S T S S T T T T S S T T

在总数 N 为 20 的情况下,样本序列如上所示时,Nt=11 ,则 Rt=Nt/N=55%

在计算 Ch 时,需要计算出 g,根据上述规则,g=10 , 则 Ch=(g-1)/N=45%

在一组 Rt 和 Ch 中,Rt 和 Ch 相加始终为 1,由此可得到它们的取值范围:

0≤Rt≤1,0≤Ch≤1

在这组模型中,两个模型的大小所表示的是:当 Rt 越大时,表示教师的活动越多,当 Ch 越大时,表示的是师生互动越多。

因此,可以将教学模式划分为 5 种:

讲授型:Rt 的数值较大,Ch 的数值较小(Rt≥0.6)

练习型:Rt 的数值较小,Ch 的数值较小(Rt≤0.2)

对话型:Rt 的数值居中,Ch 的数值较大(0.2≤Rt≤0.6,Ch≥0.6)

板书型:Rt 的数值居中,Ch 的数值较小(0.2≤Rt≤0.6,Ch≤0.3)

平衡型:Rt 的数值居中,Ch 的数值居中(0.3≤Rt/Ch≤0.6)

根据以上取值范围,可以得出 Rt-Ch 图 4-11:



图 4-11 Rt-Ch 图

依据图 4-11 可以做多方面的分析,对于不同的年级进行比较,对不同的教师进行比较,对某一位教师不同的时期进行比较。

1.1.3   TF-IDF

词频-逆向文件频率(TF-IDF)是一种在特征提取中非常重要的特征向量化的方法,他主要是用于文本挖掘中,可以体现出某个词语在一个文档或语料库中的重要程度[11]。在本系统中主要用于话题管理及笔记管理部分,生成知识点分布这一过程。

TF-IDF 用于过滤文档中出现次数多但是含义并不重要的词,比如说在一篇文档中“的”、“我”、“这”等词语,在文档中出现次数非常多,但是它们并没有实际的意义,所以我们要过滤掉这些无意义的词。

TF 表示的是一个词语在一篇文档中出现的频率,本系统中的话题管理部分,同学们会在此模块内进行课程讨论,系统首先会采集学生讨论的所有内容,然后发送到大数据平台,然后平台进行分词(使用 Tokenizer 类进行分词),而 TF 就是每个词语在整个文档中出现的频率。

TF=在某一类中词条 w 出现的次数/该类中所有词条出现的数目

IDF 表示的是语料库中文档总数与词条 w 在语料库中出现的数目之比,IDF 的目的就是过滤掉出现次数多但无意义的词语。

IDF=log(语料库中文档总数/(w 在语料库中出现的数目+1)),分母加 1 的目的防止分母为零的情况。

TF-IDF=TF*IDF,也就是与一个词在文档中的出现次数成正比,与该词在整个语料库中的出现次数成反比,即在该类中出现次数较多但是在语料库中出现的次数较少的词证明是该类中比较重要有意义的词语。

TF-IDF 在系统代码中的应用如下:

157 	object TfIdf_1 {158 	  def main(args: Array[String]): Unit = {159 	    val spark: SparkSession = SparkSession.builder()160 	      .appName("SparkMlilb")161 	      .master("local[2]")162 	      .getOrCreate()163 	    spark.sparkContext.setLogLevel("WARN")164 	val sentenceData = spark.textFile("/usr/data/topic.txt")165 	.toDF("label", "sentence")166 	    val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")167 	    val wordsData = tokenizer.transform(sentenceData)168 	    val hashingTF = new HashingTF()169 	      .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)170 	    val featurizedData = hashingTF.transform(wordsData)171 	    // alternatively, CountVectorizer can also be used to get term frequency vectors172 	173 	    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")174 	    val idfModel = idf.fit(featurizedData)175 	    val rescaledData = idfModel.transform(featurizedData)176 	    rescaledData.select("features", "label").take(3).foreach(println)177 	  }178 	}
复制代码

1.1.1   决策树算法构造教学模型

构建决策树包括三个步骤:

特征选择:选取的特征一定是能够有代表性的,根据前面分析的教学模型,选择用户的年龄、教授班级、Rt、Ch 作为特征。

决策树生成:目前比较常用的算法有 ID3 和 C4.5,因为特征个数不是太多,所以选择 ID3 作为决策树生成的算法。

决策树剪枝:主要作用防止模型的过拟合现象,在训练集中预测比较准确,但是在测试集中预测不准确,需要对决策树进行剪枝防止过拟合。

该教学模型因为特征较少,预测结果就五种,即上一小节提到的对话型、练习型、平衡型、板块型和讲授型。生成的决策树枝叶比较少,所以基本不会产生过拟合现象。当后期系统数据增多,可用的特征增多时,就需要考虑决策树生成算法是否需要使用 C4.5,以及对决策树进行剪枝操作。

决策树算法详构建教学模型过程如下:

选定训练集,训练集

是输入实例,n 为样本个数,X 为类标记,i=1,2.....N;N 为样本容量。该系统中构建决策树的目标是,根据给定的训练数据集(即业务系统中产生的数据)学习一个决策树算法的教学模型。

决策树是根据递归策略选择最优特征,将最优特征作为根节点,剩余所有的样本都位于根节点上。

决策树是监督式学习,样本集必须含有预测列的数据才能够使用决策树,此处数据使用 2014 石景山区教学设计大赛数据进行训练分析,如表 4-2 所示。


表 4-2 教学设计大赛部分数据

根据上述数据,我们需要计算出根节点,需要用到“信息熵”的概念。香农在他的《信息论》中借用德国物理学家发明的“熵”的概念提出了“信息熵”的概念[12]。在物理学中“熵”通常是指物体能量的分布更加均匀,而香农首先定义了“信息”的概念:信息就是对不确定性的消除。因此,“信息熵”就表示事物不确定性的度量标准,可以根据数学中的概率计算,出现的概率越大,出现的机会就多,不确定性就小,即信息熵小。

根据数学中对数函数,可以得到一个不确定性函数公式:


通过对单个取值的不确定性的期望 E,就是作信息熵,即:


所以通过以上数据,构建出 Rt 的信息熵:



即 Gain(Rt)=0.246。

类似的,Gain(age) =0.029, Gain(class) = 0.151, Gain(Ch)=0.148。

通过以上结论,Rt 的基尼系数最大,即信息熵最大,所以选择 Rt 作为根节点。

构建教学模型在代码中的具体实现如下:

179 	object TreeDemo {180 	def main(args: Array[String]) {181 	val conf = new SparkConf().setAppName("DecisionTree").setMaster("local")182 	val sc = new SparkContext(conf)183 	Logger.getRootLogger.setLevel(Level.WARN)184 	//训练数据185 	val data1 = sc.textFile("/usr/data/teacherMode1.txt")186 	//测试数据187 	val data2 = sc.textFile("/usr/data/teacherMode2.txt")188 	//转换成向量189 	val tree1 = data1.map { line =>190 	val parts = line.split(',')191 	LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))192 	}193 	val tree2 = data2.map { line =>194 	val parts = line.split(',')195 	LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))196 	}197 	//赋值198 	val (trainingData, testData) = (tree1, tree2)199 	//分类200 	val numClasses = 2201 	val categoricalFeaturesInfo = Map[Int, Int]()202 	val impurity = "gini"203 	//最大深度204 	val maxDepth = 5205 	//最大分支206 	val maxBins = 32207 	//模型训练208 	val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,209 	impurity, maxDepth, maxBins)210 	//模型预测211 	val labelAndPreds = testData.map { point =>212 	val prediction = model.predict(point.features)213 	(point.label, prediction)214 	}215 	//测试值与真实值对比216 	val print_predict = labelAndPreds.take(15)217 	println("label" + "\t" + "prediction")218 	for (i <- 0 to print_predict.length - 1) {219 	println(print_predict(i)._1 + "\t" + print_predict(i)._2)220 	}221 	//树的错误率222 	val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()223 	println("Test Error = " + testErr)224 	//打印树的判断值225 	println("Learned classification tree model:\n" + model.toDebugString)226 	}227 	}
复制代码

测试结果:

228 	label prediction229 	0.0 0.0230 	1.0 1.0231 	1.0 1.0232 	1.0 1.0233 	3.0 3.0234 	2.0 2.0235 	2.0 2.0236 	1.0 1.0237 	4.0 4.0238 	0.0 0.0239 	Test Error = 0.0240 	Learned classification tree model:
复制代码

通过结果可得真实值与预测值一致,Error 为 0

最终将结果保存到 Hbase 中,供可视化界面的展示使用。

1.1.1     K-Means 算法构造知识图谱模型

在 TF-IDF 小节中已经对文档内容进行了分词以及词语的过滤操作,对这些词语使用 K-Means 算法进行聚类,K-Means 算法属于非监督式学习,无标签类别类。K-几个聚类中心  Mean-均值,每次迭代的时候使用均值方式迭代。

详细构建过程如下:

(1)随机适当选择 c 个类的初始中心;

(2)在第 k 次迭代中,对任意一个样本,求其到 c 各中心的距离,将该样本归到距离最短的中心所在的类;


(3)利用均值等方法更新该类的中心值;


(4)对于所有的 c 个聚类中心,如果利用(2)(3)的迭代法更新后,聚类中心的值保持不变,则迭代结束,否则继续迭代。

终止条件:迭代次数/簇中心变化率/最小平方误差 MSE。

因为算法模型计算的数据是向量,所以需要先对词语进行向量化,即 Word2Vec,

word2vec 是 NLP 领域的重要算法,它的功能是将 word 用 K 维的 dense vector 来表达,训练集是语料库,不含标点,以空格断句。

具体代码如下:

val spark: SparkSession = SparkSession.builder()242 	          .appName("SparkMlilb")243 	          .master("local[2]")244 	          .getOrCreate()245 	        spark.sparkContext.setLogLevel("WARN")246 	        // 下面的数据为方便观察结果,使用的是测试数据,没有从文档中247 	        // 导入词语,直接使用createDataFrame方法添加的词语248 	        val documentDF = spark.createDataFrame(Seq(249 	          "Hi I heard about Spark".split(" "),250 	          "I wish Java could use case classes".split(" "),251 	          "Logistic regression models are neat".split(" ")252 	        ).map(Tuple1.apply)).toDF("text")253 	254 	        // 从单词到向量的映射255 	        val word2Vec = new Word2Vec()256 	          .setInputCol("text")257 	          .setOutputCol("result")258 	          .setVectorSize(3)259 	          .setMinCount(0)260 	        val model = word2Vec.fit(documentDF)261 	        val result = model.transform(documentDF)262 	        result.select("result").take(3).foreach(println)
复制代码

结果:

263 	[[0.03173386193811894,0.009443491697311401,0.024377789348363876]]264 	[[0.025682436302304268,0.0314303718706859,-0.01815584538105343]]265 	[[0.022586782276630402,-0.01601201295852661,0.05122732147574425]]
复制代码

向量化之后使用 K-Means 算法对数据进行聚类。

K-Means 算法在代码中的应用如下:

 Val spark: SparkSession = SparkSession.builder().appName("SparkMLLibCarKMeansAnalysis").master("local[*]").getOrCreate()267 	    spark.sparkContext.setLogLevel("WARN")268 	    //    * 2-准备数据269 	    val datapath = "/usr/data/topicnote.txt"270 	    val data1: DataFrame = spark.read.format("csv").option("header", "true").load(datapath)271 	    data1.show(false)272 	    data1.printSchema()273 	    val data = data1.select(data1("sepal_length").cast("Double"),274 	      data1("sepal_width").cast("Double"),275 	      data1("petal_length").cast("Double"),276 	      data1("petal_width").cast("Double"))277 	     data.printSchema()278 	    //sepal_length,sepal_width,petal_length,petal_width,class279 	    //    * 3-特征工程280 	    //1-将经度和纬度数据整合为一起281 	    val vectran: VectorAssembler = new VectorAssembler().setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width")).setOutputCol("features")282 	    val datatrans1: DataFrame = vectran.transform(data283 	    //   3-特征工程---最大值最小化的处理------[0,1[区间284 	    val sclaer: MinMaxScaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaledfeatures")285 	    val scalerModel: MinMaxScalerModel = sclaer.fit(datatrans1)286 	    val datatrans: DataFrame = scalerModel.transform(datatrans1)287 	    val Array(trainset, testset): Array[Dataset[Row]] = datatrans.randomSplit(Array(0.8, 0.2), 123L)288 	//    * K-Means建模289 	    //loss很高290 	    //    val kmeans: KMeans = new KMeans().setFeaturesCol("features").setPredictionCol("predictions").setK(3)291 	    //loss降低292 	    val kmeans: KMeans = new KMeans().setFeaturesCol("scaledfeatures").setPredictionCol("predictions").setK(3)293 	    val kmeanModel: KMeansModel = kmeans.fit(trainset)294 	    //    * 5-预测分析295 	    val testResult: DataFrame = kmeanModel.transform(testset)296 	    testResult.show()297 	    testResult.groupBy("petal_length", "predictions").298 	      agg(("predictions", "count")).show()299 	    //        * 6-模型校验分析-wssse打印结果    println("wssse", kmeanModel.computeCost(testResult)) //(wssse,2.5189564869286865)300 	    println(kmeanModel.clusterCenters.mkString(","))
复制代码

将得到的结果保存至 Hbase 中,供可视化界面查询使用。


发布于: 2021 年 03 月 16 日阅读数: 14
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
使用Spark Mllib进行数据分析