写点什么

Apache NiFi + MatrixDB 20 行代码实现数据实时入库!

作者:yMatrix
  • 2022-12-03
    北京
  • 本文字数:2062 字

    阅读完需:约 7 分钟

继上一篇“空间节省50%,时序性能提升5倍,三一重工从Hadoop+Spark到MatrixDB架构变迁实现One for ALL”(点击标题阅读原文)发布后,这次我们再从 Apache NiFi + MatrixDB 着手,用 20 行代码轻松实现数据实时入库!


作者信息:李净芝 - 工程车辆事业部研究院大数据工程师

01 前言


目前,三一重工泵诵云平台的数据接入采用 Nifi + MatrixGate 的方案已正常运行 4 个月,NiFi 集群由 3 台配置为内存 32g,硬盘 1T 的服务器组成,平均 5min 消费 Kafka 数据 6.4GB,每秒处理数据 14784 条。


NiFi 本身提供大量组件,用以应付各式各样 ETL 场景,实现了 hdfs、本地文件系统、主流数据库 (mysql/oracle/postgres) 之间数据的流转。



什么是 Apache NiFi ?

Apache NiFi,专门用于解决与数据流有关问题的工具,易于使用、功能强大、可靠的数据 ETL 系统。基于 WEB 图形页面,通过组件的拖拽、连接及配置,即可搭建完整的数据流,实时监控数据在各个处理组件之间流转的情况。


什么是 MatrixGate ?

MatrixGate 简称 mxgate,是 MatrixDB 自带的高性能流式数据加载服务器,使用 mxgate 进行数据加载性能要远远高于原生 INSERT 语句,MatrixGate 加载数据的逻辑如下图所示:



  1. 数据采集系统采集设备数据或者接收由设备发送来的数据

  2. 采集系统以并发微批的模式向 MatrixGate 的服务进程 mxgate 持续发送数据

  3. mxgate 进程和 MatrixDB 的 master 进程高效通信,沟通事务和控制信息

  4. 数据直接发送到 segment 节点,并行高速写入,不存在 master 单点瓶颈。


三一重工泵诵云平台将二者相结合,实现数据实时入库,且解决标准化的问题。在分享案例之前,先了解一下 NiFi 中一个重要的概念:


FlowFile

· FlowFile 是 NiFi 的核心概念,是对原始数据记录的抽象,是面向 FBP (Flow-Based Programming) 设计的。

· FlowFile 是数据记录,由一个指针(指向内容)和属性组成。

· 属性是 Key / value 键值对,是 Flowfile 的元数据。

· 内容是原始数据。

在后续的案例中,数据的流转在 NiFi 中就是 Flowfile 的生成与转化。

02 案例


数据处理思路



根据上面的思路,使用 NiFi 搭建数据流的过程如下:


01.

首先,获取数据,可以用 NiFi 自带的 ConsumeKafka_2_0 组件,只需配置 broker、topic,即可消费数据输出到下一组件。



02.

上图中的 Sany/JsonTypeJudge 为自定义组件,功能为根据 kafka.key 分发车载泵和泵车的数据,也可以用 NiFi 自带的 RouteOnAttribute 组件。


官方组件实现的分发规则更加的灵活,但是效率要低许多。样例如下:




03.

分发的数据输出的合并组件,这里使用 NiFi 自带的 MergeContent 组件,合并策略采用桶策略。




桶策略的含义如下:

  • 每个 FlowFile 都有属性,桶策略首先需要指定合并属性,在上图中,合并属性设置为 kafka.key,也就是设备号。

  • kafka.key 为 A 的 FlowFile 将进入 A 桶,kafka.key 为 B 的 FlowFile 将进入 B 桶,以此类推,每个 FlowFile 根据自身的属性进入对应的桶。

  • 桶策略还有其他配置,比如桶中的最小/最大文件数、桶中文件的最小/最大 Size、桶的持续时间等等,一旦达到门限值,桶里面的 FlowFiles 会打包合并成一个 FlowFile 输出到下一组件。



04.

将合并后的 jsonArray 进行解析,在这里用到的组件为自定义组件,因为原始的 json,key 不固定,而 NiFi 自带的 jsonReader 组件只能用单一的 schema 去读。



05.

对分发的实时工况进行标准化。设备上传的数据是由控制协议定义的,随着控制协议以及设备的更新,新老设备对于同一个物理量会存在不同的字段映射,比如转速这个物理量,在 1 车型中是字段 A,在 2 车型中是字段 B,我们希望根据车型的不同,填充 A or B 到转速这个物理量。


以下是实现标准化涉及的组件:

  • NiFi 自带的 LookupAttribute 组件,根据前文提到的 kafka.key 这一属性,为每条记录添加 protocol 属性,为后续每条记录输出 A 还是输出 B 提供依据。





  • 为每个 FlowFile 添加上 protocol 这个属性后,在使用 NiFi 自带的 ConvertRecord 组件根据 protocol 的值动态地输出 A or B,以此达到标准化的效果。





06.

数据入库 NiFi 自带的组件有 PutDatabaseRecord,这个组件能 put 数据到市面上主流的数据库,只需指定 jdbc 的 jar 包,以及配置 url,数据库名等,但是入库效率会表较低。

这里结合 mxgate 的 java api,自定义一个组件,用于加载数据到 MatrixDB。



PutDatabaseRecord 自定义组件的代码实现如下:


03 小结


在本案例中,需要注意的是,NiFi 为了防止数据丢失会将接入的数据内容作为内容声明保存在本地,可以通过更改 NiFi 配置再重启来改变内容声明留存时间。


前期由于使用默认的留存时间,再加上服务器本身磁盘有限。因此,在很短的时间内出现磁盘满了的情况,通过改变留存时间以及定时清理内容声明,可以解决磁盘爆满的问题,但是带来的问题是,一旦碰到某种原因导致 NiFi 未及时入库数据,而被定时清理了内容声明,那数据就会丢失,还是需要通过对磁盘扩容来解决。


总的来说,“NiFi+Mxgate” 二者的结合完美地解决了数据接入的问题。


原文链接


本文为 YMatrix 原创内容,未经允许不得转载。

欲了解更多超融合时序数据库相关信息,请访问 “YMatrix 超融合数据库” 官方网站

发布于: 刚刚阅读数: 3
用户头像

yMatrix

关注

MatrixDB 超融合时序数据库 2021-10-28 加入

全球超融合时序数据库开创者,专为物联网、车联网、工业互联网和智慧城市提供一站式数据平台。

评论

发布
暂无评论
Apache NiFi + MatrixDB 20行代码实现数据实时入库!_三一重工_yMatrix_InfoQ写作社区