Flink CDC 在新能源制造业的实践
摘要:本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。主要有以下几个内容:
CDC 方案选型
方案落地实施
平台的优越性
后续规划
我们是一家专注于新能源动力电池制造的企业,致力于推动能源技术的发展与应用。作为一家具有多年行业经验的企业,我们在新能源领域积累了深厚的技术实力和市场认知,业务涵盖了新能源产业链的关键环节。从上游的装备制造到下游的应用解决方案,为客户提供了全方位的服务。
随着企业业务的不断发展,对数据实时性的要求日益提高。如何管理复杂的 Flink 任务,提高任务的开发效率,降低任务的运维难度,成为我们亟待解决的问题。接下来,我将介绍在我们团队在新能源制造业中大数据平台 CDC 技术架构的选型和 Flink CDC 的最佳实践。
一、CDC 方案选型
目前,我们团队规划的数据平台架构如上图所示,数据源这一层主要以 Oracle 为主,且跨网络的分散在各个基地,考虑到数据源支持和时效性,我们需要支持 Oracle CDC 数据采集,结合我们的业务场景,我们调研了实时采集的如下方案:
如上图所示,从 CDC 机制、增量同步、断点续传、全量同步、全量+增量、架构、数据计算、生态这八个方面做了对比,可以看出其中的佼佼者主要是 Flink CDC 和 Oracle OGG 以及 Debezium。
首先,让我们了解这三种技术的基本概况。Flink CDC 是 Apache Flink 的一个官方子项目,它利用流处理能力来捕获和转换源数据库的变更事件。OGG 是 Oracle 提供的一种成熟的商业化 CDC 技术,广泛用于数据复制和实时数据分析。Debezium 则是一个开源的分布式 CDC 系统,支持多种数据库和消息中间件,易于扩展和集成。
从实现机制上来看,Flink CDC 依赖于数据库的日志或 redo log 来捕捉变更,并通过 Flink 的流处理引擎进行后续处理。它的优势在于与 Flink 生态的无缝整合,为批处理和流处理提供了统一的解决方案。此外,Flink CDC 支持丰富的转换和处理功能,如窗口操作、状态管理和时间处理,这对于复杂的实时分析场景非常有益。然而,它的局限性也较为明显,即对源数据库的依赖性较强,需要特定的日志格式或访问权限。
OGG 作为一种成熟的商业产品,提供了强大的数据捕获和传输能力。它通过解析源数据库的日志文件来捕获变更,并能够在不同的系统之间高效地传输数据。OGG 的优势在于其稳定性和广泛的兼容性,尤其是在处理大规模数据和高并发场景时表现出色。但是,OGG 的成本较高,且对于开源生态系统的支持不如其他方案那么丰富。
Debezium 则采用了不同的方法,它直接连接到数据库,通过订阅 binlog 或 redo log 来捕获变更事件。Debezium 的特点是其灵活性和易用性,支持多种数据库类型,并且可以方便地与 Kafka 等消息中间件集成。这使得 Debezium 非常适合于多源异构环境下的数据同步和实时分析。但是,Debezium 的性能可能不如 Flink CDC 和 OGG 那样出色,尤其是在处理大量数据时可能需要更多的优化工作。
在应用场景方面,Flink CDC 适合于需要复杂事件处理和流式分析的场景,尤其是那些已经采用 Flink 作为流处理平台的组织。OGG 则更适合于对稳定性和性能有极高要求的大型或关键任务应用。而 Debezium 由于其灵活性和易用性,适用于需要快速开发和部署 CDC 解决方案的场景,尤其是面对多样化的数据源和技术栈时。
此外,考虑到 OGG 成本高,链路长,且文档资源少,而 Debezium 配置复杂,而且在后续大数据流量或存在复杂的计算逻辑时难以为继,最终我们选择了 Flink CDC 作为 Oracle 的实时采集的工具方案。
二、方案落地实施:
我们团队在 Flink CDC 中的实施落地主要分为以下几步:
确保 Oracle 日志归档模式开启,开通 Oracle 整库补充日志,开通表级别的全列补充日志。
赋予操作用户各类权限,如表的操作权限,logminer 与 archive log 相关视图查询权限等。
开启跨基地网络权限,打通 Hadoop 集群对 Oracle 的访问权限。
通过平台编写 Flink SQL 任务,将 Oracle 的数据直接存入到 Doris 中,下图为线上作业效果图。
开发的过程中虽然整体顺利,但也遇到了几个小插曲,在此与各位读者一起分享
该配置开启了 Oracle 日志在线挖掘模式,如果没有这个设置,会导致生产环境默认策略读取 log 较慢,且默认策略会写入数据字典信息到 redo log 中导致日志量增加较多。但该配置也有缺点,不写入数据字典到 redo log 中,会导致无法处理 DDL 语句。
我们在同步 Oracle11g 的数据时,遇到了报错
该错误是因为对于 Oracle11 版本,Debezium 会默认把 tableIdCaseInsensitive 设置为 true, 导致表名被更新为小写,因此在 Oracle 中查询不到 这个表补全日志设置,导致误报这个 Supplemental logging not configured for table 错误”,我们只需要开启如下设置即可
我们的任务在运行一段时间后报错
这是因为 schema change event parsing 导致的问题,已经在 3.1 版本得到了解决,下载 3.1 版本的包即可,详见 Flink CDC 项目的 PR #2315。
Flink Oracle CDC connector 在采集分区表的时候存在问题,报错为
这是由于分区表未能被成功扫描到,我们进入到源码的类 OracleConnectionUtils 中可以看到
其中如果需要加入分区表的话,要在 where 条件中加入 PARTITIONED = 'YES',重新编译打包即可解决该问题。
当然写入 Doris 的速度也可能存在慢的现象,特别是 StreamLoad 中进程 stopped 与返回 loadResult 结果之间时间较长时的问题,得益于 Apache Doris 社区的帮助,我们总结以下几点优化方案:
看 BE 的资源是不是满了,检查 IO、CPU、内存
换成 2pc 导⼊,⾮ 2pc 会等待 publish,2pc 导⼊ publish 是异步的
mow 尝试换成 mor 表
show proc '/transactions' 看看事务的状态,如果都是 committed,说明 publish 卡住了,如果 io 不⾼,可以增加 publish 的线程和超时
增加 be.conf 的配置 ,适当提⾼下⾯的参数的配置
三、平台的优越性:
此外 Apache StreamPark 也给我们带来了很多惊喜,在开发 FlinkSQL 任务的过程中,借助 StreamPark,极大的提升了开发时的效率,我罗列了我们开发团队觉得非常优秀的点:
1. 多版本 Flink 管理
在开发过程中,可能会因为需求 Flink 新版本的某项能力,或是某些组件对 Flink 版本的强需求而引入多版本的 Flink,在 StreamPark 中,只需要配置一个 Flink 的路径并为其命名,即可在开发任务中轻松地选择我所需要的 Flink 版本。
2. 对于 Git 的集成
我们在引入 Apache Dolphinscheduler 时,曾多次接到数仓团队表明调度系统未能接入版本控制的遗憾,但在引入实时平台时,这个遗憾被弥补了。StreamPark 的 Git 式项目管理,极大的简化了我们在编写 JAR 后复杂的部署链路,一键编译、部署、配置,一气呵成。
3. 强大的作业运维能力及告警机制
StreamPark 有着完善的作业操作记录,可以清晰的记录所有任务 Release、Start、Cancel 的记录,对于 yarn application 形式的任务,记录了每次运行的 appId,便于后续的 yarn 日志翻阅、运行状态的监测等等。一旦报错,StreamPark 会迅速发送告警,如果配置了重启机制,可以实现分钟级的重启效率。
4. Flink SQL 在线开发
StreamPark 的类 IDE 样式的开发窗口相当优越,可以帮助我们检查低级的语法错误,简单的轻松地便可以配置各类想要配置的参数,如分配 JM、TM 的资源,配置预备提交的 yarn 队列,它的作业依赖配置更是相当简易,提供了填写 pom 和上传 jar 包两种形式,再也无需将包傻傻地通过 hdfs 上传,极大地提升了我们的开发效率。
四、后续规划:
感谢 Flink CDC,作为 Apache Flink 社区的重点项目,它提供给了我们开箱即用实时数据采集方案,极大的简化了 CDC 的实施过程,也规避了 OGG 昂贵的使用费用。
不过目前 Flink CDC Oracle Connector 也存在一些性能问题,众所周知,Oracle CDC 依赖 Debezium 组件解析 Redo Log 与 Archive Log,Debezium 通过 Oracle 的 Logminer 解析 Log。最大的问题就在于 LogMiner 的性能限制, 熟悉 Oracle 的开发者会知道,LogMiner 是运行在 Oracle 内部, 并且运行在日志落地之后, 不可避免地需要消耗数据库的算力去完成工作, 为了降低这个不在主流程的进程的资源消耗, Oracle 对 LogMiner 做了非常严格的资源限制, 每个 LogMiner 进程, 他的资源消耗都不能超过 1 个 cpu 核心, 在大多数场景下, 这个将 LogMiner 的日志解析速度限制在 1w 条每秒以下, 在一些制造业的场景下, 这个速度是远远不够的, Oracle 是一个事务数据库, 一个大的 Update, 可能会带来数十万上百万的更新, 在这种情况下, 每秒 1w 的解析速度会使得下游延迟增大到数分钟级别, 更糟糕的是, 在数据库本身负载较高的情况下, 由于 LogMiner 的解析与数据库共享负载, 会让解析速度进一步下降。
后续团队考虑通过 Oracle 的机制, 将 redo log 异步传输到另外一台没有业务压力的 Oracle 实例上, 然后在另外一台机器上开启并发切分 scn range 事件,开启 LogMiner 线程并发的解析对应 scn range 事件,顺序的处理获取到的事件。
欢迎大家多多关注 Flink CDC,从钉钉用户交流群[1]、微信公众号[2]、Slack 频道[3]、邮件列表[4]加入 CDC 用户社区,以及在 Flink CDC GitHub 仓库[5]上参与代码贡献!
[1] “ Flink CDC 社区 ② 群”群的钉钉群号:80655011780
[2] ” Flink CDC 公众号“的微信号:ApacheFlinkCDC
[3] https://flink.apache.org/what-is-flink/community/#slack
[4] https://flink.apache.org/what-is-flink/community/#mailing-lists
[5] https://github.com/apache/flink-cdc
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:新用户复制点击下方链接或者扫描二维码即可 0 元免费试用 Flink + Paimon实时计算 Flink 版(3000CU*小时,3 个月内)了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc
版权声明: 本文为 InfoQ 作者【Apache Flink】的原创文章。
原文链接:【http://xie.infoq.cn/article/9800791a2b793dd4d5139b1da】。文章转载请联系作者。
评论