private class TopicConsumeRunner extends Thread {
private final Map<TopicPartition, PartitionConsumer<K, V>> partitionConsumers = new HashMap<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private ConsumerRebalanceListener rebalanceListener;
private Consumer<K, V> kafkaConsumer;
private long pollTimeOut;
public TopicConsumeRunner(ConsumerConfig consumerConfig) { this.pollTimeOut = consumerConfig.getPollTimeOut(); // Thread name setName("Topic[" + consumerConfig.getTopic() + "]-Poller"); // 通过RebalanceListener来管理PartitionConsumer的声明周期 this.rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { LOG.info("Revoke: {}}", partitions); try { commitFinishOffset(); } catch (Exception e) { LOG.warn("Revoke commit offset error {}", e.getMessage());
// kafka comsumer is invalid, so must close partition consumer. closePartitionConsumers(); } }
@Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { LOG.info("Assign: {}}", partitions);
// resume consuming kafkaConsumer.resume(partitions);
// create new partition consumers Map<TopicPartition, PartitionConsumer<K, V>> newConsumers = new HashMap<>(); for (TopicPartition partition : partitions) { PartitionConsumer<K, V> consumer = partitionConsumers.get(partition); if (consumer == null) { newConsumers.put(partition, new PartitionConsumer<K, V>(partition, consumerConfig, messageHandler)); } else { newConsumers.put(partition, consumer); } } // clear revoked partition consumers partitionConsumers.forEach((partition, consumer) -> { if (!newConsumers.containsKey(partition)) { // need to shutdown consumer.shutdown(); } }); partitionConsumers.clear(); // add new partition consumers partitionConsumers.putAll(newConsumers);
// show partition state info partitionConsumers.forEach((partition, consumer) -> { OffsetAndMetadata osm = kafkaConsumer.committed(partition); long committed = osm == null ? -1 : osm.offset(); long position = kafkaConsumer.position(partition); long submit = consumer.submitOffset(); long finish = consumer.finishOffset(); LOG.info("{} commited={}, position={} ; submit={}, finish={}", partition, committed, position, submit, finish); }); } }; }
public void run() { // init kafka consumer createKafkaConsumer();
Duration ptoMillis = Duration.ofMillis(pollTimeOut); while (!closed.get()) { long stime = System.currentTimeMillis(); try { // 拉取已订阅Topic的消息 ConsumerRecords<K, V> records = kafkaConsumer.poll(ptoMillis); LOG.debug("{} poll records size {}", topicName, records.count());
// 计算并提交已完成消息的offset commitFinishOffset();
// 按照Partition提交到相应PartitionConsumer中 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<K, V>> recordList = records.records(partition); LOG.debug("{} poll records size {}", partition, recordList.size()); PartitionConsumer<K, V> pc = partitionConsumers.get(partition); // submit records and control rate long count = pc.submit(recordList); if (count > 0) { // 将该partition的拉取offset重定向到提交位置 kafkaConsumer.seek(partition, pc.submitOffset() + 1); } // full check and pause consuming if (pc.isFull()) { // 先暂停该partition的消费 kafkaConsumer.pause(Collections.singleton(partition)); } } } catch (Exception e) { // Ignore exception if closing if (closed.get()) { continue; }
LOG.warn("create a new KafkaConsumer for topic {} by error : {}", topicName, e.getMessage()); closeKafkaConsumer(); createKafkaConsumer(); }
long etime = System.currentTimeMillis(); if (etime - stime < pollTimeOut) { try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } }
closeKafkaConsumer(); }
private void commitFinishOffset() { partitionConsumers.forEach((partition, consumer) -> { // process finish offset long count = consumer.finish(); if (count > 0) { // sync ack commit offset long offset = consumer.finishOffset() + 1; OffsetAndMetadata cos = new OffsetAndMetadata(offset); kafkaConsumer.commitSync(Collections.singletonMap(partition, cos)); } // full check and pause consuming if (!consumer.isFull()) { // 恢复该partition的消费 kafkaConsumer.resume(Collections.singleton(partition)); } }); }
private void createKafkaConsumer() { kafkaConsumer = consumerFactory.createConsumer(); kafkaConsumer.subscribe(Arrays.asList(topicName), rebalanceListener); }
private void closeKafkaConsumer() { if (kafkaConsumer != null) { kafkaConsumer.unsubscribe(); kafkaConsumer.close(); kafkaConsumer = null; }
// shutdown and clear partition consumer closePartitionConsumers(); }
private void closePartitionConsumers() { partitionConsumers.forEach((tp, pc) -> { pc.shutdown(); }); partitionConsumers.clear(); }
// Shutdown hook which can be called from a separate thread private void close() { closed.set(true); if (kafkaConsumer != null) { kafkaConsumer.wakeup(); } } }
评论