写点什么

Flink 的 sink 实战之四:自定义,Java 开发笔试题目

作者:Java高工P7
  • 2021 年 11 月 09 日
  • 本文字数:1886 字

    阅读完需:约 6 分钟

<dependency>


<groupId>mysql</groupId>


<artifactId>mysql-connector-java</artifactId>


<version>8.0.11</version>


</dependency>


  1. 创建和数据库的 student 表对应的实体类 Student.java:


package com.bolingcavalry.customize;


public class Student {


private int id;


private String name;


private int age;


public int getId() {


return id;


}


public void setId(int id) {


this.id = id;


}


public String getName() {


return name;


}


public void setName(String name) {


this.name = name;


}


public int getAge() {


return age;


}


public void setAge(int age) {


this.age = age;


}


public Student(String name, int age) {


this.name = name;


this.age = age;


}


}


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


  1. 创建自定义 sink 类 MySQLSinkFunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:


package com.bolingcavalry.customize;


import org.apache.flink.configuration.Configuration;


import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


import java.sql.Connection;


import java.sql.DriverManager;


import java.sql.PreparedStatement;


import java.util.concurrent.TimeUnit;


import java.util.concurrent.locks.ReentrantLock;


public class MySQLSinkFunction extends RichSinkFunction<Student> {


PreparedStatement preparedStatement;


private Connection connection;


private ReentrantLock reentrantLock = new ReentrantLock();


@Override


public void open(Configuration parameters) throws Exception {


super.open(parameters);


//准备数据库相关实例


buildPreparedStatement();


}


@Override


public void close() throws Exception {


super.close();


try{


if(null!=preparedStatement) {


preparedStatement.close();


preparedStatement = null;


}


} catch(Exception e) {


e.printStackTrace();


}


try{


if(null!=connection) {


connection.close();


connection = null;


}


} catch(Exception e) {


e.printStackTrace();


}


}


@Override


public void invoke(Student value, Context context) throws Exception {


preparedStatement.setString(1, value.getName());


preparedStatement.setInt(2, value.getAge());


preparedStatement.executeUpdate();


}


/**


  • 准备好 connection 和 preparedStatement

  • 获取 mysql 连接实例,考虑多线程同步,

  • 不用 synchronize 是因为获取数据库连接是远程操作,耗时不确定

  • @return


*/


private void buildPreparedStatement() {


if(null==connection) {


boolean hasLock = false;


try {


hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);


if(hasLock) {


Class.forName("com.mysql.cj.jdbc.Driver");


connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");


}


if(null!=connection) {


preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)");


}


} catch (Exception e) {


//生产环境慎用


e.printStackTrace();


} finally {


if(hasLock) {


reentrantLock.unlock();


}


}


}


}


}


  1. 上述代码很简单,只需要注意在创建连接的时候用到了锁来控制多线程同步,以及高版本 mysql 驱动对应的 driver 和 uri 的写法与以前 5.x 版本的区别;

  2. 创建任务类 StudentSink.java,用来创建一个 flink 任务,里面通过 ArrayList 创建了一个数据集,然后直接 addSink,为了看清 DAG,调用 disableChaining 方法取消了 operator chain:


package com.bolingcavalry.customize;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


import java.util.ArrayList;


import java.util.List;


public class StudentSink {


public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


//并行度为 1


env.setParallelism(1);


List<Student> list = new ArrayList<>();


list.add(new Student("aaa", 11));


list.add(new Student("bbb", 12));


list.add(new Student("ccc", 13));


list.add(new Student("ddd", 14));


list.add(new Student("eee", 15));


list.add(new Student("fff", 16));


env.fromCollection(list)


.addSink(new MySQLSinkFunction())


.disableChaining();


env.execute("sink demo : customize mysql obj");


}


}


  1. 在 flink web 页面提交任务,并设置任务类:

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Flink的sink实战之四:自定义,Java开发笔试题目