写点什么

那些年,我们在 Apache SeaTunnel 2.1.0 部署中踩过的坑【含源码分析】

  • 2022 年 6 月 01 日
  • 本文字数:2369 字

    阅读完需:约 8 分钟

01 简介

SeaTunnel 原名 Waterdrop,自 2021 年 10 月 12 日改名为 SeaTunnel。


SeaTunnel 是一个非常易于使用的超高性能分布式数据集成平台,支持海量数据的实时同步。它每天可以稳定高效地同步数百亿数据,已在近 100 家公司的生产中使用。

02 特点

  • 易于使用,配置灵活,低代码开发

  • 实时流媒体

  • 离线多源数据分析

  • 高性能、海量数据处理能力

  • 模块化和插入式机构,易于扩展

  • 支持通过 SQL 进行数据处理和聚合

  • 支持 Spark 结构化流媒体

  • 支持 Spark 2.x


这里我们踩了一个坑,因为我们测试的 spark 环境已经升级到了 3.x 版本,而目前 SeaTunnel 只支持 2.x,所以要重新部署一个 2.x 的 spark。


03 工作流程

04 安装

安装文档

链接:https://seatunnel.incubator.apache.org/docs/2.1.0/spark/installation


测试 jdbc-to-jdbc

  • 创建新的 config/spark.batch .jdbc.to.jdbc.conf 文件


`env {    # seatunnel defined streaming batch duration in seconds    [spark.app.name](http://spark.app.name) = "SeaTunnel"    spark.executor.instances = 1    spark.executor.cores = 1    spark.executor.memory = "1g"  }
source { jdbc { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://0.0.0.0:3306/database?useUnicode=true&characterEncoding=utf8&useSSL=false" table = "table_name" result\_table\_name = "result\_table\_name" user = "root" password = "password" }
}
transform { # split data by specific delimiter
# you can also use other filter plugins, such as sql # sql { # sql = "select * from accesslog where request_time > 1000" # }
# If you would like to get more information about how to configure seatunnel and see full list of filter plugins, # please go to [https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Sql](https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Sql) }
sink { # choose stdout output plugin to output data to console # Console {} jdbc { # 这里配置driver参数,否则数据交换不成功 driver = "com.mysql.jdbc.Driver", saveMode = "update", url = "jdbc:mysql://ip:3306/database?useUnicode=true&characterEncoding=utf8&useSSL=false", user = "userName", password = "***********", dbTable = "tableName", customUpdateStmt = "INSERT INTO table (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)" } }`
复制代码


  • 启动命令


`./bin/start-seatunnel-spark.sh --master 'yarn' --deploy-mode client --config ./config/spark.batch.jdbc.to.jdbc.conf`
复制代码


  • 踩坑:之前运行时报[driver] as non-empty ,定位发现 sink 配置里需要设置 driver 参数


`ERROR Seatunnel:121 - Plugin[org.apache.seatunnel.spark.sink.Jdbc] contains invalid config, error: please specify [driver] as non-empty`
复制代码


测试 jdbc-to-hive

  • 创建新的 config/spark.batch .jdbc.to.hive.conf 文件


`env {    # seatunnel defined streaming batch duration in seconds    [spark.app.name](http://spark.app.name) = "SeaTunnel"    spark.executor.instances = 1    spark.executor.cores = 1    spark.executor.memory = "1g"    # 因为sink用到hive源,所以必须进行以下配置    spark.sql.catalogImplementation = "hive"  }
source { jdbc { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://0.0.0.0:3306/database?useUnicode=true&characterEncoding=utf8&useSSL=false" table = "table_name" result\_table\_name = "result\_table\_name" user = "root" password = "password" }
}
transform { # split data by specific delimiter
# you can also use other filter plugins, such as sql # sql { # sql = "select * from accesslog where request_time > 1000" # }
# If you would like to get more information about how to configure seatunnel and see full list of filter plugins, # please go to [https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Sql](https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Sql) }
sink { Hive { sql = "insert overwrite table seatunnel.test1 partition(province) select name,age,province from result\_table\_name" } }`
复制代码


  • 运行命令


-   `./bin/start-seatunnel-spark.sh --master 'yarn' --deploy-mode client --config ./config/spark.batch.jdbc.to.jdbc.conf`
复制代码

踩坑:一开始运行时报错,定位发现 conf 文件里没有设置 spark.sql.catalogImplementation = “hive”

  • 报错内容:


    `...... ERROR Seatunnel:191 - Exception StackTrace:org.apache.spark.sql.AnalysisException: Table or view not found:  ...... Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'seatunnel' not found; ......`
复制代码


更多踩坑经验,请关注后续更新。

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2022.03.07 加入

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

评论

发布
暂无评论
那些年,我们在Apache SeaTunnel 2.1.0部署中踩过的坑【含源码分析】_Apache_Apache SeaTunnel_InfoQ写作社区