本文介绍了如何使用 SeaTunnel 将数据从 InfluxDB 同步到 Doris。通过 SeaTunnel 强大的数据集成功能,用户可以高效地将存储于 InfluxDB 中的时间序列数据传输至 Doris,便于数据的访问与分析。
版本信息:
SeaTunnel 2.3.3
InfluxDB 2.7.6
Doris 2.1.3 rc09
准备事项
SeaTunnel2.3.3 的安装过程这里就省略了,可以参考官网文档。
SeaTunnel2.3.3 安装好以后需要删掉两个连接用的 jar 包,不然后面同步数据库会报错:connector-hudi-2.3.3.jar
和connector-datahub-2.3.3.jar
.
需要增加的 jar 包:seatunnel-api-2.3.3.jar
,seatunnel-transforms-v2-2.3.3.jar
,mysql-connector-java-8.0.28.jar
,jersey-client-1.19.4.jar
,这四个 jar 包必须添加,不然无法同步数据运行同步脚本直接报错没有某个类。
InfluxDB 2.7.6 需要做的前提事项:下面这个步骤必须要做,不然查不到数据。
InfluxDB Studio-0.2.0(这个客户端工具有个好处,可以查看字段类型,方便同步文件中的字段类型的定义,其他的客户端好像没有,也有可能是我没发现),可下载这个客户端进行连接查询数据。
Linux 安装 influxDB 2.7.6 版本后,正常使用 ip:8086 可访问 influxdb UI,填写用户名、密码、org、buckets。
同步过程及踩坑点
SeaTunnel 2.3 中集成 InfluxDB 配置用户名、密码后,执行同步任务总是报获取字段异常信息。
于是乎跟踪 SeaTunnel 代码,发现内部一直 401 权限认证失败。于是使用 InfluxDB Studio 数据库管理工具连接,输入 ui 页面相同的用户名,密码后一直报 401 权限认证不通过。通过查资料发现 ui 页面的用户名密码仅供 ui 页面使用,不能作为数据库本身访问的用户名密码。
使用 iInfluxDB client 客户端,查询权限 influx v1 auth list 结果为空。
使用命令分配权限
influx v1 auth create -o orgName --read-bucket bucketId --username=username
,
或者:influx v1 auth create -o "组织名称" --write-bucket bucketId(桶id,不需要引号) --read-bucket bucketId(桶id,不需要引号) --username=账号 --password=密码
删除命令:influx v1 auth delete --id 'id编码'
删除命令中的 id 编码为influx v1 auth list
命令查出来的 ID,下图所示:
命令执行完成后需输入两次密码。InfluxDB Studio 数据库管理工具再次使用此用户名密码登录成功,SeaTunnel 同步成功。
同步数据配置文件:v1.batch.config_tmp.template:
env {
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
influxdb {
url = "http://X.X.X.X:8086"
token = "写自己的token" #可有可无
org = "自己的组织名称"
bucket = "自己的桶" #可有可无
database = "自己的桶"
username = "写在第四步自己新建的influxdb账号"
password = "写在第四步自己新建的influxdb密码"
epoch = "H" #这个有好几级,可以去官网查看
query_timeout_sec = 600
measurement = "prometheus_remote_write" #数据表
fields = ["node_cpu_seconds_total", "node_memory_MemTotal_bytes"] #可有可无,配置自己的字段
sql = """SELECT node_cpu_seconds_total as system_cpu_usage,cpu as process_occupy_physical_memory_size,job as create_dept,node_memory_MemTotal_bytes as process_read_written_file_system_total_bytes,node_memory_MemAvailable_bytes as process_open_file_describe_quantity,time as create_time FROM "prometheus_remote_write" where time > now() - 1h"""
where = " where time > now() - 1h"
#经过本人测试。上面的sql查询的字段必须经过重命名,或者doris建表的字段必须和influxdb2的字段完全一致,不然transform 中进行转换的时候就会成为空值,这个我还没研究明白为什么,研究明白了在补上说明,doris的表字段类型也必须和influxdb2中查询的字段类型一致,不然数据存不到doris中。schema 重定义的事influxdb2查到的字段和类型
schema {
fields {
#node_cpu_seconds_total = FLOAT
system_cpu_usage = FLOAT
process_occupy_physical_memory_size = INT
create_dept = STRING
process_read_written_file_system_total_bytes = FLOAT
process_open_file_describe_quantity = FLOAT
create_time = BIGINT
}
}
}
}
sink {
Doris {
fenodes = "X.X.X.X:8030"
username = "账号"
password = "密码"
table.identifier = "sbyw_data_acquisition.sbyw_application_process_type_tmp"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
sink.max-retries = 3
batch_size = 10000
result_table_name = "sbyw_application_process_type_tmp"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
transform {
FieldMapper {
source_table_name = "prometheus_remote_write"
result_table_name = "sbyw_application_process_type_tmp"
field_mapper = {
#node_cpu_seconds_total = system_cpu_usage
system_cpu_usage = system_cpu_usage
process_occupy_physical_memory_size = process_occupy_physical_memory_size
process_read_written_file_system_total_bytes = process_read_written_file_system_total_bytes
process_open_file_describe_quantity = process_open_file_describe_quantity
create_time = create_time
create_dept = create_dept
}
}
}
复制代码
写好同步数据脚本文件运行同步命令:./bin/seatunnel.sh -c ./config/v1.batch.config_tmp.template
下面是我 Doris 的测试表:
下面是 InfluxDB Studio-0.2.0 客户端查到 InfluxDB 2.7.6 的数据:
InfluxDB 2.7.6 有个坑点,它支持 sql 查询,但不完全支持,它只支持常规的简单查询,例如下图中的查询就可以查询,但是如下图所示,可能会有人说我后面没加 group by,经过测试是不行的,即使加上 group by 也是无法执行,那是因为官方压根不支持的这种查询。
但是下图这样是可以的,InfluxDB 2 官方就是这样设计的,聚合查询无法和单字段进行同步查询。
最后是运行结果:
同步到 Doris 的数据:
原文链接:https://blog.csdn.net/2401_84562349/article/details/140919192
评论