写点什么

[BitSail] Connector 开发详解系列三:SourceReader

  • 2023-08-17
    浙江
  • 本文字数:8105 字

    阅读完需:约 27 分钟

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

Source Connector


本文将主要介绍负责数据读取的组件 SourceReader:

SourceReader

每个 SourceReader 都在独立的线程中执行,只要我们保证 SourceSplitCoordinator 分配给不同 SourceReader 的切片没有交集,在 SourceReader 的执行周期中,我们就可以不考虑任何有关并发的细节。


SourceReader 接口

public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {
void start();
void pollNext(SourcePipeline<T> pipeline) throws Exception;
void addSplits(List<SplitT> splits);
/*** Check source reader has more elements or not.*/boolean hasMoreElements();
/*** There will no more split will send to this source reader.* Source reader could be exited after process all assigned split.*/default void notifyNoMoreSplits() {
}
/*** Process all events which from { @link SourceSplitCoordinator}.*/default void handleSourceEvent(SourceEvent sourceEvent) { }
/*** Store the split to the external system to recover when task failed.*/List<SplitT> snapshotState(long checkpointId);
/*** When all tasks finished snapshot, notify checkpoint complete will be invoked.*/default void notifyCheckpointComplete(long checkpointId) throws Exception {
}
interface Context {
TypeInfo<?>[] getTypeInfos();
String[] getFieldNames();
int getIndexOfSubtask();
void sendSplitRequest(); }}
复制代码

构造方法

这里需要完成和数据源访问各种配置的提取,比如数据库库名表名、消息队列 cluster 和 topic、身份认证的配置等等。

示例

public RocketMQSourceReader(BitSailConfiguration readerConfiguration,                            Context context,                            Boundedness boundedness) {  this.readerConfiguration = readerConfiguration;  this.boundedness = boundedness;  this.context = context;  this.assignedRocketMQSplits = Sets.newHashSet();  this.finishedRocketMQSplits = Sets.newHashSet();  this.deserializationSchema = new RocketMQDeserializationSchema(      readerConfiguration,      context.getTypeInfos(),      context.getFieldNames());  this.noMoreSplits = false;
cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER); topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC); consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP); consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG); pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE); pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT); commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT); accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY); secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY);}
复制代码

start 方法

初始化数据源的访问对象,例如数据库的执行对象、消息队列的 consumer 对象或者文件系统的连接。

示例

消息队列


public void start() {  try {    if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) {      AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(          new SessionCredentials(accessKey, secretKey));      consumer = new DefaultMQPullConsumer(aclClientRPCHook);    } else {      consumer = new DefaultMQPullConsumer();    }
consumer.setConsumerGroup(consumerGroup); consumer.setNamesrvAddr(cluster); consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE, cluster, topic, consumerGroup, UUID.randomUUID())); consumer.setConsumerPullTimeoutMillis(pollTimeout); consumer.start(); } catch (Exception e) { throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e); }}
复制代码


数据库


public void start() {  this.connection = connectionHolder.connect();
// Construct statement. String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos); String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true); try { this.statement = connection.prepareStatement(querySql); } catch (SQLException e) { throw new RuntimeException("Failed to prepare statement.", e); }
LOG.info("Task {} started.", subTaskId);}
复制代码


FTP


public void start() {  this.ftpHandler.loginFtpServer();  if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) {    this.skipFirstLine = true;  }}
复制代码

addSplits 方法

将 SourceSplitCoordinator 给当前 Reader 分配的 Splits 列表添加到自己的处理队列(Queue)或者集合(Set)中。

示例

public void addSplits(List<RocketMQSplit> splits) {  LOG.info("Subtask {} received {}(s) new splits, splits = {}.",      context.getIndexOfSubtask(),      CollectionUtils.size(splits),      splits);
assignedRocketMQSplits.addAll(splits);}
复制代码

hasMoreElements 方法

在无界的流计算场景中,会一直返回 true 保证 Reader 线程不被销毁。


在批式场景中,分配给该 Reader 的切片处理完之后会返回 false,表示该 Reader 生命周期的结束。


public boolean hasMoreElements() {  if (boundedness == Boundedness.UNBOUNDEDNESS) {    return true;  }  if (noMoreSplits) {    return CollectionUtils.size(assignedRocketMQSplits) != 0;  }  return true;}
复制代码

pollNext 方法

在 addSplits 方法添加完成切片处理队列且 hasMoreElements 返回 true 时,该方法调用,开发者实现此方法真正和数据交互。


开发者在实现 pollNext 方法时候需要关注下列问题:


  • 切片数据的读取

  • 从构造好的切片中去读取数据。

  • 数据类型的转换

  • 将外部数据转换成 BitSail 的 Row 类型

示例

以 RocketMQSourceReader 为例:


从 split 队列中选取 split 进行处理,读取其信息,之后需要将读取到的信息转换成 BitSail 的 Row 类型,发送给下游处理。


public void pollNext(SourcePipeline<Row> pipeline) throws Exception {  for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) {    MessageQueue messageQueue = rocketmqSplit.getMessageQueue();    PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(),        consumerTag,        rocketmqSplit.getStartOffset(),        pollBatchSize,        pollTimeout);
if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) { continue; }
for (MessageExt message : pullResult.getMsgFoundList()) { Row deserialize = deserializationSchema.deserialize(message.getBody()); pipeline.output(deserialize); if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) { LOG.info("Subtask {} rocketmq split {} in end of stream.", context.getIndexOfSubtask(), rocketmqSplit); finishedRocketMQSplits.add(rocketmqSplit); break; } } rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset()); if (!commitInCheckpoint) { consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset()); } } assignedRocketMQSplits.removeAll(finishedRocketMQSplits);}
复制代码

转换为 BitSail Row 类型的常用方式

自定义 RowDeserializer 类

对于不同格式的列应用不同 converter,设置到相应 Row 的 Field。


public class ClickhouseRowDeserializer {
interface FiledConverter { Object apply(ResultSet resultSet) throws SQLException; }
private final List<FiledConverter> converters; private final int fieldSize;
public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) { this.fieldSize = typeInfos.length; this.converters = new ArrayList<>(); for (int i = 0; i < fieldSize; ++i) { converters.add(initFieldConverter(i + 1, typeInfos[i])); } }
public Row convert(ResultSet resultSet) { Row row = new Row(fieldSize); try { for (int i = 0; i < fieldSize; ++i) { row.setField(i, converters.get(i).apply(resultSet)); } } catch (SQLException e) { throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause()); } return row; }
private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) { if (!(typeInfo instanceof BasicTypeInfo)) { throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + " is not supported yet."); }
Class<?> curClass = typeInfo.getTypeClass(); if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getByte(index); } if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getShort(index); } if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getInt(index); } if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getLong(index); } if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> { BigDecimal dec = resultSet.getBigDecimal(index); return dec == null ? null : dec.toBigInteger(); }; } if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getFloat(index); } if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getDouble(index); } if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getBigDecimal(index); } if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getString(index); } if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getDate(index); } if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getTimestamp(index); } if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getTime(index); } if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getBoolean(index); } if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> null; } throw new UnsupportedOperationException("Unsupported data type: " + typeInfo); }}
复制代码
实现 DeserializationSchema 接口

相对于实现 RowDeserializer,我们更希望大家去实现一个继承 DeserializationSchema 接口的实现类,将一定类型格式的数据对数据比如 JSON、CSV 转换为 BitSail Row 类型。



在具体的应用时,我们可以使用统一的接口创建相应的实现类


public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private BitSailConfiguration deserializationConfiguration;
private TypeInfo<?>[] typeInfos;
private String[] fieldNames;
private transient DeserializationSchema<byte[], Row> deserializationSchema;
public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, TypeInfo<?>[] typeInfos, String[] fieldNames) { this.deserializationConfiguration = deserializationConfiguration; this.typeInfos = typeInfos; this.fieldNames = fieldNames; ContentType contentType = ContentType.valueOf( deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); switch (contentType) { case CSV: this.deserializationSchema = new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); break; case JSON: this.deserializationSchema = new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); break; default: throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); } }
@Override public Row deserialize(Writable message) { return deserializationSchema.deserialize((message.toString()).getBytes()); }
@Override public boolean isEndOfStream(Row nextElement) { return false; }}
复制代码


也可以自定义当前需要解析类专用的 DeserializationSchema:


public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private final BitSailConfiguration deserializationConfiguration;
private final transient DateTimeFormatter localDateTimeFormatter; private final transient DateTimeFormatter localDateFormatter; private final transient DateTimeFormatter localTimeFormatter; private final int fieldSize; private final TypeInfo<?>[] typeInfos; private final String[] fieldNames; private final List<DeserializationConverter> converters;
public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, TypeInfo<?>[] typeInfos, String[] fieldNames) {
this.deserializationConfiguration = deserializationConfiguration; this.typeInfos = typeInfos; this.fieldNames = fieldNames; this.localDateTimeFormatter = DateTimeFormatter.ofPattern( deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN)); this.localDateFormatter = DateTimeFormatter .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN)); this.localTimeFormatter = DateTimeFormatter .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN)); this.fieldSize = typeInfos.length; this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList()); }
@Override public Row deserialize(Writable message) { int arity = fieldNames.length; Row row = new Row(arity); Writable[] writables = ((ArrayWritable) message).get(); for (int i = 0; i < fieldSize; ++i) { row.setField(i, converters.get(i).convert(writables[i].toString())); } return row; }
@Override public boolean isEndOfStream(Row nextElement) { return false; }
private interface DeserializationConverter extends Serializable { Object convert(String input); }
private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) { Class<?> typeClass = typeInfo.getTypeClass();
if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) { return field -> null; } if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) { return this::convertToBoolean; } if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) { return this::convertToInt; } throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED, String.format("Csv format converter not support type info: %s.", typeInfo)); }
private boolean convertToBoolean(String field) { return Boolean.parseBoolean(field.trim()); }
private int convertToInt(String field) { return Integer.parseInt(field.trim()); }}
复制代码

snapshotState 方法

生成并保存 State 的快照信息,用于 ckeckpoint。

示例

public List<RocketMQSplit> snapshotState(long checkpointId) {  LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId);  if (commitInCheckpoint) {    for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) {      try {        consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset());        LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(),            rocketMQSplit.getMessageQueue(),            checkpointId);      } catch (MQClientException e) {        throw new RuntimeException(e);      }    }  }  return Lists.newArrayList(assignedRocketMQSplits);}
复制代码

hasMoreElements 方法

每次调用 pollNext 方法之前会做 sourceReader.hasMoreElements()的判断,当且仅当判断通过,pollNext 方法才会被调用。

示例

public boolean hasMoreElements() {  if (noMoreSplits) {    return CollectionUtils.size(assignedHadoopSplits) != 0;  }  return true;}
复制代码

notifyNoMoreSplits 方法

当 Reader 处理完所有切片之后,会调用此方法。

示例

public void notifyNoMoreSplits() {  LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask());  noMoreSplits = true;}
复制代码


【关于 BitSail】:


⭐️ Star 不迷路 https://github.com/bytedance/bitsail


提交问题和建议:https://github.com/bytedance/bitsail/issues


贡献代码:https://github.com/bytedance/bitsail/pulls


BitSail 官网:https://bytedance.github.io/bitsail/zh/


订阅邮件列表:bitsail+subscribe@googlegroups.com


加入 BitSail 技术社群



发布于: 刚刚阅读数: 3
用户头像

小助手微信号:Bytedance-data 2021-12-29 加入

字节跳动数据平台团队,赋能字节跳动各业务线,对内支持字节绝大多数业务线,对外发布了火山引擎品牌下的数据智能产品,服务行业企业客户。关注微信公众号:字节跳动数据平台(ID:byte-dataplatform)了解更多

评论

发布
暂无评论
[BitSail] Connector开发详解系列三:SourceReader_大数据_字节跳动数据平台_InfoQ写作社区