写点什么

【Kafka】消费者客户端小结(java)

用户头像
guoguo 👻
关注
发布于: 2020 年 08 月 20 日
【Kafka】消费者客户端小结(java)

消费者与消费者组

在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

如图所示,假设Topic中有4个分区(P0、P1、P2、P3),两个消费组,分别是消费组A(消费者C0、消费者C1、消费者C2、消费者C3、)和消费组B(消费者C4、消费者C5)。

按照Kafka默认的规则,最后的分配结果是消费组A中的每一个消费者分配到1个分区,消费组B中的每一个消费者分配到2个分区,两个消费组之间互不影响。每一个分区内的消息,只能被同消费组中的一个消费者消费。



再来看看消费者组内消费者个数的变化,所带来的分区消费分配的变化。

假设目前某消费组内只有一个消费者 C0,订阅了一个主题,这个主题包含 7 个分区:P0、P1、P2、P3、P4、P5、P6。



此时消费组内又加入了一个新的消费者C1,按照既定的逻辑,需要将原来消费者C0的部分分区分配给消费者C1消费



紧接着消费组内又加入了一个新的消费者C2

我们可以利用增加消费者的个数从而对消费者组进行可伸缩的扩展,从而提高消费能力。

但是,如果一味地增加消费者,当消费者个数大于了分区数,就会有消费者分配不到任何分区,也将无法消费任何消息

以上分配逻辑都是基于默认的分区分配策略(Range)进行分析的,可以通过消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略,有关分区分配的更多细节将另起文章总结。

消费者客户端

一个简单的示例

public class KafkaConsumerDemo {
private static final String BROKERLIST = "10.128.123.250:9092";
private static final String TOPIC = "topic-demo";
private static final String GROUP_ID = "group.demo";
private static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(TOPIC));
try {
while (IS_RUNNING.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());
System.out.println("key=" + record.key() + ", value=" + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERLIST);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
return props;
}
}

基本参数

  • bootstrap.servers

和生产者客户端 KafkaProducer 中的相同,用来指定连接Kafka 集群所需的broker 地址清单,具体内容形式为host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开。

  • group.id

消费者隶属的消费组的名称,如果为空会报InvalidGroupIdException异常。

  • key.deserializer 和 value.deserializer

与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。

以上示例还有一个参数client.id,如果不设置,kafka会默认为我们生成诸如“consumer-1”、“consumer-2”的默认值。

订阅主题

订阅主题的几个重载方法

// 订阅主题集合
public void subscribe(Collection<String> topics) {
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// 通过正则表达式匹配主题
public void subscribe(Pattern pattern) {
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {

而对于带有ConsumerRebalanceListener参数的方法,另起文章单独总结,这里总结没有ConsumerRebalanceListener参数的情况。



而对于前后两次订阅不同的主题,那么后一次将覆盖前一次,例如

consumer.subscribe(Arrays.asList(topic1));
consumer.subscribe(Arrays.asList(topic2));

最终订阅的是topic2,而不是topic1,也不是topic1和topic2的并集。



如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。

例如

consumer.subscribe(Pattern.compile("topic-.*"));



除了订阅主题外,还可以订阅主题的特定分区

/**
* Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
* and will replace the previous assignment (if there is one).
* <p>
* If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
* <p>
* Manual topic assignment through this method does not use the consumer's group management
* functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
* metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
* and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
* <p>
* If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
* assignment replaces the old one.
*
* @param partitions The list of partitions to assign this consumer
* @throws IllegalArgumentException If partitions is null or contains null or empty topics
* @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
* (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void assign(Collection<TopicPartition> partitions)

使用

consumer.assign(Arrays.asList(new TopicPartition("topic-demo", 0)));

如果事先不知道有哪些分区,可以通过partitionsFor(String topic)方法进行查询。

返回PartitonInfo列表

public class PartitionInfo {
// 主题
private final String topic;
// 分区编号
private final int partition;
// 分区的leader副本
private final Node leader;
// 分区的AR集合
private final Node[] replicas;
// 分区的ISR集合
private final Node[] inSyncReplicas;
// 分区的OSR集合
private final Node[] offlineReplicas;
// ...
}



取消订阅可通过调用unsubscribe() 完成

/**
* Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
* This also clears any partitions directly assigned through {@link #assign(Collection)}.
*
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. rebalance callback errors)
*/
public void unsubscribe() {

通过注释可以发现,这个方法既可以取消通过 subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过 assign(Collection)方式实现的订阅。

如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法,下面示例中的三行代码的效果相同:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<>());
consumer.assign(new ArrayList<>());

如果没有订阅任何主题或分区,那么再继续执行消费程序的时候会报出IllegalStateException异常。



通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。



assign()方法的注释有这样一句话

Manual topic assignment through this method does not use the consumer's group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)} and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.



而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的。



反序列化则与生产者的序列化对应,使用方式类似。

消息消费

Kafka中的消费是基于拉模式。

public ConsumerRecords<K, V> poll(final Duration timeout) {

如果将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。

如果与可用的消息,会立即返回,否则,将等待到传入的时间。如果时间到了,将返回空的数据。



消费者消费到的每条消息的类型为ConsumerRecord。

public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
/**
* 所属主题的名称
**/
private final String topic;
/**
* 所在分区的编号
**/
private final int partition;
/**
* 消息在所属分区的偏移量
**/
private final long offset;
/**
* 时间戳
**/
private final long timestamp;
/**
* 时间戳类型
* timestampType 有两种类型:CreateTime(消息创建的时间戳) 和LogAppendTime(消息追加到日志的时间戳)
**/
private final TimestampType timestampType;
/**
* 消息的键经过序列化之后的大小
* 如果key为空,则serializedKeySize值为-1
**/
private final int serializedKeySize;
/**
* 消息的值经过序列化之后的大小
* 如果value为空,则serializedKeySize值为-1
**/
private final int serializedValueSize;
/**
* 消息的头部内容
**/
private final Headers headers;
/**
* 消息的键
**/
private final K key;
/**
* 消息的值
**/
private final V value;
private final Optional<Integer> leaderEpoch;
/**
* CRC32的校验值
*/
private volatile Long checksum;
// ...
}



我们注意到poll方法返回的类型是ConsumerRecords,他实现了Iterable接口,可以通过迭代器进行消息记录的遍历。

除此之外,我们还可以通过分区或主题的维度来获取消息记录。

/**
* Get just the records for the given topic
*/
public Iterable<ConsumerRecord<K, V>> records(String topic) {
/**
* Get just the records for the given partition
*
* @param partition The partition to get records for
*/
public List<ConsumerRecord<K, V>> records(TopicPartition partition) {

通过分区的维度进行消费:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
for (TopicPartition tp : records.partitions()) {
for (ConsumerRecord<String, String> record : records.records(tp)) {
System.out.println(record.partition() + ": " + record.value());
}
}

位移提交

位移提交的值

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。

当前消费者需要提交的消费位移并不是 x,而是 x+1,对应图中position的位置。

位移提交的时机

假设当前poll所拉到的信息为[X+2,X+7]。

拉取到消息就提交位移

如果拉取到消息就提交位移,即提交了X+8的位置。

那么假设正在处理X+5消息时发生了异常,在故障恢复之后,我们就从X+8开始消费了,而X+5到X+7的消息就没有被消费点,出现了丢消息的情况。

消费完后提交位移

如果消费完再提交位移,那么当消费到X+5时出现异常,当故障恢复后,就会重新拉取[X+2,X+7]的消息进行消费,也就是[X+2,X+4]的消息又会重新消费一遍,这就造成了重复消费。



实际情况,会比以上分析更为复杂。



在默认情况下,Kafka的消费位移的提交方式是自动提交,由消费者客户端参数enable.auto.commit配置,默认值为 true。这个默认提交的方式,并不是每消费一条消息就提交,而是定时提交。而定时提交的频率由参数auto.commit.interval.ms进行设置,默认5秒。



默认情况下,通过Kafka默认的定时的自动进行位移提交,会造成消息的重复消费和可能的消息丢失。

自动提交,是在poll()方法内完成的,即在拉数据前进行上一次的位移提交。



先说重复消费,在一次定时提交后,并且在下一次定时提交周期前,拉取了一批消息进行消费,这时候出现异常了,那么又得从上一次提交的位移位置开始消费,从而造成重复消费。可以通过设置auto.commit.interval.ms 为一个较小的值,但只是缩小了重复消费的时间窗,不能解决根本问题。



再看消息丢失,正常情况下是不会出现丢失的。但实际使用时,在一次消息拉取后,假设拉取到10条消息[X+0,X+9],我们如果是缓存到本地异步去消费消息,而当客户端在消费到X+3消息时,主线程进行下一次poll时达到了定时提交位移的时间,此时提交X+10。而同时X+3消费失败了,待恢复后就从X+10开始消费,就丢失了[X+3,X+9]这段消息。

手动提交位移

拉取到消息并不代表就已经消费完成,往往需要写入数据库、写入缓存或更加复杂的业务处理。这些场景下,只有消息被正确的处理完成后,才算消费完成。手动提交可以帮助开发者,在合适的地方进行位移提交。

// 修改参数 enable.auto.commit 为 false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交分为同步提交和异步提交。对应于 KafkaConsumer 中的 commitSync()commitAsync()两种类型的方法。



commitSync()方法会根据poll()方法拉取的最新位移来进行提交。如果需要手动指定提交的位移,还有另一个重载方法

/**
* @see KafkaConsumer#commitSync(Map)
*/
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

方法本身是阻塞的,如果没消费一条进行一次位移提交,势必会拉低整体性能。所以还有种方法是按照分区粒度进行提交。

for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.partition() + ": " + record.value());
}
lastConsumeOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 按分区粒度提交位移
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastConsumeOffset + 1)));
}



另一种是异步提交。

/**
* @see KafkaConsumer#commitAsync()
*/
void commitAsync();
/**
* @see KafkaConsumer#commitAsync(OffsetCommitCallback)
*/
void commitAsync(OffsetCommitCallback callback);
/**
* @see KafkaConsumer#commitAsync(Map, OffsetCommitCallback)
*/
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

第一个无参的方法和第三个offsets的方法很好理解,一个是当前poll拉取消息的position,第三个是指定offsets提交位移。

第二个,位移提交的回调。



异步提交是不能重试的,因为重试的时候,待提交的位移很可能是一个过期的位移。



正常情况下,偶然的提交失败并不是什么大问题,因为后续的提交成功就可以了。但是在某些情况下(例如程序退出、重平衡),我们希望最后的提交成功,因此一种非常普遍的方式是混合异步提交和同步提交

try {
while(isRunning.get()) {
// poll records.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
// do some logical processing.
// ...
consumer.commitAsync();
}
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}

如果要结束消费,同步提交位移,并需要及时close,从而及时触发再平衡,而不是等待组协调者发现消费者停止发送心跳并可能已经死亡,以免长时间导致该分区消息不能消费。

停止或关闭消费

通过API暂停

KafkaConsumer中使用pause和resume分别实现拉取数据时暂停某个分区的数据返回和恢复某个分区的数据返回。

/**
* @see KafkaConsumer#pause(Collection)
*/
void pause(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#resume(Collection)
*/
void resume(Collection<TopicPartition> partitions);

而通过paused()返回被暂停的分区集合

/**
* @see KafkaConsumer#paused()
*/
Set<TopicPartition> paused();

停止消费

之前“手动提交位移”部分的示例代码通过判断isRunning.get()是否为true,决定循环是否继续,我们可以通过isRunning.set(false)来退出循环。

还有一种方式是调用KafkaConsumer的wakeup()方法,该方法是线程安全的,可以被其他线程调用从而停止轮询。被调用时,会退出poll()逻辑,并抛出WakeupException异常(该异常不需要特别处理)。

try {
while (IS_RUNNING.get()) {
// consumer.poll(***)
// process the records.
// commit offset
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore ...
} catch (Exception e) {
// resolve the exception ...
} finally {
consumer.commitSync();
consumer.close();
}

指定位移消费

auto.offset.reset

以下两种情况下

  1. 主题分区中先有消息,然后建立一个新的消费组,去消费该主题的消息。

  2. 消费组中一个消费者订阅了新的主题。



这种情况下分区中找不到提交的消费位移。那么就会根据消费者客户端的参数auto.offset.reset来决定从什么位置开始消费。

  • latest(默认)

当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

  • earliest

当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。

  • none

topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出org.apache.kafka.clients.consumer.NoOffsetForPartitionException异常

seek

/**
* @see KafkaConsumer#seek(TopicPartition, long)
*/
void seek(TopicPartition partition, long offset);

从指定分区的指定位置开始消费。在调用seek之前需要poll一次数据,否则会报没有分配分区位移的异常。

一个示例

Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(TOPIC));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10);
}
while (IS_RUNNING.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
// consume the records...
}

从头消费和从尾部消费

/**
* @see KafkaConsumer#seekToBeginning(Collection)
*/
void seekToBeginning(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#seekToEnd(Collection)
*/
void seekToEnd(Collection<TopicPartition> partitions);

消费指定时间后的消息

/**
* @see KafkaConsumer#offsetsForTimes(Map)
*/
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
/**
* @see KafkaConsumer#offsetsForTimes(Map, Duration)
*/
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout);

返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳。

再均衡

再均衡一般发生在有新的消费者加入消费组,消费者下线(真下线或没有发送心跳)、消费者主动退出消费组、消费组对应的 GroupCoordinator 节点发生了变更、主题或主题分区数量发生变化等。

在消费者subscribe的重载方法中,有两个方法提供了ConsumerRebalanceListener参数。

其包含两个方法

  • void onPartitionsRevoked(Collection<TopicPartition> partitions);

在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。

  • void onPartitionsAssigned(Collection<TopicPartition> partitions);

在重新分配分区之后和消费者开始读取消费之前被调用。参数partitions表示再均衡后所分配到的分区。

使用示例:

Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
consumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffset);
currentOffset.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do noting.
}
});
try {
while (IS_RUNNING.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord<String, String> record : records) {
// process the record
currentOffset.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitAsync();
}
} finally {
consumer.close();
}

消费者拦截器

与生产者的拦截器相对应,消费者的拦截器可以在消费到消息或提交位移时,做一些定制化的处理。自定义拦截器需实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。

public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();

onConsume将在KafkaConsumer调用poll()方法返回数据前被调用,比如按照某种规则过滤消息。如果 onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。

在提交消费位移后,会调用onCommit方法。可以使用这个方法来跟踪记录提交的位移,比如使用commitSync进行提交时,我们不知道提交位移的详细信息,但可通过onCommit来获取信息。

多线程消费

KafkaProducer是线程安全的,但KafkaConsumer是非线程安全的。

KafkaConsumer为解决多线程访问的问题,内部有一个方法acquire,当KafkaConsumer的每个公有方法被调用时,都会先调用acquire方法,结束时调用release()完成释放。此方法是私有方法,使用时无需调用。

可以把acquire()看做一个轻量级的锁,实现原理是通过线程的标记来判断是否有并发操作,当出现并发操作,会抛出ConcurrentModificationException异常。

线程封闭

即一个线程,一个独立的KafkaConsumer,从而避免并发问题。



通过一个Consumer一个线程进行消费,一个Consumer消费一个或多个分区的消息,但当消息的生产速度不断提高后,消费端为了增加消费能力,可增加消费者来提升消费能力。

但是,当消费者的数量大于了分区数量时,就有部分消费者处于没有消费分区的状态,从而无法消费消息,消费能力也达到上限。

手动指定

为了打破消费线程数量不能超过分区数量的限制,可通过手动指定消费的方式,从而达到多个消费者消费同一个分区。具体可通过assign()seek()方法来实现,但这种实现方式对于位移的提交和顺序控制来说非常的复杂,不推荐使用。

多线程处理

与多线程拉取消息并处理不同,多线程处理重点在一次拉取,多个线程处理消息,而不是多线程拉取并处理。

代码实现

public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,
Runtime.getRuntime().availableProcessors());
consumerThread.start();
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERLIST);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return props;
}
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
private ExecutorService executorService;
public KafkaConsumerThread(Properties props, String topic, int threadNum) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Collections.singletonList(topic));
this.executorService = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordsHandler(records));
}
// 提交位移
synchronized (offsets) {
if (!offsets.isEmpty()) {
kafkaConsumer.commitSync(offsets);
offsets.clear();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
public static class RecordsHandler extends Thread {
public final ConsumerRecords<String, String> records;
public RecordsHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
// 处理 tpRecords ...
// 保存位移变量
long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
synchronized (offsets) {
if (!offsets.containsKey(tp)) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
} else {
long position = offsets.get(tp).offset();
if (position < lastConsumedOffset + 1) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
}
}
}
}
}
}

以上代码有一个问题,因为往线程池中提交了线程后,我们就提交位移,由于是异步进行消费,就有可能出现在前一次消费的线程尚未执行完,而后一次同一个分区的消费任务执行完成,造成了后一次位移提交了,而后前一次消费出现异常,这样就丢了前一次拉取的消息。

如果要可靠的处理,这里可能要借助log或redis,将拉取的消息记录,处理完成在进行日志记录或从redis中清除,实现起来比较复杂。

重要参数

fetch.min.bytes

配置Consumer一次拉取请求最小的数据量,默认1(B)。

当拉取的消息小于这个值时,不会立即返回,会等待达到这个参数配置的大小才返回。可适当调大该值,以提高吞吐量,但也可能带来一定的延迟。

fetch.max.bytes

配置Consumer一次拉取请求最大的数据量,默认52428800(B),即 50MB。

消息是通过分批次返回给消费者。如果在分区中的第一个消息批次大于这个值,那么该消息批次依然会返回给消费者,保证流程运行。

fetch.max.wait.ms

与fetch.min.bytes 配置使用,当拉取的消息没有达到fetch.min.bytes的值时会等待,而此配置则配置等待的时间,默认值500(ms)。

max.partition.fetch.bytes

每个分区返回的最大数据量,默认1048576(B),即1MB。

与fetch.max.bytes类似,该参数是限制每个分区,而后者限制一次拉取整体的大小。

max.poll.records

一次拉取中最大消息数量,默认值为500(条)。

如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度

connections.max.idle.ms

指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。

exclude.internal.topics

指定Kafka中的内部主题(__consumer_offsets和__transaction_state)是否可以向消费者公开,默认值为true。如果设置为true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false则没有这个限制。

receive.buffer.bytes

设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。如果Consumer与Kafka处于不同的机房,则可以适当调大这个参数值。

send.buffer.bytes

设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

request.timeout.ms

配置Consumer等待请求响应的最长时间,默认值为30000(ms)。

metadata.max.age.ms

配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker加入。

reconnect.backoff.ms

配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向broker发送的所有请求。

retry.backoff.ms

配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100(ms)。

isolation.level

配置消费者的事务隔离级别。

有效值为“read_uncommitted”和“read_committed”。



用户头像

guoguo 👻

关注

还未添加个人签名 2017.11.30 加入

还未添加个人简介

评论 (1 条评论)

发布
用户头像
文章质量很高!
2020 年 08 月 27 日 16:39
回复
没有更多了
【Kafka】消费者客户端小结(java)