写点什么

流数据操作

作者:Xiao8
  • 2022 年 6 月 19 日
  • 本文字数:2166 字

    阅读完需:约 7 分钟

什么是流数据操作

流数据操作应该说是流计算系统与生俱来的能力,它是针对数据流的“转化”或“转移”处理。流数据操作的内容主要包括四类。

  • 一是流数据的清洗、规整和结构化。比如提取感兴趣字段、统一数据格式、过滤不合条件事件。

  • 二是流数据的关联及合并。比如在广告转化率分析中,将“点击”事件流和“安装”事件流关联起来。

  • 三是流数据的分发和并行处理。比如将一个包含了来自不同设备事件的数据流,按照设备 id 分发到不同的流中进行处理。

  • 四是流数据的转移和存储。比如将数据从 Kafka 转移到数据库里。

虽然不同系统实现以上四类流数据操作的具体方法不尽相同,但经过多年的实践和经验积累,业界针对流数据操作的目标和手段都有了一定的共识,并已逐步形成一套通用的 API 集合,几乎所有的流计算平台都会提供这些 API 的实现。比如:

  • 针对流数据的清洗、规整和结构化,抽象出 filter、map、flatMap、reduce 等方法;

  • 针对流数据的关联及合并,抽象出 join、union 等方法;

  • 针对流数据的分发和并行处理,抽象出 keyBy 或 groupBy 等方法;

  • 针对流数据的转移和存储,则抽象出 foreach 等方法。

这些 API 的功能各不相同,但它们在一起共同构成了一个灵活操作流数据的方法集合。

所以接下来,我们就选出几个最重要,且能够覆盖日常大多数使用场景的 API ,来对流数据操作这类算法问题,进行详细讲解。

过滤 filter

首先是过滤 filter 。顾名思义,“过滤”就是在数据流上筛选出符合条件的数据。这个方法通常用于剔除流数据中你不想要的数据,比如不合预期的事件类型、不完整的数据记录等。或者,你也可以用这个方法来对流数据进行采样,比如只保留 1/10 的流数据,从而减少需要处理的数据量。

下面举一个具体的例子来讲下如何使用 filter 方法。比如,我们现在需要监控仓库的环境温度,在火灾发生前提前预警以避免火灾,那么我们就可以采用过滤功能,从来自于传感器的环境温度事件流中,过滤出温度高于 100 摄氏度的事件。

这里我们使用 Flink 来实现。如果你暂时还不熟悉 Flink 的话也没有关系,这里的代码很简单,只需要先了解下这些 API 的使用形式即可。另外,本课程后面还有专门的课时讲解 Flink 。

DataStream<JSONObject> highTemperatureStream = temperatureStream.filter(x -> x.getDouble("temperature") > 100);
复制代码

在上面代码中,lambda 表达式“x->x.getDouble("temperature")>100”即过滤火灾高温事件的条件。

映射 map

“映射”用于将数据流中的每条数据转化为新的数据。它最大的价值在于对流数据进行信息增强,也就是将额外的信息附加到数据流中的数据上。比如,你只对哪些字段感兴趣、需要将数据转化为哪种格式、给数据添加一个新的字段等,这些“信息”在原来的流数据里是没有的,你可以通过 map 方法将这些信息附加到流数据上。

下面同样以仓库环境温度监控为例,来讲解 map 的使用方法。不过,这次我们不是将高温事件过滤出来,而是采用数据工程师在做特征工程时常用的一种操作,也就是“二值化”。

我们在原始环境温度事件中,添加一个新的布尔(boolean)类型字段,用于表示该事件是否是高温事件。同样,使用 Flink 实现如下:

DataStream<JSONObject> enhancedTemperatureStream = temperatureStream.map(x -> {
    x.put("isHighTemperature", x.getDouble("temperature") > 100);
    return x;
});
复制代码

上面示意代码的 lambda 表达式中,通过原始事件的 temperature 字段判断是否为高温事件,然后将结果附加到事件上,最后返回附加了高温信息的事件。

展开映射 flatMap

“展开映射”用于将数据流中的每条数据转化为 N 条新数据。相比 map 而言, flatMap 是个更加灵活的方法,因为 map 只能 1 对 1 地对数据流元素进行转化,而 flatMap 能 1 对 N 地对数据流元素进行转化。

flatMap 最大的作用体现在“flat”上,也就是“展开摊平”。它最典型的使用场景就是,比如原本数据流中的数据有一个字段是数组,现在你需要将这个数组里的每个元素拆解开,然后分成一条条单独的数据,并形成一个新的数据流。

下面举一个 flatMap 在社交活动分析中使用的例子。现在有一组代表用户信息的数据流,其中每条数据记录了用户(用 user 字段表示)及其好友列表(用 friends 数组字段表示)的信息。现在我们要分析每个用户与他的每一个好友之间的亲密程度,以判断他们之间是否是“塑料兄弟”或者“塑料姐妹”。

所以我们先要将用户和它的好友列表一一展开,展开后的每条数据代表了用户和他的其中一个好友之间的关系。下面是采用 Flink 实现的例子。

DataStream<String> relationStream = socialWebStream.flatMap(new FlatMapFunction<JSONObject, String>() {
    @Override
    public void flatMap(JSONObject value, Collector<String> out) throws Exception {
        List<String> collect = value.getJSONArray("friends").stream()
                .map(y -> String.format("%s->%s", value.getString("user"), y))
                .collect(Collectors.toList());
        collect.forEach(out::collect);
    }
});
复制代码

上面代码的 flatMap 方法中,我们使用 Java 8 的流式 API,将用户的好友列表 friends 展开,与用户形成一对对的好友关系记录(用"%s->%s"格式表示),最终由 out::collect 收集起来,写入输出数据流中。

发布于: 刚刚阅读数: 5
用户头像

Xiao8

关注

God bless the fighters. 2020.03.11 加入

欢迎关注公众号:程序猿Damon,长期从事Java开发,研究Springcloud的微服务架构设计。目前主要从事基于K8s云原生架构研发的工作,Golang开发,长期研究边缘计算框架KubeEdge、调度框架Volcano、容器云KubeSphere研究

评论

发布
暂无评论
流数据操作_6月月更_Xiao8_InfoQ写作社区