写点什么

不用再写 FlinkSQL 了,使用 XL-LightHouse 轻松实现海量数据实时统计

作者:feng
  • 2023-08-01
    美国
  • 本文字数:8627 字

    阅读完需:约 28 分钟

概述

  • XL-LightHouse 是针对互联网领域繁杂的流式数据统计需求而开发的一套集成了数据写入、数据运算、数据存储和数据可视化等一系列功能,支持大数据量,支持高并发的【通用型流式大数据统计平台】;

  • XL-LightHouse 目前已涵盖了常见的流式数据统计场景,包括 count、sum、max、min、avg、distinct、topN/lastN 等多种运算;

  • XL-LightHouse 支持多维度计算,支持分钟级、小时级、天级多个时间粒度的统计,支持自定义统计周期的配置;

  • XL-LightHouse 内置丰富的转化类函数、支持表达式解析,可以满足各种复杂的条件筛选和逻辑判断;

  • XL-LightHouse 支持时序性数据的存储和查询;

  • XL-LightHouse 是一套功能完备的流式大数据统计领域的数据治理解决方案,它提供了比较友好和完善的可视化查询功能,并对外提供 API 查询接口,此外还包括数据指标管理、权限管理、统计限流等多种功能。

背景

以互联网行业来说,在移动互联网发展比较成熟的现在,流量见顶,红利消失,企业竞争日趋惨烈,获取新增用户的成本日益增高。很多企业开始意识到不能一味的通过补贴、价格战、广告投放这种简单粗暴的方式抢占市场,这样的运作模式很难长时间维系。而通过精细化和数据化运营来降低成本、提升效率、最大化单用户价值的理念逐渐被越来越多的企业所接受。精细化和数据化运营的前提是要建立起一套完善的数据指标体系,借助这个数据指标体系企业可以有多方面的用途:


  • 1、排查问题:数据化运营是让企业业务进入到一种"可控"的状态,帮助企业在业务运转不正常的时候,能够快速的判断出问题所在。

  • 2、业务洞察:数据化运营是让业务运转的各个环节更加透明,帮助企业更清晰的看到目前的"短板"是在什么地方,辅助产品的优化迭代。

  • 3、明确方向:数据化运营是培养敏锐的嗅觉,让企业可以更加准确的判断出市场的走势、捕捉到其中具有业务价值的信息。

  • 4、科学试错:在试错成本日益高企的今天,数据化运营是帮助企业改变以往靠"拍脑袋"来做决定的方式,打破过往的经验主义,辅助决策者思考,快速验证想法,让企业减少成本更加科学的"试错"。


随着企业对数据化运营重视程度的日益增加,必然会衍生出大量的数据统计需求。而 XL-LightHouse 是以流式大数据统计为切入点,推动流式统计在诸多行业内的快速普及和大规模应用,定位是以一套服务使用较少的服务器资源同时支撑数以万计、数十万计的流式数据统计需求的大数据平台,致力于应对这种呈现"井喷"态势的流式数据统计需求所带来的一系列问题,寄希望于通过更加贴合场景、更具有实用价值的技术方案帮助企业降低数据化运营方面的成本。

收益

XL-LightHouse 代表着一种以通用型流式大数据统计技术为切入点,低成本实现企业数据化运营的理念。它可以帮助企业更快速的搭建起一套较为完善的、稳定可靠的数据化运营体系,节省企业在数据化运营方面的投入,主要体现在以下几个方面:


  • 减少企业在流式大数据统计方面的研发成本和数据维护成本。

  • 帮助企业节省时间成本,辅助产品的快速迭代。

  • 为企业节省较为可观的服务器运算资源。

  • 便于数据在企业内部的共享和互通。

  • 此外,XL-LightHouse 对中小企业友好,它大大降低了中小企业使用流式大数据统计的技术门槛,通过简单的页面配置和数据接入即可应对繁杂的流式数据统计需求。

架构


XL-LightHouse 包括如下几个模块:


  • Client 模块,业务方接入 SDK,用于上报统计原始消息数据;

  • RPC 模块,功能包含接收客户端上报的统计消息数据,对外提供统计结果查询接口;

  • Tasks 运算模块,功能包含封装各种流式统计运算场景,执行限流规则判断,解析各统计项的配置信息,消费消息数据并按统计配置进行计算以及保存统计结果;

  • Web 模块,功能包含对统计组和统计项进行管理维护、查看统计结果、设置限流规则和管理统计指标访问权限。

系统设计

XL-LightHouse 是通用型流式大数据统计平台,它将流式数据统计需求抽象分类成多种运算场景,并对各种运算场景进行高性能的实现从而让每一种运算可以达到无限制复用的效果。XL-LightHouse 使用【统计工程-统计组-统计项】的三层结构来管理所有统计需求。每一个统计需求叫做一个统计项,每个统计项都是基于一种或多种运算场景。用户可根据需要创建若干个统计工程,每个统计工程可包含多个统计项,而基于同一份元数据的多个统计项叫做一个统计组。



Web 模块可管理统计项的运行状态,用户可在 Web 端页面启动、停止、删除指定的统计项,处于运行状态的统计项正常执行统计运算,非运行状态的统计项不执行统计运算。接入系统首先需要用户在 Web 端进行相应配置,然后通过 SDK 上报原始数据。系统将统计原始消息数据按照统计周期划分成若干个批次再依据统计配置进行相应计算。

1、自定义流式统计规范(XL-Formula)

SQL 规范在大数据查询和统计分析方面被广泛应用,SQL 在离线数据分析、OLAP、OLTP 等领域都具有不可撼动的地位。而且随着 FlinkSQL 和 SparkSQL 等组件功能的日趋完善,SQL 在流式统计领域也开始被越来越多的使用。但是由于 SQL 本身是基于数据表的概念进行数据处理,不可避免需要存储较多的原始数据和中间态数据在内存中,造成较高的内存浪费;分布式 SQL 在数据处理过程中会触发 Shuffle,造成大量的网络传输,影响执行效率;SQL 在一些分组聚合操作可能引起较为严重的数据倾斜,对程序的正常执行造成影响,很多 SQL 计算任务需要依据数据量和运算逻辑进行特定优化;针对特定的统计需求需要执行单独的计算任务,不同统计任务之间运算资源不能共用,从而造成较高的计算资源浪费;SQL 语法过于臃肿和复杂、不够清晰简洁、多过滤条件的组合逻辑需要依赖较长的 SQL 语句来实现,不便于理解,书写较长 SQL 语句容易出错;SQL 函数定制化功能扩展不够方便;SQL 开发相对较复杂,实现相同功能 SQL 可能会有多种写法,不同写法执行和解析效率也各有差异。这些问题使得相应功能的实现需要依赖专业的数据研发人员,导致流式统计任务研发成本高、周期长。当企业数据指标呈现指数级增长时,SQL 规范的瓶颈也将凸显出来,需要耗费大量的研发成本、数据维护成本和服务器运算成本。我认为 SQL 规范的这些问题限制了它在流式统计这个细分场景内的快速扩张,使得 SQL 在这个细分领域内的应用基本局限在定制化需求开发的范围之内。从一定程度上来说 SQL 规范已经阻碍了流式统计的发展,制约了流式统计在各行业内的快速普及和大规模应用。XL-LightHouse 作为一个通用型流式大数据统计平台,侧重于帮助企业解决繁杂的流式数据统计问题。XL-LightHouse 并没有拘泥于现行的大数据领域的业内标准,而是寄希望通过使用更为轻巧的技术方案解决目前企业所面对的问题。它定义了一套较为完善的用于描述形式各样的流式统计需求的配置规范,通过各个属性的组合可以实现非常强大的统计功能,从而帮助企业更快速的搭建起一套较为完善的、稳定可靠的数据化运营体系。

2、消息聚合处理

系统将整个数据消费链路分成以下基本环节:Client 模块上报消息数据环节、RPC 模块处理消息数据环节、运算模块执行展开和分组操作环节、统计结果存储环节。在每个环节系统使用异步处理、批量消费、对重复性计算进行聚合处理的方案。各环节接收到消息后放入消息缓冲池,系统依据各环节的预定义聚合逻辑将消息划分成不同的计算类型,对单节点单进程内相同类型的消息进行聚合处理。这种设计可以减少数据向下游传输、提升网络 IO 效率、又可以直接减少下游运算量以及 DB 的写入压力。从 Client 端发送消息到最终的统计结果入库中间的每个环节都对重复性消息进行聚合处理尽可能减少消息量,并且将与下游运算无关的参数都会尽早抛弃掉,XL-LightHouse 的数据消费链路是一个逐层递减的结构。各个环节的消息聚合逻辑略有不同,以 Client 模块为例消息聚合主要包括以下内容:(1)消息体参数裁剪为了提高消息的传输速度并提升后续步骤消息聚合效率,Client 模块需要对原始消息进行裁剪操作,其目的是去掉统计无关字段。统计无关字段是系统根据各统计组下所有有效统计项计算得来,对于与所有有效统计项均不相关的字段在 Client 模块上报数据之前将其过滤掉,避免非必要的数据传输。(2)篡改消息体时间戳 Client 模块上报消息环节在执行聚合操作前修改消息原始时间戳为最小批次时间,其目的是为了后续步骤中在保证数据准确性的前提下能够将尽可能多的消息聚合到一起,减少网络传输和下游运算量。Client 模块以当前统计组下所有有效统计项的统计周期的最大公约数为时间窗口,按照该时间窗口和消息原始时间戳计算得到消息所对应的最小批次时间。Client 模块将消息原来的时间戳修改为最小批次时间然后放入缓冲池。(3)聚合操作聚合操作即为将同类型消息按预定义聚合逻辑合并到一起。不同环节的聚合逻辑略有不同,Client 模块的聚合逻辑是指消息内容一致的消息,即为相同统计组、相同参数值的消息。原始消息发送到缓冲池后消费线程组定时从缓冲池中批量读取消息,并将其中符合聚合规则的消息聚合到一起。经过聚合操作后消息体的数据结构由单条消息体内容变更为消息体内容和消息体重复次数两个属性。

3、消息展开与分组

在 XL-LightHouse 中集群内的所有统计任务共用集群运算资源,运算模块接收到数据后对统计消息进行展开和分组操作。


  • 消息展开操作


在大多数业务场景中针对一份元数据往往有多个数据指标,统计组下的所有统计项共用一份原始数据消息。展开操作即为查询统计组下所有有效统计项,提取各统计项的关联字段,为各统计项复制一份单独的消息数据并只保留其运算相关字段的过程。展开操作的目的是为了避免各统计项的后续运算逻辑相互之间产生影响。


  • 消息分组操作


分组操作即为提取统计项的统计周期属性,依据统计周期划分时间窗口并按时间窗口对展开操作后的消息进行分组;然后判断统计项是否包含多个统计运算单元,如果包含多个统计运算单元则按统计运算单元进行再分组;判断统计项是否包含维度属性,如包含维度属性则提取维度信息并按维度进行再分组。分组操作的目的在于将各统计任务的运算过程进行分解,拆分成不同的计算类型,同类型消息聚合处理,不同类型的消息运算过程互不影响。

4、消息缓冲池

系统聚合处理所依赖的消息缓冲池实现方案基于有界优先阻塞队列。系统将消息缓冲池分成若干个 Slot,每个 Slot 的组成结构包括一个 BoundedPriorityBlockingQueue(有界优先阻塞队列)和 Slot 对应的最后访问时间戳。消息缓冲池的处理逻辑包括以下步骤:(1)Producer 按照不同环节的聚合逻辑生成消息事件的 Key,Key 用于区分是否为相同类型的消息;(2)消息缓冲池依据消息 Key 按照 Hash 取余分配对应的 Slot;(3)按照预定义时间窗口将消息划分到不同的处理周期;(4)Slot 对相同处理周期的消息按照 Key 进行优先排序,不同处理周期的消息按窗口时间排序;(5)消费线程组定时轮询各个 Slot;(6)判断 Slot 的使用容量是否超出阈值,阈值为 batchsize * backlog_factor,其中 batchsize 为指定的单次消费最大消息数量,backlog_factor 为指定的消息积压系数;(7)如果 Slot 使用容量没有超出阈值,则继续判断 Slot 的上次消费访问时间,如果超出时间阈值则读取消息批量消费,否则跳过本次任务。 消费 Slot 消息后同时更新 Slot 使用容量以及最后访问时间。该消息缓冲池实现可以将尽可能多的相同计算类型的消息聚合到一起处理,减少对下游运算量和 DB 的写入压力。

5、基数运算

bitcount 基数运算是指 distinct(非重复值数量统计),系统使用基数过滤装置过滤已存在的基数值,通过判定在过滤装置中不存在的基数数量然后更新 DB 中的统计结果从而实现基数统计。基数过滤装置包括内存基数过滤装置和分布式基数过滤装置两部分。内存基数过滤装置的作用在于初步判断基数值是否已存在,其作用在于内存判断效率更高,从而尽可能避免重复性的基数判断对整体性能的影响。内存基数过滤装置使用 RoaringBitMap 工具包实现。分布式基数过滤装置内含多个分片,每个分片对应一个 RoaringBitMap 数据存储结构,分片数可根据实际需要指定,通过提高分片数可以提高基数运算的准确度。分布式基数过滤装置的实现方案包括如下步骤:(1)将原始数值经过 MurmurHash-128Bit 生成原始数值对应的 Long 类型的 Hash 值。(2)设置统计任务所需的分片数,每个分片对应一个 RoaringBitMap 数据结构,本系统过滤装置采用 Redis 扩展 Redis-Roaring 插件的方式实现,原始数值对应的分片可通过 Hash 取余获得。(3)将 Long 类型的 Hash 值按高 32bit 和低 32bit 拆分成两个 Int 类型整数,如果为负数取其绝对值,两个 Int 值的组合对应原始值在 RoaringBitMap 数据结构中的 Index 值。(4)批量将多个基数值对应的 Int 值组合发送到 Redis,将基数判断的多个操作使用 Lua 脚本合并执行。判断 Int 值组合是否在过滤装置中存在,如果两个 Int 值都在过滤装置中存在,则表示原始值已存在,否则为原始值不存在,如果原始值在过滤装置中不存在系统在判定完成后更新相应 Index 值。(5)统计在过滤装置中不存在的原始值的数量并更新到 DB 中。该实现方案的好处在于基数运算不需要存储原始值可减少对内存的占用;使用 MurmurHash-128Bit 生成 Index 值从而不需要维护原始数值和 Index 的映射关系;RoaringBitMap 算法本身具有压缩位图功能可以减少基数稀疏情况下的内存浪费的问题;使用 Lua 脚本实现基数过滤功能可以减少对 Redis 的访问次数提升整体性能;使用内存基数过滤装置进行初筛可以避免不必要的重复判定;通过调整分片数可以很方便的提升基数统计的准确率。

6、避免 shuffle

在大数据任务的执行过程中 shuffle 是影响性能比较主要的一个因素,Shuffle 除了会带来大量的网络开销还可能引起数据倾斜甚至 OOM 等问题。系统采用避免 Shuffle 这种不可控的因素从而规避 Shuffle 可能带来的不可预料的问题。运算模块基于 Structured Streaming 开发,采用完全规避 Shuffle 的计算方式,通过设置运算节点数量调整任务执行并行度,系统将单运算节点内的统计消息依据统计项标识、维度标识、时间批次、统计运算单元拆分成不同的计算类型。统计结果数据和中间态数据基于外部存储实现。本系统中统计结果存储在 HBase 中,bitcount 基数运算的中间态数据存储在 Redis 中、limit 运算的排序数据存储在 Redis 中。每个运算节点在运算过程中只与外部存储通信,不同运算节点之间互不影响。

7、统计限流

为了避免因为某个大数据量的统计需求的突然接入或某个统计项的流量暴涨而导致系统的不稳定,系统针对统计组消息量、统计项结果量、统计项运算量等维度的熔断保护机制。该限流保护机制的作用在于可以更好的保障整体服务的稳定性,目前包含以下策略:(1)统计组消息量限流统计组消息量限流是针对单位时间内接收到的统计组消息数量的限流策略。系统内置统计组消息量计数装置用于计算单位时间内接收到的统计组消息数量。当单位时间内消息量超出阈值后触发限流,使当前统计组进入限流状态。Client 模块以及 Tasks 模块自动抛弃非正常状态下的统计组消息。由于一个统计组可对应一个或多个统计项,所以该限流策略会影响统计组下所有统计项的正常统计。统计组进入限流状态后在指定时间内(默认 20 分钟)自动抛弃相应消息,当限流时间达到时间阈值后统计组自动恢复到正常状态。(2)统计项结果量限流统计项结果量限流是针对单位时间内统计项生成的统计结果数量的限流策略。系统内置统计项结果量计数装置用于计算单位时间内生成统计结果的数量。当单位时间内结果量超出阈值后触发限流,使当前统计项进入限流状态。统计项结果量跟两个因素有关,一是统计周期的时间粒度,统计周期粒度越细的指标数据量越多,比如秒级和分钟级统计单位时间内生成的统计结果要多于小时级和天级的统计。第二个影响因素是维度,维度数量越多的统计项单位时间内生成的统计结果更多,比如以城市为维度的统计指标生成的统计结果量要高于以省份为维度的统计指标。统计项结果量限流是针对当前统计项的限流策略,所以只对当前统计项有影响,对统计组下其他统计项没有影响。当统计项进入限流状态后在指定时间内(默认 20 分钟)自动抛弃相应相应消息,当限流时间达到时间阈值后当前统计项自动恢复到正常状态。

8、时间戳压缩

系统针对流式统计场景对数据存储格式进一步优化,目的在于提高 DB 的数据吞吐量。系统统计结果数据存储采用时间戳压缩,根据统计周期划分成不同的时段,将每个统计项相同维度下的同一时段内的多个统计结果数值存储在不同的 column 内,列名采用 delta 压缩,同一时段内的数据使用相同的 Key,减少 Key 值的重复。

9、异常熔断

熔断机制是为了保障业务方自身服务的稳定性,避免因统计服务的不稳定而对业务方自身服务产生影响。异常熔断机制是指在调用 client 接口时,如果单位时间内的失败次数或超时次数超出阈值,则进入熔断状态,此时 client 模块自动跳过统计消息发送逻辑。进入熔断状态后,client 模块周期性检测统计服务状态是否恢复正常,如果统计服务恢复正常则自动重连。

系统功能边界

  • (1)、不支持原始数据明细查询;

  • (2)、暂时只涉及流式滚动窗口数据统计(滑动窗口统计将在后续版本中支持);

  • (3)、暂时不支持秒级粒度数据统计(将在后续版本支持);

  • (4)、不涉及原始数据采集细节,系统所有的计算都基于接入方上报的原始消息,原始数据消息需要接入方组装好通过 SDK 上报。目前只提供 Java 版本的 SDK,针对 JVM 语言的服务,接入方可以在服务中直接调用。由其他语言开发的服务,可以将数据采集后存入 kafka 等消息队列再通过消费数据的形式接入 XL-LightHouse。

Hello World 使用范例

完整版使用示例请查阅:HelloWorld

范例一:首页 ICON 区域用户行为数据统计


该区域包含 3 个 Tab,每个 Tab 有多个业务 ICON 图标,用户手动滑动可切换 Tab,假设针对该 ICON 区域我们有如下数据指标需求:


点击量:1、每5分钟_点击量2、每5分钟_各ICON_点击量3、每小时_点击量4、每小时_各ICON_点击量5、每天_总点击量6、每天_各Tab_总点击量7、每天_各ICON_总点击量
点击UV:1、每5分钟_点击UV2、每小时_点击UV3、每小时_各ICON_点击UV4、每天_总点击UV5、每天_各ICON_总点击UV
复制代码


  • 上报元数据时机


用户点击 ICON 图标时上报相应埋点数据


  • 配置统计项



  • 查看统计结果


范例二:移动支付订单数据统计
1、 支付成功订单数据统计
  • 统计需求梳理


订单量:1、每10分钟_订单量2、每10分钟_各商户_订单量3、每10分钟_各省份_订单量4、每10分钟_各城市_订单量5、每小时_订单量6、每天_订单量7、每天_各商户_订单量8、每天_各省份_订单量9、每天_各城市_订单量10、每天_各价格区间_订单量11、每天_各应用场景_订单量
交易金额:1、每10分钟_成交金额2、每10分钟_各商户_成交金额top1003、每10分钟_各省份_成交金额4、每10分钟_各城市_成交金额5、每小时_成交金额6、每小时_各商户_成交金额7、每天_成交金额8、每天_各商户_成交金额9、每天_各省份_成交金额10、每天_各城市_成交金额11、每天_各应用场景_成交金额
下单用户数:1、每10分钟_下单用户数2、每10分钟_各商户_下单用户数3、每10分钟_各省份_下单用户数4、每10分钟_各城市_下单用户数5、每小时_下单用户数6、每天_下单用户数7、每天_各商户_下单用户数8、每天_各省份_下单用户数9、每天_各城市_下单用户数10、每天_各价格区间_下单用户数11、每天_各应用场景_下单用户数
复制代码


  • 消息上报时机


用户支付成功后上报原始消息数据。


  • 配置统计消息


2、 订单支付状态数据监控

我这里假设订单有四种状态:支付成功、支付失败、超时未支付、订单取消。


订单量:1、每10分钟_各状态_订单量2、每10分钟_各商户_各状态_订单量1、每天_各状态_订单量2、每天_各商户_各状态_订单量
订单异常率:1、每10分钟_订单异常率2、每10分钟_各商户_订单异常率3、每小时_订单异常率4、每天_订单异常率5、每天_各商户_订单异常率
支付失败用户数统计:1、每5分钟_支付失败用户数
复制代码


  • 定义元数据



  • 配置统计项



  • 查看统计结果


更多适用场景举例


  • GitHub 搜索 XL-LightHouse

项目地址

  • GitHub 搜索 XL-LightHouse

一键部署

  • GitHub 搜索 XL-LightHouse

写在最后的一些话

XL-LightHouse 是一套通用型流式大数据统计平台,致力于推动流式统计技术的快速普及和大规模应用,定位是以一套服务使用较少的服务器资源同时支撑数以万计、数十万计流式数据统计需求的大数据平台。XL-LightHouse 面向企业自上而下所有职能人员共同使用,倡导以通用型流式数据统计为切入点,倾向于选择更为轻巧的技术方案帮助企业更快速的搭建起一套犹如我们人体神经系统一样遍布全身的、较为完善稳定可靠的数据化运营体系。


流式统计技术并不完美,确实有一些场景不适合使用流式统计实现,所以它也不可能完全取代了其他的技术方案。但是我依然认为在企业数据化运营领域在所有的技术方案中,能够发挥中流砥柱作用的只有可能是通用型流式数据统计。时效性是流式统计得以青睐的一个原因,但我认为最根本原因在于一项技术能够普及到什么程度,很多时候使用的成本决定了一切。


在软件研发领域,我认为通用型流式统计将会对现在的软件类产品研发产生较为巨大的影响,它会发展成为如同日志一样的重要角色,通用型流式统计或将成为独立于日志之外且重要程度不亚于日志的另一套辅助类工具体系,各种工种的程序员将会在任何有必要的地方加上流式统计的代码就像加日志一样司空见惯、习以为常。


在企业级服务市场,我相信通用型流式数据统计将凭借其庞大的应用场景和巨大的业务价值而成为企业最核心的基础服务之一,而以通用型流式数据统计为核心理念、以其他技术方案为辅助手段的数据化运营类产品将成为企业级 B 端市场不可或缺的中坚力量。此外,伴随着软硬件技术的协同发展以及物联网时代的即将到来,我认为通用型流式数据统计也将渗透于现实世界各个方面,成为社会的一种基础运算能力,在各类行业中得到较为普遍的应用。


用户头像

feng

关注

还未添加个人签名 2018-03-13 加入

还未添加个人简介

评论

发布
暂无评论
不用再写FlinkSQL了,使用XL-LightHouse轻松实现海量数据实时统计_大数据_feng_InfoQ写作社区