Kafka 实时数据即席查询应用与实践
作者:vivo 互联网搜索团队- Deng Jie
Kafka 中的实时数据是以 Topic 的概念进行分类存储,而 Topic 的数据是有一定时效性的,比如保存 24 小时、36 小时、48 小时等。而在定位一些实时数据的 Case 时,如果没有对实时数据进行历史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。
一、背景
Kafka 中的实时数据是以 Topic 的概念进行分类存储,而 Topic 的数据是有一定时效性的,比如保存 24 小时、36 小时、48 小时等。而在定位一些实时数据的 Case 时,如果没有对实时数据进行历史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。因此,我们需要对处理的这些实时数据进行记录归档并存储。
二、内容
2.1 案例分析
这里以 i 视频和 vivo 短视频实时数据为例,之前存在这样的协作问题:
数据上游内容方提供实时 Topic(存放 i 视频和 vivo 短视频相关实时数据),数据侧对实时数据进行逻辑处理后,发送给下游工程去建库实时索引,当任务执行一段时间后,工程侧建索引偶尔会提出数据没有发送过去的 Case,前期由于没有对数据做存储,在定位问题的时候会比较麻烦,经常需求查看实时日志,需要花费很长的时间来分析这些 Case 是出现在哪个环节。
为了解决这个问题,我们可以将实时 Topic 中的数据,在发送给其他 Topic 的时候,添加跟踪机制,进行数据分流,Sink 到存储介质(比如 HDFS、Hive 等)。这里,我们选择使用 Hive 来进行存储,主要是查询方便,支持 SQL 来快速查询。如下图所示:
在实现优化后的方案时,有两种方式可以实现跟踪机制,它们分别是 Flink SQL 写 Hive、Flink DataStream 写 Hive。接下来,分别对这两种实现方案进行介绍和实践。
2.2 方案一:Flink SQL 写 Hive
这种方式比较直接,可以在 Flink 任务里面直接操作实时 Topic 数据后,将消费后的数据进行分流跟踪,作为日志记录写入到 Hive 表中,具体实现步骤如下:
构造 Hive Catalog;
创建 Hive 表;
写入实时数据到 Hive 表。
2.2.1 构造 Hive Catalog
在构造 Hive Catalog 时,需要初始化 Hive 的相关信息,部分代码片段如下所示:
在以上代码中,我们首先设置了 Flink 的执行环境和表环境,然后创建了一个 HiveCatalog,并将其注册到表环境中。
2.2.2 创建 Hive 表
如果 Hive 表不存在,可以通过在程序中执行建表语句,具体 SQL 见表语句代码如下所示:
在创建 Hive 表时我们使用了 IF NOT EXISTS 关键字,如果 Hive 中该表不存在会自动在 Hive 上创建,也可以提前在 Hive 中创建好该表,Flink SQL 中就无需再执行建表 SQL,因为用了 Hive 的 Catalog,Flink SQL 运行时会找到表。这里,我们设置了 auto-compaction 属性为 true,用来使小文件自动合并,1.12 版的新特性,解决了实时写 Hive 产生的小文件问题。同时,指定 metastore 值是专门用于写入 Hive 的,也需要指定 success-file 值,这样 CheckPoint 触发完数据写入磁盘后会创建_SUCCESS 文件以及 Hive metastore 上创建元数据,这样 Hive 才能够对这些写入的数据可查。
2.2.3 写入实时数据到 Hive 表
在准备完成 2.2.1 和 2.2.2 中的步骤后,接下来就可以在 Flink 任务中通过 SQL 来对实时数据进行操作了,具体实现代码片段如下所示:
将消费后的数据进行分类,编写业务 SQL 语句,将消费的数据作为日志记录,发送到 Hive 表进行存储,这样 Kafka 中的实时数据就存储到 Hive 了,方便使用 Hive 来对 Kafka 数据进行即席分析。
2.2.4 避坑技巧
使用这种方式在处理的过程中,如果配置使用的是 EventTime,在程序中配置'sink.partition-commit.trigger'='partition-time',最后会出现无法提交分区的情况。经过对源代码 PartitionTimeCommitTigger 的分析,找到了出现这种异常情况的原因。
我们可以通过看
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitionsorg.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions
中的一个函数,来说明具体的问题,部分源代码片段如下:
通过分析上述代码片段,我们可以知道系统通过分区值来抽取相应的分区来创建时间,然后进行比对,比如我们设置的时间 pattern 是 '$dt $h:$m:00' , 某一时刻我们正在往 /2022-02-26/18/20/ 这个分区下写数据,那么程序根据分区值,得到的 pattern 将会是 2022-02-26 18:20:00,这个值在 SQL 中是根据 DATA_FORMAT 函数获取的。
而这个值是带有时区的,比如我们的时区设置为东八区,2022-02-26 18:20:00 这个时间是东八区的时间,换成标准 UTC 时间是减去 8 个小时,也就是 2022-02-26 10:20:00,而在源代码中的 toMills 函数在处理这个东八区的时间时,并没有对时区进行处理,把这个其实应该是东八区的时间当做了 UTC 时间来处理,这样计算出来的值就比实际值大 8 小时,导致一直没有触发分区的提交。
如果我们在数据源中构造的分区是 UTC 时间,也就是不带分区的时间,那么这个逻辑就是没有问题的,但是这样又不符合我们的实际情况,比如对于分区 2022-02-26 18:20:00,我希望我的分区肯定是东八区的时间,而不是比东八区小 8 个小时的 UTC 时间 2022-02-26 10:20:00。
在明白了原因之后,我们就可以针对上述异常情况进行优化我们的实现方案,比如自定义一个分区类、或者修改缺省的时间分区类。比如,我们使用 TimeZoneTableFunction 类来实现一个自定义时区,部分参考代码片段如下:
2.3 方案二:Flink DataStream 写 Hive
在一些特殊的场景下,Flink SQL 如果无法实现我们复杂的业务需求,那么我们可以考虑使用 Flink DataStream 写 Hive 这种实现方案。比如如下业务场景,现在需要实现这样一个业务需求,内容方将实时数据写入到 Kafka 消息队列中,然后由数据侧通过 Flink 任务消费内容方提供的数据源,接着对消费的数据进行分流处理(这里的步骤和 Flink SQL 写 Hive 的步骤类似),每分钟进行存储到 HDFS(MapReduce 任务需要计算和重跑 HDFS 数据),然后通过 MapReduce 任务将 HDFS 上的这些日志数据生成 Hive 所需要格式,最后将这些 Hive 格式数据文件加载到 Hive 表中。实现 Kafka 数据到 Hive 的即席分析功能,具体实现流程细节如下图所示:
具体核心实现步骤如下:
消费内容方 Topic 实时数据;
生成数据预处理策略;
加载数据;
使用 Hive SQL 对 Kafka 数据进行即席分析。
2.3.1 消费内容方 Topic 实时数据
编写消费 Topic 的 Flink 代码,这里不对 Topic 中的数据做逻辑处理,在后面统一交给 MapReduce 来做数据预处理,直接消费并存储到 HDFS 上。具体实现代码如下所示:
注意事项:
这里我们把时间窗口设置小一些,每 30s 做一次 Checkpoint,如果该批次的时间窗口没有数据过来,就生成一个文件落地到 HDFS 上;
另外,我们重写了 Bucketer 为 DateTimeBucketer,逻辑并不复杂,在原有的方法上加一个年-月-日/时-分的文件生成路径,例如在 HDFS 上的生成路径:xxxx/2022-02-26/00-00。
具体 DateTimeBucketer 实现代码如下所示:
2.3.2 生成数据预处理策略
这里,我们需要对落地到 HDFS 上的文件进行预处理,处理的逻辑是这样的。比如,现在是 2022-02-26 14:00,那么我们需要将当天的 13:55,13:56,13:57,13:58,13:59 这最近 5 分钟的数据处理到一起,并加载到 Hive 的最近 5 分钟的一个分区里面去。那么,我们需要生成这样一个逻辑策略集合,用 HH-mm 作为 key,与之最近的 5 个文件作为 value,进行数据预处理合并。具体实现代码步骤如下:
步骤一:获取小时循环策略;
步骤二:获取分钟循环策略;
步骤三:判断是否为 5 分钟的倍数;
步骤四:对分钟级别小于 10 的数字做 0 补齐(比如 9 补齐后变成 09);
步骤五:对小时级别小于 10 的数字做 0 补齐(比如 1 补齐后变成 01);
步骤六:生成时间范围;
步骤七:输出结果。
其中,主要的逻辑是在生成时间范围的过程中,根据小时和分钟数的不同情况,生成不同的时间范围,并输出结果。在生成时间范围时,需要注意前导 0 的处理,以及特殊情况(如小时为 0、分钟为 0 等)的处理。最后,将生成的时间范围输出即可。
根据上述步骤编写对应的实现代码,生成当天所有日期命名规则,预览部分结果如下:
需要注意的是,如果发生了第二天 00:00,那么我们需要用到前一天的 00-00=>23-59,23-58,23-57,23-56,23-55 这 5 个文件中的数据来做预处理。
2.3.3 加载数据
在完成 2.3.1 和 2.3.2 里面的内容后,接下来,我们可以使用 Hive 的 load 命令直接加载 HDFS 上预处理后的文件,把数据加载到对应的 Hive 表中,具体实现命令如下:
2.3.4 即席分析
之后,我们使用 Hive SQL 来对 Kafka 数据进行即席分析,示例 SQL 如下所示:
2.4 Flink SQL 与 Flink DataStream 如何选择
Flink SQL 和 Flink DataStream 都是 Flink 中用于处理数据的核心组件,我们可以根据自己实际的业务场景来选择使用哪一种组件。
Flink SQL 是一种基于 SQL 语言的数据处理引擎,它可以将 SQL 查询语句转换为 Flink 的数据流处理程序。相比于 Flink DataStream,Flink SQL 更加易于使用和维护,同时具有更快的开发速度和更高的代码复用性。Flink SQL 适用于需要快速开发和部署数据处理任务的场景,比如数据仓库、实时报表、数据清洗等。
Flink DataStream API 是 Flink 数据流处理标准 API,SQL 是 Flink 后期版本提供的新的数据处理操作接口。SQL 的引入为提高了 Flink 使用的灵活性。可以认为 Flink SQL 是一种通过字符串来定义数据流处理逻辑的描述语言。
因此,在选择 Flink SQL 和 Flink DataStream 时,需要根据具体的业务需求和数据处理任务的特点来进行选择。如果需要快速开发和部署任务,可以选择使用 Flink SQL;如果需要进行更为深入和定制化的数据处理操作,可以选择使用 Flink DataStream。同时,也可以根据实际情况,结合使用 Flink SQL 和 Flink DataStream 来完成复杂的数据处理任务。
三、 总结
在实际应用中,Kafka 实时数据即席查询可以用于多种场景,如实时监控、实时报警、实时统计、实时分析等。具体应用和实践中,需要注意以下几点:
数据质量:Kafka 实时数据即席查询需要保证数据质量,避免数据重复、丢失或错误等问题,需要进行数据质量监控和调优。
系统复杂性:Kafka 实时数据即席查询需要涉及到多个系统和组件,包括 Kafka、数据处理引擎(比如 Flink)、查询引擎(比如 Hive)等,需要对系统进行配置和管理,增加了系统的复杂性。
安全性:Kafka 实时数据即席查询需要加强数据安全性保障,避免数据泄露或数据篡改等安全问题,做好 Hive 的权限管控。
性能优化:Kafka 实时数据即席查询需要对系统进行性能优化,包括优化数据处理引擎、查询引擎等,提高系统的性能和效率。
参考:
版权声明: 本文为 InfoQ 作者【vivo互联网技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/36910886fd40b9ee509b5259d】。文章转载请联系作者。
评论