写点什么

Flink 源码分析之 Flink startupMode 是如何起作用的

用户头像
shengjk1
关注
发布于: 2020 年 06 月 07 日

之前一直有个疑问,如果consumer.setStartFromLatest()以及kafkaProperties.put("auto.offset.reset", "earliest")同时存在,究竟哪一个会起作用,答案肯定是consumer.setStartFromLatest(),为什么呢?我们一起来看一下

@Override
public void open(Configuration configuration) throws Exception {
// determine the offset commit mode,区分ON_CHECKPOINTS、DISABLED or KAFKA_PERIODIC,本文主要针对ON_CHECKPOINTS
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
// create the kafka partition discoverer
this.partitionDiscoverer = createPartitionDiscoverer(
topicsDescriptor,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
this.partitionDiscoverer.open();
subscribedPartitionsToStartOffsets = new HashMap<>();
//获取fixed topic's or topic pattern 's partitions of this subtask
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
//从checkpoint中恢复
if (restoredState != null) {
for (KafkaTopicPartition partition : allPartitions) {
//新的分区(未曾在checkpoint中的分区将从earliest offset 开始消费),old partition已经从checkpoint中恢复了,并且已经保存在subscribedPartitionsToStartOffsets
if (!restoredState.containsKey(partition)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
if (!restoredFromOldState) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
} else {
// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
// in this case, just use the restored state as the subscribed partitions
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}
if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
LOG.warn(
"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
entry.getKey());
return true;
}
return false;
});
}
LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
} else {
// use the partition discoverer to fetch the initial seed partitions,
// and set their initial offsets depending on the startup mode.
// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
// when the partition is actually read.
switch (startupMode) {
case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
", but no specific offsets were specified.");
}
for (KafkaTopicPartition seedPartition : allPartitions) {
//指定partition的offset,从指定的offset卡开始,未指定的从group_offset开始
Long specificOffset = specificStartupOffsets.get(seedPartition);
if (specificOffset != null) {
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
} else {
// default to group offset behaviour if the user-provided specific offsets
// do not contain a value for this partition
//对应的startupMode也存储到 subscribedPartitionsToStartOffsets中
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}
break;
case TIMESTAMP:
if (startupOffsetsTimestamp == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
", but no startup timestamp was specified.");
}
for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
subscribedPartitionsToStartOffsets.put(
partitionToOffset.getKey(),
(partitionToOffset.getValue() == null)
// if an offset cannot be retrieved for a partition with the given timestamp,
// we default to using the latest offset for the partition
? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
: partitionToOffset.getValue() - 1);
}
break;
default:
//默认GROUP_OFFSETS
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
}
if (!subscribedPartitionsToStartOffsets.isEmpty()) {
switch (startupMode) {
case EARLIEST:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case LATEST:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case TIMESTAMP:
LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
startupOffsetsTimestamp,
subscribedPartitionsToStartOffsets.keySet());
break;
case SPECIFIC_OFFSETS:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
specificStartupOffsets,
subscribedPartitionsToStartOffsets.keySet());
List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
}
}
if (partitionsDefaultedToGroupOffsets.size() > 0) {
LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
getRuntimeContext().getIndexOfThisSubtask(),
partitionsDefaultedToGroupOffsets.size(),
partitionsDefaultedToGroupOffsets);
}
break;
case GROUP_OFFSETS:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
}
} else {
LOG.info("Consumer subtask {} initially has no partitions to read from.",
getRuntimeContext().getIndexOfThisSubtask());
}
}

open方法主要是将user指定的topic和对应的partition、offset,存储到Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets中,接下来看flink 消费kafka的入口方法



@Override
//入口方法 start a source
public void run(SourceContext<T> sourceContext) throws Exception {
if (subscribedPartitionsToStartOffsets == null) {
throw new Exception("The partitions were not set for the consumer");
}
// initialize commit metrics and default offset callback method
this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
this.offsetCommitCallback = new KafkaCommitCallback() {
@Override
public void onSuccess() {
successfulCommits.inc();
}
@Override
public void onException(Throwable cause) {
LOG.warn("Async Kafka commit failed.", cause);
failedCommits.inc();
}
};
// mark the subtask as temporarily idle if there are no initial seed partitions;
// once this subtask discovers some partitions and starts collecting records, the subtask's
// status will automatically be triggered back to be active.
if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
// - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
// Kafka through the fetcher, if configured to do so)
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
if (!running) {
return;
}
// depending on whether we were restored with the current state version (1.3),
// remaining logic branches off into 2 paths:
// 1) New state - partition discovery loop executed as separate thread, with this
// thread running the main fetcher loop
// 2) Old state - partition discovery is disabled and only the main fetcher loop is executed
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
kafkaFetcher.runFetchLoop();
} else {
runWithPartitionDiscovery();
}
}

createFetcher传入了刚才的subscribedPartitionsToStartOffsets,继续往下走,在创建KafkaFetcher对象的时候,作为构造函数的,最后传到了AbstractFetcher构造器

protected AbstractFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
this.sourceContext = checkNotNull(sourceContext);
this.checkpointLock = sourceContext.getCheckpointLock();
this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
this.useMetrics = useMetrics;
this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP);
this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP);
// figure out what we watermark mode we will be using
this.watermarksPeriodic = watermarksPeriodic;
this.watermarksPunctuated = watermarksPunctuated;
if (watermarksPeriodic == null) {
if (watermarksPunctuated == null) {
// simple case, no watermarks involved
timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
} else {
timestampWatermarkMode = PUNCTUATED_WATERMARKS;
}
} else {
if (watermarksPunctuated == null) {
timestampWatermarkMode = PERIODIC_WATERMARKS;
} else {
throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
}
}
this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
// initialize subscribed partition states with seed partitions,根据有无timestamp / watermark
//subscribedPartitionStates 持有了List<KafkaTopicPartitionState<KPH>>,KafkaTopicPartitionState包括kafkaTopicPartition offset等信息
this.subscribedPartitionStates = createPartitionStateHolders(
seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarksPeriodic,
watermarksPunctuated,
userCodeClassLoader);
// check that all seed partition states have a defined offset
//无论是从checkpoint中恢复也好,还是从kafkaConsumer.set...设置也好都需要有initial offset
for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
}
}
// all seed partitions are not assigned yet, so should be added to the unassigned partitions queue
//到目前为止consumer并未指定partition
for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) {
unassignedPartitionsQueue.add(partition);
}
// register metrics for the initial seed partitions
if (useMetrics) {
registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates);
}
// if we have periodic watermarks, kick off the interval scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
@SuppressWarnings("unchecked")
PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(
subscribedPartitionStates,
sourceContext,
processingTimeProvider,
autoWatermarkInterval);
periodicEmitter.start();
}
}

然后从AbstractFetch的子类KafkaFetch的构造器我们可以知道,unassignedPartitionsQueue又传递给了KafkaConsumerThread



public KafkaFetcher(
SourceFunction.SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
KafkaDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
super(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
consumerMetricGroup,
useMetrics);
this.deserializer = deserializer;
this.handover = new Handover();
this.consumerThread = new KafkaConsumerThread(
LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
}

当KafkaConsumerThread 开始start的时候,也就是KafkaConsumerThread run方法

......
try {
//hasAssignedPartitions default false
//当发现新的partition的时候,会add到unassignedPartitionsQueue和subscribedPartitionsToStartOffsets
if (hasAssignedPartitions) {
newPartitions = unassignedPartitionsQueue.pollBatch();
}
else {
// if no assigned partitions block until we get at least one
// instead of hot spinning this loop. We rely on a fact that
// unassignedPartitionsQueue will be closed on a shutdown, so
// we don't block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
//由于unassignedPartitionsQueue是有数据的,所以newPartitions != null 为true,会执行reassignPartitions方法
if (newPartitions != null) {
reassignPartitions(newPartitions);
}
} catch (AbortedReassignmentException e) {
continue;
}
......



void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
if (newPartitions.size() == 0) {
return;
}
hasAssignedPartitions = true;
boolean reassignmentStarted = false;
// since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,
// the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and shutdown()
// until the reassignment is complete.
final KafkaConsumer<byte[], byte[]> consumerTmp;
synchronized (consumerReassignmentLock) {
//将consumer的引用赋值给consumerTmp
consumerTmp = this.consumer;
this.consumer = null;
}
final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>();
try {
/* 之所有会有newPartition和oldPartition是因为当我们配置了KEYPARTITIONDISCOVERYINTERVALMILLIS,每个固定时间会判断是否新加了partition,如果新加了,会将新加的partition添加到unassignedPartitionsQueue中
*/
for (TopicPartition oldPartition : consumerTmp.assignment()) {
oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition));
}
final List<TopicPartition> newPartitionAssignments =
new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size());
newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));
// reassign with the new partitions
consumerTmp.assign(newPartitionAssignments);
reassignmentStarted = true;
// old partitions should be seeked to their previous position
for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
consumerTmp.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
}
// offsets in the state of new partitions may still be placeholder sentinel values if we are:
// (1) starting fresh,
// (2) checkpoint / savepoint state we were restored with had not completely
// been replaced with actual offset values yet, or
// (3) the partition was newly discovered after startup;
// replace those with actual offsets, according to what the sentinel value represent.
//kafka中配置关于offset的参数是不起作用的,还是依赖于startupMode
//根据getOffset的类型,consumer指定开始消费的offset,而offset的类型呢,我们知道来源于startupMode
for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the consumer position
// to the committed group offset, so we do not need to do it.
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else {
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
}
}
} catch (WakeupException e) {
// a WakeupException may be thrown if the consumer was invoked wakeup()
// before it was isolated for the reassignment. In this case, we abort the
// reassignment and just re-expose the original consumer.
synchronized (consumerReassignmentLock) {
this.consumer = consumerTmp;
// if reassignment had already started and affected the consumer,
// we do a full roll back so that it is as if it was left untouched
if (reassignmentStarted) {
this.consumer.assign(new ArrayList<>(oldPartitionAssignmentsToPosition.keySet()));
for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
this.consumer.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
}
}
// no need to restore the wakeup state in this case,
// since only the last wakeup call is effective anyways
hasBufferedWakeup = false;
// re-add all new partitions back to the unassigned partitions queue to be picked up again
for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) {
unassignedPartitionsQueue.add(newPartition);
}
// this signals the main fetch loop to continue through the loop
throw new AbortedReassignmentException();
}
}
// reassignment complete; expose the reassigned consumer
synchronized (consumerReassignmentLock) {
this.consumer = consumerTmp;
// restore wakeup state for the consumer if necessary
if (hasBufferedWakeup) {
this.consumer.wakeup();
hasBufferedWakeup = false;
}
}
}



发布于: 2020 年 06 月 07 日阅读数: 93
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
Flink源码分析之Flink startupMode是如何起作用的