写点什么

MySQL 数据同步至 S3file,并接入 Hive 访问:SeaTunnel 实践指南

作者:白鲸开源
  • 2025-07-15
    天津
  • 本文字数:2334 字

    阅读完需:约 8 分钟

MySQL 数据同步至 S3file,并接入 Hive 访问:SeaTunnel 实践指南

作者 | 番兄


如何借助 Apache SeaTunnel 将 MySQL 数据高效同步至 S3file?本文详述的步骤已全部通过测试验证,适用于构建基于对象存储的数据中台场景,具备部署灵活、扩展性强等优势,对有 MySQL 到 S3 数据集成需求的用户具有较高的参考价值,点赞、收藏学习吧!

第一步:创建 Hive 表

CREATE EXTERNAL TABLE ods_ekp.`ods_sys_notify_todo_bak` (  `fd_id` STRING,  `fd_app_name` STRING,  `fd_model_name` STRING,  `fd_model_id` STRING,  `fd_key` STRING,  `fd_parameter1` STRING,  `fd_parameter2` STRING,  `fd_create_time` TIMESTAMP,  `fd_subject` STRING,  `fd_type` INT,  `fd_link` STRING,  `fd_mobile_link` STRING,  `fd_pad_link` STRING,  `fd_bundle` STRING,  `fd_replace_text` STRING,  `fd_md5` STRING,  `fd_del_flag` STRING,  `fd_level` INT,  `doc_creator_id` STRING,  `fd_extend_content` STRING,  `fd_lang` STRING,  `fd_cate_name` STRING,  `fd_cate_id` STRING,  `fd_template_name` STRING,  `fd_template_id` STRING,  `fd_hierarchy_id` STRING)COMMENT 'sys_notify_todo_bak data'PARTITIONED BY (  `dctime` STRING COMMENT '分区年月日')ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'STORED AS PARQUETLOCATION 's3a://seatunnel/doris/warehouse/ods_ekp/ods_sys_notify_todo_bak'TBLPROPERTIES (  'parquet.compression'='ZSTD');
复制代码


注意:


  1. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' 这个分隔符设置需要在后面 SeaTunnel 里面配置一样的,不然格式错误;

  2. 'parquet.compression'='ZSTD' 压缩算法也是需要在后面 SeaTunnel 里面配置一样的;

  3. STORED AS PARQUET parquet 文件格式,也是需要在后面 SeaTunnel 里面配置一样的.


是用之前把注释删除


env {  job.mode = "BATCH"  parallelism = 2}
source { Jdbc { url = "jdbc:mysql://[服务器ip]:3306/[数据库]?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "[账户]" password = "[密码]", # dctime要转换成字符串,因为hive建表的时候,这个字段是字符串;分区字段也要加到查询里面,后面SeaTunnel sink的时候会自动处理 query = "select fd_id, fd_app_name, fd_model_name, fd_model_id, fd_key, fd_parameter1, fd_parameter2, fd_create_time, fd_subject, fd_type, fd_link, fd_mobile_link, fd_pad_link, fd_bundle, fd_replace_text, fd_md5, fd_del_flag, fd_level, doc_creator_id, fd_extend_content, fd_lang, fd_cate_name, fd_cate_id, fd_template_name, fd_template_id, fd_hierarchy_id, cast(date_format(fd_create_time, '%Y-%m-%d') as char) as dctime from sys_notify_todo_bak }}
transform {}
sink { S3File { bucket = "s3a://seatunnel" fs.s3a.endpoint = "[minio服务器域名/ip]:9000" access_key = "[账户]" secret_key = "[密码]" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" # 目录地址 path = "/doris/warehouse/ods_ekp/ods_sys_notify_todo_bak" tmp_path = "/data/tmp/seatunnel" # 必须填写的不然会出现问题,因为本人的minio没有做ssl处理,所以要这么设置 hadoop_s3_properties { "fs.s3a.connection.ssl.enabled" = "false" "fs.s3a.path.style.access" = "true" } # parquet文件格式 file_format_type = "parquet" # 必须用\\代表\ field_delimiter = "\\001" # parquet格式必须要加,否则会出问题 parquet_avro_write_timestamp_as_int96 = true # 压缩算法 compress_codec = "zstd" have_partition = true partition_by = ["dctime"] partition_dir_expression = "${k0}=${v0}" is_partition_field_write_in_file = false schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" custom_filename = true file_name_expression = "${transactionId}_${now}" filename_time_format = "yyyy.MM.dd" }}
复制代码

第二步:执行 SeaTunnel 同步,并在 Hive 查询里执行下列操作

-- 刷新物理目录分析MSCK REPAIR TABLE ods_ekp.ods_sys_notify_todo_bak;
-- 查询hive表确认是否有数据SELECT * from ods_ekp.ods_sys_notify_todo_bak limit 100;
复制代码

第三步:创建 Doris Hive catalog 外部库

CREATE CATALOG hive PROPERTIES (    'type'='hms',    'hive.metastore.uris' = 'thrift://[hive metastore server的ip]:9083',    "s3.endpoint" = "http://[minio服务器域名/ip]:9000",    "s3.region" = "us-east-1",    "s3.access_key" = "[账户]",    "s3.secret_key" = "[密码]",    "s3.connection.ssl.enabled" = "false",    "use_path_style" = "true",    "hive.version" = '2.1.1');
REFRESH CATALOG hive;
show databases from hive;
SELECT * from hive.ods_ekp.ods_sys_notify_todo_bak limit 100
复制代码


说明:


  1. 因为本人用的 CDH6.3.2 版本,Hive 是 2.1.1 版本,所以建立 catalog 的时候,需要指定"hive.version" = '2.1.1'

  2. 因为本人设置的 minio 没有 ssl,所以配置的时候需要加上"s3.connection.ssl.enabled" = "false"

  3. Minio 用的是 path 风格,所以需要配置"use_path_style" = "true"

  4. SeaTunnel 版本: 2.3.11

  5. Doris 版本:2.0.15

发布于: 31 分钟前阅读数: 6
用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
MySQL 数据同步至 S3file,并接入 Hive 访问:SeaTunnel 实践指南_MySQL_白鲸开源_InfoQ写作社区