写点什么

真正的 Kafka 多线程消费

作者:dinstone
  • 2022 年 2 月 13 日
  • 本文字数:5567 字

    阅读完需:约 18 分钟

前言

不同于其它 MQ,Kafka 采用分区机制来提升系统的吞吐量。我们知道,要想提升 Kafka 的生产速率,直接增加生产者就可以了,那么要想提升消费速率,是否也是直接增加消费者就可以了呢?今天就生产中在用的多线程消费方案共享出来,以飨读者。

首先回顾一下 Kafka 的消费模式,我们知道 Kafka 的队列模式(P2P 模式、广播模式)是通过消费组(Consumer Group)概念来实现的,即每个 Consumer Group 所订阅 Topic 的每个 Partition 只能分配给该 Group 下的一个 Consumer 实例来消费,当然该 Partition 还可以被其他 Consumer Group 的 Consumer 实例来消费。站在同一个 Consumer Group 的角度来看,一个 Partition 只能被该 Consumer Group 里的一个 Consumer 实例消费

由上可知,Topic 的消费能力取决于每个 Consumer Group 的消费并行能力,Consumer Group 的消费并行能力取决于消费实例个数,而消费实例个数依赖于其所订阅的 Topic 配置的 Partition 数量,消费实例数只能小于等于 Partition 数量,比如 Partition 数量为 4,那么最多只能有 4 个消费实例来并行消费,如果超过了 4 个,只会浪费系统资源,因为多出的消费实例不会被分配到任何 Partition。

那么我们想要提升消费速率该怎么办呢?自然的想法就是增加 Topic 的 Partition 数量,然后能够启动更多的消费实例,从而提升消费速率。这种方式不可避免的需要涉及到数据迁移。如果不想增加 Partition 的数量(保持现有 Partition 数量的基础上),进一步提升 Kafka 消费并行能力,该怎么办呢?多线程消费无疑是个不错的想法。

设计


我们将 kafka 并行消费的职责封装在 2 个类中,一个是 TopicConsumer,一个是 PartitionConsumer。TopicConsumer 负责订阅和消费指定 Topic 中的消息,并将拉取到消息分发到相应的 PartitionConsumer 中执行,同时还要保证消费的速率和消费的可靠性。PartitionConsumer 则负责具体任务的并行执行和任务状态的维护。

Kafka 多线程并行消费主要有 2 个挑战:

  • 速率控制,拉取速率和任务执行速率要匹配。可以使用 KafkaConsumer 的 seek、pause 和 resume 方法达到目的。

  • 可靠消费,防止消息丢失和尽量少的重复消费。在 Consumer Group 的 Rebalance、服务重启等异常场景下消息不丢和不重。这里采用手动 ACK 和批量提交算法来实现。

具体参见如下代码实现。

实现

TopicConsumer 类。构造方法初始化 kafkaConsumer 的配置,构造 Eventloop 对象 TopicConsumerRunner。

public TopicConsumer(ConsumerConfig consumerConfig, MessageHandler<K, V> messageHandler,			Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {		String topic = consumerConfig.getTopic();		if (topic == null || topic.length() == 0) {			throw new IllegalArgumentException("kafka.topic is empty");		}  		// set auto commit = false		consumerConfig.setAutoCommit(false);
// create topic consumer runner this.topicConsumeRunner = new TopicConsumeRunner(consumerConfig); }
public void start() { LOG.info("start topic[{}] consume process", consumerConfig.getTopic()); topicConsumeRunner.start(); }
public void stop() { LOG.info("stop topic[{}] consume process", consumerConfig.getTopic()); topicConsumeRunner.close(); }
复制代码

TopicConsumerRunner 类是 TopicConsumer 的内部类,run 方法是核心。

	private class TopicConsumeRunner extends Thread {
private final Map<TopicPartition, PartitionConsumer<K, V>> partitionConsumers = new HashMap<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private ConsumerRebalanceListener rebalanceListener;
private Consumer<K, V> kafkaConsumer;
private long pollTimeOut;
public TopicConsumeRunner(ConsumerConfig consumerConfig) { this.pollTimeOut = consumerConfig.getPollTimeOut(); // Thread name setName("Topic[" + consumerConfig.getTopic() + "]-Poller"); // 通过RebalanceListener来管理PartitionConsumer的声明周期 this.rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { LOG.info("Revoke: {}}", partitions); try { commitFinishOffset(); } catch (Exception e) { LOG.warn("Revoke commit offset error {}", e.getMessage());
// kafka comsumer is invalid, so must close partition consumer. closePartitionConsumers(); } }
@Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { LOG.info("Assign: {}}", partitions);
// resume consuming kafkaConsumer.resume(partitions);
// create new partition consumers Map<TopicPartition, PartitionConsumer<K, V>> newConsumers = new HashMap<>(); for (TopicPartition partition : partitions) { PartitionConsumer<K, V> consumer = partitionConsumers.get(partition); if (consumer == null) { newConsumers.put(partition, new PartitionConsumer<K, V>(partition, consumerConfig, messageHandler)); } else { newConsumers.put(partition, consumer); } } // clear revoked partition consumers partitionConsumers.forEach((partition, consumer) -> { if (!newConsumers.containsKey(partition)) { // need to shutdown consumer.shutdown(); } }); partitionConsumers.clear(); // add new partition consumers partitionConsumers.putAll(newConsumers);
// show partition state info partitionConsumers.forEach((partition, consumer) -> { OffsetAndMetadata osm = kafkaConsumer.committed(partition); long committed = osm == null ? -1 : osm.offset(); long position = kafkaConsumer.position(partition); long submit = consumer.submitOffset(); long finish = consumer.finishOffset(); LOG.info("{} commited={}, position={} ; submit={}, finish={}", partition, committed, position, submit, finish); }); } }; }
public void run() { // init kafka consumer createKafkaConsumer();
Duration ptoMillis = Duration.ofMillis(pollTimeOut); while (!closed.get()) { long stime = System.currentTimeMillis(); try { // 拉取已订阅Topic的消息 ConsumerRecords<K, V> records = kafkaConsumer.poll(ptoMillis); LOG.debug("{} poll records size {}", topicName, records.count());
// 计算并提交已完成消息的offset commitFinishOffset();
// 按照Partition提交到相应PartitionConsumer中 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<K, V>> recordList = records.records(partition); LOG.debug("{} poll records size {}", partition, recordList.size()); PartitionConsumer<K, V> pc = partitionConsumers.get(partition); // submit records and control rate long count = pc.submit(recordList); if (count > 0) { // 将该partition的拉取offset重定向到提交位置 kafkaConsumer.seek(partition, pc.submitOffset() + 1); } // full check and pause consuming if (pc.isFull()) { // 先暂停该partition的消费 kafkaConsumer.pause(Collections.singleton(partition)); } } } catch (Exception e) { // Ignore exception if closing if (closed.get()) { continue; }
LOG.warn("create a new KafkaConsumer for topic {} by error : {}", topicName, e.getMessage()); closeKafkaConsumer(); createKafkaConsumer(); }
long etime = System.currentTimeMillis(); if (etime - stime < pollTimeOut) { try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } }
closeKafkaConsumer(); }
private void commitFinishOffset() { partitionConsumers.forEach((partition, consumer) -> { // process finish offset long count = consumer.finish(); if (count > 0) { // sync ack commit offset long offset = consumer.finishOffset() + 1; OffsetAndMetadata cos = new OffsetAndMetadata(offset); kafkaConsumer.commitSync(Collections.singletonMap(partition, cos)); } // full check and pause consuming if (!consumer.isFull()) { // 恢复该partition的消费 kafkaConsumer.resume(Collections.singleton(partition)); } }); }
private void createKafkaConsumer() { kafkaConsumer = consumerFactory.createConsumer(); kafkaConsumer.subscribe(Arrays.asList(topicName), rebalanceListener); }
private void closeKafkaConsumer() { if (kafkaConsumer != null) { kafkaConsumer.unsubscribe(); kafkaConsumer.close(); kafkaConsumer = null; }
// shutdown and clear partition consumer closePartitionConsumers(); }
private void closePartitionConsumers() { partitionConsumers.forEach((tp, pc) -> { pc.shutdown(); }); partitionConsumers.clear(); }
// Shutdown hook which can be called from a separate thread private void close() { closed.set(true); if (kafkaConsumer != null) { kafkaConsumer.wakeup(); } } }
复制代码


PartitionConsumer 类:

public PartitionConsumer(TopicPartition partition, ConsumerConfig consumerConfig,			MessageHandler<K, V> messageHandler) {		this.partition = partition;		this.messageHandler = messageHandler;
this.messageQueueSize = consumerConfig.getMessageQueueSize();
int parallelSize = consumerConfig.getParallelConsumerSize(); this.executor = Executors.newCachedThreadPool(); for (int i = 0; i < parallelSize; i++) { this.executor.execute(new RecordConsumeRunner(i)); } }
public long submitOffset() { return submitOffset; }
public long finishOffset() { return finishOffset; }
public boolean isFull() { return futureQueue.size() >= messageQueueSize; }
/** * submit record to consume * * @param <K> * @param <V> * @param recordList * @return offset last submit */ public long submit(List<ConsumerRecord<K, V>> recordList) { int count = 0; ConsumerTask<K, V> last = null; for (ConsumerRecord<K, V> record : recordList) { if (!isFull()) { last = new ConsumerTask<K, V>(record); futureQueue.add(last); submitQueue.add(last); // count++; } } if (last != null) { submitOffset = last.record().offset(); } LOG.debug("{} submit count {}, last offset {}", partition, count, submitOffset); return count; }
/** * find last finish offset * * @return offset first finish */ public long finish() { int count = 0; ConsumerTask<K, V> last = null; for (;;) { ConsumerTask<K, V> check = futureQueue.peek(); if (check == null || !check.isComplete()) { break; }
last = futureQueue.poll(); // count++; } if (last != null) { finishOffset = last.record().offset(); }
LOG.debug("{} finish count {}, last offset {}", partition, count, finishOffset); return count; }
public void shutdown() { shutdown.set(true); executor.shutdownNow(); LOG.info("{} consumer shutdown, submit/future: {}/{} tasks untreated, submit/finish: {}/{} offset", partition, submitQueue.size(), futureQueue.size(), submitOffset, finishOffset); }
复制代码

RecordConsumeRunner 类:

private class RecordConsumeRunner implements Runnable {
private String tname;
public RecordConsumeRunner(int index) { this.tname = "Partition[" + partition + "]-Work-" + index; }
@Override public void run() { Thread.currentThread().setName(tname);
ConsumerTask<K, V> task = null; while (!shutdown.get() && !Thread.interrupted()) { try { task = submitQueue.take();
messageHandler.handle(task.schedule());
task.complete(); } catch (Throwable e) { if (task != null) { task.complete(e); }
// InterruptedException break, other ignore if (e instanceof InterruptedException) { break; } } } }
}
复制代码


发布于: 刚刚阅读数: 2
用户头像

dinstone

关注

还未添加个人签名 2008.11.12 加入

一个热爱编程的架构师,在网络编程方面有一些惨痛经验。

评论

发布
暂无评论
真正的Kafka多线程消费