写点什么

结语:Apache Spark 3_0(十二)

发布于: 1 小时前
结语:Apache Spark 3_0(十二)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


在我们写这本书的时候,Apache Spark 3.0 还没有正式发布,它仍在开发中,我们开始使用 Spark 3.0.0- preview2。本书中的所有代码示例都在 Spark 3.0.0- preview2 上进行过测试,它们与 Spark 3.0 官方版本的工作原理没有什么不同。在相关的章节中,我们尽可能地提到了什么时候特性是新增加的,它在 Spark 3.0 中有什么特点。在本章中,我们概述了这些变化。

缺陷修复和功能增强有很多,因此,为了简洁起见,我们仅重点介绍与 Spark 组件有关的一些显著更改和功能。某些新功能实际上是高级功能,不在本书的讨论范围之内,但是我们在这里提到这些功能是为了让你在发行版可用时可以探索它们。

Spark Core 和 Spark SQL

首先,让我们考虑一下幕后的新功能。Spark Core 和 Spark SQL 引擎中引入了许多更改,以帮助加快查询速度。加快查询的一种方法是使用动态分区修剪来读取较少的数据。另一个是在执行过程中调整和优化查询计划。

动态分区修剪

动态分区修剪(DPP)背后的想法是跳过查询结果中不需要的数据。DPP 最佳的典型方案是连接两个表:事实表(分区在多列上)和维度表(未分区),如图 12-1 所示。通常,过滤器位于表的非分区侧(在本例中为 Date)。例如,考虑这个基于两个表 Sales 和 Date 的查询:

-- In SQL

SELECT * FROM Sales JOIN Date ON Sales.date = Date.date

DPP 中的关键优化技术是从维度表中获取过滤器的结果,并将其注入到事实表中,作为扫描操作的一部分,以限制读取的数据,如图 12-1 所示。

考虑维表小于事实表并执行连接的情况,如图 12-2 所示。在这种情况下,Spark 很可能会进行广播连接(在第 7 章中进行了讨论)。在此连接期间,Spark 将执行以下步骤以最大程度地减少从较大的事实表中扫描的数据量:

1. 在连接的维度方面,Spark 将根据该维度表构建一个哈希表(也称为构建关系),作为此过滤器查询的一部分。

2. Spark 会将查询结果插入哈希表,并将其分配给广播变量,该变量将分发给参与此连接操作的所有执行程序。

3. 在每个执行程序上,Spark 都会探测广播的哈希表,以确定要从事实表中读取哪些对应的行。

4. 最后,Spark 将把此过滤器动态注入事实表的文件扫描操作中,并重用广播变量的结果。这样,作为对事实表的文件扫描操作的一部分,仅扫描与过滤器匹配的分区,并且仅读取所需的数据。

默认情况下启用,因此你不必显式配置它,当在两个表之间执行连接时,所有这些都是动态发生的。通过 DPP 优化,Spark 3.0 可以更好地处理星型模式查询。

自适应查询执行

Spark 3.0 优化查询性能的另一种方法是在运行时调整其物理执行计划。自适应查询执行(AQE)根据查询执行过程中收集的运行时统计信息重新优化和调整查询计划。它尝试在运行时执行以下操作:

通过减少 shuffle 分区的数量来减少 shuffle 阶段中的 reducer 的数量。

优化查询的物理执行计划,例如通过将 SortMergeJoin 在合适的时间点转换为 BroadcastHashJoin。

处理连接期间的数据倾斜。

所有这些自适应措施都在运行时计划执行期间发生,如图 12-3 所示。要在 Spark 3.0 中使用 AQE,请将配置设置 spark.sql.adaptive.enabled 为 true。

AQE 框架

查询中的 Spark 操作被流水线化并在并行进程中执行,但是改组或广播交换中断了该流水线,因为需要将一个阶段的输出作为下一阶段的输入(请参阅本章中的“步骤 3:了解 Spark 应用程序概念”)。这些断点在查询阶段称为实现点,它们提供了重新优化和重新检查查询的机会,如图 12-4 所示。

如图所示,这是 AQE 框架要迭代的概念性步骤:

1. 执行每个阶段的所有叶节点,例如扫描操作。

2. 实现点完成执行后,将其标记为完成,并且在执行期间获得的所有相关统计信息都会在其逻辑计划中进行更新。

3. 基于这些统计信息,例如读取的分区数,读取的数据字节等,该框架再次运行 Catalyst 优化器以了解它是否可以:

a. 合并分区的数量以减少用于读取随机数据的缩减器的数量。

b. 根据读取的表的大小,用广播连接替换排序合并连接。

c. 尝试纠正数据倾斜。

d. 创建一个新的优化逻辑计划,然后创建一个新的优化物理计划。

 

重复此过程,直到执行了查询计划的所有阶段。

简而言之,这种重新优化是动态完成的,如图 12-3 所示,其目标是动态合并 shuffle 分区,减少读取 shuffle 输出数据所需的 reducer 数量,在适当的情况下切换连接策略,并补救任何倾斜连接。

两种 Spark SQL 配置指示 AQE 如何减少 reducer 的数量:

l spark.sql.adaptive.coalescePartitions.enabled(设置为 true)

l spark.sql.adaptive.skewJoin.enabled(设置为 true)

在撰写本文时,Spark 3.0 社区博客,文档和示例尚未公开发布,但在发布时它们应该已经公开。如果你希望了解这些功能的工作原理,这些资源将使你获得更多详细信息,包括有关如何注入 SQL 连接提示的信息,下面将进行讨论。

SQL 连接提示

添加到现有的 BROADCAST 提示进行连接,在 Spark3.0 中为所有的 Spark Join 连接策略增加了连接提示(见第七章的“Spark 的 Join 连接策略”)。此处提供了每种连接类型的示例。

随机排序合并连接(SMJ)

有了这些新的提示,你可以建议 Spark 在连接表 a 和 b 或 customers 和 orders 时执行 SortMergeJoin ,如以下示例所示。你可以在 SELECT /*+ ... */注释块内的语句中添加一个或多个提示:

SELECT /*+ MERGE(a, b) */ id FROM a JOIN b ON a.key = b.key

SELECT /*+ MERGE(customers, orders) */ * FROM customers, orders WHERE

orders.custId = customers.custId

广播哈希连接(BHJ)

同样,对于广播哈希连接,你可以向 Spark 提供提示,表明你更喜欢广播连接。例如,在这里,我们广播表 a 与表 b 连接以及表 customers 与表 orders 连接:

SELECT /*+ BROADCAST(a) */ id FROM a JOIN b ON a.key = b.key

SELECT /*+ BROADCAST(customers) */ * FROM customers, orders WHERE

orders.custId = customers.custId 

Shuffle 哈希连接(SHJ)

你可以通过类似的方式提供提示以执行随机哈希连接,尽管与前两种受支持的连接策略相比,这种情况很少见:

SELECT /*+ SHUFFLE_HASH(a, b) */ id FROM a JOIN b ON a.key = b.key

SELECT /*+ SHUFFLE_HASH(customers, orders) */ * FROM customers, orders WHERE  orders.custId = customers.custId

随机复制嵌套循环连接(SNLJ)

最后,shuffle 和复制嵌套循环连接遵循类似的形式和语法:

SELECT /*+ SHUFFLE_REPLICATE_NL(a, b) */ id FROM a JOIN b

目录插件 API 和 DataSourceV2

Spark 3.0 的实验性 DataSourceV2 API 不仅限于 Hive 元存储和目录,还扩展了 Spark 生态系统并为开发人员提供了三个核心功能。具体来说包括:

l 允许插入用于目录和表管理的外部数据源

l 通过 ORC,Parquet,Kafka,Cassandra,Delta Lake 和 Apache Iceberg 等受支持的文件格式,支持将谓词下推到其他数据源。

l 提供统一的 API,用于接收和来源的数据源的流式处理和批处理

针对希望扩展 Spark 使用外部源和接收器功能的开发人员,目录 API 提供了 SQL 和编程 API 来从指定的可插入目录中创建,更改,加载和删除表。该目录提供了在不同级别执行的功能和操作的分层抽象,如图 12-5 所示。

Spark 和特定连接器之间的初始交互是解析与其实际 Table 对象的关系。Catalog 定义如何在此连接器中查找表。此外,Catalog 可以定义如何修改自己的元数据,从而支持 CREATE TABLE,ALTER TABLE 等操作。

例如,在 SQL 中,你现在可以发出命令来为目录创建命名空间。要使用可插入目录,请在 spark-defaults.conf 文件中启用以下配置:

spark.sql.catalog.ndb_catalog com.ndb.ConnectorImpl # connector implementation

spark.sql.catalog.ndb_catalog.option1 value1

spark.sql.catalog.ndb_catalog.option2 value2 

在此,数据源目录的连接器有两个选项:option1->value1 和 option2->value2。定义它们之后,Spark 或 SQL 中的应用程序用户可以使用 DataFrameReader 和 DataFrameWriter API 方法或带有这些已定义选项的 Spark SQL 命令作为数据源操作的方法。例如:

-- In SQL

SHOW TABLES ndb_catalog;

CREATE TABLE ndb_catalog.table_1;

SELECT * from ndb_catalog.table_1;

ALTER TABLE ndb_catalog.table_1

 

// In Scala

df.writeTo("ndb_catalog.table_1")

val dfNBD = spark.read.table("ndb_catalog.table_1")

  .option("option1", "value1")

  .option("option2", "value2")

这些目录插件 API 扩展了 Spark 利用外部数据源作为接收器和源的能力,但它们仍处于试验阶段,不应在生产中使用。有关使用它们的详细指南超出了本书的范围,但是如果你想将自定义连接器写入外部数据源作为目录来管理外部表及其相关联的目录,我们建议你查看发行文档以获取更多信息。元数据。

 

前面的代码段是定义和实现目录连接器并用数据填充它们后代码的示例。

加速器感知调度器(Accelerator-Aware Scheduler)

Project Hydrogen 是一项将 AI 和大数据整合在一起的社区计划,其主要目标是三个:实施障碍执行模式,感知加速器的计划以及优化的数据交换。Apache Spark 2.4.0 中引入了屏障执行模式的基本实现。在 Spark 3.0 中,已实现了基本的调度程序,以利用硬件加速器(例如目标平台上的 GPU),在该平台上,Spark 以独立模式,YARN 或 Kubernetes 部署。

为了让 Spark 以有组织的方式利用这些 GPU 来处理使用它们的特殊工作负载,你必须指定可通过配置使用的硬件资源。然后,你的应用程序可以在探测脚本的帮助下发现它们。在你的 Spark 应用程序中,启用 GPU 使用是一个三步过程:

编写探测脚本,以发现每个 Spark 执行程序上可用的基础 GPU 的地址。该脚本在以下 Spark 配置中设置:

spark.worker.resource.gpu.discoveryScript =/path/to/script.sh

为你的 Spark 执行者设置配置以使用以下发现的 GPU:

spark.executor.resource.gpu.amount = 2

spark.task.resource.gpu.amount = 1

编写 RDD 代码以利用这些 GPU 来完成任务:

import org.apache.spark.BarrierTaskContext

val rdd = ...

rdd.barrier.mapPartitions { it =>

  val context = BarrierTaskContext.getcontext.barrier()

  val gpus = context.resources().get("gpu").get.addresses

  // launch external process that leverages GPU

  launchProcess(gpus)

}

 

这些步骤仍处于试验阶段,将来的 Spark 3.x 版本中将继续进行进一步开发,以支持在命令行(带有 spark-submit)和 Spark 任务级别上无缝发现 GPU 资源。

结构化流

为了检查你的结构化流作业在执行过程中的数据起伏与变化,Spark 3.0 UI 在第 7 章中探讨了其他选项卡的同时,还提供了一个新的结构化流选项卡。此选项卡提供两组统计信息:有关已完成的流查询作业的聚合信息(图 12-6)和有关流查询的详细统计信息,包括输入速率,处理速率,输入行数,批处理持续时间和操作持续时间(图 12-7)。

在图 12-7 的屏幕截图是采取 Spark3.0.0-preview2; 在最终版本中,你应该在用户界面页面的名称标识符中看到查询名称和 ID。

无需配置;所有配置均可直接在 Spark 3.0 安装中运行,并具有以下默认值:

spark.sql.streaming.ui.enabled=true

spark.sql.streaming.ui.retainedProgressUpdates=100

spark.sql.streaming.ui.retainedQueries=100

PySpark,Pandas UDF 和 Pandas Function API

Spark 3.0 要求 pandas 0.23.2 版本或更高版本才能使用任何与 pandas 相关的方法,例如 DataFrame.toPandas()或 SparkSession.createDataFrame(pandas.DataFrame)。

此外,它需要 PyArrow 0.12.1 版本或更高使用 PyArrow 功能,例如 pandas_udf(),DataFrame.toPandas()和 SparkSession.createDataFrame(pandas.DataFrame)(spark.sql.execution.arrow.enabled 配置集到 true)。下一节将介绍 Pandas UDF 中的新功能。

重新设计的带有 Python 类型提示的 Pandas UDF

通过利用 Python 类型提示重新设计了 Spark 3.0 中的 Pandas UDF 。这使你可以自然地表达 UDF,而无需赋值类型。pandas UDF 现在更具“ Python 风格”,它们本身可以定义 UDF 应该输入和输出的内容,而不必像在 Spark 2.4 中那样通过 UDF 进行指定,例如 @pandas_udf("long", PandasUDFType.SCALAR)。

这是一个例子:

Pandas UDFs in Spark 3.0

import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("long")

def pandas_plus_one(v: pd.Series) -> pd.Series:

 return v + 1

这种新格式具有许多优点,例如更容易进行静态分析。你可以采用与以前相同的方法来应用新的 UDF:

df = spark.range(3)

df.withColumn(“ plus_one”,pandas_plus_one(“ id”))。show()

 

+ --- + -------- +

| id | plus_one |

+ --- + -------- +

| 0 | 1 |

| 1 | 2 |

| 2 | 3 |

+ --- + -------- +

pandasUDF 中的迭代器支持

pandasUDF 非常常用于加载模型并为单节点机器学习和深度学习模型执行分布式推理。但是,如果模型很大,那么 Pandas UDF 要在同一 Python 工作进程中为每个批次重复加载相同的模型,会产生很高的开销。

在 Spark3.0,pandasUDF 可以接受 pandas.Series 的迭代器的或 pandas.DataFrame,如下所示:

from typing import Iterator 

@pandas_udf('long')

def pandas_plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:

 return map(lambda s: s + 1, iterator)

df.withColumn("plus_one", pandas_plus_one("id")).show()

+---+--------+

| id|plus_one|

+---+--------+

| 0| 1|

| 1| 2|

| 2| 3|

+---+--------+

有了此支持,你只能加载一次模型,而不是为迭代器中的每个系列加载模型。以下伪代码说明了如何执行此操作:

@pandas_udf(...)

def predict(iterator):

 model = ... # load model

 for features in iterator:

   yield model.predict(features)

新的 Pandas Function API

Spark 3.0 引入了一些新的 Pandas UDF 类型,当你想对整个 DataFrame 而不是按列应用函数时,它们很有用,如第 11 章中 mapInPandas()介绍的那样。这些将一个迭代器作为输入,并输出另一个迭代器:pandas.DataFramepandas.DataFrame。

def pandas_filter(

 iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:

 for pdf in iterator:

 yield pdf[pdf.id == 1]

df.mapInPandas(pandas_filter, schema=df.schema).show()

+---+

| id|

+---+

| 1|

+---+

你可以通过在配置中指定 spark.sql.execution.arrow.maxRecordsPerBatch 来控制 pandas.DataFrame 的大小。请注意,与大多数 Pandas UDF 不同,输入大小和输出大小不必匹配。

协同组的所有数据都将加载到内存中,这意味着如果存在数据倾斜或某些组太大而无法容纳在内存中,则可能会遇到 OOM 问题。

Spark 3.0 还引入了 cogrouped map Pandas UDF。该 applyInPandas()函数使用两个 pandas.DataFrame 共享一个公用键,并将一个函数应用于每个共同组(cogroup)。然后将返回的 pandas.DataFrame 组合为单个 DataFrame。与 mapInPandas()相同,pandas.DataFrame 对返回的长度没有限制。这是一个例子:

df1 = spark.createDataFrame(

 [(1201, 1, 1.0), (1201, 2, 2.0), (1202, 1, 3.0), (1202, 2, 4.0)],

 ("time", "id", "v1"))

df2 = spark.createDataFrame(

 [(1201, 1, "x"), (1201, 2, "y")], ("time", "id", "v2"))

def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:

 return pd.merge_asof(left, right, on="time", by="id")

df1.groupby("id").cogroup(

 df2.groupby("id")

).applyInPandas(asof_join, "time int, id int, v1 double, v2 string").show()

 

+----+---+---+---+

|time| id| v1| v2|

+----+---+---+---+

|1201| 1|1.0| x|

|1202| 1|3.0| x|

|1201| 2|2.0| y|

|1202| 2|4.0| y|

+----+---+---+---+

功能变更

列出 Spark 3.0 中的所有功能更改会把这本书转变成几英寸厚的砖头。因此,为了简洁起见,我们将在此处提及一些值得注意的问题,并让你查阅 Spark 3.0 的发行说明,以获取完整的详细信息和所有细微差别。

支持和弃用的语言

Spark 3.0 支持 Python 3 和 JDK 11,并且需要 Scala 2.12 版本。不推荐使用 Python 3.6 和 Java 8 之前的所有版本。如果使用这些不建议使用的版本,则会收到警告消息。

对 DataFrame 和 Dataset API 的更改

在早期版本的 Spark 中,Dataset 和 DataFrame AP 已弃用该 unionAll()方法。在 Spark 3.0 中,这已被逆转,并且 unionAll()现在是 union()方法的别名。

此外,Spark 的 Dataset. groupbykey()的早期版本会导致一个分组的数据集,当键是非结构类型(int、string、array 等)时,键被虚假地命名为值。因此,当显示时,查询中的 ds.groupByKey().count()的聚合结果与(value, count)相反。这已被纠正,以产生(key,count),这更直观。例如:

//  In Scala

val ds = spark.createDataset(Seq(20, 3, 3, 2, 4, 8, 1, 1, 3))

ds.show(5)

 

+-----+

|value|

+-----+

|   20|

|    3|

|    3|

|    2|

|    4|

+-----+

 

ds.groupByKey(k=> k).count.show(5)

 

+---+--------+

|key|count(1)|

+---+--------+

|  1|       2|

|  3|       3|

| 20|       1|

|  4|       1|

|  8|       1|

+---+--------+

only showing top 5 rows

但是,如果愿意,可以通过设置 spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue 为 true 保留旧格式。

DataFrame 和 SQL Explain 命令

为了获得更好的可读性和格式,Spark 3.0 引入了 DataFrame.explain(FORMAT_MODE)功能,用了显示 Catalyst 优化器生成的计划的不同视图。该选项包括 simple(默认),"extended", "cost", "codegen"和" formatting "。这是一个简单的例子:

// In Scala

val strings = spark

 .read.text("/databricks-datasets/learning-spark-v2/SPARK_README.md")

val filtered = strings.filter($"value".contains("Spark"))

filtered.count()

In Python

strings = spark

 .read.text("/databricks-datasets/learning-spark-v2/SPARK_README.md")

filtered = strings.filter(strings.value.contains("Spark"))

filtered.count()

// In Scala

filtered.explain("simple")


In Python

filtered.explain(mode="simple")


== Physical Plan ==

*(1) Project [value#72]

+- *(1) Filter (isnotnull(value#72) AND Contains(value#72, Spark))

   +- FileScan text [value#72] Batched: false, DataFilters: [isnotnull(value#72),

Contains(value#72, Spark)], Format: Text, Location:

InMemoryFileIndex[dbfs:/databricks-datasets/learning-spark-v2/SPARK_README.md],PartitionFilters: [], PushedFilters: [IsNotNull(value),StringContains(value,Spark)], ReadSchema: struct<value:string>

// In Scala

filtered.explain("formatted")

 In Python

filtered.explain(mode="formatted")

 

== Physical Plan ==

* Project (3)

+- * Filter (2)

   +- Scan text  (1)


(1) Scan text


Output [1]: [value#72]

Batched: false

Location: InMemoryFileIndex [dbfs:/databricks-datasets/learning-spark-v2/...

PushedFilters: [IsNotNull(value), StringContains(value,Spark)]

ReadSchema: struct<value:string>


(2) Filter [codegen id : 1]

Input [1]: [value#72]

Condition : (isnotnull(value#72) AND Contains(value#72, Spark))


(3) Project [codegen id : 1]

Output [1]: [value#72]

Input [1]: [value#72]

-- In SQL

EXPLAIN FORMATTED

SELECT *

FROM tmp_spark_readme

WHERE value like "%Spark%"


== Physical Plan ==

* Project (3)

+- * Filter (2)

   +- Scan text  (1)


(1) Scan text

Output [1]: [value#2016]

Batched: false

Location: InMemoryFileIndex [dbfs:/databricks-datasets/


learning-spark-v2/SPARK_README.md]

PushedFilters: [IsNotNull(value), StringContains(value,Spark)]

ReadSchema: struct<value:string>


(2) Filter [codegen id : 1]

Input [1]: [value#2016]

Condition : (isnotnull(value#2016) AND Contains(value#2016, Spark))


(3) Project [codegen id : 1]

Output [1]: [value#2016]

Input [1]: [value#2016]


要查看其余的格式化模式,请在本书的 GitHub repo 中笔记本进行尝试使用。也可以查看从 Spark 2.x 到 Spark 3.0 的迁移指南。


概括

本章粗略介绍了 Spark 3.0 中的新功能。我们冒昧地提到了一些值得注意的高级功能。它们在后台运行,而不是在 API 级别运行。特别是,我们研究了动态分区修剪(DPP)和自适应查询执行(AQE),这两种优化可以提高 Spark 在执行时的性能。我们还探索了实验性 Catalog API 如何将 Spark 生态系统扩展到用于批处理和流数据的源和接收器的自定义数据存储,并研究了 Spark 3.0 中新的调度程序,使其能够在执行程序中利用 GPU。

 

作为第 7 章中对 Spark UI 的讨论的补充,我们还向你展示了新的“结构化流”选项卡,它提供了有关流作业的累积统计信息,其他可视化效果以及每个查询的详细指标。

 

在 Spark 3.0 中不推荐使用低于 3.6 的 Python 版本,并且对 Pandas UDF 进行了重新设计,以支持 Python 类型的提示和迭代器作为参数。有 pandas UDF,它们可以转换整个 DataFrame,以及将两个共同分组的 DataFrame 组合成一个新的 DataFrame。

 

为了提高查询计划的可读性,通过 DataFrame.explain(FORMAT_MODE)和在 SQL 中执行 EXPLAIN FORMAT_MODESQL 显示逻辑和物理计划的不同级别和详细信息。此外,SQL 命令现在可以获取 Spark 整个受支持的连接策略的提示。

 

虽然我们无法在这一简短的章节中列举最新版本的 Spark 中的所有更改,但我们还是建议你在发布 Spark 3.0 时浏览发行说明以了解更多信息。另外,为了快速总结面向用户的更改以及有关如何迁移到 Spark 3.0 的详细信息,我们建议你查看迁移指南。

提醒一下,本书中的所有代码均已在 Spark 3.0.0-preview2 上进行了测试,并且在 Spark 3.0 正式发布时可以与 Spark 3.0 一起使用。我们希望你喜欢这本书,并从与我们的旅程中学到东西。感谢你的关注!

 

发布于: 1 小时前阅读数: 6
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
结语:Apache Spark 3_0(十二)