写点什么

Flink X Hologres 构建企业级 Streaming Warehouse

  • 2023-02-14
    浙江
  • 本文字数:8337 字

    阅读完需:约 27 分钟

Flink X Hologres构建企业级Streaming Warehouse

摘要:本文整理自阿里云资深技术专家,阿里云 Hologres 负责人姜伟华,在 FFA 实时湖仓专场的分享。点击查看>>视频回放

本篇内容主要分为四个部分:

一、实时数仓分层的技术需求

二、阿里云一站式实时数仓 Hologres 介绍

三、Flink x Hologres:天作之合

四、基于 Flink Catalog 的 Streaming Warehouse 实践



一、实时数仓分层的技术需求

首先,我们讲一讲数仓的分层技术以及分层技术的现状。

1、实时数仓分层技术现状

大数据现在越来越讲究实时化,在各种场景下都需要实时,例如春晚实时直播大屏,双 11 GMV 实时大屏、实时个性化推荐等场景,都对数据的实时性有着非常高的要求。为了满足业务的实时性需求,大数据技术也开始逐步发展出实时数仓。



但如何构建实时数仓呢?

相比离线数仓,实时数仓没有明确的方法论体系。因此在实践中,有各种各样的方法,但没有一个方法是万能。最近行业内提出了 Streaming Warehouse 的概念。Streaming Warehouse 的本质是分层之间能够做到实时数据的流动,从而解决实时数仓分层的问题。



下面,我们先来了解下实时数仓的主流分层方案。

2、实时数仓主流分层方案

实时数仓的主流分层方案主要有 4 个。

方案 1:流式 ETL

ETL(Extract- Transform-Load)是比较传统的数据仓库建设方法,而流式 ETL 就是指:实时数据经过 Flink 实时 ETL 处理之后,将结果写入到 KV 引擎中,供应用查询。而为了解决中间层不方便排查的问题,也需要将中间层数据同步到实时数仓中供分析之用。最常见的做法就是数据通过 Flink 清洗后,写到 Kafka 形成 ODS 层。再从 Kafka 消费,经过加工形成 DWD 层。然后 Flink 加工成 DWS 层,最后通过加工形成 ADS 层的数据写到 KV 引擎并对接上层应用。因为直接使用 Kafka 数据进行分析和探查很麻烦,所以也会同步一份 Kafka 数据到实时数仓,通过实时数仓进行分析和探查。

这个方案的优势是层次明确,分工明确。但劣势是需要有大量的同步任务、数据资源消耗很大、数据有很多冗余、处理链路较复杂需要很多的组件。除此之外,这个方案构建的实时数仓分层,尤其是 Kafka 分层,复用性非常差,也没办法响应 schema 的动态变化。



方案 2:流式 ELT

而流式 ELT 则是将计算后置,直接将明细数据写进实时数仓(EL),不需要严格的数仓分层,整个架构只需要一层,上层应用查询的时候进行数据的变换(T)或者分层。常见的做法就是把数据加工清洗后,写到实时数仓里,形成 DWD 层,所有的查询都基于 DWD 层的明细数据进行。

这个方案的好处在于,没有 ETL,只有一层;数据修订很方便。但它的弊端有两个方面:

  • 在查询性能方面,由于是明细数据查询,所以在某些场景下不能满足 QPS 或延迟的要求。

  • 因为没有严格的数仓分层,所以数据复用很困难,很难兼顾各方面的诉求。



方案 3:定时调度

既然实时流式无法完成数据的实时数仓分层,我们可以将数据实时写入实时数仓的 DWD 层。DWS 层、ADS 层用离线的高频调度方法,实现分钟级的调度,从而借用离线数仓,进行分层构造。这个也就是业界常用的方案 3。

这个方案的好处在于可以复用很多离线经验,方案成本低且成熟。但方案也存在如下缺点:

  • 延迟大:每一层的延迟都跟调度相关,随着层次越多,调度延迟越大,实时数仓也变成了准实时数仓。

  • 不能完全复用离线方案:离线调度一般是小时级或天级,我们可以使用全量计算。但在分钟级调度时,必须做增量计算,否则无法及时调度。



方案 4:实时物化视图

第 4 种方案就是通过实时数仓的物化视图能力实现数仓分层。常见的做法就是 Flink 实时加工后,将数据写到实时数仓形成 DWD 层,DWS 层或 ADS 层的构造依赖于实时数仓的实时物化视图能力。

现在主流实时数仓都开始提供物化视图的能力,但本质上都是提供了一些简单的聚合类物化视图。如果物化视图的需求比较简单,可以利用实时数仓里的实时物化视图能力,将 DWS 层到 ADS 层的构建自动化,从而让物化视图的查询保证较高的 QPS。但这个方案最大的缺点在于,现在的实时物化视图技术都还不成熟,能力有限,支持的场景也比较有限。



二、阿里云一站式实时数仓 Hologres 介绍

接下来,先介绍一下阿里云一站式实时数仓 Hologres 产品。Hologres 是阿里云自研的一站式实时数仓,它同时包含三种能力:

  • OLAP 能力:同传统的实时数仓一样,可以支持数据的实时写入、以及复杂 OLAP 实时多维分析快速响应,满足业务的极致数据探索能力。

  • 在线服务 Serving(KV):可以支持 KV 查询场景,提供非常高的 QPS 和毫秒级的低延迟。

  • 湖仓一体:能够直接查询数据湖的数据,以及能够加速阿里云离线数仓 MaxCompute,助力业务更低成本实现湖仓一体。



下面为具体介绍:

  • 首先,大家可以把 Hologres 当做一个常见的实时数仓。它的特点在于写入侧支持百万 RPS 的实时写入,写入即可查,没有延迟。同时也支持高性能的实时整行更新和局部更新。其中,整行更新是把整行替换掉,局部更新可以更新一行中的局部字段,二者都是实时更新。

  • 在查询侧,一方面支持复杂的 OLAP 多维分析,可以非常好的支持实时大屏、实时报表等场景。近期 Hologres 拿到了 TPC-H 30TB 的性能世界第一的 TPC 官方认证成绩,见>>阿里云 ODPS-Hologres刷新世界纪录,领先第二名23%。其次,Hologres 也支持在线服务查询,不仅支持百万 QPS KV 点查,而且也支持阿里云达摩院的 Proxima 向量检索引擎,可以支持非常高效的向量检索能力。同时这些能力在 Hologres 中是全用 SQL 表达,对用户使用非常友好。此外,为了兼顾数据服务和实时数仓的需求,Hologres 在行存、列存的数据格式基础上,也支持行列共存,即行列共存的表即一份行存,又有一份列存,并且系统保证这两份数据是强一致的,对于 OLAP 分析,优化器会自动选择列存,对于线上服务,会自动选择行存,通过行列共存可以非常友好的实现一份数据支撑多个应用场景。

  • 因为 Hologres 同时支持 OLAP 分析和线上服务,其中线上服务要求非常高的稳定性和 SLA。为了保证 OLAP 分析和线上服务时不会发生冲突,我们支持了读写分离,从而实现 OLAP 与数据服务的强隔离。

  • 最后,在湖仓数据交互式分析方面,Hologres 对阿里云 MaxCompute 离线数仓里的数据,数据湖中的数据都可以秒级交互式分析,且不需要做任何的数据搬迁。

  • 除此之外,Hologres 的定位是一站式的企业级实时数仓,所以除了上述能力,我们还有很多其他能力。包括数据的治理、成本治理、数据血缘、数据脱敏、数据加密、IP 白名单、数据的备份和恢复等等。



三、Flink x Hologres:天作之合

1、Hologres 与 Flink 深度集成

Flink 对于实时数仓能够提供非常丰富的数据处理、数据入湖仓的能力。Hologres 与 Flink 有些非常深度的整合能力,具体包括:

  • Hologres 可以作为 Flink 的维表:在实时计算的场景下,Flink 对维表的需求很强,Hologres 支持百万级至千万级 RPS 的 KV 点查能力,可以直接当做 Flink 维表使用,且可以做到实时更新,对于像实时特征存储等维表关联场景就也可以非常高效的支持。

  • Hologres 可以作为 Flink 的结果表:Hologres 支持高性能的实时写入和整行实时更新的能力,可以结合 Flink,输出需要强大的 Update 能力,满足数仓场景下的实时更新、覆盖等需求。与此同时,Hologres 还有很强的局部更新能力。局部更新能力在很多场景下,可以替代 Flink 的多流 Join,为客户节省成本。

  • Hologres 可以作为 Flink 的源表:Hologres 支持 Binlog 能力,一张表的任何变化,比如 insert、update、delete 等等,都会产生 Binlog 事件。Flink 可以订阅 Hologres Binlog,进行驱动计算。由于 Flink 支持 Hologres 的整表读取,二者结合构成了 Flink 全增量一体化的读取能力。并且,Hologres 也对了接 Flink CDC,它可以驱动 Flink CDC 的计算。

  • 支持 Hologres Catalog:通过 Hologres Catalog 的任何操作,都会直接实时反映到 Hologres 里,用户也不需要在 Flink 建 Hologres 表,这样就使得 Flink+Hologres 就具备了整库同步、Schema Evolution 的能力。



2、基于 Flink+Hologres 的 Streaming Warehouse 方案

那 Flink 和 Hologres 如何构建 Streaming Warehouse?

Streaming Warehouse:数据能在数仓之间实时的流动,本质上就是解决实时数仓分层的问题

最开始我们介绍了常见的数仓分层方案,Flink+Hologres 的 Streaming Warehouse 方案则是可以完全将 Flink+Kafka 替换。具体做法如下:

  1. 将 Flink 写到 Hologres 里,形成 ODS 层。Flink 订阅 ODS 层的 Hologres Binlog 进行加工,将 Flink 从 DWD 层再次写入 Hologres 里。

  2. Flink 再订阅 DWD 层的 Hologres Binlog,通过计算形成 DWS 层,将其再次写入 Hologres 里。

  3. 最后,由 Hologres 对外提供应用查询。

该方案相比 Kafka 有如下优点:

  • 解决了传统中间层 Kafka 数据不易查、不易更新、不易修正的问题。Hologres 的每一层都可查、可更新、可修正。

  • Hologres 的每一层都可以单独对外提供服务。因为每一层的数据都是可查的,所以数据的复用会更好,真正实现数仓分层复用的目标。

  • Hologres 支持数据复用,模型统一,架构简化。通过 Flink+Hologres,就能实现实时数仓分层,简化架构和降低成本。



3、Flink+Hologres 核心能力:Binlog、行列共存、资源隔离

上面讲的 Flink+Hologres 的 Streaming Warehouse 方案,其强依赖于以下三个 Hologres 核心能力:

  1. Binlog:因为实时数仓一般没有 Binlog,但 Hologres 提供了 Binlog 能力,用来驱动 Flink 做实时计算,正因为有了 Binlog,Hologres 才能作为流式计算的上游。

  2. 行列共存。一张表既有行存数据,又有列存数据。这两份数据是强一致的。行列共存的特性让中间层的每张表,不但能够给 Flink 使用,而且可以给其他应用(比如 OLAP、或者线上服务)使用。

  3. 资源强隔离。实时数仓一般是弱隔离或软隔离,通过资源组、资源队列的方法实现资源隔离。如果 Flink 的资源消耗很大,可能影响中间层的点查性能。但在 Hologres 强隔离的能力下,Flink 对 Hologres Binlog 的数据拉取,不会影响线上服务。

通过 Binlog、行列共存、资源强隔离的三个特点,不仅能让 Flink+Hologres 形成 Streaming Warehouse,并且能够使中间的每层数据复用,被其他应用或线上服务使用,助力企业构建最简单最完整的实时数仓。



4、基于 Flink+Hologres 的多流合并

接下来,讲一讲基于 Flink+Hologres 的多流合并。

因为 Hologres 有特别强大的局部更新能力,基于此我们可以简化 Flink 的多流 Join。比如在风控场景下,我们需要基于用户 ID 构建用户的多侧面画像,用户画像来自很多数据源,比如客户的浏览行为、成交行为、履约行为等等。把数据源的数据按照用户 ID,把每个用户放到一行里,形成不同的字段,形成用户的完整画像。

传统的方式需要用 Flink 多流 Join 实现,Flink 把上游的多个数据源关联到一起,Join 后写到 Kafka 里,然后驱动下游的 Flink,加工这行完整的数据。这就使得多流 Join 非常耗资源。

所以在 Flink+Hologres 的 Streaming Warehouse 方案中,可以利用 Hologres 的局部更新能力,把一张表定为定义成 Hologres 的行存表或行列共存表。此时,整个方案就简化成上游每个数据源,同步数据到 Hologres 表的若干个字段里,若干个任务同时写入这张表,然后利用 Hologres 的局部更新能力,把数据汇总在一起。

如果打开这张 Hologres 表的 Binlog,上游任何数据源的变化都会更新这张表,使这张表的 Binlog 中生成行数据的最新状态,然后驱动下游的 Flink 继续计算,从而完美匹配常见的风控场景。这种用法下,资源消耗、运维都得到了极大的简化。



四、基于 Flink Catalog 的 Streaming Warehouse 实践

Flink+Hologres 的 Streaming Warehouse 方案已经非常成熟,但唯一的缺点在于,用户需要在两个系统之间切换,过程比较繁琐。为了让用户操作更简单,我们基于 Flink Catalog 提供了更加简单的使用体验。

下面我们来看看怎么样基于 Flink Catalog 去构建基于 Flink+Hologres 的 Streaming Warehouse。我们会发现,有了 Flink Catalog 后,整个使用体验会很简单,并能充分发挥 Flink 和 Hologres 两个产品的强大能力。

下图是一个典型的 Flink+Hologres 实时 ETL 链路:

  1. ODS 层、DWD 层、ODS 层的数据都存在 Hologres 中。

  2. 链路中所有的数据加工都是通过 Flink SQL 完成。在整个 ETL 链路中,用户不需要任何 Hologres SQL,直接写 Flink SQL 即可。

  3. Flink 用户可以通过 Flink SQL 对每层中的 Hologres 数据进行数据探查(流模式和批模式都可以)。比如:当我们发现 DWS 层的数据结果出现问题,需要查看哪层的结果有问题或逻辑有错误。此时,我们可以复用原来的 Flink SQL 来进行探查、定位或者数据重新消费。

  4. Hologres 中的每层数据都可以对外提供查询和服务(通过 Hologres SQL)。



接下来,以某个电商场景为例,演示一下基于 Flink Catalog 的 Streaming Warehouse。如下图所示,有一个 MySQL 数据库作为订单库,里面有订单表 orders、订单支付表 orders_pay、以及产品品类表 product_catalog。



  1. 第一步,我们通过 Flink 的实时数仓,把数据实时同步到 Hologres 里,形成 ODS 层。

  2. 第二步,加工 DWD 层。将 DWD 层的数据写到 Hologres 里。在这个过程中,我们需要把订单表和订单支付表,合并成一张表,实现多路合并。与此同时,我们希望 orders 表关联商品品类表 product_catalog。

  3. 第三步,驱动下游计算,构建 DWS 层。以用户维度和商店维度,收集统计数据。比如用户每天的订单金额和商店每天的订单金额,从而形成一条完整的链路。

  4. 第四步,将 DWS 层的表推荐给系统使用。作为用户和商店的特征,用做推荐用途。

  5. 第五步,DWD 层的表能够直接用来做实时统计分析、统计产品、实时大屏、实时报表。

上图中的绿色链路,全部使用 Flink SQL 完成。橙色链路对外提供服务,由 Hologres SQL 完成。


接下来,讲一讲每个步骤是如何运行的。


第一步,在 Flink 实时数仓,形成 ODS 层。首先,创建一个 Hologres 的 Catalog。MySQL 中存储订单、支付以及商品信息 3 张表,通过 Flink Catalog 功能,将 MySQL 整库的数据实时同步至 Hologres,形成 ODS。相关代码如下所示。我们可以看到,MySQL 整库同步到 Hologres,通过 Flink SQL 来表达是非常简单的。

-- 创建Hologres CatalogCREATE CATALOG holo WITH ( ‘type’ = ‘hologres’ … );
-- MySQL整库同步到HologresCREATE DATABASE IF NOT EXISTS holo.order_dw AS DATABASE mysql.sw INCLUDING all tables;
复制代码

第二步,DWD 实时构建。数据实时写入 ODS 层后,Flink 读取 Hologres Binlog,并用多流合并、维表关联将订单、交易、商品 3 个表打成一个大宽表,实时写入至 Hologres 的订单汇总表中,形成 DWD 层。

如下 SQL 是 DWD 层表的建表语句。这张目标表包含了来自 orders、orders_pay、product_catalog 的字段,关联了相关的用户信息、商户信息、订单信息、商品品类信息等等,形成了一张宽表。

CREATE TABLE holo.order_dw.dwd_orders (  order_id bigint not null primary key,  --字段来自order 表  order_user_id bigint,  order_shop_id bigint,  order_product_id string,  order_fee numeric(20,2),  order_create_time timestamp_ltz,  order_update_time timestamp_ltz,  order_state int,  --字段来自product_catalog表  order_product_catalog_name string,  --字段来自orders_pay表  pay_id bigint,  pay_platfrom int,  pay_create_time timestamp_ltz) ;
复制代码

下面的 SQL 是真正的计算逻辑,这里包含两个 INSERT 语句:

  • 第一个 INSERT 语句是从 orders 表实时打宽后写入。这里用到了 Hologres 的维表关联能力。实时打宽后,写入目标表的部分字段。

  • 第二个 INSERT 语句是从 orders_pay 表实时同步到同一张目标表,更新另外一些字段。

这两个 INSERT 语句最大的关联在于,它们写的是同一张表,会自动利用目标表的主键 ID 进行关联。每个 INSERT 都是做了目标表的局部更新,两者的合力结果是实时更新的目标宽表。

BEGIN STATEMENT SET;-- 从orders表实时打宽后写入INSERT INTO holo.order_dw.dwd_orders (order_id,order_user_id,order_shop_id,order_product_id,order_fee,order_create_time,order_update_time,order_state,order_product_catalog_name)SELECT o.*,dim.catalog_nameFROMholo.order_dw.orders o LEFT JOIN holo.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime () AS dimONo.product_id = dim.product_id;
-- 从order_pays表实时写入INSERT INTO holo.order_dw.dwd_orders (pay_id,order_id,pay_platform,pay_create_time)SELECT *FROMholo.order_dw.orders_pay;END;
复制代码

第三步,DWS 层的实时聚合。在 DWD 的基础上,通过 Flink 读取 Hologres DWD 的 Binlog 数据,进行实时指标聚合计算,比如按照用户维度聚合,按照商户维度聚合等,然后实时写入 Hologres,形成 DWS 层。

  • 先是创建对应的聚合指标表,DDL 语句如下

-- 用户维度聚合指标表CREATE TABLE holo.order_dw.dws_users (  user_id bigint not null,  ds string not null,  -- 当日完成支付总金额    payed_buy_fee_sum numeric(20,2) not null,        primary key(user_id,ds) NOT ENFORCED);-- 商户维度聚合指标表CREATE TABLE holo.order_dw.dws_shops (  shop_id bigint not null,  ds string not null,  -- 当日完成支付总金额  payed_buy_fee_sum numeric(20,2) not null,       primary key(shop_id,ds) NOT ENFORCED);
复制代码
  • 然后将数据写入 Hologres 中,经过简单的三步后,Flink SQL 构建了完整的 Streaming Warehouse 分层体系。

--数据写入HologresBEGIN STATEMENT SET;INSERT INTO holo.order_dw.dws_usersSELECT   order_user_id,  DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,  SUM (order_fee)FROM holo.order_dw.dwd_orders cWHERE pay_id IS NOT NULL AND order_fee IS NOT NULLGROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');INSERT INTO holo.order_dw.dws_shopsSELECT   order_shop_id,  DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,  SUM (order_fee)FROM holo.order_dw.dwd_orders cWHERE pay_id IS NOT NULL AND order_fee IS NOT NULLGROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');END;

复制代码


第四步,构建应用,基于 DWS 层,对外提供服务。

数据的分层和加工完成后,业务就可以通过 Hologres 查询数据并应用。在这个例子里,推荐系统要求非常高的点查性能,所以要求百万级的 QPS 检查能力。Hologres 的行存表或者行列共存表完全可以满足。

这个方案和传统的实时数仓最大的差别是:传统的实时数仓只有最后一层的数据,可对外提供服务。而在 Hologres 里,DWD 等中间层数据也可以对外提供服务,进行实时报表统计。用户可以在中间层进行查询操作,对接各种实时应用、实时大屏。比如

  • 直接查 DWD 层的数据,典型的如根据用户 ID 返回推荐商品(KV 场景)

--场景4: 根据用户特征推荐商品SELECT * FROM dws_users WHERE user_id = ? AND ds = '2022-11-09’;
--场景4: 根据店铺特征推荐商品
SELECT * FROM dws_shops WHERE shop_id = ? AND ds = '2022-11-09’;
复制代码
  • 实时报表查看订单量和退单量(OLAP)。

--场景6:基于宽表数据展示实时报表-- 最近30天,每个品类的订单总量和退单总量SELECTTO_CHAR(order_create_time, 'YYYYMMDD'),order_product_catalog_name,COUNT(*),COUNT(CASE WHEN refund_id IS NOT NULL THEN 1 ELSE null END)FROMdwd_ordersWHEREorder_create_time > now() - '30 day' :: intevalGROUP BY1, 2ORDER BY1, 2;
复制代码

第五步,问题排查:Flink 数据探查。如果某个业务指标出现异常,Flink 可以直接探查每层表的数据来快速定位。比如用 Flink 探查 Hologres DWD 层的 orders 表。Hologres 支持 Flink 的流模式和批模式对数据的探查。

由于流模式是 Flink 的默认模式,因此我们不需要设置执行模式。它可以直接记录数据变化,从而非常方便的查看数据异常。流模式可以探查获取一段时间范围内的数据及其变化情况。

-- 流模式探查 SELECT   * FROM holo.order_dw.dwd_orders /*+ OPTIONS('cdcMode'='false', 'startTime'='2022-11-09 00:00:00') */ c WHERE   user_id = 0;
复制代码

与此同时,批模式探查是获取当前时刻的最新数据。Hologres 也支持 Flink 批模式的数据探查。批模式和流模式的区别在于,流模式关注的是变化,批模式关注的是表中的最新状态。

-- 批模式探查 set 'execution.runtime-mode'='batch’;
SELECT * FROM holo.order_dw.dwd_orders WHERE user_id = 0 AND order_create_time>'2022-11-09 00:00:00';
复制代码

五、总结

Hologres 跟 Flink 深度集成。实现完整的 Streaming Warehouse 方案,该方案有如下明显优势:

  1. 一站式:全链路都可以用 SQL 表示,并且只需要用到 Flink 和 Hologres 两个组件,操作非常方便。实时 ETL 链路、数据分层完全可以用 Flink SQL 实现,Hologres 提供对外提供在线服务和 OLAP 查询,每层数据可复用、可查,方便构建实时数仓的数据分层和复用体系。

  2. 高性能:这种方案可以使得使得 Hologres 发挥极致的实时写入、实时更新能力和多维 OLAP、高并发点查能力,Flink 发挥实时加工能力。

  3. 企业级:自带多种企业级能力,不仅运维更简单,可观测性更好,安全能力更强,也提供多种高可用能力,从而企业更加方便的构建企业级的 Streaming Warehouse。



发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2020-10-15 加入

分享阿里云计算平台的大数据和AI方向的技术创新和趋势、实战案例、经验总结。

评论

发布
暂无评论
Flink X Hologres构建企业级Streaming Warehouse_大数据_阿里云大数据AI技术_InfoQ写作社区