写点什么

Flink 与 Spark Streaming 在与 kafka 结合的区别!

  • 2022 年 5 月 09 日
  • 本文字数:2345 字

    阅读完需:约 8 分钟

List<KafkaTopicPartition> discoveredPartitions;


// throughout the loop, we always eagerly check if we are still running before


// performing the next operation, so that we can escape the loop as soon as possible


while (running) {


if (LOG.isDebugEnabled()) {


LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());


}


try {


discoveredPartitions = partitionDiscoverer.discoverPartitions();


} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {


// the partition discoverer may have been closed or woken up before or during the discovery;


// this would only happen if the consumer was canceled; simply escape the loop


break;


}


// no need to add the discovered partitions if we were closed during the meantime


if (running && !discoveredPartitions.isEmpty()) {


kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);


}


// do not waste any time sleeping if we're not running anymore


if (running && discoveryIntervalMillis != 0) {


try {


Thread.sleep(discoveryIntervalMillis);


} catch (InterruptedException iex) {


// may be interrupted if the consumer was canceled midway; simply escape the loop


break;


}


}


}


} catch (Exception e) {


discoveryLoopErrorRef.set(e);


} finally {


// calling cancel will also let the fetcher loop escape


// (if not running, cancel() was already called)


if (running) {


cancel();


}


}


}


}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());


它定义了一个线程池对象,去动态发现 kafka 新增的 topic(支持正则形式指定消费的 topic),或者动态发现 kafka 新增的分区。


接着肯定是启动动态发现分区或者 topic 线程,并且启动 kafkaFetcher。


discoveryLoopThread.start();


kafkaFetcher.runFetchLoop();


// --------------------------------------------------------------------


// make sure that the partition discoverer is properly closed


partitionDiscoverer.close();


discoveryLoopThread.join();


接着,我们进入 kafkaFetcher 的 runFetchLoop 方法,映入眼帘的是


// kick off the actual Kafka consumer


consumerThread.start();


这个线程是在构建 kafka09Fetcher 的时候创建的


this.consumerThread = new KafkaConsumerThread(


LOG,


handover,


kafkaProperties,


unassignedPartitionsQueue,


createCallBridge(),


getFetcherName() + " for " + taskNameWithSubtasks,


pollTimeout,


useMetrics,


consumerMetricGroup,


subtaskMetricGroup);


KafkaConsumerThread 继承 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 自 Thread,然后在其 run 方法里,首先看到的是


// this is the means to talk to FlinkKafkaConsumer's main thread


final Handover handover = this.handover;


这个 handover 的作用呢暂且不提,接着分析 run 方法里面内容


1,获取消费者


try {


this.consumer = getConsumer(kafkaProperties);


}


2,检测分区并且会重分配新增的分区


try {


if (hasAssignedPartitions) {


newPartitions = unassignedPartitionsQueue.pollBatch();


}


else {


// if no assigned partitions block until we get at least one


// instead of hot spinning this loop. We rely on a fact that


// unassignedPartitionsQueue will be closed on a shutdown, so


// we don't block indefinitely


newPartitions = unassignedPartitionsQueue.getBatchBlocking();


}


if (newPartitions != null) {


reassignPartitions(newPartitions);


}


3,消费数据


// get the next batch of records, unless we did not manage to hand the old batch over


if (records == null) {


try {


records = consumer.poll(pollTimeout);


}


catch (WakeupException we) {


continue;


}


}


4,通过 handover 将数据发出去


try {


handover.produce(records);


records = null;


}


由于被 kafkaConsumerThread 打断了 kafkaFetcher 的 runFetchLoop 方法的分析,我们在这里继续


1,拉取 handover.producer 生产的数据


while (running) {


// this blocks until we get the next records


// it automatically re-throws exceptions encountered in the consumer thread


final ConsumerRecords<byte[], byte[]> records = handover.pollNext();


2,数据格式整理,并将数据整理好后,逐个 Record 发送,将循环主动批量拉取 kafka 数据,转化为事件触发。


// get the records for each topic partition


for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {


List<ConsumerRecord<byte[], byte[]>> partitionRecords =


records.records(partition.getKafkaPartitionHandle());


for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {


final T value = deserializer.deserialize(


record.key(), record.value(),


record.topic(), record.partition(), record.offset());


if (deserializer.isEndOfStream(value)) {


// end of stream signaled


running = false;


break;


}


// emit the actual record. this also updates offset state atomically


// and deals with timestamps and watermark generation


emitRecord(value, partition, record.offset(), record);


}


}


肯定会注意到这行代码 emitRecord(value, partition, record.offset(), record);,从这里开始 flink 变成事件触发的流引擎。


handover-枢纽


handover 是在构建 kafkaFetcher 的时候构建的

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
Flink与Spark Streaming在与kafka结合的区别!_Java_爱好编程进阶_InfoQ写作社区