写点什么

【用户投稿】手把手基于 Apache SeaTunnel 从 PostgreSQL 同步到 Doris

  • 2025-03-11
    广东
  • 本文字数:2940 字

    阅读完需:约 10 分钟

本文详细演示了如何通过 Apache SeaTunnel 2.3.9 实现 PostgreSQL 14.6 到 Apache Doris 3.0.3 的全量数据同步,涵盖从环境部署到生产验证的完整闭环,请各位小伙伴批评指正!

版本要求:

  • PostgreSQL --> Server 14.6

  • Apache SeaTunnel --> Apache-SeaTunnel-2.3.9

  • Apache Doris --> Apache-Doris-3.0.3

自行部署 Apache Doris

Apache Doris 1 台 Master 、2 台节点:

  • 配置好每台的时间同步

  • 配置好每台的文件句柄数

vi /etc/security/limits.conf * soft nofile 1000000* hard nofile 1000000
复制代码

JAVA 选择

  • 在 2.1(含)版本之前,请使用 Java 8,推荐版本:jdk-8u352 之后版本。

  • 从 3.0(含)版本之后,请使用 Java 17,推荐版本:jdk-17.0.10 之后版本。

关闭 swap 分区

swapoff -a #临时关闭注释修改/etc/fstab #永久关闭
复制代码
  • 安装包下载

  • 安装步骤

  • FE 集群部署

tar -xf apache-doris-3.0.3.bin.tar.gz
复制代码

EF 配置文件,每个节点都可以使用同一个配置如何 java 路径一致的话。

cat apache-doris-3.0.3/fe/conf/fe.conf...## modify case sensitivitylower_case_table_names = 1## modify Java HomeJAVA_HOME = <your-java-home-path>...#其他配置不动
复制代码

启动 Master 节点

bin/start_fe.sh --daemon
复制代码

MySQL 工具登录 Doris 添加FE Follower节点

mysql -uroot -P<fe_query_port> -h<fe_ip_address>ALTER SYSTEM ADD FOLLOWER "<fe_ip_address>:<fe_edit_log_port>"
复制代码

启动 FE Follower 节点填写master-ip和端口

bin/start_fe.sh --helper <helper_fe_ip>:<fe_edit_log_port> --daemon
复制代码

查看 FE 状态

show frontends\G;#IsMaster: true 代表是master节点
复制代码



部署 BE 集群,其他所有节点都可以使用这个配置

cat be/conf/be.conf...storage_root_path=/home/data1;/home/data2;/home/data3JAVA_HOME = <your-java-home-path>mem_limit = 4G...
#创建mkdir /home/data1mkdir /home/data2mkdir /home/data3
复制代码

在 Doris 中注册 BE 节点

## connect a alive FE nodemysql -uroot -P<fe_query_port> -h<fe_ip_address>
## registe BE nodeALTER SYSTEM ADD BACKEND "<be_ip_address>:<be_heartbeat_service_port>"
复制代码

启动 BE 进程

bin/start_be.sh --daemon
复制代码

查看 BE 启动状态

 ## connect a alive FE nodemysql -uroot -P<fe_query_port> -h<fe_ip_address>
## check BE node statusshow backends\G;
复制代码



验证集群正确性

## connect a alive fe nodemysql -uroot -P<fe_query_port> -h<fe_ip_address>
复制代码

修改 Doris 集群密码

-- check the current userselect user();  +------------------------+  | user()                 |  +------------------------+  | 'root'@'192.168.88.30' |  +------------------------+  
-- modify the password for current userSET PASSWORD = PASSWORD('doris_new_passwd');
复制代码

创建测试表并插入数据

-- create a test databasecreate database testdb;
-- create a test tableCREATE TABLE testdb.table_hash( k1 TINYINT, k2 DECIMAL(10, 2) DEFAULT "10.5", k3 VARCHAR(10) COMMENT "string column", k4 INT NOT NULL DEFAULT "1" COMMENT "int column")COMMENT "my first table"DISTRIBUTED BY HASH(k1) BUCKETS 32;
复制代码

Doris 兼容 MySQL 协议,可以使用 INSERT 语句插入数据。

-- insert dataINSERT INTO testdb.table_hash VALUES(1, 10.1, 'AAA', 10),(2, 10.2, 'BBB', 20),(3, 10.3, 'CCC', 30),(4, 10.4, 'DDD', 40),(5, 10.5, 'EEE', 50);
-- check the dataSELECT * from testdb.table_hash;+------+-------+------+------+| k1 | k2 | k3 | k4 |+------+-------+------+------+| 3 | 10.30 | CCC | 30 || 4 | 10.40 | DDD | 40 || 5 | 10.50 | EEE | 50 || 1 | 10.10 | AAA | 10 || 2 | 10.20 | BBB | 20 |+------+-------+------+------+
复制代码

PostgreSQL 部署

关于 PostgreSQL 作者不做详细赘述,请各位自行前往官网安装......

SeaTunnel 部署

社区官网:

https://seatunnel.apache.org/

安装包下载:

https://seatunnel.apache.org/download/

tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"
复制代码

手动下载连接器,然后将其移动至 connectors/目录下,如果是 2.3.5 之前则需要放入 connectors/seatunnel 目录下。

连接器下载地址:

https://repo.maven.apache.org/maven2/org/apache/seatunnel/



需要确保 JDBC 驱动 JAR 包 已放置在目录 ${SEATUNNEL_HOME}/lib/ 中,在社区之前很多文章都提到过部署教程,如果操作不当出现一些小问题,请进入社区交流(SeaTunnel 部署还是很简单的)

使用 SeaTunnel 引擎快速开始:

PG 创建表

CREATE TABLE alarm_receive_record (    id BIGINT PRIMARY KEY,    device_id VARCHAR(50),    alarm_time TIMESTAMP,    content TEXT);
复制代码

PG 追加数据

INSERT INTO alarm_receive_record (id, device_id, alarm_time, content)VALUES     (3, 'D003', '2025-02-26 16:15:00', '通信中断'),    (4, 'D004', '2025-02-26 17:20:00', '湿度超限'),    (5, 'D005', '2025-02-26 17:20:00', '湿度超限2'),(6, 'D006', '2025-02-26 17:20:00', '湿度超限6'),(7, 'D007', '2025-02-26 17:20:00', '湿度超限7'),(8, 'D008', '2025-02-26 17:20:00', '湿度超限8'),(9, 'D009', '2025-02-26 17:20:00', '湿度超限9'),(10, 'D0010', '2025-02-26 17:20:00', '湿度超限10')
复制代码

Doris 手动创建表结构

CREATE TABLE testdb.alarm_receive_record (    id BIGINT COMMENT "唯一ID",    device_id VARCHAR(50) COMMENT "设备ID",    alarm_time DATETIME COMMENT "告警时间",    content STRING COMMENT "告警内容") ENGINE=OLAPDUPLICATE KEY(id, device_id)  -- 选择 DUPLICATE 模型(全明细存储)DISTRIBUTED BY HASH(device_id) BUCKETS 10  -- 按设备ID分桶PROPERTIES (    "replication_num" = "3",  -- 副本数(与集群配置一致)    "storage_format" = "V2"    -- 存储格式);
复制代码

pg-doris.yaml

env {  parallelism = 4  job.mode = "BATCH"}source{    jdbc{        url = "jdbc:postgresql://192.168.10.17:5432/aaa"        driver = "org.postgresql.Driver"        user = "test"        password = "123123"        query = "select * from alarm_receive_record"    }}
sink { Doris { fenodes = "192.168.20.174:8030" username = "root" password = "123123" database = "testdb" table = "alarm_receive_record" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" sink.label-prefix = "pg_to_doris" sink.enable-2pc = "true" column = ["id", "device_id", "alarm_time", "content"] doris.config { format = "json" read_json_by_line = "true" } }}
复制代码

启动同步任务,多次执行,会重复数据。

cd "apache-seatunnel-${version}"./bin/seatunnel.sh --config ./config/pg-doris.yaml  -m local
复制代码

Doris 验证是否同步成功?



大数据时代,选择比努力更重要。让 SeaTunnel 成为您数据流动的超级高速公路,Doris 化作实时洞察的智慧之眼,共同构建面向未来的数据驱动体系!

用户头像

还未添加个人签名 2022-03-07 加入

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

评论

发布
暂无评论
【用户投稿】手把手基于Apache SeaTunnel从PostgreSQL同步到Doris_Apache SeaTunnel_InfoQ写作社区