Source Connector
本文将主要介绍创建、管理 Split 的角色 SplitCoordinator。
SourceSplitCoordinator
大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的 Split,SplitCoordinator 承担这个创建、管理 Split 的角色。
SourceSplitCoordinator 接口
public interface SourceSplitCoordinator<SplitT extends SourceSplit, StateT> extends Serializable, AutoCloseable {
void start();
void addReader(int subtaskId);
void addSplitsBack(List<SplitT> splits, int subtaskId);
void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}
StateT snapshotState() throws Exception;
default void notifyCheckpointComplete(long checkpointId) throws Exception {
}
void close();
interface Context<SplitT extends SourceSplit, StateT> {
boolean isRestored();
/**
* Return the state to the split coordinator, for the exactly-once.
*/
StateT getRestoreState();
/**
* Return total parallelism of the source reader.
*/
int totalParallelism();
/**
* When Source reader started, it will be registered itself to coordinator.
*/
Set<Integer> registeredReaders();
/**
* Assign splits to reader.
*/
void assignSplit(int subtaskId, List<SplitT> splits);
/**
* Mainly use in boundedness situation, represents there will no more split will send to source reader.
*/
void signalNoMoreSplits(int subtask);
/**
* If split coordinator have any event want to send source reader, use this method.
* Like send Pause event to Source Reader in CDC2.0.
*/
void sendEventToSourceReader(int subtaskId, SourceEvent event);
/**
* Schedule to run the callable and handler, often used in un-boundedness mode.
*/
<T> void runAsync(Callable<T> callable,
BiConsumer<T, Throwable> handler,
int initialDelay,
long interval);
/**
* Just run callable and handler once, often used in boundedness mode.
*/
<T> void runAsyncOnce(Callable<T> callable,
BiConsumer<T, Throwable> handler);
}
}
复制代码
构造方法
开发者在构造方法中一般主要进行一些配置的设置和分片信息存储的容器的创建。
以 ClickhouseSourceSplitCoordinator 的构造为例:
public ClickhouseSourceSplitCoordinator(SourceSplitCoordinator.Context<ClickhouseSourceSplit, EmptyState> context,
BitSailConfiguration jobConf) {
this.context = context;
this.jobConf = jobConf;
this.splitAssignmentPlan = Maps.newConcurrentMap();
}
复制代码
在自定义了 State 的场景中,需要对 checkpoint 时存储在 SourceSplitCoordinator.Context 的状态进行保存和恢复。
以 RocketMQSourceSplitCoordinator 为例:
public RocketMQSourceSplitCoordinator(
SourceSplitCoordinator.Context<RocketMQSplit, RocketMQState> context,
BitSailConfiguration jobConfiguration,
Boundedness boundedness) {
this.context = context;
this.jobConfiguration = jobConfiguration;
this.boundedness = boundedness;
this.discoveryInternal = jobConfiguration.get(RocketMQSourceOptions.DISCOVERY_INTERNAL);
this.pendingRocketMQSplitAssignment = Maps.newConcurrentMap();
this.discoveredPartitions = new HashSet<>();
if (context.isRestored()) {
RocketMQState restoreState = context.getRestoreState();
assignedPartitions = restoreState.getAssignedWithSplits();
discoveredPartitions.addAll(assignedPartitions.keySet());
} else {
assignedPartitions = Maps.newHashMap();
}
prepareConsumerProperties();
}
复制代码
start 方法
进行一些数据源所需分片元数据的提取工作,如果有抽象出来的 Split Assigner 类,一般在这里进行初始化。如果使用的是封装的 Split Assign 函数,这里会进行待分配切片的初始化工作。
流批一体场景
以 RocketMQSourceSplitCoordinator 为例:
private void prepareRocketMQConsumer() {
try {
consumer = RocketMQUtils.prepareRocketMQConsumer(jobConfiguration,
String.format(COORDINATOR_INSTANCE_NAME_TEMPLATE,
cluster, topic, consumerGroup, UUID.randomUUID()));
consumer.start();
} catch (Exception e) {
throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);
}
}
@Override
public void start() {
prepareRocketMQConsumer();
splitAssigner = new FairRocketMQSplitAssigner(jobConfiguration, assignedPartitions);
if (discoveryInternal > 0) {
context.runAsync(
this::fetchMessageQueues,
this::handleMessageQueueChanged,
0,
discoveryInternal
);
} else {
context.runAsyncOnce(
this::fetchMessageQueues,
this::handleMessageQueueChanged
);
}
}
复制代码
批式场景
以 ClickhouseSourceSplitCoordinator 为例:
public void start() {
List<ClickhouseSourceSplit> splitList;
try {
SimpleDivideSplitConstructor constructor = new SimpleDivideSplitConstructor(jobConf);
splitList = constructor.construct();
} catch (IOException e) {
ClickhouseSourceSplit split = new ClickhouseSourceSplit(0);
split.setReadTable(true);
splitList = Collections.singletonList(split);
LOG.error("Failed to construct splits, will directly read the table.", e);
}
int readerNum = context.totalParallelism();
LOG.info("Found {} readers and {} splits.", readerNum, splitList.size());
if (readerNum > splitList.size()) {
LOG.error("Reader number {} is larger than split number {}.", readerNum, splitList.size());
}
for (ClickhouseSourceSplit split : splitList) {
int readerIndex = ReaderSelector.getReaderIndex(readerNum);
splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex);
}
}
复制代码
Assigner
将划分好的切片分配给 Reader,开发过程中,我们通常让 SourceSplitCoordinator 专注于处理和 Reader 的通讯工作,实际 split 的分发逻辑一般封装在 Assigner 进行,这个 Assigner 可以是一个封装的 Split Assign 函数,也可以是一个抽象出来的 Split Assigner 类。
Assign 函数示例
以 ClickhouseSourceSplitCoordinator 为例:
tryAssignSplitsToReader 函数将存储在 splitAssignmentPlan 中的划分好的切片分配给相应的 Reader。
public class FairRocketMQSplitAssigner implements SplitAssigner<MessageQueue> {
private BitSailConfiguration readerConfiguration;
private AtomicInteger atomicInteger;
public Map<MessageQueue, String> rocketMQSplitIncrementMapping;
public FairRocketMQSplitAssigner(BitSailConfiguration readerConfiguration,
Map<MessageQueue, String> rocketMQSplitIncrementMapping) {
this.readerConfiguration = readerConfiguration;
this.rocketMQSplitIncrementMapping = rocketMQSplitIncrementMapping;
this.atomicInteger = new AtomicInteger(CollectionUtils
.size(rocketMQSplitIncrementMapping.keySet()));
}
@Override
public String assignSplitId(MessageQueue messageQueue) {
if (!rocketMQSplitIncrementMapping.containsKey(messageQueue)) {
rocketMQSplitIncrementMapping.put(messageQueue, String.valueOf(atomicInteger.getAndIncrement()));
}
return rocketMQSplitIncrementMapping.get(messageQueue);
}
@Override
public int assignToReader(String splitId, int totalParallelism) {
return splitId.hashCode() % totalParallelism;
}
}
复制代码
Assigner 方法示例
以 RocketMQSourceSplitCoordinator 为例:
public class FairRocketMQSplitAssigner implements SplitAssigner<MessageQueue> {
private BitSailConfiguration readerConfiguration;
private AtomicInteger atomicInteger;
public Map<MessageQueue, String> rocketMQSplitIncrementMapping;
public FairRocketMQSplitAssigner(BitSailConfiguration readerConfiguration,
Map<MessageQueue, String> rocketMQSplitIncrementMapping) {
this.readerConfiguration = readerConfiguration;
this.rocketMQSplitIncrementMapping = rocketMQSplitIncrementMapping;
this.atomicInteger = new AtomicInteger(CollectionUtils
.size(rocketMQSplitIncrementMapping.keySet()));
}
@Override
public String assignSplitId(MessageQueue messageQueue) {
if (!rocketMQSplitIncrementMapping.containsKey(messageQueue)) {
rocketMQSplitIncrementMapping.put(messageQueue, String.valueOf(atomicInteger.getAndIncrement()));
}
return rocketMQSplitIncrementMapping.get(messageQueue);
}
@Override
public int assignToReader(String splitId, int totalParallelism) {
return splitId.hashCode() % totalParallelism;
}
}
复制代码
addReader 方法
调用 Assigner,为 Reader 添加切片。
批式场景示例
以 ClickhouseSourceSplitCoordinator 为例:
public void addReader(int subtaskId) {
LOG.info("Found reader {}", subtaskId);
tryAssignSplitsToReader();
}
复制代码
流批一体场景示例
以 RocketMQSourceSplitCoordinator 为例:
private void notifyReaderAssignmentResult() {
Map<Integer, List<RocketMQSplit>> tmpRocketMQSplitAssignments = new HashMap<>();
for (Integer pendingAssignmentReader : pendingRocketMQSplitAssignment.keySet()) {
if (CollectionUtils.isNotEmpty(pendingRocketMQSplitAssignment.get(pendingAssignmentReader))
&& context.registeredReaders().contains(pendingAssignmentReader)) {
tmpRocketMQSplitAssignments.put(pendingAssignmentReader, Lists.newArrayList(pendingRocketMQSplitAssignment.get(pendingAssignmentReader)));
}
}
for (Integer pendingAssignmentReader : tmpRocketMQSplitAssignments.keySet()) {
LOG.info("Assigning splits to reader {}, splits = {}.", pendingAssignmentReader,
tmpRocketMQSplitAssignments.get(pendingAssignmentReader));
context.assignSplit(pendingAssignmentReader,
tmpRocketMQSplitAssignments.get(pendingAssignmentReader));
Set<RocketMQSplit> removes = pendingRocketMQSplitAssignment.remove(pendingAssignmentReader);
removes.forEach(removeSplit -> {
assignedPartitions.put(removeSplit.getMessageQueue(), removeSplit.getSplitId());
});
LOG.info("Assigned splits to reader {}", pendingAssignmentReader);
if (Boundedness.BOUNDEDNESS == boundedness) {
LOG.info("Signal reader {} no more splits assigned in future.", pendingAssignmentReader);
context.signalNoMoreSplits(pendingAssignmentReader);
}
}
}
@Override
public void addReader(int subtaskId) {
LOG.info(
"Adding reader {} to RocketMQ Split Coordinator for consumer group {}.",
subtaskId,
consumerGroup);
notifyReaderAssignmentResult();
}
复制代码
addSplitsBack 方法
对于一些 Reader 没有处理完的切片,进行重新分配,重新分配的策略可以自己定义,常用的策略是哈希取模,对于返回的 Split 列表中的所有 Split 进行重新分配后再 Assign 给不同的 Reader。
批式场景示例
以 ClickhouseSourceSplitCoordinator 为例:
ReaderSelector 使用哈希取模的策略对 Split 列表进行重分配。
tryAssignSplitsToReader 方法将重分配后的 Split 集合通过 Assigner 分配给 Reader。
public void addSplitsBack(List<ClickhouseSourceSplit> splits, int subtaskId) {
LOG.info("Source reader {} return splits {}.", subtaskId, splits);
int readerNum = context.totalParallelism();
for (ClickhouseSourceSplit split : splits) {
int readerIndex = ReaderSelector.getReaderIndex(readerNum);
splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex);
}
tryAssignSplitsToReader();
}
复制代码
流批一体场景示例
以 RocketMQSourceSplitCoordinator 为例:
addSplitChangeToPendingAssignment 使用哈希取模的策略对 Split 列表进行重分配。notifyReaderAssignmentResult 将重分配后的 Split 集合通过 Assigner 分配给 Reader。
private synchronized void addSplitChangeToPendingAssignment(Set<RocketMQSplit> newRocketMQSplits) {
int numReader = context.totalParallelism();
for (RocketMQSplit split : newRocketMQSplits) {
int readerIndex = splitAssigner.assignToReader(split.getSplitId(), numReader);
pendingRocketMQSplitAssignment.computeIfAbsent(readerIndex, r -> new HashSet<>())
.add(split);
}
LOG.debug("RocketMQ splits {} finished assignment.", newRocketMQSplits);
}
@Override
public void addSplitsBack(List<RocketMQSplit> splits, int subtaskId) {
LOG.info("Source reader {} return splits {}.", subtaskId, splits);
addSplitChangeToPendingAssignment(new HashSet<>(splits));
notifyReaderAssignmentResult();
}
复制代码
snapshotState 方法
存储处理切片的快照信息,用于恢复时在构造方法中使用。
public RocketMQState snapshotState() throws Exception {
return new RocketMQState(assignedPartitions);
}
复制代码
close 方法
关闭在分片过程中与数据源交互读取元数据信息的所有未关闭连接器。
public void close() {
if (consumer != null) {
consumer.shutdown();
}
}
复制代码
About BitSail:
⭐️ Star 不迷路 https://github.com/bytedance/bitsail
提交问题和建议:https://github.com/bytedance/bitsail/issues
贡献代码:https://github.com/bytedance/bitsail/pulls
BitSail 官网:https://bytedance.github.io/bitsail/zh/
订阅邮件列表:bitsail+subscribe@googlegroups.com
加入 BitSail 技术社群
评论