写点什么

EMR-StarRocks 与 Flink 在汇量实时写入场景的最佳实践

  • 2022-11-25
    浙江
  • 本文字数:4736 字

    阅读完需:约 16 分钟

EMR-StarRocks 与 Flink 在汇量实时写入场景的最佳实践

作者:


刘腾飞 汇量后端开发工程师

阿里云开源 OLAP 研发团队


EMR-StarRocks 介绍

阿里云 EMR 在年初推出了 StarRocks 服务,StarRocks 是新一代极速全场景 MPP(Massively Parallel Processing)数据仓库,致力于构建极速和统一分析体验。EMR StarRocks 具备如下特点:

  • 兼容 MySQL 协议,可使用 MySQL 客户端和常用 BI 工具对接 StarRocks 来分析数据

  • 采用分布式架构:

  • 对数据表进行水平划分并以多副本存储

  • 集群规模可以灵活伸缩,支持 10 PB 级别的数据分析

  • 支持 MPP 框架,并行加速计算

  • 支持多副本,具有弹性容错能力

  • 支持向量化引擎和 CBO

  • 支持弹性扩缩容

  • 支持明细模型、聚合模型、主键模型和更新模型


更多详细信息可以参考https://help.aliyun.com/document_detail/405463.html


Flink-CDC 概念介绍



CDC 的全称是 Change Data Capture,面向的场景包括数据同步、数据分发、数据采集,Flink CDC 主要面向数据库的变更,可以将上游数据和 Schema 的变更同步到下游数据湖和数据仓库中。2020 年 7 月,Flink CDC 项目提交了第一个 Commit,去年 8 月,Flink 社区发布了 CDC2.0,经过两年时间的打磨,在商业化使用上已经非常成熟。本文主要以 Mysql CDC 为例,介绍 StarRocks+Flink CDC 实时入仓中用户遇到的痛点,以及在 Flink 和 StarRocks 层面进行的对应优化和解决方案。


使用 CDC 将一张 Mysql 表中的数据导入到 StarRocks 的表中,首先需要在 StarRocks 上建立用来承接 Mysql 数据的目标表,然后在 Flink 上分别创建 Mysql 表和 StarRocks 表在 Flink 中 Sink 和 Source 表的映射,然后执行一条 insert into sink_table from source_table 语句。执行完 Insert into 之后,会生成一个 CDC 任务,CDC 任务首先向目标表同步源表的全量数据,完成后继续基于 Binlog 进行增量数据的同步。通过一个任务,完成数据的全量+增量同步,对于用户来讲是非常友好的。但是在使用的过程中,依然发现了一些痛点。


实时写入场景的用户痛点

SQL 开发工作量大

对于一些还没有完成数仓建设的新业务,或是刚刚开始依托 StarRocks 进行 OLAP 平台建设的用户而言,在 StarRocks 中建表以承载 Mysql 同步过来的数据是第一步。在一些复杂的业务中,Mysql 中的表往往有几十上百张,每张表又有数十个字段,要把它们对应的 StarRocks 表的建表语句全部编写出来是一个很大的工作量。第一个痛点 StarRocks 建表的工作量大。


Flink 字段的数据类型映射关系复杂易错

在 StarRocks 中建表是第一步,建表完成之后,为了启动 CDC 任务,还需要在 Flink 中建立 Mysql 对应的 Source 表,以及 StarRocks 对应的 Sink 表,其中 Flink 建表时,每个字段的字段类型与 Mysql、与 StarRocks 的映射关系需要严格注意,对于动辄几十上百个需要字段的表,每个字段都需要查找对应在 Flink 的类型映射关系,尤其令开发人员痛苦。因此,第二个痛点是上下游表与 Flink 字段的数据类型映射关系复杂,容易出错。


Schema 变更操作繁琐

第三个痛点来自于业务数据 Schema 的变化,据 Fivetran 公司调查,约有 60%的公司数据 Schema 每个月都会发生变化,30%的公司数据 Schema 每周都会发生变化。对于 Mysql 表中字段的增删改,用户希望在不影响 CDC 任务的情况下,将 Schema 变化同步到下游的 StarRocks。目前常用的方案,是在手动停止任务后,更改 StarRocks 和 Mysql 的 Schema,更改 Flink 侧的 Sink 和 Source 表结构,通过指定 savepoints 的方式再次启动任务。Schema 变更的操作繁琐,无法自动化是第三个痛点。


数据同步任务占用资源多

第四个痛点,是在表的数量多、实时增量数据量大的场景下,CDC 任务占用的内存和 cpu 资源较高,出于节省成本的考虑,用户希望尽可能的在资源利用方面进行优化。

接下来,我们来看针对这些痛点,EMR-StarRocks 在与 Flink 深度结合方面做了哪些优化,提供了什么样的解决方案。


CTAS&CDAS

EMR-StarRocks 与 Flink 团队推出的 CTAS&CDAS 功能主要是针对前三个痛点研发的一个解决方案。通过 CTAS&CDAS,可以使用一条 SQL 语句,完成 StarRocks 建表、Flink-CDC 任务创建、实时同步 Schema 变更等原本需要多项繁杂操作的任务,令开发和运维的工作量大大降低。


CTAS 介绍

CTAS 的全称是 create table as,语法结构如下:

CREATE TABLE IF NOT EXISTS runoob_tbl1 with ('starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8','database-name'='test_cdc','jdbc-url'='jdbc:mysql://172.16.**.**:9030','load-url'='172.16.**.**:8030','table-name'='runoob_tbl_sr','username'='test','password' = '123456','sink.buffer-flush.interval-ms' = '5000') as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',  'port' = '3306',  'username' = 'test',  'password' = '123456',  'database-name' = 'test_cdc',  'table-name' = 'runoob_tbl'  )*/;
复制代码


通过 CTAS 的语法结构可以看到,除了集群信息和 DataBase 信息外,还有一个特殊配置“starrocks.create.table.properties”,这是由于 Mysql 与 StarRocks 的表结构有一些不同,如 Key Type、分区、Bucket Number 等特殊配置,因此用它来承接 StarRocks 建表语句中字段定义后面的内容。


为了方便用户更快的建表,还设置了一个 Simple Mode,配置方式如下:

CREATE TABLE IF NOT EXISTS runoob_tbl1 with ('starrocks.create.table.properties'=' buckets 8','starrocks.create.table.mode'='simple','database-name'='test_cdc','jdbc-url'='jdbc:mysql://172.16.**.**:9030','load-url'='172.16.**.**:8030','table-name'='runoob_tbl_sr','username'='test','password' = '123456','sink.buffer-flush.interval-ms' = '5000') as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',  'port' = '3306',  'username' = 'test',  'password' = '123456',  'database-name' = 'test_cdc',  'table-name' = 'runoob_tbl'  )*/;
复制代码

开启 Simple Mode 之后,将默认使用 Primary Key 模型,默认使用 Mysql 中的主键作为 Primary Key,默认使用哈希(主键)进行分桶,这样,用户在启动 Simple Mode 对表使用 CTAS 语句时,就完全不需要关心 Mysql 中原表有哪些字段,字段名称是什么,主键是什么,只需要知道表名,就可以高效的完成 SQL 编写。


CTAS 的原理



如图所示,在执行了 CTAS 语句后,首先 Flink 会自动在 StarRocks 中创建一个与 Mysql 源表的 Schema 相同的目标表,然后建立 Mysql 与 StarRocks 表在 Flink 中的 Sink 和 Source 映射,接下来启动一个 CDC 任务,该任务将同步源表数据到目标表,并在运行时监测 Mysql 源表发送过来的数据发生的 Schema 变更,自动将 Schema 变更同步到 StarRocks 目标表中。CTAS 功能实际上是用一个 SQL,完成了原本需要手动编写 SQL 和执行的多项操作。

接下来介绍 CTAS 的实现原理。CTAS 的实现主要依赖了 Flink CDC、Flink Catalog 和 Schema Evolution。Flink 的 CDC 功能前面已经介绍过了。其中的 Catalog 功能,使 Flink 可以感知到 StarRocks 中所有的 DataBase 和所有 table 的 Schema,并对它们进行 DDL 操作。而 Schema Evolution 功能,是通过对数据的 Schema 变化进行检测和记录实现的,例如,当 Mysql 发生增列操作时,CTAS 任务并不会根据 Mysql 的 DDL 变化,立刻对下游 StarRocks 进行添加列的操作,而是当第一条使用了新 Schema 的数据被处理时,才会通过对比新旧数据 Schema 的区别,生成对应的 Alter Table Add Column 语句,对 StarRocks 进行增列操作,在等待 StarRocks 的 Schema 变更完成之后,新的数据才会被推送到下游。


CDAS 介绍

CDAS 是 CTAS 的一个语法糖。通过 CDAS 语句,可以实现 Mysql 中的整库同步,即生成一个 Flink Job,Source 是 Mysql 中的 database,目标表是 StarRocks 中对应的多张表。

CREATE DATABASE IF NOT EXISTS sr_db with ('starrocks.create.table.properties'=' buckets 8','starrocks.create.table.mode'='simple','database-name'='test_cdc','jdbc-url'='jdbc:mysql://172.16.**.**:9030','load-url'='172.16.**.**:8030','username'='test','password' = '123456','sink.buffer-flush.interval-ms' = '5000') as table mysql.test_cdc.runoob_tbl including table 'tabl1','tbl2','tbl3'   /*+ OPTIONS (   'connector' = 'mysql-cdc',  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',  'port' = '3306',  'username' = 'test',  'password' = '123456',  'database-name' = 'test_cdc' )*/;
复制代码

由于我们期望使用一条 SQL 生成多张表的 Schema 和 CDC 任务,因此需要统一使用 Simple 模式。在实际使用过程中,一个 DataBase 中可能有些表不需要同步、有些表需要自定义配置,因此我们可以使用 Including Table 语法,只选择一个 DataBase 中的部分表进行 CDAS 操作,对于需要自定义属性配置的表,则使用 CTAS 语句进行操作。


重要特性

CTAS&CDAS 的几个重要特性包括:

  • 支持将多个 CDC 任务使用同一个 Job 执行,节省了大量的内存和 CPU 资源。

  • 支持 Source 合并,在使用 CDAS 进行数据同步时,会使用一个 Job 管理所有表的同步任务,并自动将所有表的 Source 合并为一个,减少 Mysql 侧并发读取的压力。

  • 支持的 Schema Change 类型包括增加列、删除列和修改列名。这里需要注意的是,当前所支持的删除列操作,是通过将对应字段的值置空来实现的,例如上游 Mysql 表删除了一个字段,在 Flink 检测到数据 Schema 变更后,并不会将 StarRocks 中对应的列删除,而是在将数据写入到 StarRocks 时,把对应的字段的值填为空值。而修改列名的操作,也是通过增加一个新列,并把新数据中原来的列的值置空来实现的。


Connector-V2 介绍

Connector-V2 是为了解决第四个痛点而研发的,可以帮助用户降低通过 Flink 导入 StarRocks 时的内存消耗,提升任务的稳定性。



如图所示,在 V1 版本中,为了保证 Exactly-Once,我们需要将一次 Checkpoint 期间的所有数据都憋在 Flink 的 Sink 算子的内存中,由于 Checkpoint 时间不能设置的太短,且无法预测单位时间内数据的流量,因此不仅造成了内存资源的严重消耗,还经常因 OOM 带来稳定性问题。


V2 版本通过两阶段提交的特性解决了这个问题,两阶段提交指的是,数据的提交分为两个阶段,第一阶段提交数据写入任务,在数据写入阶段数据都是不可见的,并且可以分批多次写入,第二阶段是提交阶段,通过 Commit 请求将之前多批次写入的数据同时置为可见。StarRocks 侧提供了 Begin、Prepare、Commit 等接口,支持将多次数据写入请求作为同一个事务提交,保证了同一事务内数据的一致性。


通过显示的调用 Transaction 接口的方式,可以由原来在 Flink 侧积攒大批数据、一次性发送数据的方式,改进为连续小批量提交数据,在保证 Exactly-Once 的同时,大大降低了 Flink 侧用于存储数据 Buffer 的内存消耗问题,也提高了 Flink 任务的稳定性。


StarRocks + Flink 在汇量的实践

在汇量的广告投放分析业务中,使用了 CDAS 特性来完成 Mysql 到 Flink 数据的实时变更。



此前,该业务主要依托某闭源数据仓库进行 OLAP 分析,随着数据量的增长,在单表查询和多表 Join 场景都出现了较大的瓶颈,查询耗时达到无法容忍的分钟级,因此重新选型采用了 StarRocks 进行数据分析,在对应场景下表现十分优异。


在汇量的业务场景下,StarRocks 中有几十张涉及操作元数据的小表是使用 CDAS 进行实时同步的,另外几张数据量较大的明细表是以离线导入的形式按天更新的。使用 CDAS 的主要是数据更新和 Schema 变化较为频繁的小表和维度表,进行业务查询时,将这些实时更新的表与离线的数据表进行 Join,通过小表实时更新、大表离线更新、大小表联合查询的方式,实现了实时性、成本以及导入与查询性能的取舍均衡。由于业务对数据的准确性要求较高,因此使用了 Exactly-once 语义,通过 Flink 的 Checkpoint 机制来保证数据的不丢不重。

用户头像

还未添加个人签名 2020-10-15 加入

分享阿里云计算平台的大数据和AI方向的技术创新和趋势、实战案例、经验总结。

评论

发布
暂无评论
EMR-StarRocks 与 Flink 在汇量实时写入场景的最佳实践_数据库_阿里云大数据AI技术_InfoQ写作社区