业务背景 &痛点
亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、活动与竞赛等。帮助中国开发者对接世界最前沿技术,观点,和项目,并将中国优秀开发者或技术推荐给全球云社区。如果你还没有关注/收藏,看到这里请一定不要匆匆划过,点这里让它成为你的技术宝库!
使用 Flink Sql 离线表 Join 流态表的常规 lookup join,是通过 Flink hive sql connector 或者 filesystem connector,对离线 hive 库表或者 S3 上离线数据建 Flink Table,然后对 kafka 消息流中的数据建流态表,然后直接做量表做 join 操作
该方式架构如下图所示:
该方式主要面临的问题是:
解决方案思路
基于以上业务难点,本文提出一种解决方案思路,即通过 Alluxio 缓存层,将 hive 维度表数据自动加载至 Alluxio UFS 缓存中,同时通过 Flink 时态表 join,把维度表数据做成持续变化表上某一时刻的视图
同时使用 Flink 的 Temporal table function 表函数,传递一个时间参数,返回 Temporal table 这一指定时刻的视图,这样实时动态表主表与这个 Temporal table 表关联的时候,可以关联到某一个版本(历史上某一个时刻)的维度数据
优化后的整体架构如下图所示:
方案实施落地 Detail
本文以 Kafka 中用户行为日志数据做为实时流态的事实表数据,hive 上用户信息数据做为离线维度表数据,采用 Alluxio+Flink temproal 的 demo,来验证其 flink join 优化的解决方案
实时事实表
本实例中我们使用 json-data-generator 开源组件模拟的用户行为 json 数据,实时写入 kafka 中,通过 Flink kafka connector 转换为持续查询的 Flink 流态表,从而做为实时 join 的时候的 Fact 事实表数据
用户行为 json 模拟数据如下格式:
[{ "timestamp": "nowTimestamp()",
"system": "BADGE",
"actor": "Agnew",
"action": "EXIT",
"objects": ["Building 1"],
"location": "45.5,44.3",
"message": "Exited Building 1"
}]
复制代码
包含用户行为的业务时间,登录系统,用户署名,行为 activity 动作,操作涉及对象,位置信息,及相关文本消息字段。我们在
Flink Sql 中建选择主要字段建事实表如下
CREATE TABLE logevent_source (`timestamp` string,
`system` string,
actor STRING,
action STRING
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup6',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);
复制代码
Alluxio 缓存维度表
Alluxio 是大数据技术堆栈的分布式缓存,它提供了一个统一的 UFS 文件系统可以对接底层 S3,hdfs 数据,在读写 Alluxio UFS 的时候,可以针对 S3,HDFS 分布式存储层实现 warm up,显著提升吞吐量和减少网络开销,且与上层计算引擎如 Hive,spark,Trino 都有深度的集成,很适合做为离线维度数据的缓存加速器
Amazon EMR 对 Alluxio 提供了良好的集成,可以通过 boostrap 启动脚本方式,在 EMR 创建时自动部署 Alluxio 组件并启动 Alluxio master、worker 进程,详细 EMR 安装和部署 Alluxio 步骤可以参考另一篇文章 Alluxio EMR 集成实践
在集成 Alluxio 的 Amazon EMR 集群中,使用 Alluxio 中创建 hive 离线维表数据的缓存表方法如下:
hive-env.sh中设置设置client jar包:
$ export HIVE_AUX_JARS_PATH=/<PATH_TO_ALLUXIO>/client/alluxio-2.2.0-client.jar:${HIVE_AU
确保安装部署alluxio的EMR集群上ufs已配置,并且表或者db路径已创建
alluxio fs mkdir alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer
alluxio fs chown hadoop:hadoop alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer
在AWS EMR集群上,创建hive表路径指向alluxio namespace uri:
!connect jdbc:hive2://xxx.xxx.xxx.xxx:10000/default;
hive> CREATE TABLE customer(
c_customer_sk bigint,
c_customer_id string,
c_current_cdemo_sk bigint,
c_current_hdemo_sk bigint,
c_current_addr_sk bigint,
c_first_shipto_date_sk bigint,
c_first_sales_date_sk bigint,
c_salutation string,
c_first_name string,
c_last_name string,
c_preferred_cust_flag string,
c_birth_day int,
c_birth_month int,
c_birth_year int,
c_birth_country string,
c_login string,
c_email_address string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer';
OK
Time taken: 3.485 seconds
复制代码
如上所示,该 Alluxio 表 location 指向的路径即为 hive 维度表所在 S3 路径,因此对 Customer 用户维度信息表的写入操作会自动同步到 alluxio 缓存中。
创建好 Alluxio hive 离线维度表后,在 flink sql 中,可以通过 hive 的 catalog,连接到 hive 元数据,即可以查看到 alluxio 缓存表的详细信息:
CREATE CATALOG hiveCatalog WITH ( 'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf/',
'hive-version' = '3.1.2',
'hadoop-conf-dir'='/etc/hadoop/conf/'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hiveCatalog;
show create table customer;
create external table customer(
c_customer_sk bigint,
c_customer_id string,
c_current_cdemo_sk bigint,
c_current_hdemo_sk bigint,
c_current_addr_sk bigint,
c_first_shipto_date_sk bigint,
c_first_sales_date_sk bigint,
c_salutation string,
c_first_name string,
c_last_name string,
c_preferred_cust_flag string,
c_birth_day int,
c_birth_month int,
c_birth_year int,
c_birth_country string,
c_login string,
c_email_address string
)
row format delimited fields terminated by '|'
location 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer'
TBLPROPERTIES (
'streaming-source.enable' = 'false',
'lookup.join.cache.ttl' = '12 h'
)
复制代码
如上图所示,可以看到该维度表 location 路径是 alluxio 缓存 ufs 路径的 uri,业务程序读写该维度表时,alluxio 会自动更新缓存中的 customer 维度表数据,并异步写入到 alluxio 的 backend storage 的 S3 表路径,实现数据湖的表数据同步更新。
Flink Temporal 时态表 join
Flink 时态表(Temporal table)也是动态表的一种,时态表的每条记录都会有一个或多个时间字段相关联,当我们事实表 join 维度表的时候,通常需要获取实时的维度表数据做 lookup,所以通常需要在事实表 create table 或者 join 时,通过 proctime()函数指定事实表的时间字段,同时在 join 时,通过 FOR SYSTEM_TIME AS OF 语法,指定维度表 lookup 时对应的事实表时间版本的数据
在本 Demo 示例中,客户信息在 hive 离线表作为一个变化的维度表的角色,客户行为在 kafka 中作为事实表的角色,因此在 flink kafka source table 中,通过 proctime()指定时间字段,然后在 flink hive table 做 join 时,使用 FOR SYSTEM_TIME AS OF 指定 lookup 的 kafka source table 的时间字段,从而实现 Flink temporal 时态表 join 业务处理
如下所示,Flink Sql 中通过 Kafka connector 创建用户行为的事实表,其中 ts 字段即为时态表 join 时的时间戳:
CREATE TABLE logevent_source (`timestamp` string,
`system` string,
actor STRING,
action STRING,
ts as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup-01',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);
复制代码
Flink 离线维度表与流式实时表具体 join 方法如下:
select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from
(select *, proctime() as proctime from user_logevent_source) as a
left join customer FOR SYSTEM_TIME AS OF a.proctime as b on a.actor=b.c_last_name;
复制代码
如上代码示例,在事实表 logevent_source join lookup 维度表时,通过 proctime 函数获取到维度表的瞬时最新的版本数据,保障 join 时的一致性和实时性
同时,该维度表数据已经在 alluxio cache,因此读取时性能远高于离线读取 s3 上的表数据
通过 hive 切换 S3 和 alluxio 路径的 customer 信息 维度表,对比测试 flink join 可以看出 alluxio 缓存后性能明显优势
通过 alter table 方便切换本地和 cache 的 location 路径:
alter table customer set location "s3://xxxxxx/data/s3/30/customer";
alter table customer set location "alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer";
复制代码
选取某一 split 数据分片的 TaskManager 日志:
2022-06-29 02:54:34,791 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 02:54:39,971 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction [] - Loaded 433000 row(s) into lookup join cache
复制代码
2022-06-29 03:25:14,476 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 03:25:16,397 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction [] - Loaded 433000 row(s) into lookup join cache
复制代码
在 JobManager 上查看 Timeline,对比 alluxio 和 s3 路径下 job 的执行时间可以看到更加清楚
可以看到, 单个 task 查询提升 1 倍以上,整体 job 性能提升更加明显
其他需要考虑的问题
持续 Join 每次都需要拉取维度数据做 join,Flink 的 checkpoint state 是否一直膨胀导致 TM 的 RockDB 撑爆或者内存溢出?
state 自带有 ttl 机制,可以设置 ttl 过期策略,触发 Flink 清理过期 state 数据,Flink Sql 可以通过 Hint 方式设置
insert into logevent_sink
select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from
(select *, proctime() as proctime from logevent_source) as a
left join
customer/*+ OPTIONS('lookup.join.cache.ttl' = '5 min')*/ FOR SYSTEM_TIME AS OF a.proctime as b
on a.actor=b.c_last_name;
复制代码
Flink Table/Streaming API 类似:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter()
.build();
ValueStateDescriptor<Long> lastUserLogin =
new ValueStateDescriptor<>("lastUserLogin", Long.class);
lastUserLogin.enableTimeToLive(ttlConfig);
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);
复制代码
设置后重新启动 lookup join,从 Flink TM 日志中可以看到,ttl 到期后,会触发清理并重新拉取 hive 维表数据:
2022-06-29 04:17:09,161 INFO org.apache.flink.table.filesystem.FileSystemLookupFunction
[] - Lookup join cache has expired after 5 minute(s), reloading
复制代码
此外,可以通过配置 flink state retain,减少 checkpoint 时候快照数量,从而减少快照时候 state 的占用空间
Flink job中配置:
-D state.checkpoints.num-retained=5
复制代码
设置后,可以看到 s3 checkpoint 路径上,Flink Job 会自动清理历史快照,只保留最近的 5 次快照数据,从而确保 checkpoint 快照数据不会堆积
[hadoop@ip-172-31-41-131 ~]$ aws s3 ls s3://salunchbucket/data/checkpoints/7b9f2f9becbf3c879cd1e5f38c6239f8/
PRE chk-3/
PRE chk-4/
PRE chk-5/
PRE chk-6/
PRE chk-7/
复制代码
附录
Alluxio整体架构
Alluxio on EMR 快速部署
在 Amazon EMR 中利用 Alluxio 的分层存储架构
EMR Alluxio集成detail
Flink Temporal Join 详细
本篇作者
唐清原
Amazon 数据分析解决方案架构师,负责 Amazon Data Analytic 服务方案架构设计以及性能优化,迁移,治理等 Deep Dive 支持。10+数据领域研发及架构设计经验,历任 Oracle 高级咨询顾问,咪咕文化数据集市高级架构师,澳新银行数据分析领域架构师职务。在大数据,数据湖,智能湖仓,及相关推荐系统 /MLOps 平台等项目有丰富实战经验
陈昊
Amazon 合作伙伴解决方案架构师,有将近 20 年的 IT 从业经验,在企业应用开发、架构设计及建设方面具有丰富的实践经验。目前主要负责 Amazon (中国)合作伙伴的方案架构咨询和设计工作,致力于 Amazon 云服务在国内的应用推广以及帮助合作伙伴构建更高效的 Amazon 云服务解决方案。
文章来源:https://dev.amazoncloud.cn/column/article/6309af45d4155422a4610a40?sc_channel=InfoQ
评论