写点什么

Kafka 性能测试初探

作者:FunTester
  • 2023-05-05
    北京
  • 本文字数:3466 字

    阅读完需:约 11 分钟

相信大家对 Kafka 不会陌生,但首先还是要简单介绍一下。

Kafka 是一种高性能的分布式消息系统,由 LinkedIn 公司开发,用于处理海量的实时数据流。它采用了发布/订阅模式,可以将数据流分发到多个消费者端,同时提供了高可靠性、高吞吐量和低延迟的特性。

Kafka 的应用场景非常广泛,例如日志收集、事件流处理、实时监控等。在这些场景中,Kafka 可以提供高可靠性和低延迟的数据传输,确保数据的稳定性和实时性。与此同时,Kafka 还提供了丰富的 API 和管理工具,使得用户可以方便地配置和管理 Kafka 集群。

很多高性能方案都会用到 Kafka,今天我来分享如何使用 Kafka Client API 进行 Kafka 生产者和消费者压测。

依赖

我用了 Gradle 创建的项目,依赖配置如下:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '3.4.0'

kafka 服务端

我本地用了 Kafka 最新版本:kafka_2.12-3.4.0,这个版本可以不依赖zookeeper,非常方便,用来本地功能验证和测试我是十分推荐的。基本做到了开箱即用。

具体的流程可以自行搜索。

生产者压测 Demo

在创建生产者时,会有不少的参数需要配置,这里建议使用默认的。或者使用待测试参数组合。下面是我自己的配置,常用的参数我都列了出来。具体参数含义,可以自行搜索,这方面资料还是很多的,下面直接进入压测用例环节。

package com.funtest.kafka

import com.funtester.frame.SourceCodeimport com.funtester.frame.execute.FunQpsConcurrentimport com.funtester.utils.StringUtilimport groovy.util.logging.Log4j2import org.apache.kafka.clients.producer.KafkaProducerimport org.apache.kafka.clients.producer.ProducerConfigimport org.apache.kafka.clients.producer.ProducerRecordimport org.apache.kafka.common.record.CompressionTypeimport org.apache.kafka.common.serialization.ByteArraySerializerimport org.apache.kafka.common.serialization.StringSerializer
@Log4j2class Produce extends SourceCode {
    static void main(String[] args) {        Properties properties = new Properties();        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); //所有分区副本都收到确认信息,才能确认写入        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384")        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name);        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);        def topic = "testkafka"        def test = {            producer.send(new ProducerRecord<>(topic, StringUtil.getString(10)))        }        new FunQpsConcurrent(test,"Kafka测试").start()
        producer.close();    }
}

复制代码

这里用到了动态 QPS 模型,最后的close()也可以不使用,毕竟 main 方法的代码结束了就真的结束了。

消费者

呼应生产者,消费者也有一堆需要配置的参数。这里先按下不表,有兴趣的可以自行学习。

Kafka 消费者有两种订阅消息的方式,分别是订阅模式和分配模式。

订阅模式是指消费者订阅一个或多个主题,然后自动分配分区进行消费。这种模式下,Kafka 会自动管理消费者与分区之间的关系,当有新的消费者加入或者退出消费组时,Kafka 会自动重新分配分区,保证每个消费者都能够获取到消息。

而分配模式则是由消费者主动向 Kafka 请求分配指定的分区进行消费。这种模式下,消费者需要手动管理分区与消费者之间的关系,需要注意的是,当有新的消费者加入或者退出消费组时,需要手动重新分配分区。

订阅模式相对于分配模式来说更加简单易用,但是分配模式可以更加灵活地控制消费者与分区之间的关系。所以我选择了订阅模式。

package com.funtest.kafka
import com.funtester.frame.SourceCodeimport com.funtester.frame.execute.FunQpsConcurrentimport org.apache.kafka.clients.consumer.ConsumerConfigimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.clients.consumer.ConsumerRecordsimport org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
class Cunsumer extends SourceCode {
    static void main(String[] args) {        KafkaConsumer<String, String> consumer;        Properties properties = new Properties();        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FunTester32");        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10000");        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG                , "earliest");        consumer = new KafkaConsumer<>(properties);
        String topic = "testkafka";//        TopicPartition topicPartition = new TopicPartition(topic, 0);//        List<TopicPartition> topics = Arrays.asList(topicPartition);//        consumer.assign(topics);//        consumer.seekToEnd(topics);//        long current = consumer.position(topicPartition);//        consumer.seek(topicPartition, current - 10);//手动设置偏移量        consumer.subscribe([topic])//订阅模式,不能与assign混用        while (true) {            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));            for (ConsumerRecord<String, String> record : records) {                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());            }            sleep(1.0)        }
        def test = {            consumer.poll(Duration.ofMillis(1000));        }        new FunQpsConcurrent(test,"Kafka消费").start()        consumer.close()
    }}

复制代码

由于本地机器原因,需要在服务器上启动一个 Kafka 服务,用来测试不同参数组合情况下 Kafka 的性能表现。后续有机会再来分享。

FunTester 原创专题推荐~

-- By FunTester

发布于: 刚刚阅读数: 4
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
Kafka性能测试初探_FunTester_InfoQ写作社区