Flink 的 sink 实战之四:自定义,Java 开发笔试题目
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
创建和数据库的 student 表对应的实体类 Student.java:
package com.bolingcavalry.customize;
public class Student {
private int id;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Student(String name, int age) {
this.name = name;
this.age = age;
}
}
创建自定义 sink 类 MySQLSinkFunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:
package com.bolingcavalry.customize;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class MySQLSinkFunction extends RichSinkFunction<Student> {
PreparedStatement preparedStatement;
private Connection connection;
private ReentrantLock reentrantLock = new ReentrantLock();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//准备数据库相关实例
buildPreparedStatement();
}
@Override
public void close() throws Exception {
super.close();
try{
if(null!=preparedStatement) {
preparedStatement.close();
preparedStatement = null;
}
} catch(Exception e) {
e.printStackTrace();
}
try{
if(null!=connection) {
connection.close();
connection = null;
}
} catch(Exception e) {
e.printStackTrace();
}
}
@Override
public void invoke(Student value, Context context) throws Exception {
preparedStatement.setString(1, value.getName());
preparedStatement.setInt(2, value.getAge());
preparedStatement.executeUpdate();
}
/**
准备好 connection 和 preparedStatement
获取 mysql 连接实例,考虑多线程同步,
不用 synchronize 是因为获取数据库连接是远程操作,耗时不确定
@return
*/
private void buildPreparedStatement() {
if(null==connection) {
boolean hasLock = false;
try {
hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);
if(hasLock) {
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
}
if(null!=connection) {
preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)");
}
} catch (Exception e) {
//生产环境慎用
e.printStackTrace();
} finally {
if(hasLock) {
reentrantLock.unlock();
}
}
}
}
}
上述代码很简单,只需要注意在创建连接的时候用到了锁来控制多线程同步,以及高版本 mysql 驱动对应的 driver 和 uri 的写法与以前 5.x 版本的区别;
创建任务类 StudentSink.java,用来创建一个 flink 任务,里面通过 ArrayList 创建了一个数据集,然后直接 addSink,为了看清 DAG,调用 disableChaining 方法取消了 operator chain:
package com.bolingcavalry.customize;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class StudentSink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为 1
env.setParallelism(1);
List<Student> list = new ArrayList<>();
list.add(new Student("aaa", 11));
list.add(new Student("bbb", 12));
list.add(new Student("ccc", 13));
list.add(new Student("ddd", 14));
list.add(new Student("eee", 15));
list.add(new Student("fff", 16));
env.fromCollection(list)
.addSink(new MySQLSinkFunction())
.disableChaining();
env.execute("sink demo : customize mysql obj");
}
}
在 flink web 页面提交任务,并设置任务类:
评论