SeaTunnel 2.3.9 同步 Oracle 数据至 Doris 出现乱码?别慌,这么做

Oracle 字符集为 ASCII,如果想要同步数据到 Doris,就会出现中文乱码问题。那么出现这个问题该怎么办呢?实际上有办法解决。
解决思路
数据从数据库读取到 Source 的时候去处理,也就是从 ResultSet 里面获取数据的时候,去重新编码
先下载好 2.3.9 版本的 SeaTunnel 源码,进入到seatunnel-connectors-v2下的connector-jdbc模块
SeaTunnel 读取数据的流程大致如下:
JdbcSourceFactory 工作流程
JdbcSourceFactory会加载我们的 source 配置参数,构建JdbcSourceConfig,JdbcDialect,创建 JdbSource。
JdbSource 任务处理
JdbSource会创建SourceSplitEnumerator去拆分任务,然后创建JdbcSourceReader根据拆分去执行任务。
JdbcSourceReader 数据处理
JdbcSourceReader会构建JdbcInputFormat,走 pollNext方法去循环处理数据。
pollNext 方法执行步骤
JdbcSourceReader的pollNext方法会先调用 JdbcInputFormat的open方法获取 ParpareStatement和ResultSet等信息,然后调用JdbcInputFormat的nextRecord方法去处ResultSet,把数据格式转换成 SeaTunnel 需要 SeaTunnelRow。
nextRecord 方法处理
JdbcInputFormat的nextRecord会调用 JdbcRowConverter的toInternal方法处理 ResultSet,JdbcRowConverter的实现类AbstractJdbcRowConverter,是我们需要修改的(针对于 Oracle 数据库)。
问题及解决思路
进入到AbstractJdbcRowConverter的toInternal方法,发现在处理 String 类型字段时:
它默认是JdbcFieldTypeUtils.getString(rs, resultSetIndex),这一步拿到的数据就已经乱码了。
我们需要修改为JdbcFieldTypeUtils.getBytes(rs, resultSetIndex),然后根据字节数组去重新编码。
以上是 SeaTunnel 处理 ResultSet 的流程。
解决方案
下面给出我们的解决方案:
针对于 Oracle 字符集可以存储中文的情况,没有必要先得到字节数组,再重新编码,直接 getString 是可以的
为此我们需要一个开关,来判断是否需要转码
在JdbcInputFormat类里面,我们加上一个 params 的参数:
在JdbcInputFormat构造方法里,为这个参数附上值:
然后在JdbcInputFormat的nextRecord方法里的 jdbcRowConverter的toInternal方法,增加一个 params 参数:
接着到实现类AbstractJdbcRowConverter添加一个编码的方法:
在该实现类的 toInternal 方法,针对 String 类型的处理方式,修改如下:
修改完之后需要把connector-jdbc重新打包,替换掉 seatunnel 的 connectors 目录下的connector-jdbc-2.3.9.jar,然后重启集群即可。
我们的配置参数脚本:
如果 Oracle 不存在乱码的情况,这个 properties 就不需要传递
ps:如果您在connector-jdbc里面打印信息,请到 seatunnel 的 logs 目录下,去查看你的 worker 节点日志信息。
转载自花载酒 779
原文链接:https://blog.csdn.net/m0_66532138/article/details/146957750









评论