写点什么

百度网盘基于 Flink 的实时计算实践

作者:百度Geek说
  • 2025-09-02
    北京
  • 本文字数:6931 字

    阅读完需:约 23 分钟

百度网盘基于Flink的实时计算实践

01 概览

随着数字化转型的来临,企业对于数据服务的实时化需求日益增长,在大规模数据和复杂场景的情况下,Flink 在实时计算数据链路中扮演着极为重要的角色,本文介绍了网盘如何通过 Flink 构建实时计算引擎,从而提供高性能、低延迟、稳定的实时计算能力。

02 百度网盘实时计算演进

2.1 百度网盘实时计算演进历程


△百度网盘实时计算演进


在 2020 年,网盘主要通过 Spark Streaming 和 Spark Structured Streaming 来用于特定场景的支持,主要是在数据同步场景、实时清洗方面的应用。


为了解决 Spark Streaming 存在的监控告警薄弱、接入成本高、时效性低等问题,网盘于 2023 年初首次引入 Flink 实时计算引擎,并基于百度内部 StreamCompute 平台快速建设集指标监控、告警、任务生命周期管理能力;经过调研测试我们发现 Flink 任务从 0 到 1 接入成本高、开发门槛高,因此,我们开始调研实时计算引擎的解决方案,目标是降低开发门槛、配置化任务接入,最终建设网盘内部的实时计算引擎 Tiangong 来为业务提供更好的支持。


截止至今,Tiangong 计算引擎目前已在数据团队、反作弊团队、用户增长等场景广泛应用,并支持数百万亿的大流量场景。未来我们也计划将基于 Tiangong 建设网盘一体化实时计算平台,从而赋能网盘内部各个业务线实时计算能力建设。

2.2 为什么选择 Flink

网盘实时计算引擎从 Spark Streaming 和 Spark Structured Streaming 演进而来,为什么放弃 Spark 体系选择 Flink 主要从以下几个方面出发:





百度内部实时计算 RoadMap 和状态管理、流批一体、监控告警、任务管理、生态体系等各方面我们选择基于 Flink 建设网盘内部的实时计算平台。

2.3 实时计算引擎

2.3.1 实时计算引擎接入现状

目前,百度网盘的 Tiangong 计算引擎已接入 17+应用场景,高峰时作业处理的吞吐量达到千万/s,而机器规模也已经达到了 1500 台,资源 5800CU,并且已经覆盖用商策略、反作弊、主端一刻用增实时投放等多个场景。

2.3.2 Flink Tiangong 引擎架构

如下图所示的是网盘 Tiangong 实时计算引擎的架构。


  • 最下层为 Runtime 层,负责 Tiangong 计算任务的部署方式,目前支持 StreamCompute、Kubernetes、Yarn、Local 等方式;

  • 核心能力包括 Source 组件Sink 组件以及数据转换引擎

  • Source 组件:支持 Db、Message Queue、BigData 组件、自定义 Source 等多个异构数据源;

  • Sink 组件:支持 Db、Message Queue、BigData 组件、自定义 Sink 等多个异构数据目的地;

  • 数据转换引擎:支持流批一体、自定义配置化数据清洗、精准一次数据处理、失败容错、IOC 容器化管理、自定义 SQL 拓扑、灵活监控告警等能力;



△ Tiangong 计算引擎


功能层面来看,Tiangong 实时计算引擎主要包括作业管理和资源管理。其中,作业部分包括作业配置、作业上线以及作业生命周期管理三个方面的功能。


  • 作业配置方面,则包括运行环境配置、source 配置、sink 配置、清洗逻辑配置以及作业拓扑结构设置;


{    "jobName": "作业名",    "env": {运行环境配置},    "sources": [source端配置],    "udfs": [用户定义函数],    "views": [清洗逻辑],    "coreSql": [核心写入逻辑],    "sinks": [sink端配置],    "customTopology": {自定义作业运行拓扑}}
复制代码


  • 作业发布方面,则包括作业启动、取消以及删除等;



  • 作业状态则包括自定义规则告警、监控大盘等;



△ 自定义规则告警



△ 监控大盘


  • 资源管理方面,利用 StreamCompute 平台能力支持 Flink 集群动态扩缩容能力与灰度发布能力;

2.4 业务场景实践

前面提到实时计算引擎演进过程和实时计算引擎对比,可以看出网盘实时计算引擎更多地会关注在易用性、稳定性和监控告警体系等方面,具体体现的应用场景主要涉及服务端日志、埋点日志、DB Binlog 等场景的实时清洗计算。

2.4.1 网盘实时商业 BI 中心

网盘现阶段缺乏商业收入数据实时分析与商业策略实验实时评估的能力,导致商业策略 AB 实验推全链路往往需要经过周粒度才能完成,建设一套适用于网盘的实时商业 BI 中心有益于加快策略实验迭代与实时商业流水波动分析,助力网盘整体收入增长;



如上图,通过将收银台行为、商业订单、策略实验埋点数据秒粒度接入至实时数仓 Palo 中后,配合数据可视化平台 Sugar 建设商业实时 BI 中心,以此来助力商业策略、商业 PM 等各个角色快速完成 AB 实验快速推全,将天粒度实验收益评估机制优化至分钟粒度,整体实验推全链路由周粒度优化至天粒度;


2.4.1.1 Tiangong 配置化接入

下述案例为 Tiangong 引擎配置化接入商业订单实时流:


  • 实时流数据源配置


{  "sourceType": "bp_source",  "deserializerType": "STRING",  "sourceConfig": {    "parallelism": 20,    "operatorName": "xietong_strategy_businessorder_fr_bp_source",    "metaHost": "host:ip",    "cluster": "demo-cluster",    "username": "username",    "password": "password",    "pipeletName": "demo-pipelet-name",    "pipeletNum": "20",    "startingOffset": {},    "startPoint": "LATEST",    "endOffset": {},    "bpWebServiceAddress": "service_address"  }
复制代码


  • 核心处理逻辑配置


{  "jobName": "netdisk_membership_order_deatils_bp2doris",  "env": {    "streamConfigName": "20p_ck_3s_10fail_env",  ## 环境配置,主要配置Checkpoint间隔和并行度,根据数据量定义,一般为上游消息队列分区倍数    "tableConfig": {}  },  "sources": [    {      "configType": "CONFIG",      "sourceTableName":"membership_order_binlog", ## 数据源配置,bigpipei订单实时流      "sourceConfig": "prod/netdisk_membership_order_bp_source"    },   {         "configType": "SQL",         "sourceConfig": "CREATE TABLE ods_order_info_rt  ## 写入目的地配置,palo写入表                          (                              id               bigint,                              order_no         string,                              user_id          bigint,                              dev_uid          bigint,                              app_id           bigint,                              client_channel   tinyint,                              pay_channel      tinyint,                              product_id       string,                              ....                           ) WITH (               'connector' = 'doris',               'fenodes' = 'host:ip',               'table.identifier' = 'dbName:tableName',               'username' = 'username',               'password' = 'password',               'sink.properties.format' = 'json',               'sink.properties.read_json_by_line' = 'true',               'sink.label-prefix' = 'label-prefix',               'sink.enable-2pc'='true',               'sink.parallelism' = '1'       )"     }  ],  "views": [    {      "name": "binlog_filter_view",  ## 核心数据处理逻辑,纯SQL接入      "sql": "select CAST(JSON_VALUE(new_values, '$.id') as bigint)                 as id,                     JSON_VALUE(new_values, '$.business_no')        as business_no,                     JSON_VALUE(new_values, '$.order_no')           as order_no,                     UNIX_TIMESTAMP()          as write_timestamp,                     .....              FROM membership_order_binlog,                   LATERAL TABLE(BINLOG_NEWVALUES_FILTER(f0))" ## 系统内置Binlog清洗TableFunction    }  ],  "coreSql": "insert into ods_order_info_rt select id, ## 写入下游palo表,写入间隔为Checkpoint间隔,上述配置为3秒,每3秒写一批                                              business_no,                                              order_no,                                              user_id,                                              write_timestamp from binlog_filter_view"}
复制代码

2.4.1.2 可视化监控体系

Flink 作业 UI 监控



Grafana 监控大盘



实时任务监控配置



2.4.2 用户商业策略实时特征

基于商业策略实时核心行为相关特征依赖场景,结合核心行为以及用户付费埋点行为数据建设从 0 点实时累计特征与基于滚动窗口的近 X 分钟实时特征有助力策略侧对用户刚需需求的感知,并结合用户刚需行为个性化出价以此促进整体商业收入。

2.4.2.1 核心方案


  • 如上图,方案二主要将数据流拆为三块,如流数据拼接、热点文件计算、消费行为统计;

  • 流数据拼接:利用 Tiangong 计算引擎,通过 Flink SQL+行为清洗 UDF 函数,将各类行为数据打平为统一格式,并通过 union all 进行聚合,过滤异常数据后行为行为视图,数据流式产出。

  • 热点文件计算:实时将各个 file_md5 的消费次数存储 Flink Map 状态中,并根据离线分析得到的热点文件消费阈值判断热点文件,将热点文件流式写入 Bigpipe 与 Palo 中,数据流式产出,最优可做到毫秒级;

  • 消费行为次数计算:根据热点文件数据流关联用户消费行为,实时对用户消费的文件进行热点/普通归一化处理,后续将每个用户消费不同行为类型的热点/普调次数写入 Flink Map 状态中,累加计算从 0 点至今的文件消费次数,实时写入 Doris 和 Palo 中,最优可做到秒级;

2.4.2.2 技术难点

(1)大状态问题

问题引入


  • 热点文件和用户消费文件次数的计算,都涉及到数据累计的问题,如果将数据存储在共享存储(例如 Redis/Table)这类 kv 存储中,每条数据或每个窗口的数据都需要先查一下上次的计算结果,累加后再写入共享存储中,这从而导致每次计算多一次网络读 IO 操作,故利用 Flink 状态机制,将热点文件和用户消费次数存储在 Flink 状态中,每次判断都在 TaskManger 本地或者内存中,不涉及到网络 IO 操作,故性能更好。

  • 数据都存入 Flink 状态中也导致 Flink 存在大状态问题,从而导致 Checkpoint 耗时过大从而引起任务背压,最终导致数据处理延迟等问题。


解决方案


状态后端优化


  • 选择 Rocksdb 作为状态后端,开启增量 Checkpoint

  • 配置 changelog 状态机制,防止 Rocksdb 定期 Compaction 导致的 Checkpoint 耗时久问题

  • 调整 rocksdb manged 内存大小、rocksdb write buffer 大小


快照存储优化


  • 开启快照压缩配置


状态 TTL 机制


  • 长期为更新的状态做小时粒度更新,防止状态持续增大。

(2)TableStroage 写入性能差

问题引入


  • 因厂内 Table API 创建 Table Client 过程中需要根据特定表对应的机器数创建对应个数的 brpc-client-work-thread、brpc-client-io-thread、fairStrategy-timer-thread 等线程,共计 3*机器数个,网盘特征 Table 存储底层表占用 200 台机器,故创建一个 Table Client 需要创建 600+线程,从而导致 Flink 计算节点的底层 martix 容器线程超限,经过和 StreamCompute 同学沟通需限制 Table Client 的 Rpc 线程数为 1,并对应 Flink 集群的计算节点容器最大线程数由 1000->1500,从而解决线程超限问题。但因限制 Table Client Rpc 线程为 1 导致 Table 整体写入性能偏差。


解决方案


  • 细粒度拆分任务,首先对用户各类行为以及消费的热点/普调资源进行实时计算,后续根据 user_id+行为类型 keyby,并开 3s 窗口,取最新的数据落入 Table,将 3s 一个窗口的数据进行压缩。


优化效果


  • 原本天粒度写入 48 亿+次行为特征优化为 2 亿+次,具体效果如下图:



业务场景大致可以分为实时数仓、实时数据复杂聚合计算、DB 业务数据 CDC 等场景,在这几个场景 Flink 本身就提供高性能、高稳定性的能力,再配合网盘 Tiangong 实时计算引擎不熟悉 Flink 的业务方也可以配置化、低代码的方式快速建设起实时应用。

03 Flink 技术挑战和解决方案

3.1 Flink 底座建设


△ Flink 基建建设


基于 StreamCompute 平台提供的动态扩缩容、任务生命周期管理、Flink 多版本管理、云原生监控告警体系等能力,来快速构建网盘 Flink 实时计算能力。

3.2 实时计算平台建设


△ Tiangong 计算引擎



以上为 Tiangong 计算引擎能力支持,其作为网盘实时计算平台支持目前厂内大部分异构数据源,使用方可以通过简单的配置快速建设实时计算能力,拿上述业务场景实践中的用户商业策略实时特征项目接入 Tiangong 来看,只需下述配置和少量窗口数据聚合逻辑开发即可:


{    "jobName": "business_feature_compute_bp2table", // 作业名    "env": { // 作业运行环境配置        "streamConfigName": "300p_ck_30s_5fail_env",        "tableConfig": {            "stateTtlMs": 600000        }    },    "sources": [  // source配置,download日志        {            "configType": "CONFIG",            "sourceTableName": "idc_log_source",            "sourceConfig": "prod/business_strategy_idc_bp_source"        }    ],    "udfs":[  // 数据清洗转换逻辑,SQL无法完成时通过UDF               {                   "name": "idc_log_filter_func",                   "className": "com.baidu.xxx.IdcLogFilterFunction"               },               {                   "name": "idc_feature_transform_func",                   "className": "com.baidu.xxx.IdcFeatureTransformFunction"               }           ],    "views": [        {            "name": "idc_log_feature_view",            "sql": "select feature_data.event_time as event_time,                           .....                    from (select idc_feature_transform_func(f0) as feature_data                          from idc_log_source                          where idc_log_filter_func(f0) = true) as tmp                    where feature_data.log_time <> '0' and ....        }    ],    "sinks": [ // 双写TableStorage、Doris        {          "sinkConfigNames": ["prod/netdisk_strategy_idc_feature_mi_table_sink","prod/netdisk_strategy_feature_doris_sink"],          "transformSQL": "select event_time,                                   .....                           from idc_log_feature_view",           "watermarkConfig":{  // 涉及开窗逻辑所涉及的watermark配置                "maxOutOfOrdernessMs": 5000,                "idlenessMs": 10000,                "timeAssignerFunctionName": "row_event_time_assigner"           },           // 开窗计算逻辑函数          "rowTransformFunc": "strategyFeatureTransformFunction"        }      ]    }}
复制代码

3.3 自定义作业执行计划

3.3.1 细粒度算子并行度优化


△ 细粒度算子并行度优化


Tiangong 计算引擎本质基于 Flink SQL+Table API+DataStream API 做的混合计算引擎,其本质相当于 Flink SQL,因此一旦定义好 Source 和 Sink 并行度后,其任务所涉及的计算、清洗、聚合等算子都与 Source 端并行度一致,从而导致如果想要增加清洗等算子的并行度需要把 Source 的并行度也增加,从而造成资源浪费、性能降低等问题。

3.3.2 分区关系优化


△ 分区关系优化


作业内上下游算子连接数过多,会占用较大的 Network buffer 内存,从而影响作业的正常启停,基于自定义 SQL 执行计划能力,我们可以手动将 Rebalance 边修改为 Rescale。


比如上图的示例,左边上游算子有 500 个并发,而下游的 Sink 算子只有 200 个并发。在这种场景下,Flink SQL 会默认生成 Rebalance 的连接方式,共需 500*200,共 10 万个逻辑连接。


通过自定义 SQL 执行计划能力,我们手动将 Rebalance 设置为 Rescale 后,它只需要 500 个连接,大大降低了 Network buffer 的内存需求。

3.3.3 资源共享策略优化

3.3.3.1 资源共享

  • 默认情况下,flink 允许 subtask 共享 slot,即使是不同 task 的 subtask,这样的结果是一个 slot 可以保存作业的整个管道。

  • 如果是同一步操作的并行 subtask 需要放到不同的 slot,如果是先后发生的不同的 subtask 可以放在同一个 slot 中,实现 slot 的共享。



△ Slot 与 Task 的关系

3.3.3.2 自定义共享策略


△ 资源共享策略优化


支持按照算子类型将算子划分到一个 slot group 中,从而来减少数据的跨网络传输、提升资源利用率以及提升计算性能等。

3.3.4 算子名称优化

Flink SQL 不支持为每个算子自定义名称,从而导致算子名是根据系统规则来生成的,从而导致算子名称不能够通俗的表示其具体含义。为了便于作业维护和管理,自定义作业执行计划支持算子名称优化。

04 未来展望


△ 未来展望

4.1 实时计算平台化

目前 Tiangong 计算引擎的使用方式主要在公共代码库提交任务配置和 UDF 代码的方式接入,使用方需要拥有 Tiangong 计算引擎的代码库权限,存在代码安全和任务隔离性差等问题,后续我们计划基于 Tiangong 计算引擎搭建网盘自己的实时化计算平台,实现页面低代码方式快速接入实时任务。

4.2 实时 DTS 平台

目前网盘主要使用厂内 DTS 平台,通过增量 binlog 和全量 select 快照方式采集数据至下游 AFS,整体链路为 DTS->AFS->UDW,一旦上游表格式变化下游的采集任务就会失败,因此整体稳定性、维护成本和性能都过差。因此我们计划基于 Tiangong 计算引擎构建实时 DTS 平台,具体架构如下:



△ RealTime-DTS 架构

用户头像

百度Geek说

关注

百度官方技术账号 2021-01-22 加入

关注我们,带你了解更多百度技术干货。

评论

发布
暂无评论
百度网盘基于Flink的实时计算实践_百度Geek说_InfoQ写作社区