写点什么

从 Hive 到 TiDB|nifi 导入数据到 tidb 的实战过程

作者: du 拉松原文来源:https://tidb.net/blog/cfd538a9


本次测试用到的环境:


tidb: v8.5.2


nifi:1.28.1


mysql 驱动:8.0.33


Apache NiFi 是 Apache 软件基金会的开源数据集成与流处理平台,主要用于 自动化数据流转、数据转换、数据路由。它通过可视化界面配置和管理数据流,能够在不同系统之间安全、高效、可控地移动和处理数据。我们可以使用 nifi 进行数据库数据同步操作或者使用 nifi 做相关的数据治理操作。


本次总结主要是在从 hive 库迁移到 tidb 数据库过程中,对目前在使用的 nifi 方式验证过程中遇到的相关问题。


目前遇到了 nifi 批量 insert 和 upsert 速度慢问题:


  1. nifi 执行 10000 条 1 批次 insert 提交,5 分钟仅执行 18 次(单线程)

  2. nifi 执行 10000 条 1 批次的 upsert 提交,5 分钟仅 13 次(单线程)


但是使用 jmeter 压测结果大概 233w 每 5 分钟(单线程),使用的方式为 insert into table values (),()…. 1 万条数据。


针对上述明显差异做如下排查。

一、 insert 的优化方式

梳理问题

首先打开 tidb 的 general log,先设置 nifi 为 10 条一批次的提交方式,会看到类似一下的日志:


insert into table () values (); insert into table () values (); insert into table () values (); .... insert into table () values (); commit;
复制代码


从上面可以看到多次请求一个事务提交的方式。

优化方式一

然而请教了官方售后和售前小伙伴还有参考官方文档中的描述需要再 jdbcUrl 中增加如下配置:


?useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSqlLimit=10000&prepStmtCacheSize=1000&useConfigs=maxPerformance&rewriteBatchedStatements=true&rewriteBatchedStatements=true
复制代码


官网描述:https://docs.pingcap.com/zh/tidb/stable/java-app-best-practices/#useserverprepstmts


会看到如下的 log 信息:


insert into table (name) values (?),(?)(?)(?)(?)(?); commit
复制代码


这个时候测试的结果 nifi 大概每 5 分钟提交 100 次;但是这种方式在 tidb 的日志中会出现如下的错误提示:


[2025/08/18 15:42:04.486 +08:00] [INFO] [conn.go:1184] ["command dispatched failed"] [conn=2881490770] [session_alias=] [connInfo="id:2881490770, addr:100.86.12.242:54106 status:0, collation:utf8_general_ci, user:aa"] [command=Prep are] [status="inTxn:0, autocommit:0"] [sql="INSERT INTO table (aa, aa1, aa2, zy, aa3, aa4, aa4, aa5, aa6, aa7, aa8, aa9, bb1, bb2, bb3, bb4, bb5, bb6, bb7, bb8, bb9, cc1, cc2, cc3, cc4, cc5, cc6, cc7, cc8, cc9, dd1, dd2, dd3, dd4, dd5, dd6, dd7, dd8, dd9, ee1, ee2, ee3, ee4, ee5, ee6) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,? ,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,? ,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?), (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,? ,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,(len:920380)"] [txn_mode=PESSIMISTIC] [timestamp=0] [err="[executor:1390]Prepared statement contains too many placeholders"]
复制代码


这种的提示表示太多的占位符,默认最大是 65535 个。解决这个日志的方式可以减少 insert 的批次大小,比如减少到 1000 条每批次。

优化方式二

但是咱们针对客户的需求不好改变的前提,咱们还得找别的方案,所以在多次的调试过程中修改 jdbc 的参数如下:


?useServerPrepStmts=false&rewriteBatchedStatements=true&allowMultiQuries=true
复制代码


这种方式是把多条 commit 转换成 insert into table (name) values (‘aa’),(‘bb’)…. 的方式,而不是使用预编译声明的方式。而且这种方式还提高了单线程的 insert 的速度,这个方式可以达到 5 分钟 133 次,即 5 分钟 133 万的插入速度,比上面的 100 万,提高了 33% 的效率。

二、upsert 的优化方式

然而在复杂的业务中,数据的同步不仅有 insert,还有变相的治理过程,比如原来的数据保存在 hive 库中,一个表中的数据可能会出现重复,在同步数据时,使用 upsert 的方式把设定主键的数据保留唯一。

梳理问题

打开 general log,查看 nifi 生成的 upsert 语句如下:


insert ... ON DUPLICATE KEY UPDATE
复制代码


所以使用上述的 jdbcurl 配置并没有起到任何效果。并且 tidblog 中会出现 INFO 的日志:can not prepare multiple statements。

代码优化

那怎么才能使用批量的方式一块提交呢,就是 replace into 的方式。但是 nifi 不支持这个 replace into 的方式。


那就修改源码,上 github 上拉下来 1.28.1 的源码,找到 PutDatabaseRecord 组件的实现代码 PutDatabaseRecord.java,找到执行 sql 的相关代码:org.apache.nifi.processors.standard.PutDatabaseRecord#executeDML,最终定位到 mysql 的实现代码在 src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java 下


那就修改这个代码为 replace into 的方式:



重新打包:

使用 idea 找到 nifi-standard-bundle 模块,直接使用 install 即可:



执行完成后会在对应的模块下 nifi-standard-nar 下的 target 中看到对应的 nar 包。



然后打包放到 nifi 安装目录的 lib 下,替换原来的 nifi-standard-nar-1.28.1.nar。

测试结果

最终测试使用 replace 的方式,可以达到 5 分钟 96 万的速率。

总结

在 tidb 的使用过程中,可能会使用很多的第三工具用来同步数据,治理数据。但是在开始使用的过程中可能会遇到形形色色的问题。这时候需要我们排查出根本问题来给出相关的优化方案。上面是我在 nifi 使用过程中的一些优化思路,希望对大家有所帮助。同样感谢咱们 tidb 售前和售后小伙伴的支持。


发布于: 刚刚阅读数: 3
用户头像

TiDB 社区官网:https://tidb.net/ 2021-12-15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
从 Hive 到 TiDB|nifi导入数据到tidb的实战过程_TiDB第四届征文-业务场景实战_TiDB 社区干货传送门_InfoQ写作社区