写点什么

带你认识三种 kafka 消息发送模式

  • 2021 年 12 月 30 日
  • 本文字数:4003 字

    阅读完需:约 13 分钟

摘要:在 kafka-0.8.2 之后,producer 不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率。

 

本文分享自华为云社区《kafka消息发送模式》,作者:dayu_dls。

 

在 kafka-0.8.2 之后,producer 不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率。producer 请求会返回一个应答对象,包括偏移量或者错误信。这种异步方地批量的发送消息到 kafka broker 节点,因而可以减少 server 端资源的开销。新的 producer 和所有的服务器网络通信都是异步地,在 ack=-1 模式下需要等待所有的 replica 副本完成复制时,可以大幅减少等待时间。

为生产者设置属性

  

1. bootstrap.servers: 该属性指定 broker 的地址清单,地址的格式为 host:po 忱。清单里不需要包含所有的 broker 地址,生产者会给定的 broker 里查找到其他 broker 的信息。不过建议至少要提供两个 broker 的信息, 一且其中一个若机,生产者仍然能够连接到集群上。

  

2. key.serializer: broker 希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把 Java 对象作为键和值发送给 broker 。这样的代码具有良好的可读性,不过生产者需要知道如何把这些 Java 对象转换成字节数组。key. serializer 必须被设置为一个实现了 org.apache.kafka.common.serialization.StringSerializer 接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了 ByteArraySerializer(这个只做很少的事情)、StringSerializer 和 IntegeSerializer,因此,如果你只使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器。要注意,  key.serializer 是必须设置的,就算你打算只发送值内容。

  

3. value.serializer: 与 key.serializer 一样,value.serializer 指定的类会将值序列化。如果键和值都是字符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。

kafka 发送端 3 种不同的发送模式


1、Fire-and-forget

只发送消息,不关心消息是否发送成功。本质上也是一种异步发送的方式,消息先存储在缓冲区中,达到设定条件后批量发送。当然这是 kafka 吞吐量最高的一种方式,并配合参数 acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息。但是也是消息最不可靠的一种方式,因为对于发送失败的消息没有做任何处理。


ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);try {  producer.send(record);} catch (Exception e) {  e.printStackTrace();}
复制代码


​在发送消息之前有可能会发生异常,例如是序列化消息失败的 SerializationException、缓冲区满的 BufferExhaustedException、发送超时的 TimeoutException 或者发送的线程被中断的 InterruptException。发送消息之后并没有异常处理。 


2、Synchronous send

同步发送,send()方法会返回 Futrue 对象,通过调用 Futrue 对象的 get()方法,等待直到结果返回,根据返回的结果可以判断是否发送成功。如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个 partation 上,结合参数设置 retries 的值让发送失败时重试,设置 max_in_flight_requests_per_connection=1,可以控制生产者在收到服务器晌应之前只能发送 1 个消息,在消息发送成功后立刻 flush,从而控制消息顺序发送。


ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);try {  RecordMetadata metadata = producer.send(record).get();} catch (Exception e) {  e.printStackTrace();}producer.flush();producer.close();
复制代码


​在调用 send()方法后再调用 get()方法等待结果返回。如果发送失败会抛出异常,如果发送成功会返回一个 RecordMetadata 对象,然后可以调用 offset()方法获取该消息在当前分区的偏移量。


KafkaProducer 有两种类型的异常,第一种是可以重试的 Retriable,该类异常可以通过重新发送消息解决。例如是连接异常后重新连接、“no leader”异常后重新选取新的 leader。KafkaProducer 可以配置为遇到该类异常后自动重新发送消息直到超过重试次数。第二类是不可重试的,例如是“message size too large”(消息太大),该类异常会马上返回错误。


 3、Asynchronous send

异步发送,在调用 send()方法的时候指定一个 callback 函数,当 broker 接收到返回的时候,该 callback 函数会被触发执行。如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息,配合参数 retries=0,并将发送失败的消息记录到日志文件中;要使用 callback 函数,先要实现 org.apache.kafka.clients.producer.Callback 接口,该接口只有一个 onCompletion 方法。如果发送异常,onCompletion 的参数 Exception e 会为非空。


ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);      producer.send(myRecord,                   new Callback() {                        public void onCompletion(RecordMetadata metadata, Exception e) {                            if(e != null) {                               e.printStackTrace();                            } else {                               System.out.println("The offset of the record we just sent is: " + metadata.offset());                            }                       }                   });  
复制代码


异步发送相关参数


异步发送时,kafka 会先把消息存储在缓冲池中,当到达设定条件触发缓冲池消息发送。

(1)消息缓存达到 batch.size;

(2)距离上一次消息发送时间间隔 linger.ms;

(3)调用 flush()方法,会立刻触发发送,并阻塞到当前缓冲区发送完毕;

(4)调用 close(),触发发送,完毕后关闭。


4.1 buffer.memory

此配置设置生产者可用于缓冲等待发送给 brokers 消息的总内存字节数,默认为 33554432=32MB。如果消息发送到缓存区的速度比发送到 broker 的速度快,那么生产者会被阻塞(根据 max.block.ms 配置的时间,默认为 60000ms=1 分钟,在 0.9.0.0 版本之前使用 block.on.buffer.full 配置),之后会抛出异常。


4.2 compression.type

生产者对生成的所有数据使用的压缩类型,默认值是 none(即不压缩),有效值为 none,gzip,snappy 或 lz4。Snappy 压缩技术是 Google 开发的,它可以在提供较好的压缩比的同时,减少对 CPU 的使用率并保证好的性能,所以建议在同时考虑性能和带宽的情况下使用。Gzip 压缩技术通常会使用更多的 CPU 和时间,但会产生更好的压缩比,所以建议在网络带宽更受限制的情况下使用。通过启用压缩功能,可以减少网络利用率和存储空间,这往往是向 Kafka 发送消息的瓶颈。


4.3 retries

默认值为 0,当设置为大于零的值,客户端会重新发送任何发送失败的消息。注意,此重试与客户端收到错误时重新发送消息是没有区别的。在配置 max.in.flight.requests.per.connection 不等于 1 的情况下,允许重试可能会改变消息的顺序,因为如果两个批次的消息被发送到同一个分区,第一批消息发送失败但第二批成功,而第一批消息会被重新发送,则第二批消息会先被写入。注意此参数可能会改变消息的顺序性。


4.4 batch.size

当多个消息被发送到同一个分区时,生产者会把它们一起处理。此配置设置用于每批处理使用的内存字节数,默认为 16384=16KB。当使用的内存满的时候,生产者会发送当前批次的所有消息。但是,这并不意味着生产者会一直等待使用的内存变满,根据下面 linger.ms 配置的时间也会触发消息发送。设置较小的值会增加发送的频率,从而可能会减少吞吐量;设置较大的值会使用较多的内存,设置为 0 会关闭批处理的功能。


4.5 linger.ms

此配置设置在发送当前批次消息之前等待新消息的时间量,默认值为 0。KafkaProducer 会在当前批次使用的内存已满或等待时间到达 linger.ms 配置时间的时候发送消息。当 linger.ms>0 时,延时性会增加,但会提高吞吐量,因为会减少消息发送频率。


4.6 client.id

用于标识发送消息的客户端,通常用于日志和性能指标以及配额。


4.7 max.in.flight.requests.per.connection

此配置设置客户端在单个连接上能够发送的未确认请求的最大数量,默认为 5,超过此数量会造成阻塞。设置大的值可以提高吞吐量但会增加内存使用,但是需要注意的是,当设置值大于 1 而且发送失败时,如果启用了重试配置,有可能会改变消息的顺序。设置为 1 时,即使重新发送消息,也可以保证发送的顺序和写入的顺序一致。


4.8 request.timeout.ms

此配置设置客户端等待请求响应的最长时间,默认为 30000ms=30 秒,如果在这个时间内没有收到响应,客户端将重发请求,如果超过重试次数将抛异常。此配置应该比 replica.lag.time.max.ms(broker 配置,默认 10 秒)大,以减少由于生产者不必要的重试造成消息重复的可能性。


4.9 max.block.ms

当发送缓冲区已满或者元数据不可用时,生产者调用 send()和 partitionsFor()方法会被阻塞,默认阻塞时间为 60000ms=1 分钟。由于使用用户自定义的序列化器和分区器造成的阻塞将不会计入此时间。


4.10 max.request.size

此配置设置生产者在单个请求中能够发送的最大字节数,默认为 1048576 字节=1MB。例如,你可以发送单个大小为 1MB 的消息或者 1000 个大小为 1KB 的消息。注意,broker 也有接收消息的大小限制,使用的配置是 message.max.bytes=1000012 字节(好奇怪的数字,约等于 1MB)。


4.11 receive.buffer.bytes 和 send.buffer.bytes

receive.buffer.bytes:读取数据时使用的 TCP 接收缓冲区(SO_RCVBUF)的大小,默认值为 32768 字节=32KB。如果设置为-1,则将使用操作系统的默认值。send.buffer.bytes:发送数据时使用的 TCP 发送缓冲区(SO_SNDBUF)的大小,默认值为 131072 字节=128KB。如果设置为-1,则将使用操作系统的默认值。


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
带你认识三种kafka消息发送模式