写点什么

在字节跳动,一个更好的企业级 SparkSQL Server 这么做

  • 2022 年 5 月 11 日
  • 本文字数:6153 字

    阅读完需:约 20 分钟

在字节跳动,一个更好的企业级SparkSQL Server这么做

SparkSQL 是 Spark 生态系统中非常重要的组件。面向企业级服务时,SparkSQL 存在易用性较差的问题,导致难满足日常的业务开发需求。本文将详细解读,如何通过构建 SparkSQL 服务器实现使用效率提升和使用门槛降低。

前言

Spark 组件由于其较好的容错与故障恢复机制,在企业的长时作业中使用的非常广泛,而 SparkSQL 又是使用 Spark 组件中最为常用的一种方式。

相比直接使用编程式的方式操作 Spark 的 RDD 或者 DataFrame 的 API,SparkSQL 可直接输入 SQL 对数据进行 ETL 等工作的处理,极大提升了易用度。但是相比 Hive 等引擎来说,由于 SparkSQL 缺乏一个类似 Hive Server2 的 SQL 服务器,导致 SparkSQL 在易用性上比不上 Hive。

很多时候,SparkSQL 只能将自身 SQL 作业打包成一个 Jar,进行 spark-submit 命令提交,因而大大降低 Spark 的易用性。除此之外,还可使用周边工具,如 Livy,但 Livy 更像一个 Spark 服务器,而不是 SparkSQL 服务器,因此无法支持类似 BI 工具或者 JDBC 这样的标准接口进行访问。

虽然 Spark 提供 Spark Thrift Server,但是 Spark Thrift Server 的局限非常多,几乎很难满足日常的业务开发需求,具体的分析请查看:观点|SparkSQL在企业级数仓建设的优势

标准的 JDBC 接口

Java.sql 包下定义了使用 Java 访问存储介质的所有接口,但是并没有具体的实现,也就是说 JavaEE 里面仅仅定义了使用 Java 访问存储介质的标准流程,具体的实现需要依靠周边的第三方服务实现。

例如,访问 MySQL 的 mysql-connector-java 启动包,即基于 java.sql 包下定义的接口,实现了如何去连接 MySQL 的流程,在代码中只需要通过如下的代码方式:

Class.forName("com.mysql.cj.jdbc.Driver");Connection connection= DriverManager.getConnection(DB_URL,USER,PASS);//操作connection.close();
复制代码

第一,初始化驱动、创建连接,第二,基于连接进行对数据的操作,例如增删改查。可以看到在 Java 定义的标准接口访问中,先创建一个 connection 完成存储介质,然后完成 connection 后续操作。

性能问题导致单次请求实时创建 connection 的性能较差。因此我们往往通过维护一个存有多个 connection 的连接池,将 connection 的创建与使用分开以提升性能,因而也衍生出很多数据库连接池,例如 C3P0,DBCP 等。

Hive 的 JDBC 实现

构建 SparkSQL 服务器最好的方式是用如上 Java 接口,且大数据生态下行业已有标杆例子,即 Hive Server2。Hive Server2 在遵循 Java JDBC 接口规范上,通过对数据操作的方式,实现了访问 Hive 服务。除此之外,Hive Server2 在实现上,与 MySQL 等关系型数据稍有不同。

首先,Hive Server2 本身是提供了一系列 RPC 接口,具体的接口定义在 org.apache.hive.service.rpc.thrift 包下的 TCLIService.Iface 中,部分接口如下:

public TOpenSessionResp OpenSession(TOpenSessionReq req) throws org.apache.thrift.TException;public TCloseSessionResp CloseSession(TCloseSessionReq req) throws org.apache.thrift.TException;public TGetInfoResp GetInfo(TGetInfoReq req) throws org.apache.thrift.TException;public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws org.apache.thrift.TException;public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws org.apache.thrift.TException;public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws org.apache.thrift.TException;public TGetSchemasResp GetSchemas(TGetSchemasReq req) throws org.apache.thrift.TException;public TGetTablesResp GetTables(TGetTablesReq req) throws org.apache.thrift.TException;public TGetTableTypesResp GetTableTypes(TGetTableTypesReq req) throws org.apache.thrift.TException;public TGetColumnsResp GetColumns(TGetColumnsReq req) throws org.apache.thrift.TException;
复制代码

也就是说,Hive Server2 的每一个请求都是独立的,并且是通过参数的方式将操作和认证信息传递。Hive 提供了一个 JDBC 的驱动实现,通过如下的依赖便可引入:

<dependency>    <groupId>org.apache.hive</groupId>    <artifactId>hive-jdbc</artifactId>    <version>version/version></dependency>
复制代码

在 HiveConnection 类中实现了将 Java 中定义的 SQL 访问接口转化为调用 Hive Server2 的 RPC 接口的实现,并且扩充了一部分 Java 定义中缺乏的能力,例如实时的日志获取。但是使用该能力时,需要将对应的实现类转换为 Hive 的实现类,例如:

HiveStatement hiveStatement = (HiveStatement) connection.createStatement();List<String> logs = hiveStatement.getQueryLog();
复制代码

Log 获取也需调用 FetchResult 接口,通过不同的参数来区分获取 Log 信息还是获取内容信息,因此,Hive JDBC 封装的调用 Hive Server2 RPC 接口流程是:



如果该流程触发获取 MetaData、获取 Functions 等操作,则会调用其他接口,其中身份信息即 token,是用 THandleIdentifier 类进行封装。在 OpenSession 时,由 Hive Server2 生成并且返回,后续所有接口都会附带传递这个信息,此信息是一次 Connection 连接的唯一标志。

但是,Hive Server2 在 FetchResults 方法中存在 bug。由于 Hive Server2 没有很好处理 hasMoreRows 字段,导致 Hive JDBC 客户端并未通过 hasMoreRows 字段去判断是否还有下一页,而是通过返回的 List 是否为空来判断。因此,相比 Mysql Driver 等驱动,Hive 会多发起一次请求,直到返回 List 为空则停止获取下一页,对应的客户端的 JDBC 代码是:

ResultSet rs = hiveStatement.executeQuery(sql);while (rs.next()) {    // }
复制代码

即 Hive JDBC 实现 next 方法是通过返回的 List 是否为空来退出 while 循环。

构建 SparkSQL 服务器

介绍了 JDBC 接口知识与 Hive 的 JDBC 知识后,如果要构建一个 SparkSQL 服务器,那么这个服务器需要有以下几个特点:

  • 支持 JDBC 接口,即通过 Java 的 JDBC 标准进行访问,可以较好与周边生态进行集成且降低使用门槛。

  • 兼容 Hive 协议,如果要支持 JDBC 接口,那么需要提供 SparkSQL 的 JDBC Driver。目前,大数据领域 Hive Server2 提供的 Hive-JDBC-Driver 已经被广泛使用,从迁移成本来说最好的方式就是保持 Hive 的使用方式不变,只需要换个端口就行,也就是可以通过 Hive 的 JDBC Driver 直接访问 SparkSQL 服务器。

  • 支持多租户,以及类似用户名+密码和 Kerberos 等常见的用户认证能力。

  • 支持跨队列提交,同时支持在 JDBC 的参数里面配置 Spark 的相关作业参数,例如 Driver Memory,Execute Number 等。

这里还有一个问题需要考虑,即用户通过 SparkSQL 服务器提交的是一段 SQL 代码,而 SparkSQL 在执行时需要向 Yarn 提交 Jar。那么,如何实现 SQL 到 Jar 提交转换?

一个最简单的方式是,用户每提交一个 SQL 就执行一次 spark-submit 命令,将结果保存再缓存,提供给客户端。还有更好方式,即提交一个常驻的 Spark 作业,这个作业是一个常驻任务,作业会开启一个端口,用来接收用户的 SQL 进行执行,并且保存。

但是为了解决类似 Spark Thrift Server 的问题,作业需要和用户进行绑定,而不是随着 Spark 的组件启动进行绑定,即作业的提交以及接收哪个用户的请求,均来自于用户的行为触发。


有了这样几个大的方向后,便可以开始开发 SparkSQL 服务器。首先需要实现 TCLIService.Iface 下的所有接口,下面用代码+注释的方式来讲述这些 Thrift 接口的含义,以及如果实现一个 SparkSQL 服务器,需要在这些接口做什么内容:

public class SparkSQLThriftServer implements TCLIService.Iface {    @Override    public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {        //Hive JDBC Driver在执行创建Connection的时候会调用此接口,在这里维护一个用户与Spark 作业的对应关系。        //来判断是需要复用一个已经存在的Spark作业,还是全新执行一次spark-submt。        //用户与是否需要spark-submit的关联关系均在这里实现。        //同时需要生成THandleIdentifier对象,并且和用户身份进行关联,后续其他方法调用均需要使用这个对象关联出用户的信息。        return null;    }
@Override public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { //客户端调用connection.close()方法后会进入到这里,在这里进行用户状态的清除,同时需要基于用户的情况判断是否需要停止用来执行该用户SQL的Spark 作业引擎。 return null; }
@Override public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { //获取服务器的元数据信息,例如使用BI工具,在命令会列出所连接的服务的版本号等信息,均由此方法提供。 return null; }
@Override public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException { //执行SQL任务,这里传递过来的是用户在客户端提交的SQL作业,接收到用户SQL后,将该SQL发送给常驻的Spark作业,这个常驻的作业在OpenSession的时候已经确定。 return null; }
@Override public TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req) throws TException { //获取数据库支持的类型信息,使用BI工具,例如beeline的时候会调用到这里。 return null; }
@Override public TGetCatalogsResp GetCatalogs(TGetCatalogsReq req) throws TException { //获取Catalog,使用BI工具,例如beeline的时候会调用到这里。 return null; }

@Override public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { //返回查询结果,基于THandleIdentifier对象查询到用户的SQL执行的情况,将请求转发至常驻的Spark 实例,获取结果。 //参数中通过TFetchResultsReq的getFetchType来区分是获取日志数据还是查询结果数据,getFetchType == 1为获取Log,为0是查询数据查询结果。 return null; }

}
复制代码


我们采用复用当前生态的方式,来实现兼容 Hive JDBC Driver 的服务器。有了上面的 Thrift 接口实现后,则需要启动一个 Thrift 服务,例如:

TThreadPoolServer.Args thriftArgs = new TThreadPoolServer.Args(serverTransport)        .processorFactory(new TProcessorFactory(this))        .transportFactory(new TSaslServerTransport.Factory())        .protocolFactory(new TBinaryProtocol.Factory())        .inputProtocolFactory(                new TBinaryProtocol.Factory(                        true,                        true,                        10000,                        10000
) ) .requestTimeout(1000L) .requestTimeoutUnit(TimeUnit.MILLISECONDS) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS) .executorService(executorService);thriftArgs .executorService( new ThreadPoolExecutor( config.getMinWorkerThreads(), config.getMaxWorkerThreads(), config.getKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<>()));
TThreadPoolServer server = new TThreadPoolServer(thriftArgs);server.serve();
复制代码


至此便开发了一个支持 Hive JDBC Driver 访问的服务器,并且在这个服务器的方法中,实现了对 Spark 作业的管理。后续,还需要开发预设 Spark Jar,Jar 同样实现了如上接口,只是该作业的实现是实际执行用户的 SQL。

经过前面的流程,已经完成一个可以工作 SparkSQL 服务器开发,拥有接收用户请求,执行 SQL,并且返回结果的能力。但如何做的更加细致?例如,如何实现跨队列的提交、如何实现用户细粒度的资源管理、如何维护多个 Spark 作业的连接池,我们接下来会讲到。



由于对于 Spark 作业在 Yarn 上的提交,运行,停止均由 SparkSQL 服务器管理,对用户是不可见的,用户只需要编写标准的 JDBC 代码即可,因此可以基于用户的参数信息来匹配合适的引擎去执行,同时还可以限制一个 Spark 常驻作业的任务个数,实现更加灵活的 SparkSQL 作业的管理,同时也可以实现类似 C3P0 连接池的思想,维护一个用户信息到 Spark 常驻作业的关联池。

SparkSQL 服务器的 HA

Hive Server2 在启动的时候会将自己的服务器信息写入 Zookeeper 中,结构体如下所示:

[zk: localhost:2181(CONNECTED) 1] ls /hiveserver2\[serverUri=127.0.01:10000;version=3.1.2;sequence=0000000000]
复制代码

当连接 HA 模式下的服务器的时候,Hive JDBC Driver 的 URL 需要切换成 zookeeper 的地址,Hive JDBC Driver 会从多个地址中随机选择一个,作为该 Connection 的地址,在整个 Connection 中均会使用该地址。

因此对于我们实现的 SparkSQL 服务器,只需要在服务器启动的时候,保持与 Hive 一致的数据格式,将自己的服务器的地址信息写入到 Zookeeper 中即可,便可通过标准的 zk 地址进行访问,例如:

./bin/beeline -u  "jdbc:hive2://127.0.01/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=自定义的节点;auth=LDAP"  -n 用户名 -p密码
复制代码

由于服务器的选择基于 Connection 级别的,也就是在 Connection 被生成新的之前,整个服务器的地址是不会发生变化的,在发生错误的时候服务端可以进行重试,进行地址的切换,因此 HA 的力度是在 Connection 级别而非请求级别。

对接生态工具

完成以上开发之后,即可实现在大部分的场景下,使用标准的 Hive 驱动只需要切换一个端口号。特别提到 Hue 工具,由于和 Hive 的集成并未使用标准的 JDBC 接口,而是直接分开调用的 Hive Server2 的 Thrift 接口,也就是 Hue 自行维护来如何访问 Thrift 的接口的顺序问题。

可以发现在这样的情况会有一个问题就是对于 Hue 来说,并没有 Connection 的概念,正常的 SparkSQL 在 JDBC 的交互方式下处理流程是:



由于 Hue 没有 Connection 概念,因此 Hue 的请求并不会先到 OpenSession,而是直接发起 ExecuteStatement。由于没有上下文信息,正常流程下 ExecuteStatement 处接收到 Hue 的请求会发现该请求为非法,所以 OpenSession 不能作为连接的起点,而是需要在每一个 Thrift 接口处实现 OpenSession 的能力,以此作为上下文初始化。

尾声

SparkSQL 在企业中的使用比重越来越大,而有一个更好用的 SQL 服务器,则会大大提升使用效率和降低使用门槛。目前,SparkSQL 在服务器这方面的能力显然不如 Hive Server2 提供的更加标准,所以各个企业均可基于自身情况,选择是否需要开发一个合适于自身的 SparkSQL 服务器。

本文所提到的相关能力已通过火山引擎 EMR 产品向外部企业开放。结合字节跳动内部以及外部客户的需求情况,火山引擎 EMR 产品的 Ksana for SparkSQL 提供一个生产可用的 SparkSQL 服务器,并且在 Spark 性能方面也做了较大的优化,本文主要围绕技术实现的角度来阐述如何实现一个 SparkSQL 服务,后续会有更多文章讲述其他相关的优化。

产品介绍

火山引擎 E-MapReduce

支持构建开源 Hadoop 生态的企业级大数据分析系统,完全兼容开源,提供 Hadoop、Spark、Hive、Flink 集成和管理,帮助用户轻松完成企业大数据平台的构建,降低运维门槛,快速形成大数据分析能力。


更多技术交流、求职机会、试用福利,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

用户头像

公众号byte-dataplatform 2021.12.29 加入

字节跳动数据平台团队,赋能字节跳动各业务线,对内支持字节绝大多数业务线,对外发布了火山引擎品牌下的数据智能产品,服务行业企业客户。同名公众号欢迎了解。

评论

发布
暂无评论
在字节跳动,一个更好的企业级SparkSQL Server这么做_字节跳动数据平台_InfoQ写作社区