写点什么

Reactive Spring 实战 -- 响应式 Kafka 交互

用户头像
binecy
关注
发布于: 4 小时前

本文分享如何使用 KRaft 部署 Kafka 集群,以及 Spring 中如何实现 Kafka 响应式交互。

KRaft

我们知道,Kafka 使用 Zookeeper 负责为 kafka 存储 broker,Consumer Group 等元数据,并使用 Zookeeper 完成 broker 选主等操作。虽然使用 Zookeeper 简化了 Kafka 的工作,但这也使 Kafka 的部署和运维更复杂。


Kafka 2.8.0 开始移除了 Zookeeper,并使用 Kafka 內部的仲裁(Quorum)控制器來取代 ZooKeeper,官方称这个控制器为 "Kafka Raft metadata mode",即 KRaft mode。从此用户可以在不需要 Zookeeper 的情况下部署 Kafka 集群,这使 Fafka 更加简单,轻量级。使用 KRaft 模式后,用户只需要专注于维护 Kafka 集群即可。


注意:由于该功能改动较大,目前 Kafka2.8 版本提供的 KRaft 模式是一个测试版本,不推荐在生产环境使用。相信 Kafka 后续版本很快会提供生产可用的 kraft 版本。


下面介绍一下如果使用 Kafka 部署 kafka 集群。这里使用 3 台机器部署 3 个 Kafka 节点,使用的 Kafka 版本为 2.8.0。


  1. 生成 ClusterId 以及配置文件。(1)使用 kafka-storage.sh 生成 ClusterId。


$ ./bin/kafka-storage.sh random-uuiddPqzXBF9R62RFACGSg5c-Q
复制代码


(2)使用 ClusterId 生成配置文件


$ ./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.propertiesFormatting /tmp/kraft-combined-logs
复制代码


注意:只需要在生成一个 ClusterId,并使用该 ClusterId 在所有机器上生成配置文件,即集群中所有节点使用的 ClusterId 需相同。


  1. 修改配置文件脚本生成的配置文件只能用于单个 Kafka 节点,如果在部署 Kafka 集群,需要对配置文件进行一下修改。


(1)修改 config/kraft/server.properties(稍后使用该配置启动 kafka)


process.roles=broker,controller node.id=1listeners=PLAINTEXT://172.17.0.2:9092,CONTROLLER://172.17.0.2:9093advertised.listeners=PLAINTEXT://172.17.0.2:9092controller.quorum.voters=1@172.17.0.2:9093,2@172.17.0.3:9093,3@172.17.0.4:9093
复制代码


process.roles 指定了该节点角色,有以下取值


  • broker: 这台机器将仅仅当作一个 broker

  • controller: 作为 Raft quorum 的控制器节点

  • broker,controller: 包含以上两者的功能


一个集群中不同节点的 node.id 需要不同。controller.quorum.voters 需要配置集群中所有的 controller 节点,配置格式为<nodeId>@<ip>:<port>。


(2)kafka-storage.sh 脚本生成的配置,默认将 kafka 数据存放在/tmp/kraft-combined-logs/,我们还需要/tmp/kraft-combined-logs/meta.properties 配置中的 node.id,使其与 server.properties 配置中保持一起。


node.id=1
复制代码


  1. 启动 kafka 使用 kafka-server-start.sh 脚本启动 Kafka 节点


$ ./bin/kafka-server-start.sh ./config/kraft/server.properties
复制代码


下面测试一下该 kafka 集群


  1. 创建主题


$ ./bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 
复制代码


  1. 生产消息


$ ./bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1
复制代码


  1. 消费消息


$ ./bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 --from-beginning
复制代码


这部分命令的使用与低版本的 Kafka 保持一致。


Kafka 的功能暂时还不完善,这是展示一个简单的部署示例。Kafka 文档:https://github.com/apache/kafka/blob/trunk/config/kraft/README.md


Spring 中可以使用 Spring-Kafka、Spring-Cloud-Stream 两个框架实现 kafka 响应式交互。下面分别看一下这两个框架的使用。

Spring-Kafka

  1. 添加引用添加 spring-kafka 引用


<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId>    <version>2.5.8.RELEASE</version></dependency>
复制代码


  1. 准备配置文件,内容如下


spring.kafka.producer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializerspring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializerspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.group-id=warehouse-consumersspring.kafka.consumer.properties.spring.json.trusted.packages=*
复制代码


分别是生产者和消费者对应的配置,很简单。


  1. 发送消息 Spring-Kakfa 中可以使用 ReactiveKafkaProducerTemplate 发送消息。首先,我们需要创建一个 ReactiveKafkaProducerTemplate 实例。(目前 SpringBoot 会自动创建 KafkaTemplate 实例,但不会创建 ReactiveKafkaProducerTemplate 实例)。


@Configurationpublic class KafkaConfig {    @Autowired    private KafkaProperties properties;
@Bean public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() { SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties()); ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options); return template; }}
复制代码


KafkaProperties 实例由 SpringBoot 自动创建,读取上面配置文件中对应的配置。


接下来,就可以使用 ReactiveKafkaProducerTemplate 发送消息了


    @Autowired    private ReactiveKafkaProducerTemplate template;
public static final String WAREHOUSE_TOPIC = "warehouse"; public Mono<Boolean> add(Warehouse warehouse) { Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse); return resultMono.flatMap(rs -> { if(rs.exception() != null) { logger.error("send kafka error", rs.exception()); return Mono.just(false); } return Mono.just(true); }); }
复制代码


ReactiveKafkaProducerTemplate#send 方法返回一个 Mono(这是 Spring Reactor 中的核心对象),Mono 中携带了 SenderResult,SenderResult 中的 RecordMetadata、exception 存储该记录的元数据(包括 offset、timestamp 等信息)以及发送操作的异常。


  1. 消费消息 Spring-Kafka 使用 ReactiveKafkaConsumerTemplate 消费消息。


@Servicepublic class WarehouseConsumer {    @Autowired    private KafkaProperties properties;
@PostConstruct public void consumer() { ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties()); options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC)); new ReactiveKafkaConsumerTemplate(options) .receiveAutoAck() .subscribe(record -> { logger.info("Warehouse Record:" + record); }); }}
复制代码


这里与之前使用 @KafkaListener 注解实现的消息监听者不同,不过也非常简单,分为两个步骤:(1)ReceiverOptions#subscription 方法将 ReceiverOptions 关联到 kafka 主题(2)创建 ReactiveKafkaConsumerTemplate,并注册 subscribe 的回调函数消费消息。提示:receiveAutoAck 方法会自动提交消费组 offset。

Spring-Cloud-Stream

Spring-Cloud-Stream 是 Spring 提供的用于构建消息驱动微服务的框架。它为不同的消息中间件产品提供一种灵活的,统一的编程模型,可以屏蔽底层不同消息组件的差异,目前支持 RabbitMQ、Kafka、RocketMQ 等消息组件。


这里简单展示 Spring-Cloud-Stream 中实现 Kafka 响应式交互的示例,不深入介绍 Spring-Cloud-Stream 的应用。


  1. 引入 spring-cloud-starter-stream-kafka 的引用


    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-stream-kafka</artifactId>    </dependency>
复制代码


  1. 添加配置


spring.cloud.stream.kafka.binder.brokers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092spring.cloud.stream.bindings.warehouse2-out-0.contentType=application/jsonspring.cloud.stream.bindings.warehouse2-out-0.destination=warehouse2# 消息格式spring.cloud.stream.bindings.warehouse3-in-0.contentType=application/json# 消息目的地,可以理解为Kafka主题spring.cloud.stream.bindings.warehouse3-in-0.destination=warehouse2# 定义消费者消费组,可以理解为Kafka消费组spring.cloud.stream.bindings.warehouse3-in-0.group=warehouse2-consumers# 映射方法名spring.cloud.function.definition=warehouse2;warehouse3
复制代码


Spring-Cloud-Stream 3.1 版本之后,@EnableBinding、@Output 等 StreamApi 注解都标记为废弃,并提供了一种更简洁的函数式编程模型。该版本后,用户不需要使用注解,只要在配置文件中指定需要绑定的方法,Spring-Cloud-Stream 会为用户将这些方法与底层消息组件绑定,用户可以直接调用这些方法发送消息,或者接收到消息时 Spring-Cloud-Stream 会调用这些方法消费消息。


通过以下格式定义输入、输出函数的相关属性:输出(发送消息):<functionName> + -out- + <index>输入(消费消息):<functionName> + -in- + <index>对于典型的单个输入/输出函数,index 始终为 0,因此它仅与具有多个输入和输出参数的函数相关。Spring-Cloud-Stream 支持具有多个输入(函数参数)/输出(函数返回值)的函数。


spring.cloud.function.definition 配置指定需要绑定的方法名,不添加该配置,Spring-Cloud-Stream 会自动尝试绑定返回类型为 Supplier/Function/Consumer 的方法,但是使用该配置可以避免 Spring-Cloud-Stream 绑定混淆。


  1. 发送消息用户可以编写一个返回类型为 Supplier 的方法,并定时发送消息


    @PollableBean    public Supplier<Flux<Warehouse>> warehouse2() {        Warehouse warehouse = new Warehouse();        warehouse.setId(333L);        warehouse.setName("天下第一仓");        warehouse.setLabel("一级仓");
logger.info("Supplier Add : {}", warehouse); return () -> Flux.just(warehouse); }
复制代码


定义该方法后,Spring-Cloud-Stream 每秒调用一次该方法,生成 Warehouse 实例,并发送到 Kafka。(这里方法名 warehouse3 已经配置在 spring.cloud.function.definition 中。)


通常场景下,应用并不需要定时发送消息,而是由业务场景触发发送消息操作, 如 Rest 接口,这时可以使用 StreamBridge 接口


    @Autowired    private StreamBridge streamBridge;
public boolean add2(Warehouse warehouse) { return streamBridge.send("warehouse2-out-0", warehouse); }
复制代码


暂时未发现 StreamBridge 如何实现响应式交互。


  1. 消费消息应用要消费消息,只需要定义一个返回类型为 Function/Consumer 的方法即可。如下


    @Bean    public Function<Flux<Warehouse>, Mono<Void>> warehouse3() {        Logger logger = LoggerFactory.getLogger("WarehouseFunction");        return flux -> flux.doOnNext(data -> {            logger.info("Warehouse Data: {}", data);        }).then();    }
复制代码


注意:方法名与<functionName> + -out- + <index>/<functionName> + -in- + <index>、spring.cloud.function.definition 中的配置需要保持一致,以免出错。


SpringCloudStream 文档:https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html


文章完整代码:https://gitee.com/binecy/bin-springreactive/tree/master/warehouse-service


如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!



发布于: 4 小时前阅读数: 3
用户头像

binecy

关注

还未添加个人签名 2020.08.26 加入

还未添加个人简介

评论

发布
暂无评论
Reactive Spring实战 -- 响应式Kafka交互