Flink 与 Spark Streaming 在与 kafka 结合的区别!
List<KafkaTopicPartition> discoveredPartitions;
// throughout the loop, we always eagerly check if we are still running before
// performing the next operation, so that we can escape the loop as soon as possible
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
}
try {
discoveredPartitions = partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
// the partition discoverer may have been closed or woken up before or during the discovery;
// this would only happen if the consumer was canceled; simply escape the loop
break;
}
// no need to add the discovered partitions if we were closed during the meantime
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}
// do not waste any time sleeping if we're not running anymore
if (running && discoveryIntervalMillis != 0) {
try {
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
// may be interrupted if the consumer was canceled midway; simply escape the loop
break;
}
}
}
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally {
// calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
if (running) {
cancel();
}
}
}
}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
它定义了一个线程池对象,去动态发现 kafka 新增的 topic(支持正则形式指定消费的 topic),或者动态发现 kafka 新增的分区。
接着肯定是启动动态发现分区或者 topic 线程,并且启动 kafkaFetcher。
discoveryLoopThread.start();
kafkaFetcher.runFetchLoop();
// --------------------------------------------------------------------
// make sure that the partition discoverer is properly closed
partitionDiscoverer.close();
discoveryLoopThread.join();
接着,我们进入 kafkaFetcher 的 runFetchLoop 方法,映入眼帘的是
// kick off the actual Kafka consumer
consumerThread.start();
这个线程是在构建 kafka09Fetcher 的时候创建的
this.consumerThread = new KafkaConsumerThread(
LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
createCallBridge(),
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
KafkaConsumerThread 继承 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 自 Thread,然后在其 run 方法里,首先看到的是
// this is the means to talk to FlinkKafkaConsumer's main thread
final Handover handover = this.handover;
这个 handover 的作用呢暂且不提,接着分析 run 方法里面内容
1,获取消费者
try {
this.consumer = getConsumer(kafkaProperties);
}
2,检测分区并且会重分配新增的分区
try {
if (hasAssignedPartitions) {
newPartitions = unassignedPartitionsQueue.pollBatch();
}
else {
// if no assigned partitions block until we get at least one
// instead of hot spinning this loop. We rely on a fact that
// unassignedPartitionsQueue will be closed on a shutdown, so
// we don't block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
reassignPartitions(newPartitions);
}
3,消费数据
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
4,通过 handover 将数据发出去
try {
handover.produce(records);
records = null;
}
由于被 kafkaConsumerThread 打断了 kafkaFetcher 的 runFetchLoop 方法的分析,我们在这里继续
1,拉取 handover.producer 生产的数据
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
2,数据格式整理,并将数据整理好后,逐个 Record 发送,将循环主动批量拉取 kafka 数据,转化为事件触发。
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(
record.key(), record.value(),
record.topic(), record.partition(), record.offset());
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
emitRecord(value, partition, record.offset(), record);
}
}
肯定会注意到这行代码 emitRecord(value, partition, record.offset(), record);,从这里开始 flink 变成事件触发的流引擎。
handover-枢纽
handover 是在构建 kafkaFetcher 的时候构建的
评论