写点什么

基于 DataX 的数据同步(下)- 应用 DataX 进行数据同步

  • 2021 年 12 月 30 日
  • 本文字数:8039 字

    阅读完需:约 26 分钟

作者:烧鸡太子爷

来源:恒生LIGHT云社区


前面做了 DataX 介绍以及安装,下面我们用一个实例来说明下 DataX 的运行,也整理了一些实际运用的过程中的注意点

DataX 运行

运行命令

(1)查看模板:
python ${DATAX_HOME}\bin\datax.py -r 读插件类型-w 写插件类型
举例: mysql导入到orcale数据库 python ${DATAX_HOME}\bin\datax.py -r mysqlreader -w oraclewriter (2)执行任务( 通过读json配置文件配置运行): python ${DATAX_HOME}\bin\datax.py {json配置文件}
复制代码

举例(从 pgsql 数据传输到 mysql)

(1)查看模板


    python datax.py -r postgresqlreader -w mysqlwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the mysqlreader document: https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md Please refer to the postgresqlwriter document: https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { //读入库配置 "name": "postgresqlreader", //数据源名称 "parameter": { //数据库配置信息 "column": [], "connection": [ { "jdbcUrl": [], "table": [] } ], "password": "", "username": "", "where": "" //数据过滤条件 } }, "writer": { //写入库配置 "name": "mysqlwriter", //数据库名称 "parameter": { //数据库配置信息 "column": [], "connection": [ { "jdbcUrl": "", "table": [] } ], "password": "", "postSql": [], //任务执行后操作 "preSql": [], //任务执行前操作 "username": "" } } } ], "setting": { //基本配置 "speed": { //流量控制 "channel": "" //并发数,提供了通道(并发)、记录流、字节流三种流控模式 } } }// 还可以配置脏数据控制等配置}
复制代码


(2) 根据模板编辑配置


{"job": {    "setting": {        "speed": {            "channel": 3,            "byte": 1048576        },        "errorLimit": {            "record": 0,            "percentage": 0.02        }    },    "content": [{        "reader": {            "name": "postgresqlreader",            "parameter": {                "username": "authenticator",                "password": "financegtn104",                "column": [                    "\"id\"",                    "\"name\"",                    "\"content_type_id\"",                    "\"codename\""                ],                "splitPk": "",                "connection": [{                    "table": [                        "public.auth_permission"                    ],                    "jdbcUrl": [                        "jdbc:postgresql://10.20.64.241:5433/finance"                    ]                }]            }        },        "writer": {            "name": "mysqlwriter",            "parameter": {                "username": "root",                "password": "hundsun1234",                "column": [                    "`id`",                    "`name`",                    "`content_type_id`",                    "`codename`"                ],                "connection": [{                    "table": [                        "auth_permission"                    ],                    "jdbcUrl": "jdbc:mysql://10.20.64.151:3306/datax_test"                }]            }        }    }]}}
复制代码


(3)执行 Job


python ./bin/datax.py ./job/mysql2pgsql.jsonDataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2020-08-05 13:01:22.518 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl2020-08-05 13:01:22.524 [main] INFO Engine - the machine info => osInfo: Oracle Corporation 1.8 25.181-b13 jvmInfo: Linux amd64 3.10.0-862.11.6.el7.x86_64 cpu num: 8 totalPhysicalMemory: -0.00G freePhysicalMemory: -0.00G maxFileDescriptorCount: -1 currentOpenFileDescriptorCount: -1 GC Names [PS MarkSweep, PS Scavenge] MEMORY_NAME | allocation_size | init_size PS Eden Space | 256.00MB | 256.00MB Code Cache | 240.00MB | 2.44MB Compressed Class Space | 1,024.00MB | 0.00MB PS Survivor Space | 42.50MB | 42.50MB PS Old Gen | 683.00MB | 683.00MB Metaspace | -0.00MB | 0.00MB

2020-08-05 13:01:22.541 [main] INFO Engine -{ "content":[ { "reader":{ "name":"postgresqlreader", "parameter":{ "column":[ "\"id\"", "\"name\"", "\"content_type_id\"", "\"codename\"" ], "connection":[ { "jdbcUrl":[ "jdbc:postgresql://10.20.64.241:5433/finance" ], "table":[ "public.auth_permission" ] } ], "password":"*************", "splitPk":"", "username":"authenticator" } }, "writer":{ "name":"mysqlwriter", "parameter":{ "column":[ "`id`", "`name`", "`content_type_id`", "`codename`" ], "connection":[ { "jdbcUrl":"jdbc:mysql://10.20.64.151:3306/datax_test", "table":[ "auth_permission" ] } ], "password":"***********", "username":"root" } } } ], "setting":{ "errorLimit":{ "percentage":0.02, "record":0 }, "speed":{ "byte":1048576, "channel":3 } }}2020-08-05 13:01:22.557 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null2020-08-05 13:01:22.559 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=02020-08-05 13:01:22.559 [main] INFO JobContainer - DataX jobContainer starts job.2020-08-05 13:01:22.560 [main] INFO JobContainer - Set jobId = 02020-08-05 13:01:22.676 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:postgresql://10.20.64.241:5433/finance.2020-08-05 13:01:22.704 [job-0] INFO OriginalConfPretreatmentUtil - table:[public.auth_permission] has columns:[id,name,content_type_id,codename].2020-08-05 13:01:22.917 [job-0] INFO OriginalConfPretreatmentUtil - table:[auth_permission] all columns:[id,name,content_type_id,codename].2020-08-05 13:01:22.924 [job-0] INFO OriginalConfPretreatmentUtil - Write data [INSERT INTO %s (`id`,`name`,`content_type_id`,`codename`) VALUES(?,?,?,?)], which jdbcUrl like:[jdbc:mysql://10.20.64.151:3306/datax_test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]2020-08-05 13:01:22.925 [job-0] INFO JobContainer - jobContainer starts to do prepare ...2020-08-05 13:01:22.925 [job-0] INFO JobContainer - DataX Reader.Job [postgresqlreader] do prepare work .2020-08-05 13:01:22.926 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do prepare work .2020-08-05 13:01:22.926 [job-0] INFO JobContainer - jobContainer starts to do split ...2020-08-05 13:01:22.927 [job-0] INFO JobContainer - Job set Max-Byte-Speed to 1048576 bytes.2020-08-05 13:01:22.929 [job-0] INFO JobContainer - DataX Reader.Job [postgresqlreader] splits to [1] tasks.2020-08-05 13:01:22.930 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] splits to [1] tasks.2020-08-05 13:01:22.944 [job-0] INFO JobContainer - jobContainer starts to do schedule ...2020-08-05 13:01:22.947 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.2020-08-05 13:01:22.948 [job-0] INFO JobContainer - Running by standalone Mode.2020-08-05 13:01:22.955 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.2020-08-05 13:01:22.958 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.2020-08-05 13:01:22.958 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.2020-08-05 13:01:22.964 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started2020-08-05 13:01:22.967 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select "id","name","content_type_id","codename" from public.auth_permission] jdbcUrl:[jdbc:postgresql://10.20.64.241:5433/finance].2020-08-05 13:01:23.063 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select "id","name","content_type_id","codename" from public.auth_permission] jdbcUrl:[jdbc:postgresql://10.20.64.241:5433/finance].2020-08-05 13:01:23.365 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[402]ms2020-08-05 13:01:23.366 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.2020-08-05 13:01:32.966 [job-0] INFO StandAloneJobContainerCommunicator - Total 24 records, 803 bytes | Speed 80B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%2020-08-05 13:01:32.966 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.2020-08-05 13:01:32.967 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do post work.2020-08-05 13:01:32.967 [job-0] INFO JobContainer - DataX Reader.Job [postgresqlreader] do post work.2020-08-05 13:01:32.967 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.2020-08-05 13:01:32.968 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /home/dataX/datax/hook2020-08-05 13:01:32.968 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00%
[total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s 2020-08-05 13:01:32.969 [job-0] INFO JobContainer - PerfTrace not enable!2020-08-05 13:01:32.969 [job-0] INFO StandAloneJobContainerCommunicator - Total 24 records, 803 bytes | Speed 80B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%2020-08-05 13:01:32.969 [job-0] INFO JobContainer -任务启动时刻 : 2020-08-05 13:01:22任务结束时刻 : 2020-08-05 13:01:32任务总计耗时 : 10s任务平均流量 : 80B/s记录写入速度 : 2rec/s读出记录总数 : 24读写失败总数 : 0
复制代码

DataX-Web 运行

编辑操作太麻烦?是否有如 ketter 一样提供图形界面生产配置文件,是否有任务统一管理?


DataX Web 是 DataX 的集成可视化页面系统,选择数据源即可一键生成数据同步任务,支持批量创 建 RDBMS 数据同步任务,集成开源调度系统,支持分布式、增量同步数据、实时查看运行日志、监控执行器资源、KILL 运行进程等 相较于 kettle,DataX-Web 直接通过页面一键生成任务,实时查看运行情况,同步效率更快等优势


(1)环境准备*     MySQL (5.5+) 必选,对应客户端可以选装, Linux服务上若安装mysql的客户端可以通过部署脚本快速初始化数据库*     JDK (1.8.0_xxx) 必选*     DataX 必选* Python (2.x) (支持Python3需要修改替换datax/bin下面的三个python文件,替换文件在doc/datax-web/datax-python3下) 必选,主要用于调度执行底层DataX的启动脚本,默认的方式是以Java子进程方式执行DataX,用户可以选择以Python方式来做自定义的改造


(2)安装 1、源码编译 2、官方编译好的包。 解压包后,进入解压后的目录,找到bin目录下面的install.sh文件,如果选择交互式的安装,则直接执行./bin/install.sh
复制代码


注意点:


官网的数据库脚本安装的job_info表缺少user_id字段,界面点击的时候会报错找不到user_id字段错误,手动增加user_id字段,类型为int,长度为11。Alert (ALTER TABLE job_info ADD  user_id int(11);)
复制代码


详细参考教程:https://github.com/WeiYe-Jing/datax-web/blob/master/doc/datax-web/datax-web-deploy.md


安装包说明


4329


最后在浏览器输入 IP:port 就可以看到 DataX-Web 的页面了

DataX 和 kettle 对比

1)Kettle 拥有自己的管理控制台,可以直接在客户端进行 etl 任务制定,不过是 CS 架构,而不支持 BS 浏览器模式。DataX 并没有界面(DataX-Web 已经扩展,支持 BS)


2)支持的数据库,都支持的比较齐全,kettle 支持的应该更多,DataX 是阿里开发,可以更好地支持阿里自身的数据库系列,如 ODPS、ADS 等


3)Kettle 已经加入 BI 组织 Pentaho,加入后 kettle 的开发粒度和被关注度更进一步提升


4)DataX 开源的支持粒度不高,关注度远没有 kettle 高,代码提交次数更是少的很。


5) 根据网上参考信息,网友测试 kettle 全量抽取较大数据量时,抽取时间长,对比测试 datax 比 kettle 快。

其他

  • DataX 插件开源还支撑其他扩展,可以下载源码自己编译:Datax 支持 ElasticSearch Reader (官方已经支撑 Writer,但不支持 Reader)Datax 支持增量 postgresql writeMode updateDatax 支持 Redis Writer 扩展源码下载地址:https://github.com/WeiYe-Jing/datax-web/issues/253

  • PostgreSql 没有支持增量配置,针对有修改的增量数据可以采用将增量数据放在临时表中(与目标表同库),通过表比对将修改的数据删除再插入,新增数据直接插入

  • 4328




想向技术大佬们多多取经?开发中遇到的问题何处探讨?如何获取金融科技海量资源?


恒生LIGHT云社区,由恒生电子搭建的金融科技专业社区平台,分享实用技术干货、资源数据、金融科技行业趋势,拥抱所有金融开发者。


扫描下方小程序二维码,加入我们!



发布于: 刚刚
用户头像

还未添加个人签名 2018.11.07 加入

还未添加个人简介

评论

发布
暂无评论
基于DataX的数据同步(下)-应用DataX进行数据同步