写点什么

kafka

作者:想要飞的猪
  • 2023-12-01
    广东
  • 本文字数:1729 字

    阅读完需:约 6 分钟

消息队列的场景与作用:


  1. 系统解耦

  2. 流量的削峰填谷

  3. 异步通信

kafka

  1. kafka 的核心组件 生产者(producer)、消费者(consumer)、代理(Broker)、主题(Topic)、分区(patition)、副本(Replica)、偏移量(Offset);

  2. 主要特点

  3. 数据磁盘持久化:Kafka 将消息直接写入磁盘(顺序写入),不依赖于内存缓存

  4. 零拷贝:Kafka 利用操作系统的零拷贝,减少系统的内核态与用户态的切换,降低了 CPU 和内存的开销。

  5. 数据批量发送:Kafka 是将消息进行批量拉去与 发送

  6. 数据压缩:数据进行压缩,减少了网络开销。

  7. 主题分为多分区:对主题进行分区,每个分区中的消息是有序的消息队列,分区之间可以并且消费,提高了并发度。

  8. 分区副本机制,Kafka 为每个分区设置多个副本,并且分布在不同的代理(broker)上,其中一个副本为 leader,负责分区的读写,其他副本为 follower,负责同步数据,并且在 leader 失效时能够故障转移,实现了高可用。

  9. 文件存储 - Kafka 消息的存储以 Partition 为单位,每个 Partition 包含一组消息文件(Segment File)和一组索引文件(Index),索引文件与消息文件一一对应,并且文件名相同(扩展名不同)、每个索引中保存索引序列号和消息所在的绝对外置(磁盘位置);Kafka 中的建立的是稀疏索引,并不是给每个消息建立索引,而是每隔几条建立一个索引 - 写入消息是直接在文件后面追加(顺序写入),查找时根据文件名,找到索引文件,然后再通过二分法查到离文件最近的索引,再去文件中查找对应的消息。

  10. 如何保证消息顺序消费

  11. Kafka 的每个 Partition 中消息一定是有序的,所以第一种方案是,只创建一个 Partition

  12. Kafka 在发送消息的时候可以指定四个参数 topic、 Partition、key、以及 data,指定了 Partition 时,可以直接把所有的消息都发送发指定的的 Partition 中。同一个 key 的消息也可以保证发送到相同的 Partition 中,要保证顺序消息可以指定需要顺序消费的消息相同的 key

  13. 如何保证消息不丢失 MQ 有三个地方可能导致消息丢失:生产者发送、broker 存储、消息者消费,所以要保证消息不丢失,需要保证这三个地方都不丢失。

  14. 生产者发送消息 1、producer 在调用 send 方法发送消息之后可能网络问题导致并没有发送过去。send()方法实际是异步发送的。我们可以通过调用 get()方法获取调用结果,不过这样就变成了同步发送。 2、采用回调的方式,如果消息发送失败,可以重新发送。

  15. broker 存储 Kafka 为分区引入了多副本的机制,分区(Partition)中的会有一个 leader 的副本,会跟客户端进行交互,发送消息与消息消息都是通过 leader 副本。其他的为 Follower,当 leader 挂了之后 Follower 中有一个会成为新的 leader,所以只要保证有多个分布,并且,发生的消息都已经同步到副本了,这样就能保证消息的安全性。 Kafka 有个参数 acks 我们可以设置为 all 表示所有的 ISR 副本全部同步完成了,才返回给生产者,可以设置 Partition 的副本,并且还可以设置写入多少个副本才算成功,可以根据业务来。

  16. 消费者消费 消息在被追加到 Partition 中时会分配一个偏移量 offset,偏移量表示 Consumer 当前消费到的 Partition 所在的位置,Kafka 通过偏移量可以保证消息在分区内的顺序性,当消费者拉取到某个消息时会自动提交 offset,但是当拉取到消息并提交了 offset 后,消费失败了,则会丢失消息。 **解决方法:关闭自动提交 offset,每次真正消费完之后再手动提交 offset

  17. 失败重试

  18. 重试多少次?

  19. 默认重试 10 次,间隔为 0,也就是立即重试。

  20. 如何修改重试机制?

  21. 重试机制在 DefaultErrorHandler 中初始化时给定,所以需要在定义时给定参数即可,重新实现一个 KafkaListenerContainerFactory 调用setCommonErrorHandler 重新自定义错误处理器就可以实现。

  22. 失败了如何提示警告?

  23. 重写 DefualtErrorHandler 的 handleRemaining 函数,加上自定义的额警告操作。


@Componentpublic class SimpleConsumer {    @RetryableTopic()    @KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")    public void onMessage(MessageWrapper message) {        log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);        throw new RuntimeException("test kafka exception");    } } 
复制代码


  1. 延迟消息 kafka 并不支持延迟与定时消息 Spring kafka


用户头像

还未添加个人签名 2020-06-05 加入

还未添加个人简介

评论

发布
暂无评论
kafka_想要飞的猪_InfoQ写作社区