kafka 核心技术与实战学习笔记(一)
核心概念
消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方 就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是 在分区层级下的,即每个分区可配置多个副本实现高可用。
生产者:Producer。向主题发布新消息的应用程序。
消费者:Consumer。从主题订阅新消息的应用程序。
消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者 位移。
消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区 以实现高吞吐。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配 订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
问题
为何不模仿 MySQL 主从?
如果允许 follower 副本对外提供读服务(主写从读),首先会存在数据一致性的问题,消 息从主节点同步到从节点需要时间,可能造成主从节点的数据不一致。主写从读无非就是 为了减轻 leader 节点的压力,将读请求的负载均衡到 follower 节点,如果 Kafka 的分区相对 均匀地分散到各个 broker 上,同样可以达到负载均衡的效果,没必要刻意实现主写从读增 加代码实现的复杂程度
因为 mysql 一般部署在不同的机器上一台机器读写会遇到瓶颈,Kafka 中的领导者副本 一般均匀分布在不同的 broker 中,已经起到了负载的作用。即:同一个 topic 的已经通过分 区的形式负载到不同的 broker 上了
版本
对于 kafka-2.11-2.1.1 的提法,真正的 Kafka 版本号实际上是 2.1.1。那么这个 2.1.1 又表示什么呢?前面的 2 表示大版本号,即 Major Version;中间的 1 表示小版本号或次版本号,即 Minor Version;最后的 1 表示修订版 本号,也就是 Patch 号。Kafka 社区在发布 1.0.0 版本后特意写过一篇文章,宣布 Kafka 版本命名规则正式从 4 位演进到 3 位,比如 0.11.0.0 版本就是 4 位版本号
Kafka 目前总共演进了 7 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0
0.8 版本:副本机制,老版本客户端 API 需要指定 Zookeeper 地址而非 Broker 地址。尽量升级到 0.8.2.2,但是不要使用新版本 Producer API。老版本 consumer API 比较稳定
0.9 版本:安全认证、权限,Java 重写了消费者 API。新版本 Producer API 比较稳定,但是不要使用新版本(0.9 版本)ConsumerAPI。
0.10 版本:引入 Kafka Stream ,10.2.2,新版 consumer API 稳定,修复了一个可能导致 Producer 性能降低的 Bug
0.11 版本 幂等性 Producer API 以及事务(Transaction) API;Kafka 消息格式做了重构
1.0 & 2.0 Kafka Streams 改进
基本使用 - 集群参数配置
broker
log.dirs & log.dir
与 Zookeeper 相关 zookeeper.connect
与 Broker 连接相关 listeners:advertised.listeners:*host.name/port 最好使用主机名而不是 IP
Topic 管理 auto.create.topics.enable:是否允许自动创建 Topic。(false)unclean.leader.election.enable:是否允许 Unclean Leader 选举。(false)auto.leader.rebalance.enable:是否允许定期进 行 Leader 选举。(false)
数据留存 log.retention.{hour|minutes|ms}:这是个“三 兄弟”,都是控制一条消息数据被保存多长时间。从优先 级上来说 ms 设置最高、minutes 次之、hour 最低。
log.retention.bytes:这是指定 Broker 为消息保存 的总磁盘容量大小。默认是 -1,多租户有意义 message.max.bytes:控制 Broker 能够接收的最大消 息大小。幕默认不到 1M,可以调大
Topic 级别
Topic 级别参数会覆盖全局 Broker 参数的值
retention.ms 规定了该 Topic 消息被保存的时长。 默认是 7 天 retention.bytes 规定了要为该 Topic 预留多大的磁 盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以 无限使用磁盘空间。max.message.bytes 决定了 Kafka Broker 能够正常 接收该 Topic 的最大消息大小
何时设置:创建 Topic 时进行设置;修改 Topic 时设置 ps: kafka相关业务必会操作命令整理
JVM 参数
Heap Size 默认 1G,强烈建议修改为 6Gjava8 建议使用 G1 垃圾收集器 KAFKA_HEAP_OPTS:指定堆大小。KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数(TODO 具体参数配置)
操作系统参数
文件描述符限制 ulimit -n 1000000 不设置经常会报“Too many open files”的错误文件系统类型 Swappiness 个人 反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值提交时间
ulimit -n 这个参数说的太好了!如果不设置,单机在 Centos7 上几百的并发就报“Too many open files”了。网上搜索后设置成 65535,用 JMater 压测单机 也只能支撑到 1000 左右的并发,原来这个值可以设置到 1000000!
具体设置方法可以监控堆上的 live data,然后大约乘以 1.5 或 2 即可。比如你可以手动触发 Full GC,然后查看一下 堆上存活的数据大小,比如说是 1500MB,那么你可以设置 heap size 为 2.25GB
客户端实践
分区机制
轮询
随机
Key-ordering(按键保序):相同的 key 进入相同的分区,可以实现有序消费
压缩消息集合 -日志项(record item) - 消息版本:V1,V2(0.11 版本后引入)何时压缩:生产端,broker 端重新压缩:Broker 端指定了和 Producer 端不同的压缩算法;Broker 端发生了消息格式转换(V1 V2)解压缩:一般发生在 Consumer 端。Producer 端压缩、Broker 端保持、Consumer 端解压缩。ps: 每个压缩过的消息集合在 Broker 端写入时都要发生解 压缩操作,目的就是为了对消息执行各种验证?-CRC 解压?压缩算法 :gzip,snappy,zstd,lz4 最佳实践:producer 端 CPU 资源充足,带宽资源有限
无消息丢失配置一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证生产者端丢失数据:不要使用 producer.send(msg),而要使用 producer.send(msg, callback)消费者端丢失数据:如果是多线程异步处理消费消息,Consumer 程序不要开 启自动提交位移,而是要应用程序手动提交位移最佳实践:
使用 producer.send(msg, callback)
设置 acks = all
设置 retries 为一个较大的值
设置 unclean.leader.election.enable = false
replication.factor >= 3 消息多保存几份
min.insync.replicas > 1 控制的是消息至少要被写入 到多少个副本才算是“已提交”
确保 replication.factor > min.insync.replicas
确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设 置成 false,并采用手动提交位移的方式
ps: Kafka 还有一种特别隐秘的消息丢失场景:增加主题分区,怎么解决呢?(consumer 改用"从最早位置"读解决新加分区造成的问题)
客户端少见功能-拦截器 org.apache.kafka.clients.producer.ProducerInterceptororg.apache.kafka.clients.consumer.ConsumerInterceptorKafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。例如得到端到端消息的平均处理延时
TCP 链接
KafkaProducer 实例创建时启动 Sender 线程,从而创 建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
KafkaProducer 实例首次更新元数据信息之后,还会再 次创建与集群中所有 Broker 的 TCP 连接。
如果 Producer 端发送消息到某台 Broker 时发现没有与 该 Broker 的 TCP 连接,那么也会立即创建连接。
如果设置 Producer 端 connections.max.idle.ms 参数 大于 0,则步骤 1 中创建的 TCP 连接会被自动关闭;如果设置该参数 =-1,那么步骤 1 中创建的 TCP 连接将无 法被关闭,从而成为“僵尸”连接。
幂等生产者和事务生产者 幂等性(Idempotence)幂等性 Producer:
只能保证单分区上的幂等性;只能实现单会话上的幂等性事务型 Producer:开启 enable.idempotence = true;设置 Producer 端参数 transctional. id。最好为其设置一个有意义的名字。Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可
ps: 实现上可以用 kafka 的幂等性来保证单分区单会话的精准一次语义,如果是一批消息,可以路由到同一个分区
_consummer_offsets 位移主题以前保存在 zookeeper 中,0.8.2.x 以后保存在 _consummer_offsets 中消息格式:
key <Group ID,主题名,分区号 > value 位移量
用于保存 Consumer Group 信息的消息。用于删除 Group 过期位移甚至是删除 Group 的消息。手动提交位移,即设置 enable.auto.commit = falseCompaction:定期删除策略
避免消费者组重平衡
Rebalance 发生的时机- 组成员数量发生变化- 订阅主题数量发生变化- 订阅主题的分区数发生变化
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的参数推荐 设置 session.timeout.ms = 6s。 设置 heartbeat.interval.ms = 2s 第二类非必要 Rebalance 是 Consumer 消费时间过长导致的 max.poll.interval.ms + Full GC
位移提交
异步+同步提交的代码控制 poll 数量
CommitFailedException 异常处理 1. 缩短单条消息处理的时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化 之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。2. 增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参 数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果 你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms 参数的值。不幸的是,session.timeout.ms 参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是 社区在 0.10.1.0 版本引入 max.poll.interval.ms 参数,将这部分含义从 session.timeout.ms 中剥离出来的原因之一。3. 减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多 返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如 果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常 最简单的手段。4. 下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法 了。
ps: 应用中同时出现了设置相同 group.id 值的消费者组程序和独立消 费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常
消费者如何管理 TCP 连接
消费者组消费进度监控
版权声明: 本文为 InfoQ 作者【追风少年】的原创文章。
原文链接:【http://xie.infoq.cn/article/9e1b6d9317b94dc0cb285176c】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论