写点什么

大数据 -56 Kafka Spring Boot 使用 Kafka 开发分布式消息系统详解 附代码

作者:武子康
  • 2025-07-31
    山东
  • 本文字数:4114 字

    阅读完需:约 13 分钟

大数据-56 Kafka Spring Boot 使用 Kafka 开发分布式消息系统详解 附代码

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-30-新发布【1T 万亿】参数量大模型!Kimi‑K2 开源大模型解读与实践,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 07 月 28 日更新到:Java-83 深入浅出 MySQL 连接、线程、查询缓存与优化器详解 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了:


  • topics.sh、producer.sh、consumer.sh 脚本的基本使用

  • pom.xml 配置

  • JavaAPI 的使用:producer 和 consumer


简单介绍

在 Spring Boot 中使用 Kafka,是构建分布式消息驱动应用程序的一种常见方法。Kafka 的强大之处在于其高吞吐量、低延迟和良好的可扩展性,非常适合处理大量实时数据。

Kafka 的基本概念详解

Producer(生产者)

生产者是 Kafka 系统中的消息发布者,负责创建并向 Kafka 的主题(topic)发送消息。生产者可以:


  • 批量发送消息以提高吞吐量

  • 配置消息确认机制(acks=0/1/all)

  • 使用 key/value 格式发送消息

  • 自定义分区策略(默认使用轮询或基于 key 的哈希)


示例场景:电商网站的订单系统将新订单作为消息发送到"orders"主题。

Consumer(消费者)

消费者是从 Kafka 主题中读取并处理消息的客户端应用程序。消费者具有以下特点:


  • 可以独立消费或组成消费者组

  • 支持消息的偏移量(offset)管理

  • 提供多种消费模式(实时/批量)

  • 支持消息回溯和重新消费

Broker(代理)

Broker 是 Kafka 集群中的单个节点,负责:


  • 接收生产者的消息并存储在磁盘

  • 响应消费者的读取请求

  • 维护消息的副本以保证高可用性

  • 处理集群协调和分区领导选举


典型生产环境中,一个 Kafka 集群由多个 broker 组成(通常 3 个或以上)。

Topic(主题)

主题是 Kafka 中消息的分类和组织单元,特点包括:


  • 类似于数据库中的表或消息队列中的队列

  • 可以配置不同的保留策略(时间/大小)

  • 支持多分区(横向扩展的基础)

  • 消息按发布顺序持久化存储

Partition(分区)

分区是主题的物理子单元,具有以下特性:


  • 每个分区是一个有序的、不可变的记录序列

  • 分区可以分布在不同的 broker 上以实现负载均衡

  • 分区数量决定了主题的最大并行消费能力

  • 每个分区的消息都分配一个递增的偏移量(offset)


示例:若"orders"主题有 3 个分区,则最大可并行 3 个消费者同时消费。

Consumer Group(消费者组)

消费者组是 Kafka 实现并行消费的核心机制:


  • 组内消费者共同消费一个主题的所有分区

  • 每个分区在同一时间只能被组内一个消费者消费

  • 支持动态扩容缩容(rebalance 机制)

  • 消费进度(offset)由 Kafka 协调器统一管理


应用场景:订单处理服务部署多个实例组成消费者组,实现负载均衡和高可用。

spring-kafka

Spring-Kafka 是 Spring 框架对 Apache Kafka 的集成,使得在 Spring 应用中使用 Kafka 更加简便和直观。它提供了一系列功能和配置选项来帮助开发者快速构建基于消息驱动的微服务架构。

KafkaTemplate

KafkaTemplate 是 Spring-Kafka 提供的用于发送消息的核心类。它简化了生产者与 Kafka 交互的过程。你可以通过这个类轻松地将消息发送到 Kafka 的主题中。

KafkaListener

@KafkaListener 是用于消费 Kafka 消息的注解。通过这个注解,可以非常方便地定义消息消费者,处理从指定主题接收到的消息。

Spring-Kafka 的配置

Spring-Kafka 支持通过配置文件来配置 Kafka 客户端的属性。这些配置可以在 application.properties 或 application.yml 中指定。

架构图

上节已经出现过了,这里再放一次


POM

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>springboot-kafka</artifactId> <version>1.0-SNAPSHOT</version>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>
复制代码

配置文件

我们常见的配置文件如下图:


spring:  kafka:    bootstrap-servers: localhost:9092    consumer:      group-id: my-group      auto-offset-reset: earliest      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    template:      default-topic: my-topic
复制代码

Producer

编写代码

编写了一个 KafkaProducerController 里边写了两个方法,都是使用了 KafkaTemplate 的工具。


@RestControllerpublic class KafkaProducerController {
@Resource private KafkaTemplate<Integer, String> kafkaTemplate;
@RequestMapping("/sendSync/{message}") public String sendSync(@PathVariable String message) { ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message); ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record); try { SendResult<Integer, String> result = future.get(); System.out.println(result.getProducerRecord().key() + "->" + result.getProducerRecord().partition() + "->" + result.getProducerRecord().timestamp()); } catch (Exception e) { e.printStackTrace(); } return "Success"; }
@RequestMapping("/sendAsync/{message}") public String sendAsync(@PathVariable String message) { ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message); ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable ex) { System.out.println("发送失败!"); ex.printStackTrace(); }
@Override public void onSuccess(SendResult<Integer, String> result) { System.out.println("发送成功"); System.out.println(result.getProducerRecord().key() + "->" + result.getProducerRecord().partition() + "->" + result.getProducerRecord().timestamp()); } }); return "Success"; }
}
复制代码

测试结果

http://localhost:8085/sendSync/wzktest1http://localhost:8085/sendAsync/wzktest2http://localhost:8085/sendAsync/wzktest222222
复制代码


我们观察控制台的效果如下:


Consumer

编写代码

编一个类来实现 Consumer:


@Configurationpublic class KafkaConsumer {
@KafkaListener(topics = {"wzk_topic_test"}) public void consume(ConsumerRecord<Integer, String> consumerRecord) { System.out.println( consumerRecord.topic() + "\t" + consumerRecord.partition() + "\t" + consumerRecord.offset() + "\t" + consumerRecord.key() + "\t" + consumerRecord.value()); }
}
复制代码

测试运行

2024-07-12 13:48:46.831  INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}}2024-07-12 13:48:46.926  INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : wzk-test: partitions assigned: [wzk_topic_test-0]wzk_topic_test  0  13  1  wzktestwzk_topic_test  0  14  2  wzktest222wzk_topic_test  0  15  2  wzktest222222
复制代码


控制台的截图如下:



发布于: 刚刚阅读数: 2
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-56 Kafka Spring Boot 使用 Kafka 开发分布式消息系统详解 附代码_Java_武子康_InfoQ写作社区