写点什么

kafka 那些事儿

用户头像
郑印
关注
发布于: 刚刚

术语

Producer
  • 生成者,向 kakfa 写入数据

Topic
  • 一个 Topic 有 1 个或 N 个分区构成,代表一类消息的数据

Partition
  • 分区,一个有序不变的消息序列,一个 topic 可以有 N 个分区

Replica
  • 副本,一个分区可以有多个副本

Replication
  • 复制,分区数据进行冗余备份的过程

Broker
  • Kafka 由一组 Broker 的服务组成

Leader
  • 分区主节点,一个可以提供对分区读写服务的节点

Follower
  • 分区从节点,与 Leader 通信,备份分区数据

Consumer
  • 消费者,读取 kafka 的数据

Consumer Group
  • 消费者组,一个组可以有多个消费者,每个消费者只能消费一个分区的数据,并且一个分区也只能同时供消费者组的一个消费者消费。

Rebalance
  • 消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。

更多细节,参考文档 https://xiaomingtongxie.gitbooks.io/kafka-tutorial-cn/content/chapter1/ru-men.html

Kafka 分区与复制测试

注: 此测试代码是在 2017 年编写的,供参考

  1. Kafka 单 Broker ,单分区 测试 https://gitee.com/izhengyin/some-test/tree/master/kafka/src/main/java/izy/sometest/kafka/mq/item1

  2. Kafka 单 Broker , 多分区 测试 https://gitee.com/izhengyin/some-test/tree/master/kafka/src/main/java/izy/sometest/kafka/mq/item2

  3. Kafka 多个 Broker , 单复制测试 https://gitee.com/izhengyin/some-test/tree/master/kafka/src/main/java/izy/sometest/kafka/mq/item3

  4. Kafka 多个 Broker , 多复制测试,多分区测试 https://gitee.com/izhengyin/some-test/tree/master/kafka/src/main/java/izy/sometest/kafka/mq/item4

无消息丢失实践

  1. Producer 要接收 callback 的回调通知,确定消息已经 commit 了才算发送完成,如果超时或者失败都应该设计重试策略.

  2. Producer 设置 acks 参数,表明「多少」副本 Broker 都要接收到消息,该消息才算是“已提交”,acks=1 有很大丢失消息的风险,asks=all 如果副本数多,会有性能问题,兼顾设置。

  3. 副本数配置保障,replication.factor >=3 基本的 3 副本备份要求,同时 min.insync.replicas 可以配置为 >=2 做二次保证(防止 producer 设置错误)最小的 acks 数。

  4. Broker 配置,unclean.leader.election.enable = false,如果一个 Broker 落后原先的 Leader 太多就不允许竞争 leader。

  5. Consumer 确保消息消费完成再提交,通常情况下都不会有问题,但是假如在消费端使用多线程处理消息,使用不当可能就会出现错误 commit 的情况。

Kafka 精确一次(exactly once)保障,幂等与事务

消息交付可靠性保障
  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送

  • 至少一次(at least once):消息不会丢失,但有可能被重复发送

  • 精确一次(exactly once):消息不会丢失,也不会被重复发送

通过幂等性保障
  • Producer 添加配置,props.put(“enable.idempotence”, ture)

  • kafka 只能保证单分区的幂等

通过事务保障
  • Producer 添加配置, enable.idempotence = true

  • 设置 Producer 端参数 transactional. id

  • 增加事务的相应编码

  producer.initTransactions();  try {              producer.beginTransaction();              producer.send(record1);              producer.send(record2);              producer.commitTransaction();  } catch (KafkaException e) {              producer.abortTransaction();  }
复制代码

Rebalance 消费者组重平衡

Rebalance 的问题
  • 影响消费者的性能,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成,并且这个过程还非常慢。

听说,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。

什么时候会 Rebalance
  • 组成员数发生变更

    Consumer 实例加入组或者离开组

    Consumer 被踢出

  • 订阅主题数发生变更

    消费组订阅了多个主题,当有订阅主题新增或移除时

  • 订阅主题的分区数发生变更

    运维修改 topic 的配置

预防 Consumer 被提出
  • 设置合理的 session.timeout.ms 与 heartbeat.interval.ms; 常规的长连接心跳检查机制,session.timeout.ms / heartbeat.interval.ms >= 3 ; 至少三次未收到心跳才判断为 consumer 下线

  • 注意消息处理的时间过长。 如果消息处理的时间长于 max.poll.interval.ms (默认值是 5 分钟) 就会被提出,同时超时后提交消息时还会触发 CommitFailedException 异常。

流处理平台的 Kafka

虽然目前大部分在使用 kafka 时还是将它当做消息中间件使用,但是在 kafka 官方的定义里,kafka 是一个流处理平台。

流处理平台具有三个关键能力
  • 发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统。

  • 以容错的方式存储消息(流)。

  • 在消息流发生时处理它们。

kafka Stream wordCount 实现
public class StreamConsumer {    public static void main(String[] args) throws Exception {        Properties props = new Properties();        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");        StreamsBuilder builder = new StreamsBuilder();        KStream<String, String> source = builder.stream(Topic.NAME);        //每分钟统计一次,按照value进行分组        KTable<Windowed<String>, Long> counts = source                .groupBy((key,value) -> value)                .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))                .count();        // 将windowed转换为String        counts.toStream()                .map(((windowed, aLong) -> new KeyValue<>(windowed.toString(),aLong)))                .peek((k,v) -> System.out.println(k +" => "+v))                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));        final KafkaStreams streams = new KafkaStreams(builder.build(), props);        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));        streams.start();    }}
复制代码

示例代码: https://gitee.com/izhengyin/some-test/tree/master/kafka/src/main/java/izy/sometest/kafka/stream/wordcount

发布于: 刚刚阅读数: 2
用户头像

郑印

关注

还未添加个人签名 2017.10.17 加入

还未添加个人简介

评论

发布
暂无评论
kafka那些事儿