写点什么

【技术干货】代码示例:使用 Apache Flink 连接 TDengine

作者:TDengine
  • 2022 年 5 月 27 日
  • 本文字数:4446 字

    阅读完需:约 15 分钟

小 T 导读:想用 Flink 对接 TDengine?保姆级教程来了。


前言

TDengine 是由涛思数据开发并开源的一款高性能、分布式、支持 SQL 的时序数据库(Time-Series Database)。

除了核心的时序数据库功能外,TDengine 还提供缓存数据订阅流式计算等大数据平台所需要的系列功能。但是很多小伙伴出于架构的考虑,还是需要将数据导出到 Apache Flink、Apache Spark 等平台进行计算分析。

为了帮助大家对接,我们特别推出了保姆级课程,包学包会。


技术实现

Apache Flink 提供了 SourceFunction 和 SinkFunction,用来提供 Flink 和外部数据源的连接,其中 SouceFunction 为从数据源读取数据,SinkFunction 为将数据写入数据源。 与此同时,Flink 提供了 RichSourceFunction 和 RichSinkFunction 这两个类(继承自AbstractRichFunction),提供了额外的初始化(open(Configuration))和销毁方法(close())。 通过重写这两个方法,可以避免每次读写数据时都重新建立连接。


代码实现

完整源码:https://github.com/liuyq-617/TD-Flink

代码逻辑:

1) 自定义类 SourceFromTDengine

用途:数据源连接,数据读取

package com.taosdata.flink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import com.taosdata.model.Sensor;import java.sql.*;import java.util.Properties;public class SourceFromTDengine extends RichSourceFunction<Sensor> {    Statement statement;    private Connection connection;    private String property;    public SourceFromTDengine(){        super();    }        @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        String driver = "com.taosdata.jdbc.rs.RestfulDriver";        String host = "u05";        String username = "root";        String password = "taosdata";        String prop = System.getProperty("java.library.path");        Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class);        LOG.info("java.library.path:{}", prop);        System.out.println(prop);        Class.forName( driver );        Properties properties = new Properties();        connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"                , properties);        statement = connection.createStatement();    }    @Override    public void close() throws Exception {        super.close();        if (connection != null) {            connection.close();        }        if (statement != null) {            statement.close();        }    }    @Override    public void run(SourceContext<Sensor> sourceContext) throws Exception {        try {            String sql = "select * from tt.meters";            ResultSet resultSet = statement.executeQuery(sql);            while (resultSet.next()) {                Sensor sensor = new Sensor( resultSet.getLong(1),                        resultSet.getInt( "vol" ),                        resultSet.getFloat( "current" ),                        resultSet.getString( "location" ).trim());                sourceContext.collect( sensor );            }        } catch (Exception e) {            e.printStackTrace();        }    }    @Override    public void cancel() {    }}   
复制代码


2) 自定义类 SinkToTDengine

用途:数据源连接,数据写入

SinkToTDengine

package com.taosdata.flink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import com.taosdata.model.Sensor;import java.sql.*;import java.util.Properties;public class SinkToTDengine extends RichSinkFunction<Sensor> {    Statement statement;    private Connection connection;    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        String driver = "com.taosdata.jdbc.rs.RestfulDriver";        String host = "TAOS-FQDN";        String username = "root";        String password = "taosdata";        String prop = System.getProperty("java.library.path");        System.out.println(prop);        Class.forName( driver );        Properties properties = new Properties();        connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"                , properties);        statement = connection.createStatement();            }    @Override    public void close() throws Exception {        super.close();        if (connection != null) {            connection.close();        }        if (statement != null) {            statement.close();        }    }    @Override    public void invoke(Sensor sensor, Context context) throws Exception {        try {            String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)",                                 sensor.getLocation(),                                sensor.getLocation(),                                sensor.getTs(),                                sensor.getVal(),                                sensor.getCurrent()                                );            statement.executeUpdate(sql);        } catch (Exception e) {            e.printStackTrace();        }    }}
复制代码

3) 自定义类 Sensor

用途:定义数据结构,用来接受数据

package com.taosdata.model;public class Sensor {    public long ts;    public int val;    public float current;    public String location;    public Sensor() {    }    public Sensor(long ts, int val, float current, String location) {        this.ts = ts;        this.val = val;        this.current = current;        this.location = location;    }    public long getTs() {        return ts;    }    public void setTs(long ts) {        this.ts = ts;    }    public int getVal() {        return val;    }    public void setVal(int val) {        this.val = val;    }    public float getCurrent() {        return current;    }    public void setCurrent(float current) {        this.current = current;    }    public String getLocation() {        return location;    }    public void setLocation(String location) {        this.location = location;    }    @Override    public String toString() {        return "Sensor{" +                "ts=" + ts +                ", val=" + val +                ", current=" + current +                ", location='" + location + '\'' +                '}';    }}
复制代码

4) 主程序类 ReadFromTDengine

用途:调用 Flink 进行读取和写入数据

package com.taosdata;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import com.taosdata.model.Sensor;import org.slf4j.LoggerFactory;import org.slf4j.Logger;public class ReadFromTDengine {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource<Sensor> SensorList = env.addSource( new com.taosdata.flink.SourceFromTDengine() );        SensorList.print();        SensorList.addSink( new com.taosdata.flink.SinkToTDengine() );               env.execute();    }}
复制代码


简单测试 RESTful 接口

1) 环境准备:

a) Flink 安装 &启动:
b) TDengine Database 环境准备:
  • 创建原始数据: create database tt;create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));insert into beijing using meters tags(‘beijing’) values(now,220,30.2);

  • 创建目标数据库表: create database sinktest;create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));

2) 打包编译:

源码位置: https://github.com/liuyq-617/TD-Flink

mvn clean package

3) 程序启动:

flink run target/test-flink-1.0-SNAPSHOT-dist.jar

  • 读取数据 vi log/flink-root-taskexecutor-0-xxxxx.out 查看到数据打印:Sensor{ts=1645166073101, val=220, current=5.7, location=’beijing’}

  • 写入数据 show sinktest.tables; 已经创建了 beijing 子表 select * from sinktest.beijing; 可以查询到刚插入的数据


使用 JNI 方式

举一反三的小伙伴此时已经猜到,只要把 JDBC URL 修改一下就可以了。

但是 Flink 每次分派作业时都在使用一个新的 ClassLoader,而我们在计算节点上就会得到“Native library already loaded in another classloader”错误。

为了避免此问题,可以将 JDBC 的 jar 包放到 Flink 的 lib 目录下,不去调用 dist 包就可以了。

  • cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib

  • flink run target/test-flink-1.0-SNAPSHOT.jar


小结

通过在项目中引入 SourceFromTDengine 和 SinkToTDengine 两个类,即可完成在 Flink 中对 TDengine 的读写操作。后面我们会有文章介绍 Spark 和 TDengine 的对接。

注:文中使用的是 JDBC 的 RESTful 接口,这样就不用在 Flink 的节点安装 TDengine,JNI 方式需要在 Flink 节点安装 TDengine Database 的客户端。


想了解更多TDengine Database 的具体细节,欢迎大家在GitHub上查看相关源代码。


用户头像

TDengine

关注

高性能、分布式、支持SQL的时序数据库 2021.11.04 加入

官网:http://www.taosdata.com GitHub:https://github.com/taosdata/TDengine

评论

发布
暂无评论
【技术干货】代码示例:使用 Apache Flink 连接 TDengine_数据库_TDengine_InfoQ写作社区