Flink 源码分析之 Flink 是如何 kafka 读取数据的
发布于: 2020 年 06 月 07 日
首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:
//入口方法 start a source public void run(SourceContext<T> sourceContext) throws Exception { ...... // from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Kafka through the fetcher, if configured to do so) //创建Fetcher 从kafka中拉取数据 this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); if (!running) { return; } // depending on whether we were restored with the current state version (1.3), // remaining logic branches off into 2 paths: // 1) New state - partition discovery loop executed as separate thread, with this // thread running the main fetcher loop // 2) Old state - partition discovery is disabled and only the main fetcher loop is executed if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { //未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS kafkaFetcher.runFetchLoop(); } else {//仍然调用了kafkaFetcher.runFetchLoop(); runWithPartitionDiscovery(); } }
createFetcher方法
@Override protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {...... return new KafkaFetcher<>( sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(), deserializer, properties, pollTimeout, runtimeContext.getMetricGroup(), consumerMetricGroup, useMetrics); }
返回了一个 KafkaFetcher对象,我们点进去看一下
KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象
public KafkaFetcher( SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {...... this.consumerThread = new KafkaConsumerThread( LOG,//KafkaConsumerThread 构造器中的参数 handover, kafkaProperties,//unassignedPartitionsQueue具体是什么呢?咱们会在flink startupMode是如何起作用的 详细去讲 unassignedPartitionsQueue, getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup); }
至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();
KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message
//fetcher message from kafka public void runFetchLoop() throws Exception { try {//KafkaConsumerThread构造的参数之一 final Handover handover = this.handover; // kick off the actual Kafka consumer //实际的从kafka中拉取数据的地方 consumerThread.start(); while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread//从handover中获取数据,然后对records进行处理 final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { final T value = deserializer.deserialize(record); if (deserializer.isEndOfStream(value)) { // end of stream signaled running = false; break; } // emit the actual record. this also updates offset state atomically // and deals with timestamps and watermark generation//发送消息,接下来就是timestamps和watermark的处理了 emitRecord(value, partition, record.offset(), record); } } } } finally { // this signals the consumer thread that no more work is to be done consumerThread.shutdown(); } // on a clean exit, wait for the runner thread try { consumerThread.join(); } catch (InterruptedException e) { // may be the result of a wake-up interruption after an exception. // we ignore this here and only restore the interruption state Thread.currentThread().interrupt(); } }
既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法
@Override public void run() { // early exit check if (!running) { return; } // this is the means to talk to FlinkKafkaConsumer's main thread//构造器中参数,用于FlinkKafkaConsumer主线程进行消费,上面提到的handover.pollNext() final Handover handover = this.handover; // This method initializes the KafkaConsumer and guarantees it is torn down properly. // This is important, because the consumer has multi-threading issues, // including concurrent 'close()' calls. try {//获取consumer this.consumer = getConsumer(kafkaProperties); } catch (Throwable t) { handover.reportError(t); return; } // from here on, the consumer is guaranteed to be closed properly ...... // early exit check if (!running) { return; } // the latest bulk of records. May carry across the loop if the thread is woken up // from blocking on the handover ConsumerRecords<byte[], byte[]> records = null; // reused variable to hold found unassigned new partitions. // found partitions are not carried across loops using this variable; // they are carried across via re-adding them to the unassigned partitions queue List<KafkaTopicPartitionState<TopicPartition>> newPartitions; // main fetch loop while (running) { // check if there is something to commit //default false if (!commitInProgress) { // get and reset the work-to-be committed, so we don't repeatedly commit the same//这里具体可以参考[Flink是如何保存Offset的](https://www.jianshu.com/p/ee4fe63f0182) final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null); if (commitOffsetsAndCallback != null) { log.debug("Sending async offset commit request to Kafka broker"); // also record that a commit is already in progress // the order here matters! first set the flag, then send the commit command. commitInProgress = true; consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1)); } } try { //hasAssignedPartitions default false //当发现新的partition的时候,会add到unassignedPartitionsQueue和sub//具体可以参考 flink startupMode是如何起作用的 if (hasAssignedPartitions) { newPartitions = unassignedPartitionsQueue.pollBatch(); } else { // if no assigned partitions block until we get at least one // instead of hot spinning this loop. We rely on a fact that // unassignedPartitionsQueue will be closed on a shutdown, so // we don't block indefinitely newPartitions = unassignedPartitionsQueue.getBatchBlocking(); } if (newPartitions != null) { reassignPartitions(newPartitions); } } catch (AbortedReassignmentException e) { continue; } if (!hasAssignedPartitions) { // Without assigned partitions KafkaConsumer.poll will throw an exception continue; } // get the next batch of records, unless we did not manage to hand the old batch over if (records == null) { try {//通过kafkaAPI 拉取数据 records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } try {//handover对records进行"包装",供FlinkKafkaConsumer主线程消费 handover.produce(records); records = null; } catch (Handover.WakeupException e) { // fall through the loop } } // end main fetch loop } catch (Throwable t) { // let the main thread know and exit // it may be that this exception comes because the main thread closed the handover, in // which case the below reporting is irrelevant, but does not hurt either handover.reportError(t); } finally { // make sure the handover is closed if it is not already closed or has an error handover.close(); // make sure the KafkaConsumer is closed try { consumer.close(); } catch (Throwable t) { log.warn("Error while closing Kafka consumer", t); } } }
至此如何从kafka中拉取数据,已经介绍完了
划线
评论
复制
发布于: 2020 年 06 月 07 日阅读数: 89
版权声明: 本文为 InfoQ 作者【shengjk1】的原创文章。
原文链接:【http://xie.infoq.cn/article/e3c9ae9f68627130b547132ed】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
shengjk1
关注
还未添加个人签名 2018.04.26 加入
博客 https://blog.csdn.net/jsjsjs1789
评论