private class TopicConsumeRunner extends Thread {
private final Map<TopicPartition, PartitionConsumer<K, V>> partitionConsumers = new HashMap<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private ConsumerRebalanceListener rebalanceListener;
private Consumer<K, V> kafkaConsumer;
private long pollTimeOut;
public TopicConsumeRunner(ConsumerConfig consumerConfig) {
this.pollTimeOut = consumerConfig.getPollTimeOut();
// Thread name
setName("Topic[" + consumerConfig.getTopic() + "]-Poller");
// 通过RebalanceListener来管理PartitionConsumer的声明周期
this.rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
LOG.info("Revoke: {}}", partitions);
try {
commitFinishOffset();
} catch (Exception e) {
LOG.warn("Revoke commit offset error {}", e.getMessage());
// kafka comsumer is invalid, so must close partition consumer.
closePartitionConsumers();
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOG.info("Assign: {}}", partitions);
// resume consuming
kafkaConsumer.resume(partitions);
// create new partition consumers
Map<TopicPartition, PartitionConsumer<K, V>> newConsumers = new HashMap<>();
for (TopicPartition partition : partitions) {
PartitionConsumer<K, V> consumer = partitionConsumers.get(partition);
if (consumer == null) {
newConsumers.put(partition,
new PartitionConsumer<K, V>(partition, consumerConfig, messageHandler));
} else {
newConsumers.put(partition, consumer);
}
}
// clear revoked partition consumers
partitionConsumers.forEach((partition, consumer) -> {
if (!newConsumers.containsKey(partition)) {
// need to shutdown
consumer.shutdown();
}
});
partitionConsumers.clear();
// add new partition consumers
partitionConsumers.putAll(newConsumers);
// show partition state info
partitionConsumers.forEach((partition, consumer) -> {
OffsetAndMetadata osm = kafkaConsumer.committed(partition);
long committed = osm == null ? -1 : osm.offset();
long position = kafkaConsumer.position(partition);
long submit = consumer.submitOffset();
long finish = consumer.finishOffset();
LOG.info("{} commited={}, position={} ; submit={}, finish={}", partition, committed, position,
submit, finish);
});
}
};
}
public void run() {
// init kafka consumer
createKafkaConsumer();
Duration ptoMillis = Duration.ofMillis(pollTimeOut);
while (!closed.get()) {
long stime = System.currentTimeMillis();
try {
// 拉取已订阅Topic的消息
ConsumerRecords<K, V> records = kafkaConsumer.poll(ptoMillis);
LOG.debug("{} poll records size {}", topicName, records.count());
// 计算并提交已完成消息的offset
commitFinishOffset();
// 按照Partition提交到相应PartitionConsumer中
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<K, V>> recordList = records.records(partition);
LOG.debug("{} poll records size {}", partition, recordList.size());
PartitionConsumer<K, V> pc = partitionConsumers.get(partition);
// submit records and control rate
long count = pc.submit(recordList);
if (count > 0) {
// 将该partition的拉取offset重定向到提交位置
kafkaConsumer.seek(partition, pc.submitOffset() + 1);
}
// full check and pause consuming
if (pc.isFull()) {
// 先暂停该partition的消费
kafkaConsumer.pause(Collections.singleton(partition));
}
}
} catch (Exception e) {
// Ignore exception if closing
if (closed.get()) {
continue;
}
LOG.warn("create a new KafkaConsumer for topic {} by error : {}", topicName, e.getMessage());
closeKafkaConsumer();
createKafkaConsumer();
}
long etime = System.currentTimeMillis();
if (etime - stime < pollTimeOut) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
}
closeKafkaConsumer();
}
private void commitFinishOffset() {
partitionConsumers.forEach((partition, consumer) -> {
// process finish offset
long count = consumer.finish();
if (count > 0) {
// sync ack commit offset
long offset = consumer.finishOffset() + 1;
OffsetAndMetadata cos = new OffsetAndMetadata(offset);
kafkaConsumer.commitSync(Collections.singletonMap(partition, cos));
}
// full check and pause consuming
if (!consumer.isFull()) {
// 恢复该partition的消费
kafkaConsumer.resume(Collections.singleton(partition));
}
});
}
private void createKafkaConsumer() {
kafkaConsumer = consumerFactory.createConsumer();
kafkaConsumer.subscribe(Arrays.asList(topicName), rebalanceListener);
}
private void closeKafkaConsumer() {
if (kafkaConsumer != null) {
kafkaConsumer.unsubscribe();
kafkaConsumer.close();
kafkaConsumer = null;
}
// shutdown and clear partition consumer
closePartitionConsumers();
}
private void closePartitionConsumers() {
partitionConsumers.forEach((tp, pc) -> {
pc.shutdown();
});
partitionConsumers.clear();
}
// Shutdown hook which can be called from a separate thread
private void close() {
closed.set(true);
if (kafkaConsumer != null) {
kafkaConsumer.wakeup();
}
}
}
评论