写点什么

当 Spring 邂逅 Kafka,有趣的知识增加了

作者:翊君
  • 2022 年 6 月 07 日
  • 本文字数:7396 字

    阅读完需:约 24 分钟

当Spring邂逅Kafka,有趣的知识增加了

0.阅读完本文你将会学到

  • 一些 linux 的常用命令

  • 如何在 linux 上安装 JDK、ZooKeeper、Kafka

  • 轻量级的 Spring 与 Kafka 的整合


Kafka 起初是由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统,现已被捐献给 Apache 基金会。


目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持续化、可水平扩展、支持流数据处理等多种特性而被广泛使用。


关于 Kafka 名字的由来,另有一段佳话。如果你的记忆力还不错的话,应该会记得高中有一篇课文叫做《变形记》,它的作者正是奥地利小说家Franz Kafka。笔者本人也非常喜欢他的《城堡》一作。而 Apache Kafka 的作者大学的时候也非常喜欢 Franz Kafka,所以就将这个系统命名为 Kafka。


现在让我们打开电脑,一起实践吧!


如果你的电脑上已经安装了 Kafka,可以跳过第一部分,直接进入第二部分哦。

1. Kafka 的安装与设置

安装 Kafka 之前,我们需要安装 Java 以及 ZooKeeper。

1.1 安装 JDK

1. 确认系统是否已安装过 Java


安装 JDK 之前我们先确认下系统是否已安装过 JDK,如下操作:


rem -qa | grep javarem -qa | grep jdkrem -qa | grep gcj
复制代码


如果没有任何信息,则表示系统没有安装过 Java。


如果想要卸载已经安装过的 JDK,则可以执行下方的命令。


rpm -qa | grep java | xargs rpm -e --nodeps
复制代码


2. 安装 Java


下面开始安装 Java,这里以 1.8 为例。


yum list java-1.8*
复制代码


通过这个命令我们可以看见 Java 1.8 版本的所有文件。


java-1.8.0-openjdk.x86_64                                                                                                                                  java-1.8.0-openjdk-accessibility.x86_64                                                                                                                    java-1.8.0-openjdk-demo.x86_64                                                                                                                             java-1.8.0-openjdk-devel.x86_64                                                                                                                            java-1.8.0-openjdk-headless.x86_64                                                                                                                         java-1.8.0-openjdk-headless-slowdebug.x86_64                                                                                                               java-1.8.0-openjdk-javadoc.noarch                                                                                                                          java-1.8.0-openjdk-javadoc-zip.noarch                                                                                                                      java-1.8.0-openjdk-slowdebug.x86_64                                                                                                                        java-1.8.0-openjdk-src.x86_64        
复制代码


然后我们可以通过这个命令安装 Java 1.8 版本的所有文件。


yum install java-1.8.0-openjdk* -y
复制代码


当控制台返回 Complete 之后,显示 Java 已经安装成功。


3. 确认 Java 安装成功


使用下面这个命令进行确认


java -version
复制代码


结果显示如下,表示已安装成功。


使用 yum 安装的时候,环境变量就自动配好了。


openjdk version "1.8.0_312"OpenJDK Runtime Environment (build 1.8.0_312-b07)OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)
复制代码

1.2 安装 ZooKeeper

1. 创建目录 data 并且下载 3.7.0 版本的 ZooKeeper


mkdir /datacd /datawget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
复制代码


2. 解压


tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
复制代码


3. 修改配置文件


// 进入配置文件目录cd apache-zookeeper-3.7.0/conf
// 将zoo_sample.cfg这个文件复制为zoo.cfg cp zoo_sample.cfg zoo.cfg
// 修改配置文件vi zoo.cfg
复制代码


输入 vi zoo.cfg 之后,需要按 i 进入 insert 模式才能做修改。修改完毕请先按 ESC 退出 insert 模式,进入命令行模式,再按连续两个大写 ZZ 进行保存并退出。


dataDir=/tmp/zookeeper修改成dataDir=/data/apache-zookeeper-3.7.0-bin/data


3. 创建对应的 data 目录


mkdir /data/apache-zookeeper-3.7.0-bin/data
复制代码


4. 启动 ZooKeeper


进入 ZooKeeper 的 bin 目录并且启动服务


cd /data/apache-zookeeper-3.7.0-bin/bin./zkServer.sh start
复制代码


Zookeeper 成功后将会出现下面信息:


/usr/bin/javaZooKeeper JMX enabled by defaultUsing config: /data/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfgStarting zookeeper ... STARTED
复制代码


下面是其他几个常用命令


// 停止./zkServer.sh stop
// 重启./zkServer.sh restart
// 查看状态./zkServer.sh status
复制代码

1.3 安装 kafka

1. 下载版本为 3.0.0 的 kakfa


cd /datawget https://mirrors.bfsu.edu.cn/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz
复制代码


2. 解压


tar -zxvf kafka_2.13-3.0.0.tgz kafka_2.13-3.0.0
复制代码


3. 启动


config/server.properties中的zookeeper.connect的默认地址是localhost:2181,如果你的 Zookeeper 安装在本机,保持默认即可。


cd kafka_2.13-3.0.0.tgz kafka_2.13-3.0.0
// 前台启动:bin/kafka-server-start.sh config/server.properties// 下面的命令行是后台启动,不会像前台启动一直打印日记。bin/kafka-server-start.sh -daemon config/server.properties
复制代码


现在你已经成功启动了 Kafka,恭喜你终于迈出了第一步!

2. Spring 与 Kafka 的整合

2.1 配置 pom

我们需要在 pom.xml 里面添加 Kafka 的依赖:


<dependency>     <groupId>org.springframework.kafka</groupId>     <artifactId>spring-kafka</artifactId>     <version>2.7.2</version> </dependency>
复制代码


文中的 demo 应用将是一个 Spring Boot 的应用,你可以在这里方便快捷地创建一个 Spring Boot 的应用。

2.2 配置 Topic

我们先来回顾下什么是 topic:


在 Kafka 中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为 topic 。如果把 Kafka 看做为一个数据库, topic 可以理解为数据库中的一张表, topic 的名字即为表名。


之前我们可以通过命令行创建 Topic


bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic
复制代码


现在由于有了 Kafka 中 AdminClient 的引入,我们可以在程序中创建 topic。我们需要添加 KafkaAdmin 这个 bean,它可以自动地带入 NewTopic 的所有 bean 的 topic。


@Configurationpublic class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress;
@Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); }
@Bean public NewTopic topic1() { return new NewTopic("jayxu", 1, (short) 1); }}
复制代码

2.3 生产消息

为了创建消息,我们首先需要配置一个 ProducerFactory。ProducerFactory 设置了创建 Kafka Producer 实例的策略。


然后我们需要一个 KafkaTemplate,它包装了一个 Producer 实例,并提供了向 Kafka Topic 发送消息的方法。


Producer 实例是线程安全的。在整个应用环境中使用单例会有更高的性能。KakfaTemplate 实例也是线程安全的,建议使用一个实例。

2.3.1 Producer 配置

@Configurationpublic class KafkaProducerConfig {
@Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }}
复制代码

2.3.2 发布消息

我们可以使用 KafkaTemplate 来发送消息。


@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg);}
复制代码


sendAPI 返回一个 ListenableFuture 对象。如果我们想阻止发送线程,并获得关于已发送消息的结果,我们可以调用 ListenableFuture 对象的 get API。该线程将等待结果,但它会减慢 producer 的速度。


Kafka 是一个快速的流处理平台。因此,最好是异步处理结果,这样后续的消息就不会等待前一个消息的结果了。


我们可以通过回调来做到这一点:


public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override public void onSuccess(SendResult<String, String> result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } });}
复制代码

2.4 消费消息

2.4.1 Consumer 配置

为了消费消息,我们需要配置一个 ConsumerFactory 和一个 KafkaListenerContainerFactory。一旦这些 bean 在 Spring bean 工厂中可用,就可以使用 @KafkaListener 注解来配置基于 POJO 的 consumer。


配置类中需要有 @EnableKafka 注解,以便在 Spring 管理的 bean 上检测 @KafkaListener 注解。


@EnableKafka@Configurationpublic class KafkaConsumerConfig {
@Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }}
复制代码

2.4.2 消费消息

@KafkaListener(topics = "topicName", groupId = "foo")public void listenGroupFoo(String message) {    System.out.println("Received Message in group foo: " + message);}
复制代码


我们可以为一个 topic 实现多个 listener,每个都有不同的 group ID。此外,一个 consumer 可以监听来自不同 topic 的消息。


@KafkaListener(topics = "topic1, topic2", groupId = "foo")
复制代码


Spring 还支持使用监听器中的 @Header 注解来检索一个或多个消息头。


@KafkaListener(topics = "topicName")public void listenWithHeaders(        @Payload String message,        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {    System.out.println(            "Received Message: " + message"                    + "from partition: " + partition);}
复制代码

2.4.3 消费特定分区的信息

注意,我们创建的话题jayxu只有一个分区。


然而,对于一个有多个分区的 topic,@KafkaListener 可以明确地订阅一个有initial offset的 topic 的特定分区。


@KafkaListener(        topicPartitions = @TopicPartition(topic = "topicName",                partitionOffsets = {                        @PartitionOffset(partition = "0", initialOffset = "0"),                        @PartitionOffset(partition = "3", initialOffset = "0")}),        containerFactory = "partitionsKafkaListenerContainerFactory")public void listenToPartition(        @Payload String message,        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {    System.out.println(            "Received Message: " + message"                    + "from partition: " + partition);}
复制代码


由于在这个监听器中,initialOffset被设置为 0,所以每次初始化这个监听器时,所有之前消耗的 0 和 3 分区的消息都会被重新消费。


如果我们不需要设置 offset,我们可以使用 @TopicPartition 注解的 partitions 属性,只设置没有 offset 的分区。


@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
复制代码

2.4.4 为监听器添加消息过滤器

我们可以通过添加一个自定义的过滤器来配置监听器来消费特定类型的消息。这可以通过给 KafkaListenerContainerFactory 设置一个 RecordFilterStrategy 来完成。


@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String>filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory;}
复制代码


然后我们可以配置一个监听器来使用这个容器工厂。


@KafkaListener(        topics = "topicName",        containerFactory = "filterKafkaListenerContainerFactory")public void listenWithFilter(String message) {    System.out.println("Received Message in filtered listener: " + message);}
复制代码


在这个监听器中,所有符合过滤器的信息都将被丢弃。

2.5 自定义消息转换器

到目前为止,我们只涵盖了发送和接收字符串的消息。然而,我们也可以发送和接收自定义的 Java 对象。这需要在 ProducerFactory 中配置适当的序列化器,在 ConsumerFactory 中配置解序列化器。


让我们看看一个简单的 bean 类,我们将把它作为消息发送。


public class Greeting {
private String msg; private String name;
// standard getters, setters and constructor}
复制代码

2.5.1 生产自定义消息

在这个例子中,我们将使用 JsonSerializer。


让我们看看 ProducerFactory 和 KafkaTemplate 的代码。


@Beanpublic ProducerFactory<String, Greeting> greetingProducerFactory() {    // ...    configProps.put(            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,            JsonSerializer.class);    return new DefaultKafkaProducerFactory<>(configProps);}
@Beanpublic KafkaTemplate<String, Greeting> greetingKafkaTemplate() { return new KafkaTemplate<>(greetingProducerFactory());}
复制代码


我们可以使用这个新的 KafkaTemplate 来发送 Greeting 信息。


kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
复制代码

2.5.2 消费自定义消息

同样地,让我们修改 ConsumerFactory 和 KafkaListenerContainerFactory,以正确地反序列化 Greeting 消息。


@Beanpublic ConsumerFactory<String, Greeting> greetingConsumerFactory() {    // ...    return new DefaultKafkaConsumerFactory<>(            props,            new StringDeserializer(),            new JsonDeserializer<>(Greeting.class));}
@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Greeting>greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(greetingConsumerFactory()); return factory;}
复制代码


spring-kafka 的 JSON 序列化器和反序列化器使用 Jackson 库,这也是 spring-kafka 项目的可选 Maven 依赖。


所以,让我们把它添加到我们的 pom.xml 中。


<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.7</version> </dependency>
复制代码


建议不要使用 Jackson 的最新版本,而是使用 spring-kafka 的 pom.xml 中加入的版本。


最后,我们需要写一个监听器来消费 Greeting 消息。


@KafkaListener(        topics = "topicName",        containerFactory = "greetingKafkaListenerContainerFactory")public void greetingListener(Greeting greeting) {    // process greeting message}
复制代码

3. 总结

在这篇文章中,我们介绍了如何安装 Kafka 以及 Spring 支持 Apache Kafka 的基本情况。我们简单学习了一下用于发送和接收消息的类。


在运行代码之前,请确保 Kafka 服务器正在运行,并且 topic 是手动创建的。


感谢您的观看,如果可以的话请三连支持!您也可以查看我的主页进行关注我的后续文章,非常感谢!

发布于: 2022 年 06 月 07 日阅读数: 18
用户头像

翊君

关注

周而不比 和而不同 2019.02.11 加入

喜欢阅读、摄影、写文的一枚小码农

评论

发布
暂无评论
当Spring邂逅Kafka,有趣的知识增加了_kafka_翊君_InfoQ写作社区