Pulsar 的 JDBC core sink connector 实现了将 Pulsar 中的数据转移到外部的支持 JDBC 的数据库的功能,这使得许多支持 JDBC 的数据库都能通过这个 core connector 实现自己的 connector 来完成数据的转移,如 mysql,sqlite,clickhouse 等等,本文将介绍 Pulsar JDBC core sink connector 的实现。
JDBC 的设置
所有集成并实现 JDBC core sink connector 的其他 JDBC sink connector,如 mariadb sink connector, clickhouse sink connector 等,都共有一套配置,配置的方法大体相同。
如,在创建 connector 的时候,可以提供如下的 JSON:
{
"userName": "clickhouse",
"password": "password",
"jdbcUrl": "jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink",
"tableName": "pulsar_clickhouse_jdbc_sink"
}
复制代码
可设置相应的用户名、密码、JDBC url、表名等等。除此之外,用户还可以设置消息中的结构化字段映射到表中的列名,更详细的配置可见:https://pulsar.apache.org/docs/en/io-jdbc-sink/#configuration
JDBC sink connector 实现
当 Pulsar connector 所对应的 topic 有数据时,则会调用 connector 中的 write 方法,将数据 sink 到外部的数据库中,但是 JDBC sink connector 并不会马上将数据写出去,而是做了一层缓存,以防止对外部数据库造成过大的压力。
@Override
public void write(Record<T> record) throws Exception {
int number;
synchronized (this) {
incomingList.add(record);
number = incomingList.size();
}
if (number == batchSize) {
flushExecutor.schedule(this::flush, 0, TimeUnit.MILLISECONDS);
}
}
复制代码
首先将进来的 record 写入到 incomingList 中,当这个 incomingList 达到用户所设定的一定大小阈值,则会被 flush 发送出去,flushExecutor 也会定时进行 flush,原理类似 batch message。
private void flush() {
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
synchronized (this) {
List<Record<T>> tmpList;
swapList.clear();
tmpList = swapList;
swapList = incomingList;
incomingList = tmpList;
}
int count = 0;
try {
// bind each record value
for (Record<T> record : swapList) {
String action = record.getProperties().get(ACTION);
if (action == null) {
action = INSERT;
}
switch (action) {
case DELETE:
bindValue(deleteStatement, record, action);
count += 1;
deleteStatement.execute();
break;
case UPDATE:
bindValue(updateStatement, record, action);
count += 1;
updateStatement.execute();
break;
case INSERT:
bindValue(insertStatement, record, action);
count += 1;
insertStatement.execute();
break;
default:
String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s",
action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT);
throw new IllegalArgumentException(msg);
}
}
connection.commit();
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception ", e);
swapList.forEach(Record::fail);
}
}
}
复制代码
在 flush 的时候,会根据在 incomingList 中的每一个 record 的 action 执行相应的语句,最后统一 commit。
在这中间,会调用 bindValue 的方法,bindValue 有多种实现方式,如下是 GenericRecord 的实现:
@Override
public void bindValue(PreparedStatement statement,
Record<GenericRecord> message, String action) throws Exception {
GenericRecord record = message.getValue();
List<ColumnId> columns = Lists.newArrayList();
if (action == null || action.equals(INSERT)) {
columns = tableDefinition.getColumns();
} else if (action.equals(DELETE)){
columns.addAll(tableDefinition.getKeyColumns());
} else if (action.equals(UPDATE)){
columns.addAll(tableDefinition.getNonKeyColumns());
columns.addAll(tableDefinition.getKeyColumns());
}
int index = 1;
for (ColumnId columnId : columns) {
String colName = columnId.getName();
int colType = columnId.getType();
if (log.isDebugEnabled()) {
log.debug("colName: {} colType: {}", colName, colType);
}
try {
Object obj = record.getField(colName);
if (obj != null) {
setColumnValue(statement, index++, obj);
} else {
if (log.isDebugEnabled()) {
log.debug("Column {} is null", colName);
}
setColumnNull(statement, index++, colType);
}
} catch (NullPointerException e) {
// With JSON schema field is omitted, so get NPE
// In this case we want to set column to Null
if (log.isDebugEnabled()) {
log.debug("Column {} is null", colName);
}
setColumnNull(statement, index++, colType);
}
}
}
复制代码
GenericRecord 的 bindValue 中,会根据用户所提供的表结构的列进行遍历,然后获取 GenericRecord 相应的字段值设置到语句中,完成了 pulsar 中的消息到 sql 中的数据的转换。
JDBC core connector 实现了许多功能,用户能够通过这个 connector 很方便地进行拓展实现自己的 Pulsar JDBC connector。
评论