2021 年了,还听到有些兄弟在问 Kafka 香不香?
1、写在开头
项目开发中,很多应用场合都会出现对 kafka 的应用,比如,不同的微服务 DB 数据同步、服务间的异步通知等等。笔者在此做个系统的总结,方便日后复习,希望对大家的面试笔试都有帮助。
2、kafka 的安装部署
2.1、windows 环境
1)下载安装包,解压安装包,eg:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz
2)检查修改配置文件,安装包的解压路径,eg:D:\install\kafka\kafka_2.12-2.6.0\config,两个文件 server.properties & zookeeper.properties
分别修改配置文件的日志存放目录参数:(server.properties)
# A comma separated list of directories under which to store log files
log.dirs=D:\install\kafka\log\kafka
修改配置文件的日志存放目录参数::(zookeeper.properties)
dataDir=D:\install\kafka\log\zk
3)启动 kafka 内置的 zk,因为 kafka 内部集成了 zookeeper 进行集群管理,需要优先启动 zk
在 bin 目录的 window 文件夹下有 win 版本的.bat 批处理脚本,打开终端,选中 zookeeper-server-start.bat。
执行命令:zookeeper-server-start.bat ..\..\config\zookeeper.properties
4)启动 kafka,打开终端,选中 kafka-server-start.bat,
执行命令:kafka-server-start.bat ..\..\config\server.properties
5)测试:topic 创建 &日志生产 &日志消费
选中 kafka-topics.bat,执行命令(创建 topic):
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-test-topic
选中 kafka-topics.bat,执行命令(查看 topic):
kafka-topics.bat --list --zookeeper localhost:2181
选中 kafka-console-producer.bat,执行命令(生产消息):
kafka-console-producer.bat --broker-list localhost:9092 --topic kafka-test-topic
选中 kafka-console-consumer.bat,执行命令(消费消息):
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic kafka-test-topic --from-beginning
2.2、linux 环境
步骤和 windows 几乎无异,差异只是配置文件的日志目录参数要修改为虚拟机的路径(分隔符要注意,windows 系统和 linux 系统不一样),执行对应的.bat 文件要修改为.sh 文件
3、Kafka 是什么?
Apache Kafka is an open-source distributed event streaming platform
Apache Kafka 是分布式发布-订阅消息系统,在 kafka 官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。
它最初由 LinkedIn 公司开发,Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
目前开源消息中间件不止 kafka 一种,如下图:
4、Kafka 的用途?
解耦
快递小哥手上有很多快递需要送,他每次都需要先电话一一确认收货人是否有空、哪个时间段有空,然后再确定好送货的方案。这样完全依赖收货人了!如果快递一多,快递小哥估计得忙疯了……
如果有了便利店,快递小哥只需要将同一个小区的快递放在同一个便利店,然后通知收货人来取货就可以了,这时候快递小哥和收货人就实现了解耦!
异步
快递小哥打电话给我后需要一直在你楼下等着,直到我拿走你的快递他才能去送其他人的。
快递小哥将快递放在小芳便利店后,又可以干其他的活儿去了,不需要等待你到来而一直处于等待状态,提高了工作的效率。
削峰
假设双十一我买了不同店里的各种商品,而恰巧这些店发货的快递都不一样,有中通、圆通、申通、各种通等……更巧的是他们都同时到货了!
中通的小哥打来电话叫我去北门取快递、圆通小哥叫我去南门、申通小哥叫我去东门。我一时手忙脚乱……
我们能看到在系统需要交互的场景中,使用消息队列中间件真的是好处多多,基于这种思路,就有了丰巢、菜鸟驿站等比小芳便利店更专业的“中间件”了。
5、Kafka 的优势?
Kafka 的独特优势在于:
高吞吐量:计算机集群真实消息传递场景,网络延迟低至 2ms。
高扩展性:将生产集群扩展到 1000 个代理、每天数万亿条消息、数 PB 的数据和数十万个分区。弹性扩展和收缩存储和处理。
永久储存:Kafka 将数据流安全地存储在分布式、持久、容错的集群中(硬盘)。
高可用性:在可用性区域上高效地扩展集群,或者跨地理区域连接单独的集群。
6、Kafka 核心概念
上图描述了:producer*4,broker*2,topic*2,consumer*4,consumer-group*2,zk*1。
下面是相关概念详细描述:
Producer:Producer 即生产者,消息的产生者,是消息的入口。
Kafka Cluster:
Broker:Broker 是 Kafka 实例,每个服务器上有一个或多个 Kafka 的实例,我们姑且认为每个 Broker 对应一台服务器。每个 Kafka 集群内的 Broker 都有一个不重复的编号,如图中的 Broker-0、Broker-1 等……
Topic:消息的主题,可以理解为消息的分类,Kafka 的数据就保存在 Topic。在每个 Broker 上都可以创建多个 Topic。(若没有手动创建 topic)kafka 的 topic 创建时机,consumer/prodocer 发生时,通过 topic-list 会找到已创建的记录,都会创建这个主题。
Partition:Topic 的分区,每个 Topic 可以有多个分区,分区的作用是做负载,提高 Kafka 的吞吐量。同一个 Topic 在不同的分区的数据是不重复的,Partition 的表现形式就是一个一个的文件夹!
Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。在 Kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,Follower 和 Leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费者组成一个消费者组。
在 Kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 Topic 的不同分区的数据,这也是为了提高 Kafka 的吞吐量!
(eg:多机部署下,多台服务器跑同一份代码,对于同一个 topic 的数据,它们分别消费了不同 partition 的消息,从而提高了系统整体消息消费的吞吐量)
Zookeeper:Kafka 集群依赖 Zookeeper 来保存集群的的元信息,来保证系统的可用性。
7、kafka 生产消息
我们看上面的架构图中,Producer 就是生产者,是数据的入口。注意看图中的红色箭头,Producer 在写入数据的时候永远在找 Leader,不会直接将数据写入 Follower!那 Leader 怎么找呢?写入的流程又是什么样的呢?我们看下图:
发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入 Leader 后,Follower 是主动的去 Leader 进行同步的!Producer 采用 Push 模式将数据发布到 Broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入消息到分区的示意图如下:
上面说到数据会写入到不同的分区,那 Kafka 为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
方便扩展。因为一个 Topic 可以有多个 Partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
提高并发。以 Partition 为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
7.1 kafka 分区策略
熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器。那在 Kafka 中,如果某个 Topic 有多个 Partition,Producer 又怎么知道该将数据发往哪个 Partition 呢?
Kafka 中的三个原则:
Partition 在写入的时候可以指定需要写入的 Partition,如果有指定,则写入对应的 Partition。
如果没有指定 Partition,但是设置了数据的 Key,则会根据 Key 的值 Hash 出一个 Partition。
如果既没指定 Partition,又没有设置 Key,则会轮询选出一个 Partition。
我们可以看下官方 API 文档,找到 org.apache.kafka.clients.producer.KafkaProducer<K,V>
public Future<RecordMetadata> send(ProducerRecord<K,V> record)
Asynchronously send a record to a topic. Equivalent to
send(record, null)
.
ProducerRecord 是底层封装的发送消息对象,该类有多个重载的构造器方法,可见完全符合以上的三个原则:
Constructors
Constructor and Description
ProducerRecord(String topic, Integer partition, K key, V value)
Creates a record to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
Creates a record to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
Creates a record with a specified timestamp to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
Creates a record with a specified timestamp to be sent to a specified topic and partition
ProducerRecord(String topic, K key, V value)
Create a record to be sent to Kafka
ProducerRecord(String topic, V value)
Create a record with no key
7.1.1 分区数据结构(Segment)
Producer 将数据写入 Kafka 后,集群就需要对数据进行保存了!Kafka 将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。
Kafka 初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
前面说过了每个 Topic 都可以分为一个或多个 Partition,如果你觉得 Topic 比较抽象,那 Partition 就是比较具体的东西了:
Partition 在服务器上的表现形式就是一个一个的文件夹,每个 Partition 的文件夹下面会有多组 Segment 文件。
每组 Segment 文件又包含 .index 文件、.log 文件、.timeindex 文件(早期版本中没有)三个文件。
Log 文件就是实际存储 Message 的地方,而 Index 和 Timeindex 文件为索引文件,用于检索消息。
如上图,这个 Partition 有三组 Segment 文件,每个 Log 文件的大小是一样的,但是存储的 Message 数量是不一定相等的(每条的 Message 大小不一致)。
文件的命名是以该 Segment 最小 Offset 来命名的,如 000.index 存储 Offset 为 0~368795 的消息,Kafka 就是利用分段+索引的方式来解决查找效率的问题。
举个例子:我们一开头进行的测试,topic=first-topic-0,此时 kafka 会默认分配一个 partition,该分区的最小单位 Segment 以一个文件夹的形式保存了相关文件( .index 文件、.log 文件、.timeindex 文件)
7.2 消息确保不丢包(ACK 应答机制)
其实上面的写入流程图中有描述出来,那就是通过 ACK 应答机制!
在生产者向队列写入数据的时候可以设置参数来确定是否确认 Kafka 接收到数据,这个参数可设置的值为 0、1、all:
0 代表 Producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
1 代表 Producer 往集群发送数据只要 Leader 应答就可以发送下一条,只确保 Leader 发送成功。
all 代表 Producer 往集群发送数据需要所有的 Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader 发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
最后要注意的是,如果往不存在的 Topic 写数据,能不能写入成功呢?答案是:Kafka 会自动创建 Topic,分区和副本的数量根据默认配置都是 1。
7.2.1 消息数据结构(message data structure)
上面说到 Log 文件就实际是存储 Message 的地方,我们在 Producer 往 Kafka 写入的也是一条一条的 Message。
那存储在 Log 中的 Message 是什么样子的呢?消息主要包含消息体、消息大小、Offset、压缩类型……等等!我们重点需要知道的是下面三个:
Offset:Offset 是一个占 8byte 的有序 id 号,它可以唯一确定每条消息在 Parition 内的位置!
消息大小:消息大小占用 4byte,用于描述消息的大小。
消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
7.2.2 消息存储策略
无论消息是否被消费,Kafka 都会保存所有的消息。那对于旧数据有什么删除策略呢?
基于时间, 默认配置是 168 小时(7 天)。
基于大小, 默认配置是 1073741824。
需要注意的是,Kafka 读取特定消息的时间复杂度是 O(1),所以这里删除过期的文件并不会提高 Kafka 的性能!
7.2.3 消费消息寻址
在保存数据的小节里面,我们聊到了 Partition 划分为多组 Segment,每个 Segment 又包含 .log、.index、.timeindex 文件,存放的每条 Message 包含 Offset、消息大小、消息体…… 同时,我们多次提到 Segment 和 Offset,查找消息的时候是怎么利用 Segment+Offset 配合查找的呢?
7.2.3.1 一条消息被消费掉的过程
假如现在需要查找一个 Offset 为 368801 的 Message 是什么样的过程呢?我们先看看下面的图:
先找到 Offset 的 368801message 所在的 Segment 文件(利用二分法查找),这里找到的就是在第二个 Segment 文件。【找到 Segment 文件夹】
打开找到的 Segment 中的 .index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1。我们要查找的 Offset 为 368801 的 Message 在该 Index 内的偏移量为 368796+5=368801,所以这里要查找的相对 Offset 为 5)。【找到 Segment 文件夹下的 index 文件,确定 offset】
由于该文件采用的是稀疏索引的方式存储着相对 Offset 及对应 Message 物理偏移量的关系,所以直接找相对 Offset 为 5 的索引找不到。这里同样利用二分法查找相对 Offset 小于或者等于指定的相对 Offset 的索引条目中最大的那个相对 Offset,所以找到的是相对 Offset 为 4 的这个索引。【根据稀疏索引,寻址到数据文件】
带稀疏索引的文件。这类文件是将所有数据记录关键字值分成许多组,每组一个索引项,这种索引称为稀疏索引。这类文件的数据记录要求按关键字顺序排列。因此,其特点是索引项少,管理方便,但插入、删除记录代价较大。
根据找到的相对 Offset 为 4 的索引确定 Message 存储的物理偏移位置为 256。打开数据文件,从位置为 256 的那个地方开始顺序扫描直到找到 Offset 为 368801 的那条 Message。【从数据文件中找到指定的 Message】
因此,这套机制是建立在 Offset 为有序的基础上,利用 Segment+有序 Offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。
7.2.3.2 消费者记录消费位置
在早期的版本中,消费者将消费到的 Offset 维护在 Zookeeper 中,Consumer 每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的 Offset 已经直接维护在 Kafka 集群的 __consumer_offsets 这个 Topic 中!(见下图所示的 Segments)
7.2.3.3 Kafka 消费 Offset 原理
在通过 Client 端消费 Kafka 中的消息时,消费的消息会同时在 Zookeeper 和 Kafka Log 中保存,如上图红线所示。当手动删除 Kafka 某一分片上的消息日志时,如上图蓝线所示,此是只是将 Kafka Log 中的信息清 0 了,但是 Zookeeper 中的 Partition 和 Offset 数据依然会记录。
当重新启动 Kafka 后,我们会发现如下二种情况:
客户端无法正常用消费;
在使用 Kafka Consumer Offset Monitor 工具进行 Kafka 监控时会发现 Lag(还有多少消息数未读取(Lag=logSize-Offset))为负数;其中此种情况的删除操作需要我们重点关注,后面我们也会详细介绍其对应的操作步骤。
一般正常情况,如果想让 Kafka 客户端正常消费,那么需要 Zookeeper 和 Kafka Log 中的记录保持如上图黄色所示。
8、kafka 消费消息
消息存储在 Log 文件后,消费者就可以进行消费了。消费模式比如:点对点模式和发布订阅模式。
8.1 消息消费模式
8.2 消费原理
而 Kafka 采用的是点对点的模式,消费者主动的去 Kafka 集群拉取消息,与 Producer 相同的是,消费者在拉取消息的时候也是找 Leader 去拉取。
kafka 的 consumer 消费原则有两个:
多个消费者可以组成一个消费者组(Consumer Group),每个消费者组都有一个组 id!
同一个消费组者的消费者可以消费同一 Topic 下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!
是不是有点绕?我们看下图:
图示是消费者组内的消费者小于 Partition 数量的情况,所以会出现某个消费者消费多个 Partition 数据的情况,消费的速度也就不及只处理一个 Partition 的消费者的处理速度!
如果是消费者组的消费者多于 Partition 的数量,那会不会出现多个消费者消费同一个 Partition 的数据呢?
答:上面已经提到过不会出现这种情况!
举例子,在上面的图:
ConsumerGroup2,多出来的消费者 Consumer-3 就是不消费任何 Partition 的数据。
所以在实际的应用中,建议消费者组的 Consumer 的数量与 Partition 的数量一致!
9、延伸阅读
《源码系列》
《经典书籍》
《Java并发编程实战:第2章 影响线程安全性的原子性和加锁机制》
《Java并发编程实战:第3章 助于线程安全的三剑客:final & volatile & 线程封闭》
《服务端技术栈》
《算法系列》
《设计模式》
版权声明: 本文为 InfoQ 作者【后台技术汇】的原创文章。
原文链接:【http://xie.infoq.cn/article/562bf0f6289296030755cece5】。文章转载请联系作者。
评论