写点什么

使用 MLlib 进行机器学习(十 - 上)

发布于: 2021 年 07 月 25 日
使用MLlib进行机器学习(十-上)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


到目前为止,我们一直专注于 Apache Spark 的数据工程工作负载。数据工程通常是为机器学习(ML)任务准备数据的前期步骤,而机器学习将是本章的重点。我们生活在一个机器学习和人工智能应用普及的时代。不管我们是否意识到这一点,每天我们都有可能会出于各种目的(例如在线购物推荐和广告,欺诈检测,分类,图像识别,模式匹配等)接触 ML 模型。这些 ML 模型为许多公司制定了重要的业务决策。根据麦肯锡的这项研究,其中 35%的消费者在 Amazon 购买的商品和 75%的 Netflix 购买的商品受到基于机器学习的产品推荐的推动。建立一个表现良好的模型可以决定公司的成败。

在本章中,我们将帮助你开始使用 MLlib(Apache Spark 中的核心组件中的机器学习库)来构建 ML 模型。我们将从机器学习的简要介绍开始,然后涵盖大规模 ML 和功能设计的最佳实践(如果你已经熟悉机器学习的基础知识,则可以直接跳至“设计机器学习管道”)。通过此处提供的简短代码段以及该书的 GitHub 仓库中提供的笔记(notebook),你将学习如何构建基本的 ML 模型和使用 MLlib。

本章介绍了 Scala 和 Python API。如果你有兴趣在 Spark 中使用 R 语言(sparklyr)进行机器学习,我们建议你查看 Javier Luraschi,Kevin Kuo 和 Edgar Ruiz(O'Reilly)的著作《Mastering Spark with R》。

什么是机器学习?

如今,机器学习正在大肆宣传,但是它到底是什么呢?广义上讲,机器学习是一个使用统计,线性代数和数值优化从数据中抽取模式的过程。机器学习可以应用于诸如预测功耗,确定视频中是否有猫,或将具有类似特征的项目聚类的问题。

机器学习有几种类型,包括监督、半监督、无监督和强化学习。本章将主要关注有监督的机器学习,而仅涉及无监督的学习。在深入探讨之前,让我们简要讨论有监督和无监督机器学习之间的区别。

监督学习

在有监督的机器学习中,你的数据由一组输入记录组成,每个输入记录都具有关联的标签,并且目标是在给定新的无标签输入的情况下预测输出标签。这些输出标签可以是不连续的连续的,这给我们带来了两种监督的机器学习:分类回归

在分类问题中,目标是将输入分为一组离散的类或标签。对于二进制分类,你要预测两个离散的标签,例如“ dog”或“ not dog”,如图 10-1 所示。


使用 multiclass(也称为多项式)分类,可以有三个或更多离散的标签,例如预测狗的品种(例如,澳大利亚牧羊犬,金毛猎犬或贵宾犬,如图 10-2 所示)。

在回归问题中,要预测的值是连续数字,而不是标签。这意味着你可能会预测模型在训练期间未看到的值,如图 10-3 所示。例如,你可以构建一个模型来预测在给定温度下的每日冰淇淋销量。你的模型可能得到会预测值 $ 77.67,即使训练数据中没有包含该值的输入/输出对。

下面的表 10-1 列出了 Spark MLlib 中可用的一些常用的监督 ML 算法,并注明了它们是否可用于回归,分类或同时用于回归和分类。



无监督学习

获得监督式机器学习所需的标记数据可能需要付出昂贵的代价甚至有时候是不可行的。这就是无监督机器学习发挥作用的地方。无需预测标签,无监督的机器学习可以帮助你更好地理解数据的结构。

例如,请观察图 10-4 左侧的原始非聚类数据。对于每个数据点(1,2),都没有已知的真实标签,但是通过对我们的数据应用无监督机器学习,我们可以找到自然形成的聚类,如右图所示。

无监督机器学习可用于异常值检测或用作监督机器学习的预处理步骤,例如,减少数据集的维数(即每个样本点的维数),这对于减少存储需求或简化下游操作很有用。MLlib 中的一些无监督机器学习算法包括 k-均值、LDA 和高斯混合模型。

为什么使用 Spark 进行机器学习?

Spark 是一个统一的分析引擎,为数据摄取,工程设计,模型训练和部署提供了一个生态系统。如果没有 Spark,开发人员将需要许多不同的工具来完成这组任务,并且可能仍难以应对可伸缩性的问题。

Spark 有两个机器学习包:spark.mllib 和 spark.ml。spark.mllib 是基于 RDD API(从 Spark 2.0 开始处于维护模式)的原始机器学习 API,而 spark.ml 是基于 DataFrames 的较新 API。本章的其余部分将重点介绍 spark.ml 如何使用该软件包以及如何在 Spark 中设计机器学习管道。但是,我们使用“ MLlib”作为总称来指代 Apache Spark 中的两个机器学习库包。

使用 spark.ml,数据科学家可以在同一个生态系统中进行数据准备和模型构建,而无需对数据进行下采样以使其适合一台计算机。spark.ml 着重于 O(n)向外扩展,其中模型随你拥有的数据点数线性扩展,因此可以扩展至大量的数据。在下一章中,我们将讨论在诸如的分布式框架 spark.ml 和 scikit-learn(sklearn)的单节点框架之间进行选择时需要进行的一些权衡。如果你以前使用过 scikit-learn,很多 spark.ml API 都会感觉很熟悉,但是也会存在一些细微的差异,下面我们将进行讨论。

 

设计机器学习管道

在本节中,我们将介绍如何创建和调整机器学习管道。管道的概念在许多机器学习框架中很常见,是一种组织一系列操作以应用于你的数据的方式。在 MLlib 中,管道 API 提供了一个基于 DataFrames 的高级 API,用来组织你的机器学习工作流程。Pipeline API 由一系列转换器和预估组成,我们将在后面详细讨论。

在本章中,我们将使用 Inside Airbnb 提供的旧金山住房数据集。它包含有关 Airbnb 在旧金山的租金的信息,例如卧室数量,位置,评论评分等,我们的目标是建立一个模型来预测该城市房源的每晚租金价格。这是一个回归问题,因为价格是一个连续变量。我们将指导你完成数据科学家用来解决此问题的工作流程,包括特征工程,构建模型,超参数调整和评估模型质量。该数据集非常混乱,并且可能很难建模(就像大多数现实世界中的数据集一样!),因此,如果你自己进行实验,则如果早期的模型不好,也不需要感到焦虑。

本章的目的不是向你展示 MLlib 中的每个 API,而是让你掌握使用 MLlib 来构建端到端管道的技能和知识。在详细介绍之前,让我们定义一些 MLlib 术语:

转换器(Transformer

接受一个 DataFrame 作为输入,并返回一个新的 DataFrame 并追加一个或多个列。转换器无法从你的数据中学习任何参数,而只是应用基于规则的转换来准备用于模型训练的数据,或使用经过训练的 MLlib 模型生成预测。他们有一个 .transform() 方法。

预估器(Estimator)

通过.fit() 方法从你的 DataFrame 中读取(或“拟合”)参数,并返回一个模型 ,这个模型是一个转换器。

管道(pipeline)

将一系列转换器和预估器组织到一个模型中。管道本身是预估器,而 pipeline.fit()方法返回的输出是一个 PipelineModel,是一个转换器。

尽管这些概念现在看起来似乎还很抽象,但是本章中的代码段和示例将帮助你理解它们是如何组合在一起的。但是,在构建机器学习模型并使用转换器,预估器和管道之前,我们需要加载数据并执行一些数据准备。

数据提取与研究

我们对示例数据集中的数据进行了稍微的预处理,以删除异常值(例如,发布价为 $ 0 / night 的 Airbnb),将所有整数都转换为双精度,并选择了一百多个字段中的信息量很大的子集。此外,对于数据列中所有缺失的数值,我们估算了中位数并添加了一个指标列(列名后跟_na,例如 bedrooms_na)。这样,ML 模型或人工分析人员就可以将该列中的任何值解释为推定值,而不是真实值。你可以在本书的 GitHub repo 中看到数据准备笔记。请注意,还有许多其他方法可以处理缺失值,对于那些方法本书不做介绍。

让我们快速浏览一下数据集和相应的数据结构(输出仅显示列的子集):

 In Python

filePath = """/databricks-datasets/learning-spark-v2/sf-airbnb/

sf-airbnb-clean.parquet/"""

airbnbDF = spark.read.parquet(filePath)

airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",

                "number_of_reviews", "price").show(5)

                

 

// In Scala

val filePath =

  "/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/"

val airbnbDF = spark.read.parquet(filePath)

airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",

                "number_of_reviews", "price").show(5)

 

+----------------------+---------------+--------+---------+--------

|neighbourhood_cleansed|room_type|bedrooms|bathrooms|number_...|price|

+----------------------+---------------+--------+---------+----------

| Western Addition|Entire home/apt| 1.0| 1.0| 180.0|170.0|

| Bernal Heights  |Entire home/apt| 2.0| 1.0| 111.0|235.0|

| Haight Ashbury  | Private room  | 1.0| 4.0| 17.0 | 65.0|

| Haight Ashbury  | Private room  | 1.0| 4.0| 8.0  | 65.0|

| Western Addition|Entire home/apt| 2.0| 1.5| 27.0 |785.0|

+----------------------+---------------+--------+---------+--------

鉴于我们的功能,我们的目标是预测租赁物业每晚的价格。


在数据科学家可以进行模型构建之前,他们需要探索和理解他们的数据。他们通常会使用 Spark 对数据进行分组,然后使用数据可视化库(例如 matplotlib)来可视化数据。我们将把数据探索作为练习留给读者。


创建训练和测试数据集

在开始进行特征工程和建模之前,我们将数据集分为两组:训练集测试集。根据数据集的大小,你的训练/测试比率可能会有所不同,但是许多数据科学家使用 80/20 作为标准的训练/测试划分。你可能会想,“为什么不使用整个数据集来训练模型?” 问题在于,如果我们在整个数据集上构建模型,则该模型可能会记住或“过度拟合”我们提供的训练数据,而我们将没有更多的数据来评估它对以前看不见的数据的概括程度。看不见的数据。假设数据遵循相似的分布,则模型在测试集上的性能是其对看不见的数据(即,在野外还是在生产中)的性能表现的代理。图 10-5 中显示了训练数据集和测试数据集的拆分。

我们的训练集由一组特征 X 和一个标签 y 组成。在这里,我们用大写字母 X 表示尺寸为 n x d 的矩阵,其中 n 是数据点(或示例)的数量,d 是特征的数量(这就是我们在 DataFrame 中称为字段或列的数量)。我们使用小写字母 y 表示向量,尺寸为 n x 1;对于每个示例,都有一个标签。

使用不同的度量标准来衡量模型的效果。对于分类问题,标准度量是正确预测的准确性或百分比。一旦该模型在使用该指标的训练集上具有令人满意的性能,我们将该模型应用于我们的测试集。如果它根据我们的评估指标在我们的测试集上表现良好,那么我们可以确信我们已经建立了一个模型,该模型可以推广到未出现的数据。

对于我们的 Airbnb 数据集,我们将保留 80%的数据作为训练集,并保留 20%的数据用于测试集。此外,我们将为数据可重复性设置一个随机种子,这样,如果我们重新运行此代码,我们可能分别在训练数据集和测试数据集中生成重复的数据。种子本身的价值并不重要,但数据科学家通常喜欢将其设置为 42,因为这是 Ultimate Question of Life

的答案:

 In Python

trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)

print(f"""There are {trainDF.count()} rows in the training set,

and {testDF.count()} in the test set""")

 

// In Scala

val Array(trainDF, testDF) = airbnbDF.randomSplit(Array(.8, .2), seed=42)

println(f"""There are ${trainDF.count} rows in the training set, and

${testDF.count} in the test set""")

这将产生以下输出:

训练集中有 5780 行,测试集中有 1366 行

但是,如果我们更改 Spark 集群中执行程序(Executor)的数量会怎样?Catalyst 优化器根据群集资源和数据集的大小确定最佳的数据分区方法。假设 Spark DataFrame 中的数据是按行分区的,并且每个工作节点都独立于其他工作节点执行拆分,如果分区中的数据发生更改,则拆分结果(by random Split())将不相同。

虽然你可以修复集群配置和随机种子以确保获得一致的结果,但是我们建议你一次性拆分数据,然后将其写到其自己的训练/ 测试文件夹中,这样就不会出现这些可重复性问题。

在探索性分析期间,你应该缓存训练数据集,因为你将在整个机器学习过程中多次访问它。请参考上一节“缓存和数据的持久性”的第七章。

使用转换器准备特征

现在,我们已将数据分为训练集和测试集,让我们准备数据以建立一个线性回归模型,该模型可以在给定卧室数量的情况下预测价格。在后面的示例中,我们将包括所有相关特征,但是现在让我们确保已具备相应的机制。线性回归(与 Spark 中的许多其他算法一样)要求所有输入特征都包含在 DataFrame 中的单个向量内。因此,我们需要转换数据。

Spark 中的转换器接受一个 DataFrame 作为输入,并返回一个新 DataFrame 并追加一个或多个列。他们不会从你的数据中学习,而是使用该 transform()方法应用基于规则的转换。

为了将我们所有的特征放到一个向量中,我们将使用 VectorAssembler Transformer。VectorAssembler 接受一个输入列的列表,并创建一个带有追加列的新 DataFrame,我们将其称为特征(features)。它将这些输入列的值组合到一个向量中:

In Python

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")

vecTrainDF = vecAssembler.transform(trainDF)

vecTrainDF.select("bedrooms", "features", "price").show(10)

 

// In Scala

import org.apache.spark.ml.feature.VectorAssembler

val vecAssembler = new VectorAssembler()

  .setInputCols(Array("bedrooms"))

  .setOutputCol("features")

val vecTrainDF = vecAssembler.transform(trainDF)

vecTrainDF.select("bedrooms", "features", "price").show(10)

 

+--------+--------+-----+

|bedrooms|features|price|

+--------+--------+-----+

|     1.0|   [1.0]|200.0|

|     1.0|   [1.0]|130.0|

|     1.0|   [1.0]| 95.0|

|     1.0|   [1.0]|250.0|

|     3.0|   [3.0]|250.0|

|     1.0|   [1.0]|115.0|

|     1.0|   [1.0]|105.0|

|     1.0|   [1.0]| 86.0|

|     1.0|   [1.0]|100.0|

|     2.0|   [2.0]|220.0|

+--------+--------+-----+

你会注意到,在 Scala 代码中,我们必须实例化新 VectorAssembler 对象以及使用 setter 方法更改输入和输出列。在 Python 中,你可以选择将参数直接传递给的构造函数 VectorAssembler 或使用 setter 方法,但是在 Scala 中,你只能使用 setter 方法。

接下来我们将介绍线性回归的基础知识,但是如果你已经熟悉算法,请跳至“使用预估器来构建模型”。

了解线性回归

线性回归建模因变量(或标签)与一个或多个自变量(或特征)之间的线性关系。在我们的案例中,我们希望拟合线性回归模型来预测在给定卧室数量的情况下 Airbnb 租金的价格。

在图 10-6 中,我们有一个特征 x 和一个输出 y(这是我们的因变量)。线性回归试图将方程式拟合 x y 之间的线性关系,对于标量变量,可以将其表示为 y = mx + b,其中 m 是斜率,b 是偏移量或截距。

这些点表示来自我们的数据集中真实的(xy)对,实线表示最适合该数据集的线。数据点未完全对齐,因此我们通常认为线性回归是将模型拟合为 y≈mx + b +ε,其中 ε 是抽取的服从同一分布的误差,不同样本 x 产生的误差独立。这些是我们的模型预测与真实值之间的误差。通常我们将ε视为高斯或正态分布。回归线上方的垂直线表示正ε(或残差),其中真实值高于预测值,回归线下方的垂直线表示负残差。线性回归的目标是找到一条使这些残差的平方最小的线。你会注意到,该线可以推断未见数据点的预测值。

线性回归还可以扩展为处理多个自变量。如果我们有三个特征作为输入,x = [x1 , x2 , x3 ],那么我们就可以建模 y 作为 y ≈ w0 + w1x1 + w2x2 + w3x3 + ε.。在这种情况下,每个特征都有一个单独的系数(或权重)和一个截距(这里是 w0 而不是 b)。估计模型的系数和截距的过程称为学习(或拟合)模型的参数。现在,我们将重点关注在给定卧室数量的情况下预测价格的单变量回归示例,稍后将回到多元线性回归。

使用预估器建立模型

设置完 vectorAssembler 之后,我们准备好数据并将其转换为线性回归模型期望的格式。在 Spark 中,LinearRegression 是一种预估器——接受 DataFrame 并返回模型。预估器从你的数据中学习参数,有一个 estimator_name.fit()方法,并进行急切的评估计算(即,启动 Spark 作业),而对转换器的评估则比较滞后。其他一些估计器的例子包括输入器、决策树分类器和随机森林回归器。

你会注意到,线性回归(特征)的输入列是我们 vectorAssembler 的输出:

In Python

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="price")

lrModel = lr.fit(vecTrainDF)

 

// In Scala

import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression()

  .setFeaturesCol("features")

  .setLabelCol("price")

 

val lrModel = lr.fit(vecTrainDF)

lr.fit()返回 LinearRegressionModel(lrModel),它是一个转换器。换句话说,预估器 fit()方法的输出是一个转换器。一旦预估器了解了参数,转换器就可以将这些参数应用于新的数据点以生成预测。让我们检查一下它学到的参数:

In Python

m = round(lrModel.coefficients[0], 2)

b = round(lrModel.intercept, 2)

print(f"""The formula for the linear regression line is

price = {m}*bedrooms + {b}""")

 

// In Scala

val m = lrModel.coefficients(0)

val b = lrModel.intercept

println(f"""The formula for the linear regression line is

price = $m%1.2f*bedrooms + $b%1.2f""")

打印:

线性回归线的公式为:价格= 123.68 *卧室数量+ 47.51

创建管道

如果我们想将模型应用于测试集,则需要以与训练集相同的方式来准备数据(即,将其通过向量装配器传递)。通常,数据准备管道会包含多个步骤,并且不仅要记住要应用哪些步骤,而且要记住这些步骤的顺序也变得很麻烦。这是 Pipeline API 的动机:你只需按顺序指定希望数据通过的阶段,Spark 会为你处理。它们为用户提供了更好的代码可重用性和组织性。在 Spark 中,Pipelines 是预估器,而 PipelineModels(拟合的 Pipelines)是转换器。

让我们现在构建管道:

 In Python

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)

 

// In Scala

import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline().setStages(Array(vecAssembler, lr))

val pipelineModel = pipeline.fit(trainDF)

使用 Pipeline API 的另一个好处是,它可以确定哪些阶段是你的预估器/转换器,因此你不必担心为每个阶段指定 name.fit()与相对 name.transform()。

由于 pipelineModel 是转换器,因此也很容易将其应用于我们的测试数据集:

 In Python

predDF = pipelineModel.transform(testDF)

predDF.select("bedrooms", "features", "price", "prediction").show(10)

 

// In Scala

val predDF = pipelineModel.transform(testDF)

predDF.select("bedrooms", "features", "price", "prediction").show(10)

 

+--------+--------+------+------------------+

|bedrooms|features| price|        prediction|

+--------+--------+------+------------------+

|     1.0|   [1.0]|  85.0|171.18598011578285|

|     1.0|   [1.0]|  45.0|171.18598011578285|

|     1.0|   [1.0]|  70.0|171.18598011578285|

|     1.0|   [1.0]| 128.0|171.18598011578285|

|     1.0|   [1.0]| 159.0|171.18598011578285|

|     2.0|   [2.0]| 250.0|294.86172649777757|

|     1.0|   [1.0]|  99.0|171.18598011578285|

|     1.0|   [1.0]|  95.0|171.18598011578285|

|     1.0|   [1.0]| 100.0|171.18598011578285|

|     1.0|   [1.0]|2010.0|171.18598011578285|

+--------+--------+------+------------------+

在这段代码中,我们仅使用一个功能就构建了一个 bedrooms 模型(你可以在本书的 GitHub repo 中找到本章的笔记本)。但是,你可能希望使用所有特征来构建模型,其中某些特征可能是类别特征,例如 host_is_superhost。类别特征采用离散值,没有内在顺序——例如,职业或国家/地区名称。在下一节中,我们将考虑一种解决方案,该方法用于处理这类变量,称为“独热编码

独热编码

在我们刚刚创建的管道中,我们只有两个阶段,而线性回归模型仅使用一个功能。让我们看一下如何构建一个稍微更复杂的管道,其中包含我们所有的数字和分类功能。

MLlib 中的大多数机器学习模型都希望数值作为输入,以向量表示。要将分类值转换为数值,我们可以使用一种称为独热编码(简称:OHE)的技术。假设我们有一个名为列 Animal,我们有三种类型的动物:Dog,Cat 和 Fish。我们不能将字符串类型直接传递到 ML 模型中,因此我们需要分配一个数字映射,例如:

Animal = {"Dog", "Cat", "Fish"}

"Dog" = 1, "Cat" = 2, "Fish" = 3 

但是,使用这种方法,我们在数据集中引入了一些以前没有的虚假关系。例如,为什么我们分配 Cat 两倍的值 Dog?我们使用的数值不应在我们的数据集中引入任何关系。相反,我们想为列中的每个不同值创建一个单独的 Animal 列:

"Dog" = [ 1, 0, 0]

"Cat" = [ 0, 1, 0]

"Fish" = [0, 0, 1]

如果动物是狗,则在第一列中记录为 1,在其他列记录为 0。如果是猫,则在第二列中记录为 1,在其他列记录为 0。列的顺序无关紧要。如果你以前使用过 pandas,你会注意到它的作用与 pandas.get_dummies()是相同的。

如果我们有一个拥有 300 只动物的动物园,那么 OHE 是否会大量增加内存/计算资源的消耗?使用 Spark 不是问题!当大多数条目 0 为时,Spark 在内部使用 SparseVector ,这在 OHE 很常见,因此它不会浪费存储 0 值的空间。让我们看一个例子,以更好地了解如何 SparseVector 是如何工作的:

DenseVector(0,0,0,7,0,2,0,0,0,0)

SparseVector(10,[3,5],[7,2])

在本实施例中 DenseVector 包含的 10 个值,除 2 个非 0 值之外,其他值都为 0。要创建一个 SparseVector,我们需要跟踪向量的大小,非零元素的索引以及这些索引处的对应值。在此示例中,向量的大小为 10,在索引 3 和 5 处有两个非零值,在这些索引处的对应值是 7 和 2。

有几种方法可以使用 Spark 对数据进行独热编码。常用的方法是使用 StringIndexer 和 OneHotEncoder。使用这种方法,第一步是应用 StringIndexer 预估器将类别值转换为类别索引。这些类别索引按标签频率排序,因此最频繁使用的标签的索引为 0,这为我们在相同数据的各种运行中提供了可重复的结果。


创建类别索引后,你可以将其作为输入传递给 OneHotEncoder(如果使用 Spark 2.3 / 2.4 对对应 OneHotEncoderEstimator)。该 OneHotEncoder 映射将一列类别索引映射到一列二进制向量。查看表 10-2 了解 Spark 2.3 / 2.4 与 3.0 版本在 StringIndexer 和 OneHotEncoder API 上的区别。

以下代码演示了如何对我们的分类功能进行独热编码。在我们的数据集中,任何 string 类型的列都被视为分类特征,但有时你可能希望将数字特征视为分类特征,反之亦然。你需要仔细确定哪些列是数字列,哪些是类别列:

In Python

from pyspark.ml.feature import OneHotEncoder, StringIndexer

 

categoricalCols = [field for (field, dataType) in trainDF.dtypes

                   if dataType == "string"]

indexOutputCols = [x + "Index" for x in categoricalCols]

oheOutputCols = [x + "OHE" for x in categoricalCols]

 

stringIndexer = StringIndexer(inputCols=categoricalCols,

                              outputCols=indexOutputCols,

                              handleInvalid="skip")

oheEncoder = OneHotEncoder(inputCols=indexOutputCols,

                           outputCols=oheOutputCols)

 

numericCols = [field for (field, dataType) in trainDF.dtypes

               if ((dataType == "double") & (field != "price"))]

assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs,

                               outputCol="features")

                               

 

// In Scala

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

 

val categoricalCols = trainDF.dtypes.filter(_._2 == "StringType").map(_._1)

val indexOutputCols = categoricalCols.map(_ + "Index")

val oheOutputCols = categoricalCols.map(_ + "OHE")

 

val stringIndexer = new StringIndexer()

  .setInputCols(categoricalCols)

  .setOutputCols(indexOutputCols)

  .setHandleInvalid("skip")

 

val oheEncoder = new OneHotEncoder()

  .setInputCols(indexOutputCols)

  .setOutputCols(oheOutputCols)

 

val numericCols = trainDF.dtypes.filter{ case (field, dataType) =>

  dataType == "DoubleType" && field != "price"}.map(_._1)

val assemblerInputs = oheOutputCols ++ numericCols

val vecAssembler = new VectorAssembler()

  .setInputCols(assemblerInputs)

  .setOutputCol("features")

现在你可能想知道,“StringIndexer 如何处理出现在测试数据集中而不是训练数据集中的新类别?” 有一个 handleInvalid 参数指定你要如何处理它们。选项包括 skip(过滤掉无效数据的行),error(引发错误)或 keep(将无效数据放入 numLabels 索引处的特殊追加桶中)。对于此示例,我们只是跳过了无效的记录。

这种方法的一个难题是你需要明确告诉 StringIndexer 指出哪些特征应被视为类别特征。你可以使用 VectorIndexer 来自动检测所有类别变量,但是由于它必须遍历每一列并检测其值是否少于 maxCategories 唯一值,因此在计算成本是非常高的。maxCategories 是用户指定的参数,确定此值也可能很困难。

另一种方法是使用 RFormula。其语法受 R 编程语言的启发。使用 RFormula,你可以提供标签以及要包括的功能。它支持一个有限的 R 运算符的子集,包括~,.,:,+和-。例如,你可能指定 formula = "y ~ bedrooms + bathrooms",这表示给定 bedrooms 和 bathrooms 预测 y 值,或者 formula = "y ~ .",表示使用所有可用特征(并自动从特征中排除 y)。RFormula 将自动 StringIndex 和 OHE 所有字符串列,将数字列转换为 double 类型,并将所有这些组合成一个 VectorAssembler 的向量。因此,我们可以用一行替换所有前面的代码,并且我们将得到相同的结果:

 In Python

from pyspark.ml.feature import RFormula

 

rFormula = RFormula(formula="price ~ .",

                    featuresCol="features",

                    labelCol="price",

                    handleInvalid="skip")         

 

// In Scala

import org.apache.spark.ml.feature.RFormula

 

val rFormula = new RFormula()

  .setFormula("price ~ .")

  .setFeaturesCol("features")

  .setLabelCol("price")

  .setHandleInvalid("skip")

RFormula 自动组合 StringIndexer 和 OneHotEncoder,OneHotEncoder 的缺点是,并非所有算法都要求或不建议使用独热编码。例如,如果仅将 StringIndexer 用作分类功能,则基于树的算法可以直接处理类别变量。你无需对基于树的方法独热编码类别特征,这通常会使基于树的模型变得更糟糕。不幸的是,没有一种适合所有人的解决方案,而理想的方法与你计划应用于数据集的下游算法紧密相关。

如果其他人为你执行特征工程,请确保他们记录了他们是如何生成这些特征的。

一旦编写了用于转换数据集的代码,就可以使用所有特征作为输入来添加到线性回归模型。

在这里,我们将所有特征准备和模型构建放入管道中,并将其应用于我们的数据集:

 In Python

lr = LinearRegression(labelCol="price", featuresCol="features")

pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])

Or use RFormula

pipeline = Pipeline(stages = [rFormula, lr])

pipelineModel = pipeline.fit(trainDF)

predDF = pipelineModel.transform(testDF)

predDF.select("features", "price", "prediction").show(5)

 

// In Scala

val lr = new LinearRegression()

  .setLabelCol("price")

  .setFeaturesCol("features")

val pipeline = new Pipeline()

  .setStages(Array(stringIndexer, oheEncoder, vecAssembler, lr))

// Or use RFormula

// val pipeline = new Pipeline().setStages(Array(rFormula, lr))

 

val pipelineModel = pipeline.fit(trainDF)

val predDF = pipelineModel.transform(testDF)

predDF.select("features", "price", "prediction").show(5)

 

+--------------------+-----+------------------+

|            features|price|        prediction|

+--------------------+-----+------------------+

|(98,[0,3,6,7,23,4...| 85.0| 55.80250714362137|

|(98,[0,3,6,7,23,4...| 45.0| 22.74720286761658|

|(98,[0,3,6,7,23,4...| 70.0|27.115811183814913|

|(98,[0,3,6,7,13,4...|128.0|-91.60763412465076|

|(98,[0,3,6,7,13,4...|159.0| 94.70374072351933|

+--------------------+-----+------------------+

如你所见,features 列表示为 SparseVector。独热编码后有 98 个特征,然后是非零索引,然后是值本身。如果你将 truncate=False 参数传入 show()方法中,你可以看到所有的输出。

我们的模型表现如何?你可以看到,尽管有些预测可能被认为是“接近”,但其他的预测却相距遥远(存在租金为负数!!)。接下来,我们将评估数值模型在整个测试集中的效果。


评估模型

现在我们已经建立了一个模型,我们需要评估它的表现。在 spark.ml 有分类,回归,聚类和排序预估(在 Spark 3.0 引入)。鉴于上面的案例是一个回归问题,我们将使用均方根误差(RMSE)和 R²( R 平方)来评估模型的性能。

RMSE

RMSE 是从零到无穷大的度量。距离零越近越好。

让我们逐步介绍一下数学公式:

1.计算真值 yi 和预测值 yi 之间的差值(或误差)(发音为 y-hat,其中 hat 表示它是 hat 下变量的预测值):



2.ÿÿ之间差在平方,这样一来我们的正残差和负残差就不会被抵消。这被称为平方差:

3.然后,我们对所有 n 个数据点的平方差求和,称为平方差和(SSE)或残差平方和:

4.但是,SSE 会随着数据集中的记录 n 的数量的增加而增长,所以我们希望根据记录的数量来对其进行规范化。它给了我们均方误差(MSE),一个非常常用的回归指标:

5.如果我们停在 MSE,那么我们的误差项将处在预测变量单位的平方的规模。我们通常会采用 MSE 的平方根来使误差恢复到原始单位的比例,从而得出均方根误差(RMSE):

让我们使用 RMSE 评估我们的模型:

In Python

from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(

  predictionCol="prediction",

  labelCol="price",

  metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)

print(f"RMSE is {rmse:.1f}")

 

// In Scala

import org.apache.spark.ml.evaluation.RegressionEvaluator

val regressionEvaluator = new RegressionEvaluator()

  .setPredictionCol("prediction")

  .setLabelCol("price")

  .setMetricName("rmse")

val rmse = regressionEvaluator.evaluate(predDF)

println(f"RMSE is $rmse%.1f")

这将产生以下输出:

RMSE 是 220.6

解释 RMSE 的价值

那么,我们如何知道 220.6 是否对 RMSE 来说是一个比较好的值呢?有多种方法可以解释此值,其中一种方法是建立简单的基准模型并计算其 RMSE 进行比较。回归任务的常见基准模型是计算训练集上标签的平均值 ȳ(发音 y -bar),然后用该平均值来预测数据集中的每条记录,并计算结果 RMSE(示例代码在这本书的 GitHub repo 上)。如果你尝试此操作,你将看到我们的基准模型的 RMSE 为 240.7,因此我们的预测好过了基准。如果你没有好过基准,那么在模型构建过程中可能出了点问题。

如果这是分类问题,则你可能希望将预测最流行的类别作为基线模型。

请注意,标签的单位会直接影响你的 RMSE。例如,如果你的标签是高度,那么如果使用厘米而不是米作为度量单位,则 RMSE 会更高。你可以通过使用其他单位来任意降低 RMSE,这就是将 RMSE 与基准进行比较的原因。

当然,还有一些指标可以使你直观地了解自己在基准方面的表现,例如 ,我们将在下面进行讨论。

尽管名称 包含“平方”,但 值的范围从负无穷大到 1。让我们看一下此度量标准背后的数学公式。的计算如下:

如果总是预测ȳ,则 SS tot 是平方的总和:

并且 SSres 是你的模型预测(也称为误差平方总和,这是我们计算出的 RMSE)残差平方的总和:

如果你的模型完美地预测了每个数据点,那么你的 SS res = 0,则使  =1。如果你的 SS res = SS tot,则分数为 1/1,因此你的 为 0。如果你的模型执行与始终预测平均值相同的操作,则会发生

但是,如果你的模型的性能比总是预测ȳ还糟糕,并且 SStot 确实很大,那会出现什么情况呢?那么你的 实际上可以是负数!如果 为负,则应重新评估建模过程。使用 的好处在于,你不必定义要进行比较的基准模型。

如果要更改回归评估器以使用 ,而不必重新定义回归评估器,则可以使用 setter 属性设置度量标准名称:

In Python

r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)

print(f"R2 is {r2}")

 

// In Scala

val r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)

println(s"R2 is $r2")

输出为:

R2 为 0.159854

我们的 为正,但非常接近 0。我们的模型表现不佳的原因之一是因为我们的标签 price 似乎是对数正态分布的。如果分布是对数正态的,则意味着如果我们对取值求对数,则结果看起来像是正态分布。价格通常是对数正态分布的。如果考虑一下旧金山的租金价格,大多数租金约为每晚 200 美元,但有些租金每晚可能高达数千美元!你可以在图 10-7 中看到我们的训练数据集的 Airbnb 价格分布。

如果我们查看价格的对数,请看一下结果分布(图 10-8)。

你可以在此处看到我们的对数价格分布看起来更像是正态分布。作为练习,尝试构建模型以预测对数刻度上的价格,然后对预测取幂并评估模型。该代码也可以在该书的 GitHub repo 库中的本章笔记本中找到。你应该看到此数据集的 RMSE 降低而 升高。

保存和加载模型

现在,我们已经建立并评估了一个模型,让我们将其保存到持久性存储中以备后用(或者,如果我们的集群出现故障,我们就不必重新计算模型)。保存模型与编写 DataFrames 非常相似——也就是 API 中的 model.write().save(path)。你可以选择提供 overwrite()命令来覆盖该路径中包含的任何数据:

 In Python

pipelinePath = "/tmp/lr-pipeline-model"

pipelineModel.write().overwrite().save(pipelinePath)

 

// In Scala

val pipelinePath = "/tmp/lr-pipeline-model"

pipelineModel.write.overwrite().save(pipelinePath)

加载保存的模型时,需要指定要重新加载的模型的类型(例如,是 LinearRegressionModel 还是 LogisticRegressionModel)。因此,我们建议你始终将转换器/预估器放在 Pipeline 中,这样对于所有模型,你都可以加载 PipelineModel,而只需更改模型的文件路径即可:

 In Python

from pyspark.ml import PipelineModel

savedPipelineModel = PipelineModel.load(pipelinePath)

 

// In Scala

import org.apache.spark.ml.PipelineModel

val savedPipelineModel = PipelineModel.load(pipelinePath)

加载后,可以将其应用于新的数据点。但是,你不能使用该模型中的权重作为训练新模型的初始化参数(与从随机权重开始相反),因为 Spark 没有“热启动”的概念。如果数据集稍有变化,则必须从头开始重新训练整个线性回归模型。

通过构建和评估线性回归模型,让我们探究其他一些模型如何在我们的数据集上执行。在下一节中,我们将探索基于树的模型,并查看一些常见的超参数以进行调整以提高模型效果。

发布于: 2021 年 07 月 25 日阅读数: 18
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
使用MLlib进行机器学习(十-上)