写点什么

最佳实践 | 在 EMR Serverless Spark 中实现 Doris 读写操作

  • 2025-03-25
    浙江
  • 本文字数:2864 字

    阅读完需:约 9 分钟

背景信息

EMR Serverless Spark 是一款面向 Data+AI 的高性能 Lakehouse 产品。它为企业提供了一站式的数据平台服务,包括任务开发、调试、调度和运维等,极大地简化了数据处理和模型训练的全流程。同时,它 100%兼容开源 Spark 生态,能够无缝集成到客户现有的数据平台。使用 EMR Serverless Spark,企业可以更专注于数据处理分析和模型训练调优,提高工作效率。


Apache Doris 是一个高性能、实时的分析型数据库,能够较好地满足报表分析、即席查询、数据湖联邦查询加速等使用场景。更多信息,请参见Apache Doris 简介


基于 Apache Doris 官方提供的 Spark Connector,EMR Serverless Spark 可以在开发时添加对应的配置来连接 Doris。通过结合 Apache Doris 与 EMR Serverless Spark,您可以高效地进行数据读取、写入和分析操作,从而实现端到端的数据处理流程。


EMR Serverless Spark 新用户可 免费领取 1000 CU*小时 资源包,欢迎体验。

前提条件

  • 已创建 Serverless Spark 工作空间,详情请参见创建工作空间

  • 已创建 Doris 集群。


如果是在 EMR on ECS 创建包含 Doris 服务的数据分析(OLAP)集群,详情请参见创建集群。本文以在 EMR on ECS 创建包含 Doris 服务的集群为例,后续简称 EMR Doris 集群。

使用限制

EMR Serverless Spark 引擎的版本要求为 esr-2.5.0、esr-3.1.0、esr-4.1.0 及以上版本。

操作流程

步骤一:获取 Doris Spark Connector JAR 并上传至 OSS

您需要查阅 Doris 的官方文档 Spark Doris Connector。该文档通常会列出不同版本的连接器与不同版本的 Spark 引擎的兼容情况。您需要确认您正在使用的 Spark 版本与 Doris Spark Connector 版本之间的兼容性。


  1. 访问 Doris Spark Connector 的GitHub仓库,选择合适的版本进行下载。


Doris Spark Connector JAR 包的命名格式为 spark-doris-connector-spark-${spark_version}-${connector_version}.jar。例如,您使用的引擎版本为 esr-3.1.0 (Spark 3.4.3, Scala 2.12),则可以下载 spark-doris-connector-spark-3.4-24.0.0.jar


  1. 将下载的 Spark Connector JAR 上传至阿里云 OSS 中,上传操作可以参见简单上传

步骤二:创建网络连接

EMR Serverless Spark 需要能够打通与 EMR Doris 集群之间的网络才可以正常访问 Doris 服务。更多网络连接信息,请参见EMR Serverless Spark与其他VPC间网络互通


重要:配置安全组规则时,端口范围请根据实际需求选择性开放必要的端口。端口范围的取值为 1~65535。本文示例需开启 HTTP 端口(8031)、RPC 端口(9061)以及 Webserver 端口(8041)。

步骤三:在 EMR Doris 集群中创建库表

  1. 使用 SSH 方式登录集群,详情请参见登录集群

  2. 执行以下命令,连接 EMR Doris 集群。


mysql -h127.0.0.1  -P 9031 -uroot
复制代码


  1. 创建数据库和表。


CREATE DATABASE IF NOT EXISTS testdb;USE testdb;CREATE TABLE test (    id INT,     name STRING) PROPERTIES("replication_num" = "1");
复制代码


4. 插入测试数据。


INSERT INTO test VALUES (1, 'a'), (2, 'b'), (3, 'c');
复制代码


5. 查询数据。


SELECT * FROM test;
复制代码


返回信息如下图所示。


步骤四:EMR Serverless Spark 读取 Doris 表

使用 SQL 会话读 Doris 表

  1. 创建 SQL 会话,详情请参见管理SQL会话


创建会话时,在引擎版本下拉列表中选择与 Doris Spark Connector 版本对应的引擎版本,在网络连接中选择步骤二中创建好的网络连接,并在 Spark 配置中添加以下参数来加载 Doris Spark Connector。


spark.user.defined.jars  oss://<bucketname>/path/connector.jar
复制代码


其中,oss://<bucketname>/path/connector.jar为您步骤一中上传至 OSS 的 Doris Spark Connector 的路径。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar


  1. 数据开发页面,选择创建一个 SQL > SparkSQL 类型的任务,然后在右上角选择创建好的 SQL 会话。


更多操作,请参见 SparkSQL 开发(链接:https://x.sm.cn/BJPDwLp)


  1. 拷贝如下代码到新增的 SparkSQL 页签中,并根据需要修改相应的参数信息,然后单击运行


CREATE TEMPORARY VIEW testUSING dorisOPTIONS(  "table.identifier" = "testdb.test",  "fenodes" = "<doris_address>:<http_port>",  "user" = "<user>",  "password" = "<password>");SELECT * FROM test;
复制代码


其中,涉及参数信息说明如下。



如果能够正常返回数据,则表明配置正确。


使用 Notebook 会话读 Doris 表

  1. 创建 Notebook 会话,详情请参见管理Notebook会话


创建会话时,在引擎版本下拉列表中选择与 Doris Spark Connector 版本对应的引擎版本,在网络连接中选择步骤二中创建好的网络连接,并在 Spark 配置中添加以下参数来加载 Doris Spark Connector。


spark.user.defined.jars  oss://<bucketname>/path/connector.jar
复制代码


其中,oss://<bucketname>/path/connector.jar 为您步骤一中上传至 OSS 的 Doris Spark Connector 的路径。例如,oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar


  1. 数据开发页面,选择创建一个 Python > Notebook 类型的任务,然后在右上角选择创建的 Notebook 会话。


更多操作,请参见管理 Notebook 会话。


  1. 拷贝如下代码到新增的 Notebook 页签中,并根据需要修改相应的参数信息,然后单击运行


dorisSparkDF = spark.read.format("doris") \  .option("doris.table.identifier", "testdb.test") \  .option("doris.fenodes", "<doris_address>:<http_port>") \  .option("user", "<user>") \  .option("password", "<password>") \  .load()  dorisSparkDF.show(3)
复制代码


其中,涉及参数信息说明如下。



如果能够正常返回数据,则表明配置正确。


步骤五:EMR Serverless Spark 写入 Doris 表

使用 SQL 会话写 Doris 表

拷贝如下代码到前一个步骤中新增的 SparkSQL 页签中,并根据需要修改相应的参数信息,然后单击运行


CREATE TEMPORARY VIEW test_writeUSING dorisOPTIONS(  "table.identifier" = "testdb.test",  "fenodes" = "<doris_address>:<http_port>",  "user" = "<user>",  "password" = "<password>");INSERT INTO test_write VALUES (4, 'd'), (5, 'e');SELECT * FROM test_write;
复制代码


如果能够返回以下数据,则表明数据写入成功。


使用 Notebook 会话写 Doris 表

拷贝如下代码到前一个步骤中新增的 Notebook 页签中,并根据需要修改相应的参数信息,然后单击运行


data = [(7, 'f'), (8, 'g')]mockDataDF = spark.createDataFrame(data, ["id", "name"])mockDataDF.write.mode("append").format("doris") \  .option("doris.table.identifier", "testdb.test") \  .option("doris.fenodes", "<doris_address>:<http_port>") \  .option("user", "<user>") \  .option("password", "<password>") \  .save()  dorisSparkDF = spark.read.format("doris") \  .option("doris.table.identifier", "testdb.test") \  .option("doris.fenodes", "<doris_address>:<http_port>") \  .option("user", "<user>") \  .option("password", "<password>") \  .load()  dorisSparkDF.show(10)
复制代码


如果能够返回以下数据,则表明数据写入成功。



如果您在使用 EMR Serverless Spark 的过程中遇到任何疑问,可加入钉钉群 58570004119 咨询。


用户头像

还未添加个人签名 2020-10-15 加入

分享阿里云计算平台的大数据和AI方向的技术创新和趋势、实战案例、经验总结。

评论

发布
暂无评论
最佳实践 | 在 EMR Serverless Spark 中实现 Doris 读写操作_大数据_阿里云大数据AI技术_InfoQ写作社区