写点什么

一文详解 Kafka API

  • 2022 年 2 月 11 日
  • 本文字数:15737 字

    阅读完需:约 52 分钟

摘要:Kafka 的 API 有 Producer API,Consumer API 还有自定义 Interceptor (自定义拦截器),以及处理的流使用的 Streams API 和构建连接器的 Kafka Connect API。


本文分享自华为云社区《【Kafka笔记】Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)》,作者: Copy 工程师。

简介​


Kafka 的 API 有 Producer API,Consumer API 还有自定义 Interceptor (自定义拦截器),以及处理的流使用的 Streams API 和构建连接器的 Kafka Connect API。

Producer API​


Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送过程中,涉及两个线程:main 线程和 Sender 线程,以及一个线程共享变量 RecordAccumulator。main 线程将消息发送给 RecordAccmulator,Sender 线程不断地从 RecordAccumulator 中拉取消息发送给 Kafka broker。


这里的 ACk 机制,不是生产者得到 ACK 返回信息才开始发送,ACK 保证的是生产者不丢失数据。例如:


而是只要有消息数据,就向 broker 发送。


消息发送流程


生产者使用 send 方法,经过拦截器之后在经过序列化器,然后在走分区器。然后通过分批次把数据发送到 PecordAccumulator,main 线程到此过程就结束了,然后在回去执行 send。


Sender 线程不断的获取 RecordAccumulator 的数据发送到 topic。


消息发送流程是异步发送的,并且顺序是一定的拦截器-》序列化器-》分区器


异步发送 API


需要用到的类:


KafkaProducer: 需要创建一个生产者对象,用来发送数据ProducerConfig:获取所需要的一系列配置参数ProducerRecord:每条数据都要封装成一个ProducerRecord对象
复制代码


实例:


public class KafkaProducerDemo {
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建KafkaProducer客户端 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10 ; i++) { producer.send(new ProducerRecord<>("my-topic","ImKey-"+i,"ImValue-"+i)); } // 关闭资源 producer.close();
}}
复制代码


配置参数说明:


send():方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。


ack:是判断请求是否完整的条件(就会判断是不是成功发送了,也就是上次说的 ACK 机制),指定 all 将会阻塞消息,性能低但是最可靠。


retries:如果请求失败,生产者会自动重试,我们指定是 1 次,但是启动重试就有可能出现重复数据。


batch.size:指定缓存的大小,生产者缓存每个分区未发送的消息。值越大的话将会产生更大的批量,并需要更大的内存(因为每个活跃的分区都有一个缓存区)。


linger.ms:指示生产者发送请求之前等待一段时间,设置等待时间是希望更多地消息填补到未满的批中。默认缓冲可以立即发送,即便缓冲空间还没有满,但是如果想减少请求的数量可以设置 linger.ms 大于 0。需要注意的是在高负载下,相近的时间一般也会组成批,即使等于 0。


buffer.memory:控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms 设定,之后将会抛出一个 TimeoutException


key.serializer 和 value.serializer 将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的 ByteArraySerializaer 或 StringSerializer 处理简单的 string 或 byte 类型。


运行日志:


[Godway] INFO  2019-11-14 14:46 - org.apache.kafka.clients.producer.ProducerConfig[main] - ProducerConfig values: 	acks = all	batch.size = 16384	bootstrap.servers = [XXXXXX:9093]	buffer.memory = 33554432	client.id = 	compression.type = none	connections.max.idle.ms = 540000	enable.idempotence = false	interceptor.classes = null	key.serializer = class org.apache.kafka.common.serialization.StringSerializer	linger.ms = 1	max.block.ms = 60000	max.in.flight.requests.per.connection = 5	max.request.size = 1048576	metadata.max.age.ms = 300000	metric.reporters = []	metrics.num.samples = 2	metrics.recording.level = INFO	metrics.sample.window.ms = 30000	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner	receive.buffer.bytes = 32768	reconnect.backoff.max.ms = 1000	reconnect.backoff.ms = 50	request.timeout.ms = 30000	retries = 1	retry.backoff.ms = 100	sasl.jaas.config = null	sasl.kerberos.kinit.cmd = /usr/bin/kinit	sasl.kerberos.min.time.before.relogin = 60000	sasl.kerberos.service.name = null	sasl.kerberos.ticket.renew.jitter = 0.05	sasl.kerberos.ticket.renew.window.factor = 0.8	sasl.mechanism = GSSAPI	security.protocol = PLAINTEXT	send.buffer.bytes = 131072	ssl.cipher.suites = null	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]	ssl.endpoint.identification.algorithm = null	ssl.key.password = null	ssl.keymanager.algorithm = SunX509	ssl.keystore.location = null	ssl.keystore.password = null	ssl.keystore.type = JKS	ssl.protocol = TLS	ssl.provider = null	ssl.secure.random.implementation = null	ssl.trustmanager.algorithm = PKIX	ssl.truststore.location = null	ssl.truststore.password = null	ssl.truststore.type = JKS	transaction.timeout.ms = 60000	transactional.id = null	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[Godway] INFO 2019-11-14 14:46 - org.apache.kafka.common.utils.AppInfoParser[main] - Kafka version : 0.11.0.3[Godway] INFO 2019-11-14 14:46 - org.apache.kafka.common.utils.AppInfoParser[main] - Kafka commitId : 26ddb9e3197be39a[Godway] WARN 2019-11-14 14:46 - org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - Error while fetching metadata with correlation id 1 : {my-topic=LEADER_NOT_AVAILABLE}[Godway] INFO 2019-11-14 14:46 - org.apache.kafka.clients.producer.KafkaProducer[main] - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
Process finished with exit code 0
复制代码


有一条警告{my-topic=LEADER_NOT_AVAILABLE} 提示该 topic 不存在,但是没有关系 kafka 会自动给你创建一个 topic,不过创建的 topic 是有一个分区和一个副本:



查看一下该 topic 的消息:



消息已经在 topic 里了


上面的实例是没有回调函数的,send 方法是有回调函数的:


public class KafkaProducerCallbackDemo {
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建KafkaProducer客户端 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 10; i < 20 ; i++) { producer.send(new ProducerRecord<String, String>("my-topic", "ImKey-" + i, "ImValue-" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.println("消息发送成功!"+recordMetadata.offset()); }else { System.err.println("消息发送失败!"); } } }); } producer.close(); }}
复制代码


回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null 说明消息发送失败。


注意: 消息发送失败会自动重试,不需要我们在回调函数中手动重试,使用回调也是无阻塞的。而且 callback 一般在生产者的 IO 线程中执行,所以是非常快的,否则将延迟其他的线程消息发送。如果需要执行阻塞或者计算的回调(耗时比较长),建议在 callbanck 主体中使用自己的 Executor 来并行处理!



同步发送 API


同步发送的意思就是,一条消息发送之后,会阻塞当前的线程,直到返回 ack(此 ack 和异步的 ack 机制不是一个 ack)。


此 ack 是 Future 阻塞 main 线程,当发送完成就返回一个 ack 去通知 main 线程已经发送完毕,继续往下走了


public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)


send 是异步的,并且一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。


发送的结果是一个 RecordMetadata,它指定了消息发送的分区,分配的 offset 和消息的时间戳。如果 topic 使用的是 CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果 topic 使用的是 LogAppendTime,则追加消息时,时间戳是 broker 的本地时间。


由于 send 调用是异步的,它将为分配消息的此消息的 RecordMetadata 返回一个 Future。如果 future 调用 get(),则将阻塞,直到相关请求完成并返回该消息的 metadata,或抛出发送异常。


Throws:


InterruptException - 如果线程在阻塞中断。

SerializationException - 如果 key 或 value 不是给定有效配置的 serializers。

TimeoutException - 如果获取元数据或消息分配内存话费的时间超过 max.block.ms。

KafkaException - Kafka 有关的错误(不属于公共 API 的异常)。


public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建KafkaProducer客户端 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 20; i < 30 ; i++) { RecordMetadata metadata = producer.send(new ProducerRecord<>("my-topic", "ImKey-" + i, "ImValue-" + i)).get(); System.out.println(metadata.offset()); } producer.close();
}}
复制代码


API 生产者自定义分区策略


生产者在向 topic 发送消息的时候的分区规则:


public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)public ProducerRecord(String topic, Integer partition, K key, V value)public ProducerRecord(String topic, K key, V value)public ProducerRecord(String topic, V value)
复制代码


根据 send 方法的参数的构造方法就可以看出来,


  1. 指定分区就发送到指定分区

  2. 没有指定分区,有 key 值,就按照 key 值的 Hash 值分配分区

  3. 没有指定分区,也没有指定 key 值,轮询分区分配(只分配一次,以后都按照第一次的分区顺序)


自定义分区器


自定义分区器需要实现 org.apache.kafka.clients.producer.Partitioner 接口。并且实现三个方法


public class KafkaMyPartitions implements Partitioner {
@Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { return 0; } @Override public void close() {
} @Override public void configure(Map<String, ?> map) {
}}
复制代码


自定义分区实例:


KafkaMyPartitions:


public class KafkaMyPartitions implements Partitioner {
@Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { // 这里写自己的分区策略 // 我这里指定为1 return 1; } @Override public void close() { }
@Override public void configure(Map<String, ?> map) { }}
复制代码


KafkaProducerCallbackDemo:


public class KafkaProducerCallbackDemo {
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定自定义分区 props.put("partitioner.class","com.firehome.newkafka.KafkaMyPartitions");
// 创建KafkaProducer客户端 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 20; i < 25 ; i++) { producer.send(new ProducerRecord<String, String>("th-topic", "ImKey-" + i, "ImValue-" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.printf("消息发送成功!topic=%s,partition=%s,offset=%d \n",recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset()); }else { System.err.println("消息发送失败!"); } } }); } producer.close(); }}
复制代码


返回日志:


消息发送成功!topic=th-topic,partition=1,offset=27 消息发送成功!topic=th-topic,partition=1,offset=28 消息发送成功!topic=th-topic,partition=1,offset=29 消息发送成功!topic=th-topic,partition=1,offset=30 消息发送成功!topic=th-topic,partition=1,offset=31
复制代码


可以看到直接发送到了分区 1 上了。


多线程发送消息


Producer API 是线程安全的,直接就可以使用多线程发送消息,实例:


public class KafkaProducerThread implements Runnable {
private KafkaProducer<String,String> kafkaProducer;
public KafkaProducerThread(){
} public KafkaProducerThread(KafkaProducer kafkaProducer){ this.kafkaProducer = kafkaProducer; } @Override public void run() { for (int i = 0; i < 20 ; i++) { String key = "ImKey-" + i+"-"+Thread.currentThread().getName(); String value = "ImValue-" + i+"-"+Thread.currentThread().getName(); kafkaProducer.send(new ProducerRecord<>("th-topic", key, value)); System.out.printf("Thread-name = %s, key = %s, value = %s",Thread.currentThread().getName(),key,value); } }
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建KafkaProducer客户端 KafkaProducer<String, String> producer = new KafkaProducer<>(props); KafkaProducerThread producerThread1 = new KafkaProducerThread(producer); //KafkaProducerThread producerThread2 = new KafkaProducerThread(producer); Thread one = new Thread(producerThread1, "one"); Thread two = new Thread(producerThread1, "two"); System.out.println("线程开始"); one.start(); two.start(); }}
复制代码


这里只是一个简单的实例。

Consumer API​


kafka 客户端通过 TCP 长连接从集群中消费消息,并透明地处理 kafka 集群中出现故障服务器,透明地调节适应集群中变化的数据分区。也和服务器交互,平衡均衡消费者。


偏移量和消费者的位置


kafka 为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的唯一标示符。也表示消费者在分区的位置。例如,一个位置是 5 的消费者(说明已经消费了 0 到 4 的消息),下一个接收消息的偏移量为 5 的消息。实际上有两个与消费者相关的“位置”概念:


消费者的位置给出了下一条记录的偏移量。它比消费者在该分区中看到的最大偏移量要大一个。 它在每次消费者在调用 poll(long)中接收消息时自动增长。


“已提交”的位置是已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。消费者可以选择定期自动提交偏移量,也可以选择通过调用 commit API 来手动的控制(如:commitSync 和 commitAsync)。


这个区别是消费者来控制一条消息什么时候才被认为是已被消费的,控制权在消费者。


消费者组和主题订阅


Kafka 的消费者组概念,通过进程池瓜分消息并处理消息。这些进程可以在同一台机器运行,也可分布到多台机器上,以增加可扩展性和容错性,相同 group.id 的消费者将视为同一个消费者组。


分组中的每个消费者都通过 subscribe API 动态的订阅一个 topic 列表。kafka 将已订阅 topic 的消息发送到每个消费者组中。并通过平衡分区在消费者分组中所有成员之间来达到平均。因此每个分区恰好地分配 1 个消费者(一个消费者组中)。所有如果一个 topic 有 4 个分区,并且一个消费者分组有只有 2 个消费者。那么每个消费者将消费 2 个分区。


消费者组的成员是动态维护的:如果一个消费者故障。分配给它的分区将重新分配给同一个分组中其他的消费者。同样的,如果一个新的消费者加入到分组,将从现有消费者中移一个给它。这被称为重新平衡分组。当新分区添加到订阅的 topic 时,或者当创建与订阅的正则表达式匹配的新 topic 时,也将重新平衡。将通过定时刷新自动发现新的分区,并将其分配给分组的成员。


从概念上讲,你可以将消费者分组看作是由多个进程组成的单一逻辑订阅者。作为一个多订阅系统,Kafka 支持对于给定 topic 任何数量的消费者组,而不重复。


这是在消息系统中常见的功能的略微概括。所有进程都将是单个消费者分组的一部分(类似传统消息传递系统中的队列的语义),因此消息传递就像队列一样,在组中平衡。与传统的消息系统不同的是,虽然,你可以有多个这样的组。但每个进程都有自己的消费者组(类似于传统消息系统中 pub-sub 的语义),因此每个进程都会订阅到该主题的所有消息。


此外,当分组重新分配自动发生时,可以通过 ConsumerRebalanceListener 通知消费者,这允许他们完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等


它也允许消费者通过使用 assign(Collection)手动分配指定分区,如果使用手动指定分配分区,那么动态分区分配和协调消费者组将失效。


发现消费者故障


订阅一组 topic,当调用 poll(long)时,消费者将自动加入到消费者组中。只要持续调用 poll,消费者将一直保持可用,并继续从分配的分区中接收数据。此外,消费者向服务器定时发送心跳。如果消费者崩溃或无法再 session.timeout.ms 配置的时间内发送心跳,则消费者就被视为死亡,并且其分区将被重新分配。


还有一种可能,消费者可能遇到活锁的情况,它持续的发送心跳,但是没有处理。为了预防消费者在这总情况下一直拥有分区,我们使用 max.poll.interval.ms 活跃监测机制。在此基础上,如果你调用的 poll 的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。发生这种情况时,你会看到 offset 提交失败( 调用 commitSync()引发的 CommitFailedException )。这是一种安全机制,保障只有活动成员能够提交 offset。所以要留在组中,你必须持续调用 poll。


消费者提供两种配置设置来控制 poll 循环:


  1. max.poll.interval.ms: 增大 poll 的间隔,可以为消费者提供更多的时间去处理返回的消息(调用 poll(long)返回的消息,通常返回的消息都是一批),缺点是此值越大将会延迟组重新平衡。

  2. max.poll.records:此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔,减少重新平衡分组的


对于消息处理时间不可预测地情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用 poll。 但是必须注意确保已提交的 offset 不超过实际位置。另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量。 还要注意, 你需要pause暂停分区,不会从 poll 接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致机器内存溢出)。


实例:


自动提交偏移量


public static void main(String[] args) {
Properties props = new Properties(); props.put("bootstrap.servers","xxxxxxxxxx:9093"); props.put("group.id","test-6");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit","true");//自动提交offset props.put("auto.commit.interval.ms","1000"); // 自动提交时间间隔 props.put("max.poll.records","5"); // 拉取的数据条数 props.put("session.timeout.ms","10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 可以写多个topic consumer.subscribe(Arrays.asList("my-topic")); while (true){ ConsumerRecords<String, String> records = consumer.poll(5000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } System.out.println("处理了一批数据!"); } }
复制代码


配置说明:


bootstrap.servers: 集群是通过配置 bootstrap.servers 指定一个或多个 broker。不用指定全部的 broker,它将自动发现集群中的其余的 borker(最好指定多个,万一有服务器故障)


enable.auto.commit: 自动提交偏移量,如果设置了自动提交偏移量,下面这个设置就必须要用到了。


auto.commit.interval.ms:自动提交时间间隔,和自动提交偏移量配合使用


max.poll.records:控制从 broker 拉取的消息条数


poll(long time): 当消费者获取不到消息时,就会使用这个参数,为了减轻无效的循环请求消息,消费者会每隔 long time 的时间请求一次消息,单位是毫秒。


session.timeout.ms: broker 通过心跳机器自动检测消费者组中失败的进程,消费者会自动 ping 集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过 session.timeout.ms,那么就会认为是故障的,它的分区将被分配到别的进程。


auto.offset.reset:这个属性很重要,一会详细讲解


这里说明一下 auto.commit.interval.ms 以及何时提交消费者偏移量,经过测试:


  1. 设置props.put("auto.commit.interval.ms","60000");

自动提交时间为一分钟,也就是你在这一分钟内拉取任何数量的消息都不会被提交消费的当前偏移量,如果你此时关闭消费者(一分钟内),下次消费还是从和第一次的消费数据一样,即使你在一分钟内消费完所有的消息,只要你在一分钟内关闭程序,导致提交不了 offset,就可以一直重复消费数据。

  1. 设置props.put("auto.commit.interval.ms","3000");

但是在消费过程中设置 sleep。


public static void main(String[] args) {
Properties props = new Properties(); props.put("bootstrap.servers","xxxxxxxxxxxx:9093"); props.put("group.id","test-6");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit","true");//自动提交offset props.put("auto.commit.interval.ms","100000"); // 自动提交时间间隔 props.put("max.poll.records","5"); // 拉取的数据条数 props.put("session.timeout.ms","10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); // KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 可以写多个topic consumer.subscribe(Arrays.asList("my-topic"));
while (true){ ConsumerRecords<String, String> records = consumer.poll(5000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } try { Thread.sleep(5000L); System.out.println("等待了5秒了!!!!!!!!!!!!开始等待15秒了"); Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("处理了一批数据!"); } }
复制代码


这里如果你消费了第一批数据,在执行第二次 poll 的时候,关闭程序也不会提交偏移量,只有在执行第二次 poll 的时候才会把上一次的最后一个 offset 提交上去。


auto.offset.reset 讲解:


auto.offset.reset 的值有三种:earliest,latest,none,代表者不同的意思


earliest:当各分区下有已经提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费,最常用的值

latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据

none:topic 各分区都存在已提交的 offset 时,从 offset 后开始消费,只要有一个分区不存在已提交的 offset,则抛出异常

!!注意:当使用了 latest,并且分区没有已提交的 offset 时,消费新产生的该分区下的数据,其实是把 offset 的值直接设置到最后一个消息的位置。例如,有个 30 条数据的 demo 的 topic,各分区无提交 offset,使用了 latest,再看 offset 就会发现已经在 30 的位置了,所以才只能消费新产生的数据!!!!


手动提交偏移量


不需要定时提交偏移量,可以自己控制 offset,当消息已经被我们消费过后,再去手动提交他们的偏移量。这个很适合我们的一些处理逻辑。


手动提交 offset 的方法有两种:分别是 commitSync(同步提交) 和 commitAsync(异步提交)。两者的相同点,都会将本次 poll 的一批数据最高的偏移量提交;不同点是 commitSync 会失败重试,一直到提交成功(如果有不可恢复的原因导致,也会提交失败),才去拉取新数据。而 commitAsync 则没有重试机制(提交了就去拉取新数据,不管这次的提交有没有成功),故有可能提交失败。


实例:


public static void main(String[] args) {        Properties props = new Properties();        props.put("bootstrap.servers","XXXXXC:9093");        props.put("group.id","test-11");//消费者组,只要group.id相同,就属于同一个消费者组        props.put("enable.auto.commit","false");//自动提交offset        props.put("auto.commit.interval.ms","1000"); // 自动提交时间间隔        props.put("max.poll.records","20"); // 拉取的数据条数        props.put("session.timeout.ms","10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");        props.put("auto.offset.reset", "earliest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); int i= 0; while (true){ ConsumerRecords<String, String> records = consumer.poll(5000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); i++; } if (i == 20){ System.out.println("i_num:"+i); // 同步提交 consumer.commitSync(); // 异步提交 // consumer.commitAsync(); }else { System.out.println("不足二十个,不提交"+i); } i=0; } }
复制代码


这些都是全部提交偏移量,如果我们想更细致的控制偏移量提交,可以自定义提交偏移量:


public static void main(String[] args) throws InterruptedException {        Properties props = new Properties();        props.put("bootstrap.servers","XXXXXXXXXX:9093");        props.put("group.id","test-18");//消费者组,只要group.id相同,就属于同一个消费者组        props.put("enable.auto.commit","false");//自动提交offset        props.put("auto.commit.interval.ms","1000000"); // 自动提交时间间隔        props.put("max.poll.records","5"); // 拉取的数据条数        props.put("session.timeout.ms","10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");        props.put("auto.offset.reset", "earliest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true){ ConsumerRecords<String, String> records = consumer.poll(5000); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)), new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { for (Map.Entry<TopicPartition,OffsetAndMetadata> entry : map.entrySet()){ System.out.println("提交的分区:"+entry.getKey().partition()+",提交的偏移量:"+entry.getValue().offset()); } } }); } } }
复制代码


订阅指定的分区


通过消费者 Kafka 会通过分区分配分给消费者一个分区,但是我们也可以指定分区消费消息,要使用指定分区,只需要调用 assign(Collection)消费指定的分区即可:


public static void main(String[] args) throws InterruptedException {        Properties props = new Properties();        props.put("bootstrap.servers","XXXXXXXXX:9093");        props.put("group.id","test-19");//消费者组,只要group.id相同,就属于同一个消费者组        props.put("enable.auto.commit","false");//自动提交offset        props.put("auto.commit.interval.ms","1000000"); // 自动提交时间间隔        props.put("max.poll.records","5"); // 拉取的数据条数        props.put("session.timeout.ms","10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");        props.put("auto.offset.reset", "earliest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); // 你可以指定多个不同topic的分区或者相同topic的分区 我这里只指定一个分区 TopicPartition topicPartition = new TopicPartition("my-topic", 0); // 调用指定分区用assign,消费topic使用subscribe consumer.assign(Arrays.asList(topicPartition)); while (true){ ConsumerRecords<String, String> records = consumer.poll(5000); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)), new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { for (Map.Entry<TopicPartition,OffsetAndMetadata> entry : map.entrySet()){ System.out.println("提交的分区:"+entry.getKey().partition()+",提交的偏移量:"+entry.getValue().offset()); } } }); } } }
复制代码


一旦手动分配分区,你可以在循环中调用 poll。消费者分区任然需要提交 offset,只是现在分区的设置只能通过调用 assign 修改,因为手动分配不会进行分组协调,因此消费者故障或者消费者的数量变动都不会引起分区重新平衡。每一个消费者是独立工作的(即使和其他的消费者共享 GroupId)。为了避免 offset 提交冲突,通常你需要确认每一个 consumer 实例的 groupId 都是唯一的。


注意:

手动分配分区(assgin)和动态分区分配的订阅 topic 模式(subcribe)不能混合使用。


​​​​​点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
一文详解Kafka API