写点什么

构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

作者:CnosDB
  • 2023-09-03
    内蒙古
  • 本文字数:3716 字

    阅读完需:约 12 分钟

构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合


当今的数据技术生态系统中,实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求,Apache Flink、Apache Kafka 和 CnosDB 等开源工具的结合应运而生,使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 Flink、Kafka 和 CnosDB 来构建一个强大的实时数据处理流水线。


构建高效实时数据流水线:Flink、Kafka

和 CnosDB 的完美组合

什么是 Flink、Kafka、CnosDB

  • Flink:是一个强大的流式处理引擎,它支持事件驱动、分布式、并且容错。Flink 能够处理高吞吐量和低延迟的实时数据流,适用于多种应用场景,如数据分析、实时报表和推荐系统等。


  • Kafka:是一个高吞吐量的分布式流数据平台,用于收集、存储和传输实时数据流。Kafka 具有良好的持久性、可扩展性和容错性,适用于构建实时数据流的可靠管道。


  • CnosDB:是一个专为时序数据设计的开源时序数据库。它具有高性能、高可用性和易用性的特性,非常适合存储实时生成的时间序列数据,如传感器数据、日志和监控数据等。


场景描述

用例中假设有一个物联网设备网络,每个设备都定期生成传感器数据,包括温度、湿度和压力等。我们希望能够实时地收集、处理和存储这些数据,以便进行实时监控和分析。

数据流向架构图如下:


1.首先,我们需要设置一个数据收集器来获取传感器数据,并将数据发送到 Kafka 主题。这可以通过编写一个生产者应用程序来实现,该应用程序将生成的传感器数据发送到 Kafka。


2.使用 Flink 来实时处理传感器数据。首先,需要编写一个 Flink 应用程序,该应用程序订阅 Kafka 主题中的数据流,并对数据进行实时处理和转换。例如,您可以计算温度的平均值、湿度的最大值等。


3.将处理后的数据存储到 CnosDB 中以供后续查询。为了实现这一步,需要配置一个 CnosDB Sink,使得 Flink 应用程序可以将处理后的数据写入 CnosDB 中。


构建流水线

1.数据采集与传输

编写一个生产者应用程序,读取传感器数据并将其发送到 Kafka 主题。

public class SensorDataProducer {    public static void main(String[] args) {        Properties props = new Properties();        props.put("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092");        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

while (true) { SensorData data = generateSensorData(); // 生成传感器数据 producer.send(new ProducerRecord<>("sensor-data-topic", data)); Thread.sleep(1000); // 每秒发送一次数据 } }}
复制代码

2.实时处理与转换

编写一个 Flink 应用程序,订阅 Kafka 主题中的数据流,实时处理并转换数据。

// Flink 应用程序示例public class SensorDataProcessingJob {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties(); props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092"); props.setProperty("group.id", "sensor-data-consumer-group");

DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));

DataStream<ProcessedData> processedData = sensorData .map(json -> parseJson(json)) // 解析JSON数据 .keyBy(ProcessedData::getDeviceId) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口 .apply(new SensorDataProcessor()); // 自定义处理逻辑

processedData.print(); // 打印处理后的数据,可以替换为写入 CnosDB 操作

env.execute("SensorDataProcessingJob"); }}
复制代码

3.数据写入与存储

配置 CnosDB Sink,将 processedData.print() 替换为写入 CnosDB 的程序在 CnosDB 创建一个存储数据时长为 30 天的数据库:


| CnosDB 建库语法说明请查看:创建数据库[https://docs.cnosdb.com/zh/latest/reference/sql.html#创建数据库]

CREATE DATABASE IF NOT EXISTS "db_flink_test" WITH TTL '30d' SHARD 2 VNODE_DURATION '1d' REP
复制代码

在 Maven [https://maven.apache.org/]中引入 CnosBD Sink [https://docs.cnosdb.com/zh/latest/reference/connector/flink-connector-cnosdb.html]包:

<dependency>    <groupId>com.cnosdb</groupId>    <artifactId>flink-connector-cnosdb</artifactId>    <version>1.0</version></dependency>
复制代码

编写程序:

public class WriteToCnosDBJob {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties(); props.setProperty("bootstrap.servers", "kafka-broker1:9092,kafka-broker2:9092"); props.setProperty("group.id", "sensor-data-consumer-group");

DataStream<String> sensorData = env.addSource(new FlinkKafkaConsumer<>("sensor-data-topic", new SimpleStringSchema(), props));

DataStream<ProcessedData> processedData = sensorData .map((MapFunction<String, ProcessedData>) json -> parseJson(json)) // 解析JSON数据 .keyBy(ProcessedData::getDeviceId) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 10秒滚动窗口 .apply(new SensorDataProcessor()); // 自定义处理逻辑

DataStream<CnosDBPoint> cnosDBDataStream = processedData.map( new RichMapFunction<ProcessedData, CnosDBPoint>() { @Override public CnosDBPoint map(String s) throws Exception { return new CnosDBPoint("sensor_metric") .time(value.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS) .tag("device_id", value.getDeviceId()) .field("average_temperature", value.getAverageTemperature()) .field("max_humidity", value.getMaxHumidity()); } } );

CnosDBConfig cnosDBConfig = CnosDBConfig.builder() .url("http://localhost:8902") .database("db_flink_test") .username("root") .password("") .build();

cnosDBDataStream.addSink(new CnosDBSink(cnosDBConfig)); env.execute("WriteToCnosDBJob"); }}
复制代码

运行后查看结果:

db_flink_test ❯ select * from sensor_metric limit 10;+---------------------+---------------+---------------------+--------------+| time                | device_id     | average_temperature | max_humidity |+---------------------+---------------+---------------------+--------------+| 2023-01-14T17:00:00 | OceanSensor1  | 23.5                | 79.0         || 2023-01-14T17:05:00 | OceanSensor2  | 21.8                | 68.0         || 2023-01-14T17:10:00 | OceanSensor1  | 25.2                | 75.0         || 2023-01-14T17:15:00 | OceanSensor3  | 24.1                | 82.0         || 2023-01-14T17:20:00 | OceanSensor2  | 22.7                | 71.0         || 2023-01-14T17:25:00 | OceanSensor1  | 24.8                | 78.0         || 2023-01-14T17:30:00 | OceanSensor3  | 23.6                | 80.0         || 2023-01-14T17:35:00 | OceanSensor4  | 22.3                | 67.0         || 2023-01-14T17:40:00 | OceanSensor2  | 25.9                | 76.0         || 2023-01-14T17:45:00 | OceanSensor4  | 23.4                | 70.0         |+---------------------+---------------+---------------------+--------------+
复制代码

总结

通过结合 Flink、Kafka 和 CnosDB,您可以构建一个强大的实时数据处理流水线,从数据采集到实时处理再到数据存储和可视化。每个步骤都涉及具体的配置和代码实现,确保您熟悉每个工具的特性和操作。这种架构适用于各种实时数据应用,如物联网监控、实时报表和仪表板等。根据您的需求和情境,调整配置和代码,以构建适合您业务的实时数据处理解决方案。


发布于: 2023-09-03阅读数: 4
用户头像

CnosDB

关注

还未添加个人签名 2022-04-18 加入

打造高性能、高压缩比、高可用的分布式云原生时间序列数据库,引领世界迈向万物智联 欢迎关注 https://www.cnosdb.com

评论

发布
暂无评论
构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合_flink_CnosDB_InfoQ写作社区