写点什么

快速体验 Spark Connect

作者:CloudEon开源
  • 2023-12-31
    广东
  • 本文字数:8448 字

    阅读完需:约 28 分钟

介绍

在 Apache Spark 3.4 中,引入了一个解耦的客户端-服务器架构的新模块 Spark Connect,允许使用 DataFrame API 和未解析的逻辑计划作为协议远程连接到 Spark 集群。客户端和服务器之间的分离允许 Spark 及其开放生态系统从任何地方利用。它可以嵌入到现代数据应用程序,IDE,笔记本电脑和编程语言中。


工作原理

Spark Connect 轻量级客户端将 DataFrame 操作转换为使用协议缓冲区编码的未解析逻辑查询计划。这些计划通过 gRPC 框架发送到服务器。


服务器上的 Spark Connect 服务端接收未解析的逻辑计划并将其转换为 Spark 的逻辑计划操作符,类似于解析 SQL 查询,其中解析属性和关系并构建初始解析计划。然后,标准的 Spark 执行过程启动,Spark Connect 会利用 Spark 的所有优化和增强。最终的结果通过 gRPC 作为 Apache Arrow 编码的行批处理流回客户端。


场景用途

  1. 稳定性:使用过多内存的应用程序现在只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端上定义自己的依赖项,而不需要担心与 Spark 驱动程序的潜在冲突。

  2. 可升级性:Spark 驱动程序现在可以独立于应用程序进行无缝升级,例如从性能改进和安全修复中受益。这意味着应用程序可以向前兼容,只要服务器端 RPC 定义被设计为向后兼容。

  3. 可调试性和可观察性:Spark Connect 支持在开发过程中直接从您喜爱的 IDE 进行交互式调试。类似地,可以使用应用程序的框架原生指标和日志库来监视应用程序。


简单总结一下,就是可以基于轻量级的依赖整合到 java 体系项目或者 python 体系项目,甚至是 go 体系的项目中,然后实现轻松调用 spark connect 服务端所在大数据集群的能力,进行数据分析或者数据处理。以后想在 web 项目中调用 spark 的能力不再需要引入过多的依赖导致依赖冲突等问题,也不会因为 driver 端和 web 服务整合在一起而导致不稳定。

快速上手

截至发稿时,Spark 已经更新到 3.5.0 版本。下面的案例都以 3.5.0 版本为主。这里主要实践中 Java 项目中引入 Spark Connect 客户端,连接远程的 Spark Connect 服务器。

搭建 connect server

启动包含 jdk1.8 的容器进行搭建


docker run -it -p 15002:15002 -p 4040:4040  --name spark --rm  registry.cn-hangzhou.aliyuncs.com/udh/jdk:1.8.141 bash
复制代码


容器内下载 spark3.5.0 的包


wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
复制代码


容器内启动 connect server


./sbin/start-connect-server.sh --jars /root/.ivy2/jars/org.apache.spark_spark-connect_2.12-3.5.0.jar,/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar
复制代码


需要在容器内提前下载好两个依赖的 jar。


https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar


https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar


启动参数:


Usage: ./sbin/start-connect-server.sh [options]
Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, k8s://https://host:port, or local (Default: local[*]). --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of jars to include on the driver and executor classpaths. --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by --repositories. The format for the coordinates should be groupId:artifactId:version. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName). --archives ARCHIVES Comma-separated list of archives to be extracted into the working directory of each executor.
--conf, -c PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf.
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
--proxy-user NAME User to impersonate when submitting the application. This argument does not work with --principal / --keytab.
--help, -h Show this help message and exit. --verbose, -v Print additional debug output. --version, Print the version of current Spark.
Spark Connect only: --remote CONNECT_URL URL to connect to the server for Spark Connect, e.g., sc://host:port. --master and --deploy-mode cannot be set together with this option. This option is experimental, and might change between minor releases.
Cluster deploy mode only: --driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1).
Spark standalone or Mesos with cluster deploy mode only: --supervise If given, restarts the driver on failure.
Spark standalone, Mesos or K8s with cluster deploy mode only: --kill SUBMISSION_ID If given, kills the driver specified. --status SUBMISSION_ID If given, requests the status of the driver specified.
Spark standalone, Mesos and Kubernetes only: --total-executor-cores NUM Total cores for all executors.
Spark standalone, YARN and Kubernetes only: --executor-cores NUM Number of cores used by each executor. (Default: 1 in YARN and K8S modes, or all available cores on the worker in standalone mode).
Spark on YARN and Kubernetes only: --num-executors NUM Number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least NUM. --principal PRINCIPAL Principal to be used to login to KDC. --keytab KEYTAB The full path to the file that contains the keytab for the principal specified above.
Spark on YARN only: --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
复制代码

java 整合 connect client

参考官方文档,搭建客户端代码原本以为应该是比较简单的。。。



建立一个 maven 项目,添加 pom


<dependencies>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-api_2.12</artifactId> <version>3.5.0</version> </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-connect-client-jvm --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-connect-client-jvm_2.12</artifactId> <version>3.5.0</version> </dependency>
</dependencies>
复制代码


java 代码


public class App {    public static void main( String[] args )    {        SparkSession spark = SparkSession.builder().remote("sc://localhost").build();
spark.stop(); }}
复制代码


结果,报错了



查看 spark 的官方社区,找到了相同的问题:


https://issues.apache.org/jira/browse/SPARK-45255?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CacheLoader%22%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC


临时解决思路

shade 包依赖生成项目

建立两个 maven 项目,一个叫 shade-spark-connect。用于将缺失依赖 shade 倒入一个胖包。


定义 pom


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>shade-spark-connect</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging>
<name>shade-spark-connect</name> <url>http://maven.apache.org</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spark.shade.packageName>org.sparkproject.connect.client</spark.shade.packageName> <io.grpc.version>1.59.0</io.grpc.version> <netty.version>4.1.100.Final</netty.version>
</properties>
<dependencies> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>32.0.1-jre</version> </dependency>
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.25.1</version> </dependency> </dependencies>

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <configuration> <shadedArtifactAttached>false</shadedArtifactAttached> <artifactSet> <!-- 包含 guava jar,但排除 com.google.thirdparty目录 --> <includes> <include>com.google.guava:*</include> </includes> </artifactSet> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/> </transformers> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>com/google/thirdparty/**</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <configuration> <shadedArtifactAttached>false</shadedArtifactAttached> <artifactSet>
<includes> <include>com.google.protobuf:**</include> <include>com.google.guava:**</include>

</includes> </artifactSet>
<!-- 依赖的 jar 包中的一些类文件打包到项目构建生成的 jar 包中,在打包的时候把类重命名--> <relocations> <relocation> <pattern>com.google.common</pattern> <shadedPattern>org.sparkproject.connect.client.com.google.common</shadedPattern> <includes> <include>com.google.common.*</include> </includes>
</relocation> <relocation> <pattern>com.google.protobuf</pattern> <shadedPattern>org.sparkproject.connect.client.com.google.protobuf</shadedPattern> <includes> <include>com.google.protobuf.**</include> </includes>
</relocation> </relocations> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/> </transformers> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin>
</plugins>
</build></project>
复制代码

测试 connect client 的项目

建另一个 maven 项目,用于测试 connect 代码。


pom 代码


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>spark-connect-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging>
<name>spark-connect-demo</name> <url>http://maven.apache.org</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <io.grpc.version>1.59.0</io.grpc.version> <netty.version>4.1.100.Final</netty.version>
</properties>
<dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>${io.grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>${io.grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-services</artifactId> <version>${io.grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>${io.grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-inprocess</artifactId> <version>${io.grpc.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-codec-http2</artifactId> <version>${netty.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-handler-proxy</artifactId> <version>${netty.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport-native-unix-common</artifactId> <version>${netty.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-connect-client-jvm_2.12</artifactId> <version>3.5.0</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency>
<dependency> <groupId>org.example</groupId> <artifactId>demo</artifactId> <version>1.0</version> <scope>system</scope> <systemPath>E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar</systemPath> </dependency> </dependencies>

<build> <plugins>

<plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.12.2</version> <!-- 适用于你的 Scala 版本 --> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins>

</build></project>
复制代码


其中 E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar 指向上面 shade-spark-connect 使用 package 命令生成到包。


java 代码


import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._import org.apache.spark.sql.types._import org.apache.spark.sql._import org.apache.spark.sql.expressions._object Myapp extends App {
println(spark.version) val spark: SparkSession = SparkSession.builder.remote("sc://localhost").getOrCreate()//using list to init df val df = spark.createDataFrame(List(("a a a", 1), ("b b b b", 2), ("c c d a", 3))).toDF("value", "col2") df.show()

// word count val words = df.select(explode(split(col("value"), " ")).as("word")) val wordCounts = words.groupBy("word").count() wordCounts.show()

spark.stop()
}
复制代码


可以看到 connect server 在计算完返回的结果了


总结

我也尝试过使用 PySpark 的 Connect 客户端连接 Connect 服务器,按照官方文档的步骤进行操作是完全没有问题的。**猜测可能是因为目前更多的用户结合 Python 使用这个模块,而使用 Scala 和 Java 的用户相对较少。或许随着时间的推移,官方会解决这个问题。如果你希望尽早体验这一功能,可以按照文章中的方法迅速进行尝试。


用户头像

还未添加个人签名 2019-06-23 加入

还未添加个人简介

评论

发布
暂无评论
快速体验Spark Connect_CloudEon开源_InfoQ写作社区