5 种 kafka 消费端性能优化方法
本文分享自华为云社区《FusionInsight HD&MRS:kafka消费端性能优化方法》,作者: 穿夹克的坏猴子 。
kafka 消费端性能优化主要从下面几个方面优化:
1. 接口使用方面优化:
旧版本 highlevel-consumer:偏移量信息存储在 zookeeper,最大消费线程数与分区数量相同,不推荐
旧版本 simpleconsumer:自行选择存储偏移量的方式,可以实现多线程消费单分区,若无特殊的性能要求,不推荐
新版本 highlevel-consumer:偏移量信息存储在 kafka 指定的 topic 中,默认情况下最大消费线程数与分区数量相同,可以实现多线程消费单分区,推荐
2. 参数调优(以下参数需根据现网环境评估调至合适的值):
2.1 旧版本消费者(kafka old API)参数调优
fetch.message.max.bytes:该参数为一次性从 kafka 集群中获取的数据块大小。在升级到 651 版本后这个参数需要调大,否则容易出现获取数据限制的报错。建议调整大小不小于 kafka 的服务端参数 message.max.bytes。
注意如何确认为旧版本:如果生产者的配置方式包含如下这些配置,则为旧版本:group.id/zookeeper.connect
2.2 新版本参数(kafka new API)参数调优
max.poll.records:意味消费者一次 poll()操作,能够获取的最大数据量,调整这个值能提升吞吐量,于此同时也需要同步提升 max.poll.interval.ms 的参数大小。
fetch.max.bytes:意味 server 端可返回给 consumer 的最大数据大小,增加可以提升吞吐量,但是在客户端和服务端网络延迟比较大的环境下,建议可以减小该值,防止业务处理数据超时。
heartbeat.interval.ms:消费超时时间,consumer 与 kafka 之间的超时时间,该参数不能超过 session.timeout.ms,通常设置为 session.timeout.ms 的三分之一,默认值:3000。
max.partition.fetch.bytes:限制每个 consumer 发起 fetch 请求时候,读到数据(record)的限制,设置过大,consumer 本地缓存的数据就会越多,可能影响内存的使用,默认值:1048576。
fetch.max.bytes:server 端可返回给 consumer 的最大数据大小,数值可大于 max.partition.fetch.bytes,一般设置为默认值即可,默认值:52428800
session.timeout.ms:使用 consumer 组管理 offset 时,consumer 与 broker 之间的心跳超时时间,如果 consumer 消费数据的频率非常低,建议增大这个参数值,默认值:10000。
auto.offset.reset:消费过程中无法找到数据消费到的 offset 位置,所选择的消费策略,earliest:从头开始消费,可能会消费到重复数据,latest:从数据末尾开始消费,可能会丢失数据。默认值:earlist。
max.poll.interval.ms:消费者在每一轮 poll() (拉取数据之间的最大时间延迟),如果此超时时间期满之前 poll()没有被再次调用,则消费者被视为失败,并且分组将触发 rebalance,以便将分区重新分配给别的成员。
如果,再两次 poll 之间需要添加过多复杂的,耗时的逻辑,需要延长这个时间,默认值:300s
max.poll.records:消费者一次 poll()操作,能够获取的最大数据量,增加这个参数值,会增加一次性拉取数据的数据量,确保拉取数据的时间,至少在 max.poll.interval.ms 规定的范围之内,默认值:500
2.3 Simpleconsumer 参数调优
simpleconsumer 在初始化阶段需要传一个 fetchsize 的参数,比如:consumer=new SimpleConsumer(leaderBroker,a_port,100000,64*1024,clientName)中 64*1024,该参数表示 simpleconsumer 一次性获取的数据大小,如果该值过大则可能会导致 request 时间过长,使用过程中应该降低这个值,保证消费频率。
使用 SimpleConsumer 的核心需求是:多线程消费单个分区,以达到提升性能的要求,如果没有这样需求,不建议使用这个这种消费方式
3. 消费端频繁 rebalance 导致性能下降调优:
3.1 因业务处理能力不足导致的:
session.timout.ms 控制心跳超时时间。
heartbeat.interval.ms 控制心跳发送频率,建议该值不超过 session.timout.ms 的三分之一。
max.poll.interval.ms 控制每次 poll 的间隔,时间=获取数据的时间+处理数据的时间,如果 max.poll.records 设定的值在 max.poll.interval.ms 指定的时间内没有处理完成会触发 rebalance,这里给出一个相对较为合理的配置,建议在预计的处理时间的基础上再加 1 分钟。
max.poll.records 每个批次处理的数据条数,默认为 500 条。如果处理能力较低,建议可以减小这个值。
3.2 非正常消费者频繁的访问 kafka 集群导致频繁 rebalance:
收集 kafka-request.log,查看异常的 topic 有哪些客户端节点在消费,cat kafka-request.* | grep “topic=topicName” | grep “apikey=FETCH” | awk –F’from connection’ ‘{print $2}’ | awk –F’;’ ‘{print $1}’ | awk –F’-’ ‘{print $2}’ | awk –F’:’ ‘{print $1}’ | sort | uniq –c | sort -nr ,找出不应该产生消费行为的节点,停止异常节点上消费者
4. 版本引发性能下降优化
FI 8.0.2 版本之前 kafka SimpleAclAuthorizer 鉴权异常导致性能下降,8.0.2 版本在使用非安全端口(21005 或者 9092 端口)时会出现集群性能下降的问题,表现:kafka-root.log 中出现大量 ExitcodeException:id:Default#Principal:no such user 报错。
解决办法:升级到 FI 8023 以上版本
临时规避办法:业务侧使用 21007 端口访问 kafka,去掉鉴权插件即 allow.everyone.if.no.acl.found=true,将以下 kafka 服务端配置置为空:authorizer.class.name=
5. FI 6513~6516 版本的内核问题引发的性能异常
6513 版本在 kafka 引入社区的的 lazy index 功能后,在新的 segment 创建的过程中可能会导致并发创建失败的问题,常见的报错(server.log 中)如以下两种类型:
java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code
java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index
当出现以上两种类型的报错的时候可以断定是版本问题导致,问题预警如:https://support.huawei.com/enterprise/zh/bulletins-product/ENEWS2000007844;
解决方案:升级到 6517 版本以上版本或者打入紧急补丁:https://support.huawei.com/enterprise/zh/cloud-computing/fusioninsight-hd-pid-21110924/software/251482609?idAbsPath=fixnode01%7C7919749%7C7941815%7C19942925%7C250430185%7C21110924;
临时规避方案:重启异常的 broker 实例
版权声明: 本文为 InfoQ 作者【华为云开发者联盟】的原创文章。
原文链接:【http://xie.infoq.cn/article/7fcb75e9d2474ec767c51a93d】。文章转载请联系作者。
评论