Flink 源码分析之 Flink startupMode 是如何起作用的
发布于: 2020 年 06 月 07 日
之前一直有个疑问,如果consumer.setStartFromLatest()以及kafkaProperties.put("auto.offset.reset", "earliest")同时存在,究竟哪一个会起作用,答案肯定是consumer.setStartFromLatest(),为什么呢?我们一起来看一下
@Override public void open(Configuration configuration) throws Exception { // determine the offset commit mode,区分ON_CHECKPOINTS、DISABLED or KAFKA_PERIODIC,本文主要针对ON_CHECKPOINTS this.offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()); // create the kafka partition discoverer this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); this.partitionDiscoverer.open(); subscribedPartitionsToStartOffsets = new HashMap<>(); //获取fixed topic's or topic pattern 's partitions of this subtask final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions(); //从checkpoint中恢复 if (restoredState != null) { for (KafkaTopicPartition partition : allPartitions) { //新的分区(未曾在checkpoint中的分区将从earliest offset 开始消费),old partition已经从checkpoint中恢复了,并且已经保存在subscribedPartitionsToStartOffsets if (!restoredState.containsKey(partition)) { restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); } } for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) { if (!restoredFromOldState) { // seed the partition discoverer with the union state while filtering out // restored partitions that should not be subscribed by this subtask if (KafkaTopicPartitionAssigner.assign( restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks()) == getRuntimeContext().getIndexOfThisSubtask()){ subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue()); } } else { // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state; // in this case, just use the restored state as the subscribed partitions subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue()); } } if (filterRestoredPartitionsWithCurrentTopicsDescriptor) { subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> { if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) { LOG.warn( "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.", entry.getKey()); return true; } return false; }); } LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); } else { // use the partition discoverer to fetch the initial seed partitions, // and set their initial offsets depending on the startup mode. // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now; // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined // when the partition is actually read. switch (startupMode) { case SPECIFIC_OFFSETS: if (specificStartupOffsets == null) { throw new IllegalStateException( "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + ", but no specific offsets were specified."); } for (KafkaTopicPartition seedPartition : allPartitions) { //指定partition的offset,从指定的offset卡开始,未指定的从group_offset开始 Long specificOffset = specificStartupOffsets.get(seedPartition); if (specificOffset != null) { // since the specified offsets represent the next record to read, we subtract // it by one so that the initial state of the consumer will be correct subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); } else { // default to group offset behaviour if the user-provided specific offsets // do not contain a value for this partition //对应的startupMode也存储到 subscribedPartitionsToStartOffsets中subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); } } break; case TIMESTAMP: if (startupOffsetsTimestamp == null) { throw new IllegalStateException( "Startup mode for the consumer set to " + StartupMode.TIMESTAMP + ", but no startup timestamp was specified."); } for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) { subscribedPartitionsToStartOffsets.put( partitionToOffset.getKey(), (partitionToOffset.getValue() == null) // if an offset cannot be retrieved for a partition with the given timestamp, // we default to using the latest offset for the partition ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET // since the specified offsets represent the next record to read, we subtract // it by one so that the initial state of the consumer will be correct : partitionToOffset.getValue() - 1); } break; default: //默认GROUP_OFFSETS for (KafkaTopicPartition seedPartition : allPartitions) { subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel()); } } if (!subscribedPartitionsToStartOffsets.isEmpty()) { switch (startupMode) { case EARLIEST: LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); break; case LATEST: LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); break; case TIMESTAMP: LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), startupOffsetsTimestamp, subscribedPartitionsToStartOffsets.keySet()); break; case SPECIFIC_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), specificStartupOffsets, subscribedPartitionsToStartOffsets.keySet()); List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size()); for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey()); } } if (partitionsDefaultedToGroupOffsets.size() > 0) { LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" + "; their startup offsets will be defaulted to their committed group offsets in Kafka.", getRuntimeContext().getIndexOfThisSubtask(), partitionsDefaultedToGroupOffsets.size(), partitionsDefaultedToGroupOffsets); } break; case GROUP_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); } } else { LOG.info("Consumer subtask {} initially has no partitions to read from.", getRuntimeContext().getIndexOfThisSubtask()); } }
open方法主要是将user指定的topic和对应的partition、offset,存储到Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets中,接下来看flink 消费kafka的入口方法
@Override //入口方法 start a source public void run(SourceContext<T> sourceContext) throws Exception { if (subscribedPartitionsToStartOffsets == null) { throw new Exception("The partitions were not set for the consumer"); } // initialize commit metrics and default offset callback method this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER); this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER); this.offsetCommitCallback = new KafkaCommitCallback() { @Override public void onSuccess() { successfulCommits.inc(); } @Override public void onException(Throwable cause) { LOG.warn("Async Kafka commit failed.", cause); failedCommits.inc(); } }; // mark the subtask as temporarily idle if there are no initial seed partitions; // once this subtask discovers some partitions and starts collecting records, the subtask's // status will automatically be triggered back to be active. if (subscribedPartitionsToStartOffsets.isEmpty()) { sourceContext.markAsTemporarilyIdle(); } // from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Kafka through the fetcher, if configured to do so) this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); if (!running) { return; } // depending on whether we were restored with the current state version (1.3), // remaining logic branches off into 2 paths: // 1) New state - partition discovery loop executed as separate thread, with this // thread running the main fetcher loop // 2) Old state - partition discovery is disabled and only the main fetcher loop is executed if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { kafkaFetcher.runFetchLoop(); } else { runWithPartitionDiscovery(); } }
protected AbstractFetcher( SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); this.checkpointLock = sourceContext.getCheckpointLock(); this.userCodeClassLoader = checkNotNull(userCodeClassLoader); this.useMetrics = useMetrics; this.consumerMetricGroup = checkNotNull(consumerMetricGroup); this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_CURRENT_OFFSETS_METRICS_GROUP); this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup(LEGACY_COMMITTED_OFFSETS_METRICS_GROUP); // figure out what we watermark mode we will be using this.watermarksPeriodic = watermarksPeriodic; this.watermarksPunctuated = watermarksPunctuated; if (watermarksPeriodic == null) { if (watermarksPunctuated == null) { // simple case, no watermarks involved timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; } else { timestampWatermarkMode = PUNCTUATED_WATERMARKS; } } else { if (watermarksPunctuated == null) { timestampWatermarkMode = PERIODIC_WATERMARKS; } else { throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks"); } } this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); // initialize subscribed partition states with seed partitions,根据有无timestamp / watermark //subscribedPartitionStates 持有了List<KafkaTopicPartitionState<KPH>>,KafkaTopicPartitionState包括kafkaTopicPartition offset等信息 this.subscribedPartitionStates = createPartitionStateHolders( seedPartitionsWithInitialOffsets, timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader); // check that all seed partition states have a defined offset //无论是从checkpoint中恢复也好,还是从kafkaConsumer.set...设置也好都需要有initial offset for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { if (!partitionState.isOffsetDefined()) { throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets."); } } // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue //到目前为止consumer并未指定partition for (KafkaTopicPartitionState<KPH> partition : subscribedPartitionStates) { unassignedPartitionsQueue.add(partition); } // register metrics for the initial seed partitions if (useMetrics) { registerOffsetMetrics(consumerMetricGroup, subscribedPartitionStates); } // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { @SuppressWarnings("unchecked") PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter( subscribedPartitionStates, sourceContext, processingTimeProvider, autoWatermarkInterval); periodicEmitter.start(); } }
public KafkaFetcher( SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super( sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics); this.deserializer = deserializer; this.handover = new Handover(); this.consumerThread = new KafkaConsumerThread( LOG, handover, kafkaProperties, unassignedPartitionsQueue, getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup); }
当KafkaConsumerThread 开始start的时候,也就是KafkaConsumerThread run方法
......try { //hasAssignedPartitions default false //当发现新的partition的时候,会add到unassignedPartitionsQueue和subscribedPartitionsToStartOffsets if (hasAssignedPartitions) { newPartitions = unassignedPartitionsQueue.pollBatch(); } else { // if no assigned partitions block until we get at least one // instead of hot spinning this loop. We rely on a fact that // unassignedPartitionsQueue will be closed on a shutdown, so // we don't block indefinitely newPartitions = unassignedPartitionsQueue.getBatchBlocking(); }//由于unassignedPartitionsQueue是有数据的,所以newPartitions != null 为true,会执行reassignPartitions方法 if (newPartitions != null) { reassignPartitions(newPartitions); } } catch (AbortedReassignmentException e) { continue; }......
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception { if (newPartitions.size() == 0) { return; } hasAssignedPartitions = true; boolean reassignmentStarted = false; // since the reassignment may introduce several Kafka blocking calls that cannot be interrupted, // the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and shutdown() // until the reassignment is complete. final KafkaConsumer<byte[], byte[]> consumerTmp; synchronized (consumerReassignmentLock) {//将consumer的引用赋值给consumerTmp consumerTmp = this.consumer; this.consumer = null; } final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>(); try {/* 之所有会有newPartition和oldPartition是因为当我们配置了KEYPARTITIONDISCOVERYINTERVALMILLIS,每个固定时间会判断是否新加了partition,如果新加了,会将新加的partition添加到unassignedPartitionsQueue中*/ for (TopicPartition oldPartition : consumerTmp.assignment()) { oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition)); } final List<TopicPartition> newPartitionAssignments = new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size()); newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet()); newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions)); // reassign with the new partitions consumerTmp.assign(newPartitionAssignments); reassignmentStarted = true; // old partitions should be seeked to their previous position for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) { consumerTmp.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue()); } // offsets in the state of new partitions may still be placeholder sentinel values if we are: // (1) starting fresh, // (2) checkpoint / savepoint state we were restored with had not completely // been replaced with actual offset values yet, or // (3) the partition was newly discovered after startup; // replace those with actual offsets, according to what the sentinel value represent. //kafka中配置关于offset的参数是不起作用的,还是依赖于startupMode//根据getOffset的类型,consumer指定开始消费的offset,而offset的类型呢,我们知道来源于startupMode for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) { if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle())); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) { consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle())); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { // the KafkaConsumer by default will automatically seek the consumer position // to the committed group offset, so we do not need to do it. newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else { consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1); } } } catch (WakeupException e) { // a WakeupException may be thrown if the consumer was invoked wakeup() // before it was isolated for the reassignment. In this case, we abort the // reassignment and just re-expose the original consumer. synchronized (consumerReassignmentLock) { this.consumer = consumerTmp; // if reassignment had already started and affected the consumer, // we do a full roll back so that it is as if it was left untouched if (reassignmentStarted) { this.consumer.assign(new ArrayList<>(oldPartitionAssignmentsToPosition.keySet())); for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) { this.consumer.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue()); } } // no need to restore the wakeup state in this case, // since only the last wakeup call is effective anyways hasBufferedWakeup = false; // re-add all new partitions back to the unassigned partitions queue to be picked up again for (KafkaTopicPartitionState<TopicPartition> newPartition : newPartitions) { unassignedPartitionsQueue.add(newPartition); } // this signals the main fetch loop to continue through the loop throw new AbortedReassignmentException(); } } // reassignment complete; expose the reassigned consumer synchronized (consumerReassignmentLock) { this.consumer = consumerTmp; // restore wakeup state for the consumer if necessary if (hasBufferedWakeup) { this.consumer.wakeup(); hasBufferedWakeup = false; } } }
