写点什么

Kafka 测试初探【Go】

作者:FunTester
  • 2023-05-25
    北京
  • 本文字数:2890 字

    阅读完需:约 9 分钟

上周分享了Kafka性能测试初探的 Java 版本,有读者留言说太简单,内容比较水。这里澄清一下,是我学得比较水。文章定位就是一篇使用 Java 语言的 Kafka Client 客户端进行简单操作演示,然后模拟一下简单场景的性能测试。其中深入学习 Kafka 的可以随处搜到很权威实用的资料,有深入学习需求的可以自行寻找。


好久没有写 Go 了,这才突然觉察到,又重新复习了一波 Go 语言的基础语法。顺带着之前留下的好习惯,每个学习的框架和工具都用 Java 和 Go 写一遍。这次也分享一下 Go 语言的 Kafka 基础入门,以及生产者的简单测试场景。


我用的是 shopify 出的 sarama,依赖如下github.com/Shopify/sarama v1.38.1。在搜资料的过程中,还发现有使用其他客户端的,选择挺多。

Kafka 配置

Sarama 框架中的生产者和消费者的配置类是一个,不太清楚这么设计的意图,两个配置重合度并不高,在 Sarama 中也是分开配置,但使用了同一个配置类。


生产者配置:


  config := sarama.NewConfig()  config.Producer.Return.Successes = true  config.Producer.Return.Errors = true  config.Producer.RequiredAcks = sarama.NoResponse  config.Producer.Compression = sarama.CompressionLZ4  config.Producer.Timeout = time.Duration(50) * time.Millisecond  config.Producer.Retry.Max = 3
复制代码


消费者配置:


  config := sarama.NewConfig()  config.Consumer.Offsets.AutoCommit.Enable = true  config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second  config.Consumer.Offsets.Initial = sarama.OffsetOldest  config.Consumer.Offsets.Retry.Max = 3
复制代码


这里只选择部分参数,详细的配置项和注释都可以在源码中找到,Sarama 的一个好处就是注释非常全,甚至不用看官方 API 文档。

生产者

下面是生产者的代码,相比较 Java 来说,我这里增加了 header 的实践,其实 Java 也是支持的,只是当时学的时候漏掉了这个知识点。


producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)  if err != nil {    log.Fatal(err)    return  }  defer func() {    // 关闭生产者    if err = producer.Close(); err != nil {      log.Fatal(err)      return    }  }()  // 定义需要发送的消息  headers := []sarama.RecordHeader{sarama.RecordHeader{    Key:   []byte("funtest"),    Value: []byte("have fun ~"),  }}
msg := &sarama.ProducerMessage{ Topic: "topic_test", Key: sarama.StringEncoder("test"), Value: sarama.StringEncoder("ddddddddddddddddd"), Headers: headers, } // 发送消息,并获取该消息的分片、偏移量 for i := 0; i < 100; i++ { ftool.Sleep(1000) partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatal(err) } fmt.Printf("partition:%d offset:%d\n", partition, offset) }
复制代码


这里官方给的实践代码中感觉ProducerMessage类似于 Java 的org.apache.kafka.clients.producer.ProducerRecord#ProducerRecord,也是可以指定 partitionid 和时间戳,以及单独设置 retries 次数的。还有一个比较重要的类AsyncProducer,暂时不探索了。

消费者

消费者使用上 Go 和 Java 差异比较大,Sarama 用了 channel 的概念,可以一直不停止从服务端获取消息对象,不像 Java 可以指定一次接受的消息数量,单次最大等待时间等。盲猜这里 channel 的性能太好了吧,不需要复杂设计也能满足需求。


  consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)  if err != nil {    fmt.Printf("fail to start consumer, err:%v\n", err)    return  }  topic := "topic_test"  partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区  if err != nil {    fmt.Printf("fail to get list of partition:err%v\n", err)    return  }  fmt.Println(partitionList)  defer consumer.Close()  for partition := range partitionList { // 遍历所有的分区    // 针对每个分区创建一个对应的分区消费者    log.Println(partition)    pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)    if err != nil {      fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)    }    for msg := range pc.Messages() {      log.Println(string(msg.Value))      //log.Println(string(msg.Headers[0].Value))    }    for {      msg := <-pc.Messages()      log.Println(string(msg.Value))    }  }
复制代码


后来我写了两种接受方式,其实都是阻塞的,如果是性能测试的时候可以使用 Go 中的go关键字起 routine 来执行。

性能测试

我这之展示性能测试简单的例子,即生产者不停地往 Kafka 发消息的 Demo,复用了上文中的生产者代码,下面只展示执行部分。


execute.ExecuteRoutineTimes(func() {  _, _, _ := producer.SendMessage(msg)}, 100, 10)
复制代码


执行方法 ExecuteRoutineTimes 是我写的一个基于线程模型的压测执行方法,内容如下:



// ExecuteRoutineTimes// @Description: FunTester性能测试执行框架// @param fun 待执行方法// @param times 次数// @param routine 线程数func ExecuteRoutineTimes(fun func(), times, routine int) { c := make(chan int) //确认所有线程都结束 key := false //用于控制所有线程一起结束 start := ftool.Milli() for i := 0; i < routine; i++ { go func() { sum := 0 for i := 0; i < times; i++ { if key { break } fun() sum++ } key = true c <- sum }() } total := 0 for i := 0; i < routine; i++ { num := <-c total += num } end := ftool.Milli() diff := end - start //total := thread * times log.Printf("总耗时: %f", float64(diff)/1000)
log.Printf("请求总数: %d", total) log.Printf("QPS: %f", float64(total)/float64(diff)*1000.0)}
复制代码


总结起来,相比 Java,Go 语言相对简单一些。如果习惯了 Go 语言的习惯,对于做测试来说上手要比 Java 快一些。再买个坑,改天测试一下两者之间的性能差异。理论上 Go 要比 Java 好一些。


Sarama 是一个用于 Apache Kafka 的 Go 语言库。Kafka 是一个分布式流处理平台,它可以处理大规模的数据流,并将其发布到主题中,供其他应用程序使用。Sarama 库允许 Go 应用程序与 Kafka 集群进行通信。它支持多个版本的 Kafka 协议,并提供了生产者和消费者 API,以便应用程序可以轻松地将消息发布到 Kafka 主题或从中读取消息。Sarama 还提供了一些有用的工具,如分区选择器和负载平衡器,以帮助开发人员更好地管理 Kafka 消费者。


发布于: 2023-05-25阅读数: 2
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
Kafka测试初探【Go】_FunTester_InfoQ写作社区