[BitSail] Connector 开发详解系列三:SourceReader
- 2023-08-17 浙江
本文字数:8105 字
阅读完需:约 27 分钟
更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群
Source Connector
本文将主要介绍负责数据读取的组件 SourceReader:
SourceReader
每个 SourceReader 都在独立的线程中执行,只要我们保证 SourceSplitCoordinator 分配给不同 SourceReader 的切片没有交集,在 SourceReader 的执行周期中,我们就可以不考虑任何有关并发的细节。
SourceReader 接口
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {
void start();
void pollNext(SourcePipeline<T> pipeline) throws Exception;
void addSplits(List<SplitT> splits);
/*** Check source reader has more elements or not.*/boolean hasMoreElements();
/*** There will no more split will send to this source reader.* Source reader could be exited after process all assigned split.*/default void notifyNoMoreSplits() {
}
/*** Process all events which from { @link SourceSplitCoordinator}.*/default void handleSourceEvent(SourceEvent sourceEvent) { }
/*** Store the split to the external system to recover when task failed.*/List<SplitT> snapshotState(long checkpointId);
/*** When all tasks finished snapshot, notify checkpoint complete will be invoked.*/default void notifyCheckpointComplete(long checkpointId) throws Exception {
}
interface Context {
TypeInfo<?>[] getTypeInfos();
String[] getFieldNames();
int getIndexOfSubtask();
void sendSplitRequest(); }}
构造方法
这里需要完成和数据源访问各种配置的提取,比如数据库库名表名、消息队列 cluster 和 topic、身份认证的配置等等。
示例
public RocketMQSourceReader(BitSailConfiguration readerConfiguration, Context context, Boundedness boundedness) { this.readerConfiguration = readerConfiguration; this.boundedness = boundedness; this.context = context; this.assignedRocketMQSplits = Sets.newHashSet(); this.finishedRocketMQSplits = Sets.newHashSet(); this.deserializationSchema = new RocketMQDeserializationSchema( readerConfiguration, context.getTypeInfos(), context.getFieldNames()); this.noMoreSplits = false;
cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER); topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC); consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP); consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG); pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE); pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT); commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT); accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY); secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY);}
start 方法
初始化数据源的访问对象,例如数据库的执行对象、消息队列的 consumer 对象或者文件系统的连接。
示例
消息队列
public void start() { try { if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) { AclClientRPCHook aclClientRPCHook = new AclClientRPCHook( new SessionCredentials(accessKey, secretKey)); consumer = new DefaultMQPullConsumer(aclClientRPCHook); } else { consumer = new DefaultMQPullConsumer(); }
consumer.setConsumerGroup(consumerGroup); consumer.setNamesrvAddr(cluster); consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE, cluster, topic, consumerGroup, UUID.randomUUID())); consumer.setConsumerPullTimeoutMillis(pollTimeout); consumer.start(); } catch (Exception e) { throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e); }}
数据库
public void start() { this.connection = connectionHolder.connect();
// Construct statement. String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos); String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true); try { this.statement = connection.prepareStatement(querySql); } catch (SQLException e) { throw new RuntimeException("Failed to prepare statement.", e); }
LOG.info("Task {} started.", subTaskId);}
FTP
public void start() { this.ftpHandler.loginFtpServer(); if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) { this.skipFirstLine = true; }}
addSplits 方法
将 SourceSplitCoordinator 给当前 Reader 分配的 Splits 列表添加到自己的处理队列(Queue)或者集合(Set)中。
示例
public void addSplits(List<RocketMQSplit> splits) { LOG.info("Subtask {} received {}(s) new splits, splits = {}.", context.getIndexOfSubtask(), CollectionUtils.size(splits), splits);
assignedRocketMQSplits.addAll(splits);}
hasMoreElements 方法
在无界的流计算场景中,会一直返回 true 保证 Reader 线程不被销毁。
在批式场景中,分配给该 Reader 的切片处理完之后会返回 false,表示该 Reader 生命周期的结束。
public boolean hasMoreElements() { if (boundedness == Boundedness.UNBOUNDEDNESS) { return true; } if (noMoreSplits) { return CollectionUtils.size(assignedRocketMQSplits) != 0; } return true;}
pollNext 方法
在 addSplits 方法添加完成切片处理队列且 hasMoreElements 返回 true 时,该方法调用,开发者实现此方法真正和数据交互。
开发者在实现 pollNext 方法时候需要关注下列问题:
切片数据的读取
从构造好的切片中去读取数据。
数据类型的转换
将外部数据转换成 BitSail 的 Row 类型
示例
以 RocketMQSourceReader 为例:
从 split 队列中选取 split 进行处理,读取其信息,之后需要将读取到的信息转换成 BitSail 的 Row 类型,发送给下游处理。
public void pollNext(SourcePipeline<Row> pipeline) throws Exception { for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) { MessageQueue messageQueue = rocketmqSplit.getMessageQueue(); PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(), consumerTag, rocketmqSplit.getStartOffset(), pollBatchSize, pollTimeout);
if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) { continue; }
for (MessageExt message : pullResult.getMsgFoundList()) { Row deserialize = deserializationSchema.deserialize(message.getBody()); pipeline.output(deserialize); if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) { LOG.info("Subtask {} rocketmq split {} in end of stream.", context.getIndexOfSubtask(), rocketmqSplit); finishedRocketMQSplits.add(rocketmqSplit); break; } } rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset()); if (!commitInCheckpoint) { consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset()); } } assignedRocketMQSplits.removeAll(finishedRocketMQSplits);}
转换为 BitSail Row 类型的常用方式
自定义 RowDeserializer 类
对于不同格式的列应用不同 converter,设置到相应 Row 的 Field。
public class ClickhouseRowDeserializer {
interface FiledConverter { Object apply(ResultSet resultSet) throws SQLException; }
private final List<FiledConverter> converters; private final int fieldSize;
public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) { this.fieldSize = typeInfos.length; this.converters = new ArrayList<>(); for (int i = 0; i < fieldSize; ++i) { converters.add(initFieldConverter(i + 1, typeInfos[i])); } }
public Row convert(ResultSet resultSet) { Row row = new Row(fieldSize); try { for (int i = 0; i < fieldSize; ++i) { row.setField(i, converters.get(i).apply(resultSet)); } } catch (SQLException e) { throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause()); } return row; }
private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) { if (!(typeInfo instanceof BasicTypeInfo)) { throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + " is not supported yet."); }
Class<?> curClass = typeInfo.getTypeClass(); if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getByte(index); } if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getShort(index); } if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getInt(index); } if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getLong(index); } if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> { BigDecimal dec = resultSet.getBigDecimal(index); return dec == null ? null : dec.toBigInteger(); }; } if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getFloat(index); } if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getDouble(index); } if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getBigDecimal(index); } if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getString(index); } if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getDate(index); } if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getTimestamp(index); } if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getTime(index); } if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getBoolean(index); } if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> null; } throw new UnsupportedOperationException("Unsupported data type: " + typeInfo); }}
实现 DeserializationSchema 接口
相对于实现 RowDeserializer,我们更希望大家去实现一个继承 DeserializationSchema 接口的实现类,将一定类型格式的数据对数据比如 JSON、CSV 转换为 BitSail Row 类型。
在具体的应用时,我们可以使用统一的接口创建相应的实现类
public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private BitSailConfiguration deserializationConfiguration;
private TypeInfo<?>[] typeInfos;
private String[] fieldNames;
private transient DeserializationSchema<byte[], Row> deserializationSchema;
public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, TypeInfo<?>[] typeInfos, String[] fieldNames) { this.deserializationConfiguration = deserializationConfiguration; this.typeInfos = typeInfos; this.fieldNames = fieldNames; ContentType contentType = ContentType.valueOf( deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); switch (contentType) { case CSV: this.deserializationSchema = new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); break; case JSON: this.deserializationSchema = new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); break; default: throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); } }
@Override public Row deserialize(Writable message) { return deserializationSchema.deserialize((message.toString()).getBytes()); }
@Override public boolean isEndOfStream(Row nextElement) { return false; }}
也可以自定义当前需要解析类专用的 DeserializationSchema:
public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private final BitSailConfiguration deserializationConfiguration;
private final transient DateTimeFormatter localDateTimeFormatter; private final transient DateTimeFormatter localDateFormatter; private final transient DateTimeFormatter localTimeFormatter; private final int fieldSize; private final TypeInfo<?>[] typeInfos; private final String[] fieldNames; private final List<DeserializationConverter> converters;
public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, TypeInfo<?>[] typeInfos, String[] fieldNames) {
this.deserializationConfiguration = deserializationConfiguration; this.typeInfos = typeInfos; this.fieldNames = fieldNames; this.localDateTimeFormatter = DateTimeFormatter.ofPattern( deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN)); this.localDateFormatter = DateTimeFormatter .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN)); this.localTimeFormatter = DateTimeFormatter .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN)); this.fieldSize = typeInfos.length; this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList()); }
@Override public Row deserialize(Writable message) { int arity = fieldNames.length; Row row = new Row(arity); Writable[] writables = ((ArrayWritable) message).get(); for (int i = 0; i < fieldSize; ++i) { row.setField(i, converters.get(i).convert(writables[i].toString())); } return row; }
@Override public boolean isEndOfStream(Row nextElement) { return false; }
private interface DeserializationConverter extends Serializable { Object convert(String input); }
private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) { Class<?> typeClass = typeInfo.getTypeClass();
if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) { return field -> null; } if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) { return this::convertToBoolean; } if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) { return this::convertToInt; } throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED, String.format("Csv format converter not support type info: %s.", typeInfo)); }
private boolean convertToBoolean(String field) { return Boolean.parseBoolean(field.trim()); }
private int convertToInt(String field) { return Integer.parseInt(field.trim()); }}
snapshotState 方法
生成并保存 State 的快照信息,用于 ckeckpoint。
示例
public List<RocketMQSplit> snapshotState(long checkpointId) { LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId); if (commitInCheckpoint) { for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) { try { consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset()); LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(), rocketMQSplit.getMessageQueue(), checkpointId); } catch (MQClientException e) { throw new RuntimeException(e); } } } return Lists.newArrayList(assignedRocketMQSplits);}
hasMoreElements 方法
每次调用 pollNext 方法之前会做 sourceReader.hasMoreElements()的判断,当且仅当判断通过,pollNext 方法才会被调用。
示例
public boolean hasMoreElements() { if (noMoreSplits) { return CollectionUtils.size(assignedHadoopSplits) != 0; } return true;}
notifyNoMoreSplits 方法
当 Reader 处理完所有切片之后,会调用此方法。
示例
public void notifyNoMoreSplits() { LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask()); noMoreSplits = true;}
【关于 BitSail】:
⭐️ Star 不迷路 https://github.com/bytedance/bitsail
提交问题和建议:https://github.com/bytedance/bitsail/issues
贡献代码:https://github.com/bytedance/bitsail/pulls
BitSail 官网:https://bytedance.github.io/bitsail/zh/
订阅邮件列表:bitsail+subscribe@googlegroups.com
加入 BitSail 技术社群
版权声明: 本文为 InfoQ 作者【字节跳动数据平台】的原创文章。
原文链接:【http://xie.infoq.cn/article/5a454455c5a0d940f406e3610】。文章转载请联系作者。
字节跳动数据平台
小助手微信号:Bytedance-data 2021-12-29 加入
字节跳动数据平台团队,赋能字节跳动各业务线,对内支持字节绝大多数业务线,对外发布了火山引擎品牌下的数据智能产品,服务行业企业客户。关注微信公众号:字节跳动数据平台(ID:byte-dataplatform)了解更多










评论