什么是 kafka
Apache Kafka 是 Apache 软件基金会的开源的流处理平台,该平台提供了消息的订阅与发布的消息队列,一般用作系统间解耦、异步通信、削峰填谷等作用。同时 Kafka 又提供了 Kafka streaming 插件包实现了实时在线流处理。相比较一些专业的流处理框架不同,Kafka Streaming 计算是运行在应用端,具有简单、入门要求低、部署方便等优点。
kafka 的架构
Kafka 集群以 Topic 形式负责分类集群中的 Record,每一个 Record 属于一个 Topic。每个 Topic 底层都会对应一组分区的日志用于持久化 Topic 中的 Record。同时在 Kafka 集群中,Topic 的每一个日志的分区都一定会有 1 个 Borker 担当该分区的 Leader,其他的 Broker 担当该分区的 follower,Leader 负责分区数据的读写操作,follower 负责同步改分区的数据。这样如果分区的 Leader 宕机,该分区的其他 follower 会选取出新的 leader 继续负责该分区数据的读写。其中集群的中 Leader 的监控和 Topic 的部分元数据是存储在 Zookeeper 中。
kafka 的 API
topic 的创建
[root@node01 bin]# kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--create
--topic test
--partitions 3
--replication-factor 3
复制代码
查看 topic 的列表
[root@node01 bin]# kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--list
复制代码
查看一个 topic 的详细信息
[root@node01 bin]# ./bin/kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--describe
--topic test
复制代码
修改 topic
[root@node01 kafka_2.11-2.2.0]# ./bin/kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--alter
--topic test
--partitions 2
复制代码
删除 topic
[root@node01 bin]# kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--delete
--topic test
复制代码
producer 往一个 topic 中生产消息
[root@node01 bin]# kafka-console-producer.sh
--broker-list node01:9092,node01:9092,node01:9092
--topic test
复制代码
consumer 订阅一个 topic 消费消息
[root@node01 bin]# kafka-console-consumer.sh
--bootstrap-server node01:9092,node01:9092,node01:9092
--topic test
--group opentest
复制代码
查看消费组信息
[root@node01 bin]# kafka-console-consumer.sh
--bootstrap-server node01:9092,node01:9092,node01:9092
--list
复制代码
查看某一消费组的详细信息
[root@node01 bin]# kafka-console-consumer.sh
--bootstrap-server node01:9092,node01:9092,node01:9092
--describe
--group opentest
复制代码
kafka 在程序中的使用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.0</version>
</dependency>
复制代码
生产者的代码
@Test
public void producer() throws ExecutionException, InterruptedException {
String topic = "items";
Properties p = new Properties();
p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node02:9092,node03:9092,node01:9092");
p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);
while(true){
for (int i = 0; i < 3; i++) {
for (int j = 0; j <3; j++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item"+j,"val" + i);
Future<RecordMetadata> send = producer
.send(record);
RecordMetadata rm = send.get();
int partition = rm.partition();
long offset = rm.offset();
System.out.println("key: "+ record.key()+" val: "+record.value()+" partition: "+partition + " offset: "+offset);
}
}
}
}
复制代码
消费者代码
@Test
public void consumer(){
Properties p = new Properties();
p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node02:9092,node03:9092,node01:9092");
p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"opentest");
p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//
p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交
// p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");//默认5秒
// p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); //拉取数据的配置
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p);
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));//
if(!records.isEmpty()){
Iterator<ConsumerRecord<String, String>> iter = records.iterator();
while(iter.hasNext()){
ConsumerRecord<String, String> record = iter.next();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
System.out.println("key: "+ record.key()+" val: "+ record.value()+ " partition: "+partition + " offset: "+ offset);
}
}
}
}
复制代码
kafka 的原理深入
1. kafka 的 AKF
2. kafka 数据如何保证顺序消费
3. kafka 中 consumer 的分组
4.消息队列,常见保证消息顺序性消费的两种方案
①. 生产时保证消息的有序性,单线程消费
多线程生产消息,后面单线程消费数据,可以消费一条数据,就更新 kafka 偏移量 offset 的值,这种方式可以保证消息消费的进度,以及准确地更细 offset 的值,但是单线程的消费会对数据库以及 offset 进行频繁的更新,成本有点高,并且存在 cpu 以及网卡的资源浪费。
②. 多线程消费
5.kafka 的消息生产的确认机制-- ack 确认机制
Kafka 生产者在发送完一个的消息之后,要求 Broker 在规定的时间内 Ack 应答,如果没有在规定时间内应答,Kafka 生产者会尝试重新发送消息。默认 acks=1。
①. acks=1 - Leader 会将 Record 写到其本地日志中,但会在不等待所有 Follower 的完全确认的情况下做出响应。在这种情况下,如果 Leader 在确认记录后立即失败,但在 Follower 复制记录之前失败,则记录将丢失。
②. acks=0 - 生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。
③. acks=all /-1 - 这意味着 Leader 将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于 acks = -1 设置。
6.kafka 的 ISR、OSR 以及 AR
**ISR:**in-sync-replica set 同步副本设置。为了解决数据同步高延迟问题以及 leader 重新选举时不会影响数据同步。
7.kafka 中的索引
kafka 中索引文件有两个,分别是 offset 的索引文件以及 timeindex 的索引文件,文件初始化时,都是 10M 大小。offset 索引文件中会记录 offset 的值,以及文件中的 position。以 position 取读取 log 文件中的一批数据。timeindex 索引会记录一个时间戳,以及对应的 offset,所以需要重新去 offset 的索引文件中找到 offset 对应的 log 文件的 position,再去读取数据。
kafka 中常见的优化参数
1. broker 的配置
2. producer 的配置
3. consumer 的配置
总结
kafka 作为主流的消息中间件,主要原因是其有如下优势:
解耦
Kafka 具备消息系统的优点,只要生产者和消费者数据两端遵循接口约束,就可以自行扩展或修改数据处理的业务过程。
高吞吐量、低延迟
即使在非常廉价的机器上,Kafka 也能做到每秒处理几十万条消息,而它的延迟最低只有几毫秒。
持久性
Kafka 可以将消息直接持久化在普通磁盘上,且磁盘读写性能优异。
扩展性
Kafka 集群支持热扩展,Kaka 集群启动运行后,用户可以直接向集群添加。
容错性
Kafka 会将数据备份到多台服务器节点中,即使 Kafka 集群中的某一台 Kafka 服务节点宕机,也不会影响整个系统的功能。
支持多种客户端语言
Kafka 支持 Java、.NET、PHP、Python 等多种语言。
更多学习资料戳下方!!!
https://qrcode.ceba.ceshiren.com/link?name=article&project_id=qrcode&from=infoQ×tamp=1662366626&author=xueqi
评论