写点什么

八股 MQ006——Message 之旅

作者:Codyida
  • 2023-05-07
    广东
  • 本文字数:6017 字

    阅读完需:约 20 分钟

写在前面

大雨来的有征兆。


压抑的天空给所有颜色染上暗灰的底蕴。


热量无处可逃。


所有释放的瞬间,都安静了。


下落的雨滴是画面里唯一细碎的亮色。


雨伞上轻微的振动,成了城市与我维持的心跳。


不算经常,但也偶尔,我也会这么想。


你那边也下雨了么?

Producer 生产消息

Producer 在设定配置参数与初始化之后,可以调用send方法,将需要发送的消息包装成ProducerRecord对象。具体来说,会有以下步骤:


  1. Producer 创建一个ProducerRecord对象,设置需要发送的消息的 key 与 value,并将主题、分区、键、值、时间戳和头部等信息赋值给它。

  2. Producer 将ProducerRecord对象作为参数,调用 Producer 的send方法。

  3. Serializer:send方法中将 Key 与 Value 序列化(为了内存紧凑与传输方便)。

  4. Partitioner: a. 根据指定的 partition 字段;b. 根据消息的 Key 做 hash 计算;c. 消息 key 为空则随机轮询;完成消息的分区。

  5. 将消息放入 RecordAccumulator 中,等待批量消息积攒到一定量或者等待时间超限,由 sender 线程获取到消息之后再发送。


示例代码


// 创建一个 KafkaProducer 对象Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 创建一个 ProducerRecord 对象String topic = "test";Integer partition = 0;Long timestamp = System.currentTimeMillis();String key = "hello";String value = "world";List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("source", "producer".getBytes()));ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
// 发送 ProducerRecord 对象producer.send(record);producer.close();
复制代码

ProducerRecord 是什么?

ProducerRecord是一个类,代表一组向 Kafka 发送到 KV 键值对。它由如下信息组成:


  1. 主题(topic):表示消息要发送到的主题名称。

  2. 分区(partition):表示消息要发送到的分区编号。如果没有指定,生产者会根据分区器的策略自动选择一个分区。

  3. 键(key):表示消息的键,可以用来决定消息的分区或者作为消息的标识。如果没有指定,生产者会使用空键(此时分区会使用默认的轮询或随机算法,用于负载均衡)。

  4. 值(value):表示消息的内容,可以是任意类型的对象。如果没有指定,生产者会使用空值。

  5. 时间戳(timestamp):表示消息的创建时间,以毫秒为单位。如果没有指定,生产者会使用当前时间作为时间戳。

  6. 头部(headers):表示消息的一些额外信息,可以是一个或多个键值对。如果没有指定,生产者会使用空头部。


public class ProducerRecord<K, V> {    private final String topic; //目标topic    private final Integer partition; //目标partition    private final Headers headers;//消息头信息    private final K key;   //消息key    private final V value; //消息体    private final Long timestamp; //消息时间戳}
复制代码

如何创建一个 ProducerRecord 实例?

其实就是使用ProducerRecord类的构造方法。需要注意的是:所有的构造方法,必带三个参数:


  1. Topic string

  2. K key

  3. V value

ProducerRecord 在 send 方法中的使用

从表面上看,ProducerRecord是作为send方法的一个参数的。实际来讲,会有如下流程:


  1. 检查ProducerRecord对象的有效性,比如主题名称是否为空,键和值是否符合序列化器的要求等。

  2. 根据分区器的策略,将ProducerRecord对象路由到对应的分区。分区器可以是默认的轮询或哈希算法,也可以是自定义的算法。

  3. 调用RecordAccumulatorappend 方法将消息放入 RecordAccumulator 中。至此,ProducerRecord 已经进入到消息累加器中。

RecordAccumulator 的故事

旅程又到了RecordAccumulator。之前有说过,RecordAccumulator 实现了一个小的生产者-消费者模型。Producer 作为生产者,将消息生产后发送到这里,RecordAccumulator累加了足够的消息(为了提升吞吐、提升压缩效率)便会通知消费者——Sender 线程来获取消息,并最终把消息发送到 Broker。


RecordAccumulator中与ProducerRecord有关的需要关注到batches字段。这个字段对应的数据结构是一个ConcurrentMap。它:


  1. Key:TopicPartition,Key 存储了 Topic 与 Partition 信息,它记录消息需要发送到哪个 Topic 与 Partition;

  2. Value:是一个存放ProducerBatch的双端队列Dqueue,通过append方法添加到 RecordAccumulator里的ProducerRecord就累加在ProducerBatch


下面了解一下 ProducerBatch


  1. ProducerBatch它包含了一个或多个ProducerRecord

  2. ProducerBatch通过 MemoryRecordsBuilder对象持有一个DataOutputStream对象的引用,这个对象内部封装了一个ByteBufferOutputStream,用来缓存消息的字节数据。

  3. Prod接收erBatch在发送时,会通过NetworkClient将自己封装成一个请求(Request)对象,并按照节点(Node)分组发送到对应的Broker上,并等待响应

  4. ProducerBatch可以通过batch.size来限制消息批次的大小。默认 16KB。

  5. ProducerBatch可以通过linger.ms来设置等待消息累加到上线的时间,默认 0ms,即不等待。

  6. ProducerBatch可以通过compression.type来设置消息压缩的类型。

  7. ProducerBatch在收到发送的响应之后,会调用Future对象的回调方法,通知 Producer 发送的结果,而后释放自己占用的内存


至此,Message 通过 Producer 包装ProducerRecord,被发送到RecordAccumulator中,包装成ProducerBatch做批次累加发送。而后消息就会到达Broker

Broker 接收与存储消息

Broker 接收消息

Broker 用于维持网络连接与处理发送请求的架构模型在这里先买个坑。这里还是保持以消息的视角来看,Broker 在收到消息之后,会有哪些处理流程,主要如下:


Broker 接收消息的处理流程如下:


  1. Broker 首先根据分区算法选择将消息存储到哪一个 partition。

  2. Broker 将消息写入本地 log 文件,并返回一个 offset 给生产者。

  3. Broker 的 leader 负责接收和复制消息,follower 从 leader 拉取消息,写入本地 log 后发送 ACK。

  4. 当 leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向生产者发送 ACK。

  5. Broker 根据一定的策略(基于时间或大小)删除过期的消息文件。


这里面 Broker 的分区算法策略在前面已有说过。后面的操作主要是 Broker 写入本地 log 文件,而后给生产者发送 ACK,与本地 log 文件的管理策略。所以后面的主要内容关注 Kafka 的 log 文件系统

Broker 消息存储架构

Kafka 的存储架构大致如下:


  1. Broker:Kafka 集群由多个 Broker 组成。每一个 Broker 实际就是处理请求、存储消息的服务器节点

  2. Topic 与 Partition:Kafka 数据通过 Topic 进行分类。Topic 只是数据组织逻辑概念上的分类。也用于 Producer 与 Consumer 交互的媒介。Topic 实际由多个 Partition 组成。Partition 是实际数据存储所在。每个 Partition 内部在逻辑上是一个有序队列。消息在 Partition 中存储即通过偏移量(Offset)来唯一确认。

  3. Partition 与 Segment:Partition 实际存储数据时,会被分成为多个 Segment。每个 Segment 由一个 log 数据文件与 index 索引文件组成。index 文件存储消息的偏移量和在数据文件中的位置,数据文件存储消息的内容。每个段都有一个起始偏移量来命名它,例如 00000000000000000000.index 和 00000000000000000000.log 就是偏移量为 0 的段的索引文件和数据文件。

  4. Partition 与 Replica:Partition 可以由多个 Replica 组成,以提高数据冗余与可用性。同一个 Partition 的多个 Replica 可以分配在多个不同的 Broker 上。Replica 中有 Leader 与 Follower。Leader 负责处理外部读写请求,Follower 负责异步同步 Leader 副本数据。

Segment

Partition 被拆分为多个大小相当,但包含消息数量可能不等的 Segment。一个 Segment 分为一个 index 索引文件与一个 log 数据文件。

Segment 文件命名规则

partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

log 文件

  1. log 文件内以二进制形式存储消息内容。

  2. 消息在 log 文件中以 Offset 来排序的。

  3. 每条消息的内容由固定长度的头部与可变长度的主体组成。头部包含了消息的元数据,如长度、时间戳、压缩类型、校验和等;主体包含了消息的键(key)和值(value),它们可以是任意类型的数据,如字符串、字节数组、JSON 等。详细的消息格式在此不赘述。

  4. 消息在 log 中存储的形态可以是未压缩或者压缩过的。如果是压缩过,加压是在 Producer 侧完成,解压是在 Consumer 侧完成。Broker 只负责存储。

  5. 当 log 文件达到一定的大小(由 log.segment.bytes 或者 segment.bytes 参数控制)时,Broker 会关闭当前 log 文件,并创建一个新的 log 文件来继续写入消息。新的 log 文件的起始偏移量是等于上一个 log 文件的最后一条消息的偏移量加一。

index 文件

  1. index 文件用来存储消息的偏移量和在 log 文件中的位置的索引文件,它可以加快查找消息的速度,避免扫描整个 log 文件。

  2. index 文件与 log 文件一一对应。他们都用相同的起始偏移量来命名,只是 index 文件后缀为.index,log 文件后缀为.log

  3. index 文件中的每条记录都由两个字段组成,分别是相对偏移量(relative offset)和物理位置(physical position)。相对偏移量是指该记录对应的消息的偏移量减去起始偏移量的值;物理位置是指该记录在这个 Segment 中真实物理位置。

segment 的管理

  • 创建:segment 的创建逻辑可以由参数配置控制。具体:

  • log.segment.bytes: segment 的大小,当 segment 大小达到指定值时,就新创建一个 segment。

  • log.roll.hours:segment 的创建周期,单位小时,kafka 数据是以 segment 存储的,当周期时间到达时,就创建一个新的 segment 来存储数据。

  • log.roll.jitter.hours:segment 创建周期的随机时间,单位小时,用于避免多个 broker 同时创建 segment。

  • log.index.size.max.bytes:索引文件的最大大小,当索引文件达到指定值时,就新创建一个索引文件。

  • log.index.interval.bytes:索引文件中每条索引条目之间的字节数间隔,用于控制索引文件的稀疏程度。

  • 删除:只有被清理过的 segment 才可以被删除。segment 的删除逻辑也可以由参数配置控制。具体:

  • log.retention.bytes:分区的最大大小,当分区大小达到指定值时,就会删除最旧的 segment。

  • log.retention.hours:分区的最大存活时间,当分区中的消息超过指定时间时,就会删除最旧的 segment。

  • log.retention.check.interval.ms:检查分区是否需要删除的时间间隔,单位毫秒。

  • log.cleanup.policy:分区的清理策略,可以选择 delete 或 compact 两种,delete 策略是直接删除过期的消息,compact 策略是保留最新的消息并删除重复的键值对。

  • 清理:kafka 对 segment 中的消息进行去重或删除操作,减少磁盘空间的占用。

  • 清理是基于 segment 的,而不是基于消息的,也就是说压缩操作是在 segment 关闭后进行的,而不是在消息写入时进行的。

  • segment 清理是异步的,也就是说压缩操作不会阻塞消息的写入或读取,而是由后台线程定期执行的。

  • segment 清理有两种策略,分别是 delete 和 compact,可以通过log.cleanup.policy参数来配置。

  • delete 策略是直接删除过期或超过大小限制的消息

  • compact 策略,也可以说是 segment 的压缩,是保留最新的消息并删除重复的键值对

  • segment 清理可以提高磁盘利用率,减少网络传输量,提高消费者性能,但也会增加 CPU 和 I/O 开销,以及延长恢复时间。

segment 的写入流程

  1. 生产者客户端首先连接到 Zookeeper 集群,从 Zookeeper 中获取对应的 topic 的分区信息和分区的 leader 的相关信息。

  2. 生产者客户端根据分区策略选择一个分区,并连接到对应的 leader 所在的 broker。

  3. 生产者客户端将消息发送到 leader broker,leader broker 将消息追加到本地的 segment 文件中,并返回一个 ack。

  4. 其他 follower broker 从 leader broker 同步数据,将消息追加到本地的 segment 文件中,并返回一个 ack。

  5. leader broker 收到所有的 follower broker 的 ack 后,向生产者客户端返回一个最终的 ack,表示消息写入完成。

segment 的取数流程

  1. 消费者客户端首先连接到 Zookeeper 集群,从 Zookeeper 中获取对应的 topic 的分区信息和分区的 leader 的相关信息。

  2. 消费者客户端根据消费组和分区策略选择一个或多个分区,并连接到对应的 leader 所在的 broker。

  3. 消费者客户端向 leader broker 发送拉取请求,指定拉取的分区和偏移量,该请求是一个异步请求。

  4. leader broker 从本地的 segment 文件中读取消息,并返回给消费者客户端。

  5. 消费者客户端从接收缓存区中解析消息,并更新消费偏移量。

在 segment 中找到指定 offset 的消息

  1. 根据 offset 确定消息所在的 segment 文件,这可以通过二分查找的方式实现,因为 segment 文件的命名规则是以第一条消息的 offset 为准的。一个 segment 文件内存储的消息 offset 范围是:当前 segment 文件名称里 offset+1~下一个 segment 文件名称里 offset。

  2. 计算 offset 在 segment 文件中的相对偏移量,在对应的 index 文件里,找到对应相对偏移量对应的文件物理索引位置。

  3. 根据 index 文件中找到的物理位置,从 segment 文件中读取消息内容,这是一个顺序读取的过程,因为 segment 文件中存储了消息的完整数据。

segment 的内存映射

这里简单说一下 segment 的内存映射。它利用了 Java NIO 的 MappedBytedBuffer 类来实现文件和内存的映射。优势:


  1. 提高 IO 效率,减少用户空间与内核空间之间的数据拷贝,让数据操作直接在内核缓冲区完成。

  2. 利用操作系统的分页存储机制,实现数据的异步刷新与恢复。


具体来说,kafka 在两个场景中使用了 segment 内存映射机制:


  1. 当 producer 向 broker 发送数据时,broker 会将数据写入到一个 mmap 的内存空间中,这个内存空间和操作系统内核空间有映射关系,也就相当于写入到了内核缓冲区。然后 broker 会定期地将内核缓冲区的数据刷新到磁盘上,形成 segment 文件。

  2. 当 consumer 从 broker 拉取数据时,broker 会根据 consumer 指定的分区和偏移量,从 segment 文件中读取数据,并返回给 consumer。这个过程中,broker 会利用 mmap 的机制,将 segment 文件映射到内存中,然后直接从内存中读取数据,避免了磁盘 I/O 的开销。

写在后面

这篇文章从消息的视角,大体梳理了一下消息从生产者生产,发送到 Broker,Broker 接受消息然后存储,与后面 Broker 接受消费者请求,取数并推送。但这里还有很多细节地方可以继续展开。


  1. Broker 是如何处理 Producer 与 Consumer 的请求的。这里是有一个应对超高并发的网络架构的。

  2. 这篇文章只介绍了 Broker 组织数据的方式,但它是如何和操作系统交互,完成数据的落盘的。


这些问题会在后面逐一分析。

参考资料

  1. https://blog.csdn.net/liyiming2017/article/details/88235763

  2. 深度解析kafka broker从连接建立到接收请求发送响应_$码出未来的博客-CSDN博客

  3. 深度解析kafka broker处理发送消息请求并写入磁盘_kafka消息无法写到磁盘_$码出未来的博客-CSDN博客

  4. 深度剖析:Kafka 请求是如何处理? 看完这篇文章彻底懂了!

发布于: 刚刚阅读数: 3
用户头像

Codyida

关注

还未添加个人签名 2017-12-21 加入

还未添加个人简介

评论

发布
暂无评论
八股MQ006——Message之旅_后端、_Codyida_InfoQ写作社区