写点什么

HTTP 接口数据也能定时同步入湖?用 DolphinScheduler×SeaTunnel 快速搞定!

作者:白鲸开源
  • 2025-05-09
    天津
  • 本文字数:3639 字

    阅读完需:约 12 分钟

背景与目标

我们之前曾评估使用过 SeaTunnel 做 CDC 入湖验证:SeaTunnel-CDC 入湖实践,这些场景都是能直连数据库的场景,业务需求中经常会出现无法直连数据库做 CDC 进行数据同步的场景,而这些场景就需要使用 API 进行数据对接,用 Apache DolphinScheduler 定时同步数据。


举个实际中的例子:


  • ERP(SAP)的库存数据进行同步入湖仓做库存分析


同时,本次目标希望其他同事能依样画葫芦,在以后的对接 http 接口到湖仓的时候能够自行完成,而非每遇到一个对接需求,就需要通过代码方式进行对接。

准备工作

  • seatunnel 2.3.10


首先,您需要在${SEATUNNEL_HOME}/config/plugin_config文件中加入连接器名称,然后,执行命令来安装连接器,确认连接器在 ${SEATUNNEL_HOME}/connectors/目录下即可。


本例中我们会用到:connector-jdbcconnector-paimon


写入 StarRocks 也可以使用connector-starrocks,本例中的场景比较适合用connector-jdbc,所以使用connector-jdbc


# 配置连接器名称--connectors-v2--connector-jdbcconnector-starrocksconnector-paimon--end--
复制代码


# 安装连接器sh bin/install-plugin.sh 2.3.10
复制代码

SeaTunnel 任务

我们先至少保证能在本地完成 SeaTunnel 任务,再完成对 Apache DolphinScheduler 的对接。


  • http to starRocksexample/http2starrocks


env {  parallelism = 1  job.mode = "BATCH"}
source { Http { plugin_output = "stock" url = "https://ip/http/prd/query_sap_stock" method = "POST" headers { Authorization = "Basic XXX" Content-Type = "application/json" } body = """{"IT_WERKS": [{"VALUE": "1080"}]}""" format = "json" content_field = "$.ET_RETURN.*" schema { fields { MATNR = "string" MAKTX = "string" WERKS = "string" NAME1 = "string" LGORT = "string" LGOBE = "string" CHARG = "string" MEINS = "string" LABST = "double" UMLME = "double" INSME = "double" EINME = "double" SPEME = "double" RETME = "double" } } }}
# 此转换操作主要用于字段从命名等方便用途transform { Sql { plugin_input = "stock" plugin_output = "stock-tf-out" query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock" }}
# 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写sink { jdbc { plugin_input = "stock-tf-out" url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "lab" password = "XXX" compatible_mode="starrocks" query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)""" }}
# connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景// sink {// StarRocks {// plugin_input = "stock-tf-out"// nodeUrls = ["ip:8030"]// base-url = "jdbc:mysql://ip:9030/"// username = "lab"// password = "XXX"// database = "scm"// table = "ods_sap_stock"// batch_max_rows = 1000// data_save_mode="DROP_DATA"// starrocks.config = {// format = "JSON"// strip_outer_array = true// }// schema_save_mode = "RECREATE_SCHEMA"// save_mode_create_template="""// CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (// MATNR STRING COMMENT '物料',// WERKS STRING COMMENT '工厂',// LGORT STRING COMMENT '库存地点',// MAKTX STRING COMMENT '物料描述',// NAME1 STRING COMMENT '工厂名称',// LGOBE STRING COMMENT '地点描述',// CHARG STRING COMMENT '批次编号',// MEINS STRING COMMENT '单位',// LABST DOUBLE COMMENT '非限制使用库存',// UMLME DOUBLE COMMENT '在途库存',// INSME DOUBLE COMMENT '质检库存',// EINME DOUBLE COMMENT '受限制使用的库存',// SPEME DOUBLE COMMENT '已冻结的库存',// RETME DOUBLE COMMENT '退货'// ) ENGINE=OLAP// PRIMARY KEY ( MATNR,WERKS,LGORT)// COMMENT 'sap库存'// DISTRIBUTED BY HASH (WERKS) PROPERTIES (// "replication_num" = "1"// )// """// }// }
复制代码


  • http to paimonexample/http2paimon


env {  parallelism = 1  job.mode = "BATCH"}
source { Http { plugin_output = "stock" url = "https://ip/http/prd/query_sap_stock" method = "POST" headers { Authorization = "Basic XXX" Content-Type = "application/json" } body = """{"IT_WERKS": [{"VALUE": "1080"}]}""" format = "json" content_field = "$.ET_RETURN.*" schema { fields { MATNR = "string" MAKTX = "string" WERKS = "string" NAME1 = "string" LGORT = "string" LGOBE = "string" CHARG = "string" MEINS = "string" LABST = "double" UMLME = "double" INSME = "double" EINME = "double" SPEME = "double" RETME = "double" } } }}# 此转换操作主要用于字段从命名等方便用途transform { Sql { plugin_input = "stock" plugin_output = "stock-tf-out" query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock" }}
# 连接paimon进行数据同步,paimon 暂时 未看到有支持 insert overwrite 分区覆写,此例仅作为参考,不适用本此例子需求sink { Paimon { warehouse = "s3a://test/" database = "sap" table = "ods_sap_stock" paimon.hadoop.conf = { fs.s3a.access-key=XXX fs.s3a.secret-key=XXX fs.s3a.endpoint="http://minio:9000" fs.s3a.path.style.access=true fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider } }}
复制代码

DolphinScheduler 集成 SeaTunnel

  • 制作 worker 镜像


FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2RUN mkdir /opt/seatunnelRUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10# 容器集成seatunnelCOPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/
复制代码


打包镜像,推送到镜像仓库


docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel .
复制代码


  • 使用新镜像部署一个 worker,此处修改docker-compose.yaml,增加一个dolphinscheduler-worker-seatunnel节点。


...  dolphinscheduler-worker-seatunnel:    image: xxx/dolphinscheduler-worker:3.2.2-seatunnel    profiles: ["all"]    env_file: .env    healthcheck:      test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ]      interval: 30s      timeout: 5s      retries: 3    depends_on:      dolphinscheduler-zookeeper:        condition: service_healthy    volumes:      - ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler      - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs      - ./dolphinscheduler-shared-local:/opt/soft      - ./dolphinscheduler-resource-local:/dolphinscheduler    networks:      dolphinscheduler:        ipv4_address: 172.15.0.18...
复制代码


  • DolphinScheduler 配置 SeaTunnel 分组及环境配置

  • 安全中心-Worker 分组管理,创建一个这个节点 ip 的分组,用于以后需要 seatunnel 的任务跑该分组


  • 环境管理-创建环境,增加一个用于执行 seatunnel 的环境,同时需要绑定 Worker 分组为上一步创建的 seatunnel 分组


  • 创建工作流定义,把上面的 seatunnel 任务配置填写上


  • 运行时候,选择 seatunnel 的 worker 分组和环境即可跑在这个集成了 seatunnel 的环境上



转载自俊瑶先森

原文链接:https://junyao.tech/posts/9c6867c5.html

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
HTTP接口数据也能定时同步入湖?用DolphinScheduler×SeaTunnel快速搞定!_大数据_白鲸开源_InfoQ写作社区