写点什么

Flink 源码分析之 FlinkConsumer 是如何保证一个 partition 对应一个 thread 的

用户头像
shengjk1
关注
发布于: 2020 年 06 月 07 日

我们都知道flink 连接kafka时,默认是一个partition对应一个thread,它究竟是怎么实现的呢?以及到我们自己定义 RichParallelSourceFunction 的时候如何借鉴这部分代码呢?

我们一起来看一下(基于flink-1.8)

看过flink kafka连接器源码的同学对 FlinkKafkaConsumerBase 应该不陌生(没有看过的也无所谓,我们一起来看就好)

一起来看一下 FlinkKafkaConsumerBase 的 open 方法中关键的部分

//获取fixed topic's or topic pattern 's partitions of this subtask
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();

没错这就是查看Flink Consumer 保证 一个partition对应一个Thread的入口方法

public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
if (!closed && !wakeup) {
try {
...
// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
} else {
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
nextPartition = iter.next();
//从之前已经发现的KafkaTopicPartition中移除,其二可以保证仅仅是这个subtask的partition
if (!setAndCheckDiscoveredPartition(nextPartition)) {
iter.remove();
}
}
}
return newDiscoveredPartitions;
...
}

关键性的部分 setAndCheckDiscoveredPartition 方法,点进去

public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
discoveredPartitions.add(partition);
//kafkaPartition与indexOfThisSubTask --对应
return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}
return false;
}

indexOfThisSubtask 表示当前线程是那个subtask,numParallelSubtasks 表示总共并行的subtask 的个数, 当其返回true的时候,表示此partition 属于此indexOfThisSubtask。

下面来看一下具体是怎么划分的

public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}

基于topic 和 partition,然后对numParallelSubtasks取余。



那么,当我们自己去定义RichParallelSourceFunction的时候如何去借鉴它呢,直接上代码:

public class WordSource extends RichParallelSourceFunction<Tuple2<Long, Long>> {
private Boolean isRun = true;
@Override
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
int start = 0;
int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
while (isRun) {
start += 1;
if (start % numberOfParallelSubtasks == getRuntimeContext().getIndexOfThisSubtask()) {
ctx.collect(new Tuple2<>(
Long.parseLong(start+""),
1L));
Thread.sleep(1000);
System.out.println("Thread.currentThread().getName()=========== " + Thread.currentThread().getName());
}
}
}
@Override
public void cancel() {
isRun = false;
}
}

当当当,自此,自己定义个RichParallelSourceFunction也可以并行发数据了,啦啦啦啦!



发布于: 2020 年 06 月 07 日阅读数: 58
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
Flink源码分析之FlinkConsumer是如何保证一个partition对应一个thread的