Apache NiFi + MatrixDB 20 行代码实现数据实时入库!
继上一篇“空间节省50%,时序性能提升5倍,三一重工从Hadoop+Spark到MatrixDB架构变迁实现One for ALL”(点击标题阅读原文)发布后,这次我们再从 Apache NiFi + MatrixDB 着手,用 20 行代码轻松实现数据实时入库!
作者信息:李净芝 - 工程车辆事业部研究院大数据工程师
01 前言
目前,三一重工泵诵云平台的数据接入采用 Nifi + MatrixGate 的方案已正常运行 4 个月,NiFi 集群由 3 台配置为内存 32g,硬盘 1T 的服务器组成,平均 5min 消费 Kafka 数据 6.4GB,每秒处理数据 14784 条。
NiFi 本身提供大量组件,用以应付各式各样 ETL 场景,实现了 hdfs、本地文件系统、主流数据库 (mysql/oracle/postgres) 之间数据的流转。
什么是 Apache NiFi ?
Apache NiFi,专门用于解决与数据流有关问题的工具,易于使用、功能强大、可靠的数据 ETL 系统。基于 WEB 图形页面,通过组件的拖拽、连接及配置,即可搭建完整的数据流,实时监控数据在各个处理组件之间流转的情况。
什么是 MatrixGate ?
MatrixGate 简称 mxgate,是 MatrixDB 自带的高性能流式数据加载服务器,使用 mxgate 进行数据加载性能要远远高于原生 INSERT 语句,MatrixGate 加载数据的逻辑如下图所示:
数据采集系统采集设备数据或者接收由设备发送来的数据
采集系统以并发微批的模式向 MatrixGate 的服务进程 mxgate 持续发送数据
mxgate 进程和 MatrixDB 的 master 进程高效通信,沟通事务和控制信息
数据直接发送到 segment 节点,并行高速写入,不存在 master 单点瓶颈。
三一重工泵诵云平台将二者相结合,实现数据实时入库,且解决标准化的问题。在分享案例之前,先了解一下 NiFi 中一个重要的概念:
FlowFile
· FlowFile 是 NiFi 的核心概念,是对原始数据记录的抽象,是面向 FBP (Flow-Based Programming) 设计的。
· FlowFile 是数据记录,由一个指针(指向内容)和属性组成。
· 属性是 Key / value 键值对,是 Flowfile 的元数据。
· 内容是原始数据。
在后续的案例中,数据的流转在 NiFi 中就是 Flowfile 的生成与转化。
02 案例
数据处理思路
根据上面的思路,使用 NiFi 搭建数据流的过程如下:
01.
首先,获取数据,可以用 NiFi 自带的 ConsumeKafka_2_0 组件,只需配置 broker、topic,即可消费数据输出到下一组件。
02.
上图中的 Sany/JsonTypeJudge 为自定义组件,功能为根据 kafka.key 分发车载泵和泵车的数据,也可以用 NiFi 自带的 RouteOnAttribute 组件。
官方组件实现的分发规则更加的灵活,但是效率要低许多。样例如下:
03.
分发的数据输出的合并组件,这里使用 NiFi 自带的 MergeContent 组件,合并策略采用桶策略。
桶策略的含义如下:
每个 FlowFile 都有属性,桶策略首先需要指定合并属性,在上图中,合并属性设置为 kafka.key,也就是设备号。
kafka.key 为 A 的 FlowFile 将进入 A 桶,kafka.key 为 B 的 FlowFile 将进入 B 桶,以此类推,每个 FlowFile 根据自身的属性进入对应的桶。
桶策略还有其他配置,比如桶中的最小/最大文件数、桶中文件的最小/最大 Size、桶的持续时间等等,一旦达到门限值,桶里面的 FlowFiles 会打包合并成一个 FlowFile 输出到下一组件。
04.
将合并后的 jsonArray 进行解析,在这里用到的组件为自定义组件,因为原始的 json,key 不固定,而 NiFi 自带的 jsonReader 组件只能用单一的 schema 去读。
05.
对分发的实时工况进行标准化。设备上传的数据是由控制协议定义的,随着控制协议以及设备的更新,新老设备对于同一个物理量会存在不同的字段映射,比如转速这个物理量,在 1 车型中是字段 A,在 2 车型中是字段 B,我们希望根据车型的不同,填充 A or B 到转速这个物理量。
以下是实现标准化涉及的组件:
NiFi 自带的 LookupAttribute 组件,根据前文提到的 kafka.key 这一属性,为每条记录添加 protocol 属性,为后续每条记录输出 A 还是输出 B 提供依据。
为每个 FlowFile 添加上 protocol 这个属性后,在使用 NiFi 自带的 ConvertRecord 组件根据 protocol 的值动态地输出 A or B,以此达到标准化的效果。
06.
数据入库 NiFi 自带的组件有 PutDatabaseRecord,这个组件能 put 数据到市面上主流的数据库,只需指定 jdbc 的 jar 包,以及配置 url,数据库名等,但是入库效率会表较低。
这里结合 mxgate 的 java api,自定义一个组件,用于加载数据到 MatrixDB。
PutDatabaseRecord 自定义组件的代码实现如下:
03 小结
在本案例中,需要注意的是,NiFi 为了防止数据丢失会将接入的数据内容作为内容声明保存在本地,可以通过更改 NiFi 配置再重启来改变内容声明留存时间。
前期由于使用默认的留存时间,再加上服务器本身磁盘有限。因此,在很短的时间内出现磁盘满了的情况,通过改变留存时间以及定时清理内容声明,可以解决磁盘爆满的问题,但是带来的问题是,一旦碰到某种原因导致 NiFi 未及时入库数据,而被定时清理了内容声明,那数据就会丢失,还是需要通过对磁盘扩容来解决。
总的来说,“NiFi+Mxgate” 二者的结合完美地解决了数据接入的问题。
本文为 YMatrix 原创内容,未经允许不得转载。
欲了解更多超融合时序数据库相关信息,请访问 “YMatrix 超融合数据库” 官方网站
版权声明: 本文为 InfoQ 作者【yMatrix】的原创文章。
原文链接:【http://xie.infoq.cn/article/8c7562315a545373edab8458b】。文章转载请联系作者。
评论