写点什么

Golang 正确使用 kafka 的姿势 - 细节决定成败

用户头像
OpenIM
关注
发布于: 2 小时前
Golang正确使用kafka的姿势-细节决定成败
本文转自 跟我学 IM 后台开发作者 杰克.许 经 OpenIM 技术人员整理修订后发布。
写在前面

Open-IM 是由前微信技术专家打造的开源的即时通讯组件。Open-IM 包括 IM 服务端和客户端 SDK,实现了高性能、轻量级、易扩展等重要特性。开发者通过集成 Open-IM 组件,并私有化部署服务端,可以将即时通讯、实时网络能力快速集成到自身应用中,并确保业务数据的安全性和私密性。


Kafka 在 OpenIM 项目中承担重要的角色,感谢作者在使用 OpenIM 中发现的 bug(使用 Kafka 不当的 bug)

了解更多原创文章:

【OpenIM原创】开源OpenIM:轻量、高效、实时、可靠、低成本的消息模型


【OpenIM原创】C/C++调用golang函数,golang回调C/C++函数


【OpenIM原创】简单轻松入门 一文讲解WebRTC实现1对1音视频通信原理


【OpenIM扩展】OpenIM服务发现和负载均衡golang插件:gRPC接入etcdv3


【开源OpenIM】高性能、可伸缩、易扩展的即时通讯架构

如果您有兴趣可以在文章结尾了解到更多关于我们的信息,期待着与您的交流合作。

01 背景

在一些业务系统中,模块之间通过引入 Kafka 解耦,拿 IM 举例(图来源):



用户 A 给 B 发送消息,msg_gateway 收到消息后,投递消息到 Kafka 后就给 A 返回发送成功。这个时候,其实还没有持久化到 mysql 中,虽然最终会保持一致性。所以,试想如果 Kafka 丢消息了,是不是就出大问题了?A 认为给 B 发送消息成功了,但是在服务器内部消息丢失了 B 并没有收到。


所以,在使用 Kafka 的时候,有一些业务对消息丢失问题非常的关注。


同样,常见的问题还有:


  • 重复消费的问题。

  • 乱序的问题。


下面我们来一起看一下如何使用 sarama 包来解决这些问题。

02 Kafka 消息丢失问题描述

以下内容来源:


kafka 什么时候会丢消息:https://blog.csdn.net/qrne06/article/details/94225070


上面我们担心的点需要进一步明确一下丢消息的定义:kafka 集群中的部分或全部 broker 挂了,导致 consumer 没有及时收到消息,这不属于丢消息。broker 挂了,只要消息全部持久化到了硬盘上,重启 broker 集群之后,使消费者继续拉取消息,消息就没有丢失,仍然全量消费了。所以我的理解,所谓丢消息,意味着:开发人员未感知到哪些消息没有被消费。


作者把消息的丢失归纳了以下几种情况:


1) producer 把消息发送给 broker,因为网络抖动,消息没有到达 broker,且开发人员无感知。


解决方案:producer 设置 acks 参数,消息同步到 master 之后返回 ack 信号,否则抛异常使应用程序感知到并在业务中进行重试发送。这种方式一定程度保证了消息的可靠性,producer 等待 broker 确认信号的时延也不高。


2)producer 把消息发送给 broker-master,master 接收到消息,在未将消息同步给 follower 之前,挂掉了,且开发人员无感知。


解决方案:producer 设置 acks 参数,消息同步到 master 且同步到所有 follower 之后返回 ack 信号,否则抛异常使应用程序感知到并在业务中进行重试发送。这样设置,在更大程度上保证了消息的可靠性,缺点是 producer 等待 broker 确认信号的时延比较高。


3)producer 把消息发送给 broker-master,master 接收到消息,master 未成功将消息同步给每个 follower,有消息丢失风险。


解决方案:同上。


4)某个 broker 消息尚未从内存缓冲区持久化到磁盘,就挂掉了,这种情况无法通过 ack 机制感知。


解决方案:设置参数,加快消息持久化的频率,能在一定程度上减少这种情况发生的概率。但提高频率自然也会影响性能。


5)consumer 成功拉取到了消息,consumer 挂了。


解决方案:设置手动 sync,消费成功才提交


综上所述,集群/项目运转正常的情况下,kafka 不会丢消息。一旦集群出现问题,消息的可靠性无法完全保证。要想尽可能保证消息可靠,基本只能在发现消息有可能没有被消费时,重发消息来解决。所以在业务逻辑中,要考虑消息的重复消费问题,对于关键环节,要有幂等机制。


作者的几条建议:


1)如果一个业务很关键,使用 kafka 的时候要考虑丢消息的成本和解决方案。


2)producer 端确认消息是否到达集群,若有异常,进行重发。


3)consumer 端保障消费幂等性。


4)运维保障集群运转正常且高可用,保障网络状况良好。

03 生产端丢消息问题解决

上面说了,只需要把 producer 设置 acks 参数,等待 Kafka 所有 follower 都成功后再返回。我们只需要进行如下设置:


  • \1. config := sarama.NewConfig() 2. config.Producer.RequiredAcks = sarama.WaitForAll // -1


ack 参数有如下取值:


1. const (2. // NoResponse doesn't send any response, the TCP ACK is all you get. 3.   NoResponse RequiredAcks = 04. // WaitForLocal waits for only the local commit to succeed before         responding.    5. WaitForLocal RequiredAcks = 1   6. // WaitForAll waits for all in-sync replicas to commit before          responding.    7. // The minimum number of in-sync replicas is configured on the             broker    via   8. // the `min.insync.replicas` configuration key.    9. WaitForAll RequiredAcks = -110.  )
复制代码

04 消费端丢消息问题

通常消费端丢消息都是因为 Offset 自动提交了,但是数据并没有插入到 mysql(比如出现 BUG 或者进程 Crash),导致下一次消费者重启后,消息漏掉了,自然数据库中也查不到。这个时候,我们可以通过手动提交解决,甚至在一些复杂场景下,还要使用二阶段提交。


自动提交模式下的丢消息问题


默认情况下,sarama 是自动提交的方式,间隔为 1 秒钟


1.  // NewConfig returns a new configuration instance with sane                defaults.2. func NewConfig() *Config {  3. // …  4. c.Consumer.Offsets.AutoCommit.Enable = true. // 自动提交 5. c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 间隔 6. c.Consumer.Offsets.Initial = OffsetNewest 7. c.Consumer.Offsets.Retry.Max = 3 8.  // ...9.  }
复制代码


这里的自动提交,是基于被标记过的消息(sess.MarkMessage(msg, “"))


1. type exampleConsumerGroupHandler struct{}2. func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession)        error   { return nil }3. func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession)      error { return nil }4. func (h exampleConsumerGroupHandler) ConsumeClaim(sess                  ConsumerGroupSession, claim ConsumerGroupClaim) error {  5. for msg := range claim.Messages() {      6. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic,      msg.Partition, msg.Offset)      7. // 标记消息已处理,sarama会自动提交     8. sess.MarkMessage(msg, "") 9. }   10. return nil11. }
复制代码


如果不调用 sess.MarkMessage(msg, “"),即使启用了自动提交也没有效果,下次启动消费者会从上一次的 Offset 重新消费,我们不妨注释掉 sess.MarkMessage(msg, “"),然后打开 Offset Explorer 查看:



那么这样,我们就大概理解了 sarama 自动提交的原理:先标记再提交。我们只需要保持标记逻辑在插入 mysql 代码之后即可确保不会出现丢消息的问题:


正确的调用顺序:


1. func (h msgConsumerGroup) ConsumeClaim(sesssarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {   2. for msg := range claim.Messages() {3. // 插入mysql4. insertToMysql(msg)      5. // 正确:插入mysql成功后程序崩溃,下一次顶多重复消费一次,而不是因为Offset超         前,导致应用层消息丢失了     6.  sess.MarkMessage(msg, “") 7.  }  8.  return nil9.  }
复制代码


错误的顺序:


1. func (h msgConsumerGroup) ConsumeClaim(sess                           sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { 2. for msg := range claim.Messages() {     3. // 错误1:不能先标记,再插入mysql,可能标记的时候刚好自动提交Offset,但mysql插入失败了,导致下一次这个消息不会被消费,造成丢失      4. // 错误2:干脆忘记调用sess.MarkMessage(msg, “"),导致重复消费   5. sess.MarkMessage(msg, “")      6. // 插入mysql      7. insertToMysql(msg)  8.  }  9.  return nil10. }
复制代码


sarama 手动提交模式


当然,另外也可以通过手动提交来处理丢消息的问题,但是个人不推荐,因为自动提交模式下已经能解决丢消息问题。


1. consumerConfig := sarama.NewConfig()2. consumerConfig.Version = sarama.V2_8_0_0consumerConfig.3. Consumer.Return.Errors = falseconsumerConfig.4. Consumer.Offsets.AutoCommit.Enable = false  // 禁用自动提交,改为手动5. consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest6. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {   7. for msg := range claim.Messages() {      8. fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))      9. // 插入mysql     10. insertToMysql(msg)      11. // 手动提交模式下,也需要先进行标记     12. sess.MarkMessage(msg, "")      13. consumerCount++      14. if consumerCount%3 == 0 {         15. // 手动提交,不能频繁调用,耗时9ms左右,macOS i7 16GB         16. t1 := time.Now().Nanosecond()         17. sess.Commit()         18. t2 := time.Now().Nanosecond()         19.fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")      20. }   21. }   22. return nil23. }
复制代码


05 Kafka 消息顺序问题

投递 Kafka 之前,我们通过一次 gRPC 调用解决了消息序号的生成问题,但是这里其实还涉及一个消息顺序问题:订阅 Kafka 的消费者如何按照消息顺序写入 mysql,而不是随机写入呢?


我们知道,Kafka 的消息在一个 partition 中是有序的,所以只要确保发给某个人的消息都在同一个 partition 中即可。


1. 全局一个 partition


这个最简单,但是在 kafka 中一个 partition 对应一个线程,所以这种模型下 Kafka 的吞吐是个问题。


2. 多个 partition,手动指定


1. msg := &sarama.ProducerMessage{   2. Topic: “msgc2s",   3. Value: sarama.StringEncoder(“hello”),   4. Partition: toUserId % 10,5. }6. partition, offset, err := producer.SendMessage(msg)
复制代码


生产消息的时候,除了 Topic 和 Value,我们可以通过手动指定 partition,比如总共有 10 个分区,我们根据用户 ID 取余,这样发给同一个用户的消息,每次都到 1 个 partition 里面去了,消费者写入 mysql 中的时候,自然也是有序的。



但是,因为分区总数是写死的,万一 Kafka 的分区数要调整呢?那不得重新编译代码?所以这个方式不够优美。


3. 多个 partition,自动计算


kafka 客户端为我们提供了这种支持。首先,在初始化的时候,设置选择分区的策略为 Hash:


p.config.Producer.Partitioner = sarama.NewHashPartitioner
复制代码


然后,在生成消息之前,设置消息的 Key 值:


1. msg := &sarama.ProducerMessage{   2. Topic: "testAutoSyncOffset",   3. Value: sarama.StringEncoder("hello"),   4. Key: sarama.StringEncoder(strconv.Itoa(RecvID)),5. }
复制代码


Kafka 客户端会根据 Key 进行 Hash,我们通过把接收用户 ID 作为 Key,这样就能让所有发给某个人的消息落到同一个分区了,也就有序了。


4.扩展知识:多线程情况下一个 partition 的乱序处理


我们上面说了,Kafka 客户端针对一个 partition 开一个线程进行消费,如果处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟就只能处理几十条消息,这吞吐量太低了。这个时候,我们可能就把逻辑移动到其他线程里面去处理,这样的话,顺序就可能会乱。



我们可以通过写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。PS:就像 4 % 10 = 4,14 % 10 = 4,他们取余都是等于 4,所以落到了一个 partition,但是 key 值不一样啊,我们可以自己再取余,放到不同的 queue 里面。



06 重复消费和消息幂等


这篇文章中:


kafka 什么时候会丢消息:https://blog.csdn.net/qrne06/article/details/94225070


详细了描述了各种丢消息的情况,我们通过设置 RequiredAcks = sarama.WaitForAll(-1),可以解决生产端丢消息的问题。第六节中也对消费端丢消息进行了说明,只需要确保在插入数据库之后,调用 sess.MarkMessage(msg, "”) 即可。


如果出现了插入 Mysql 成功,但是因为自动提交有 1 秒的间隔,如果此时崩溃,下次启动消费者势必会对这 1 秒的数据进行重复消费,我们在应用层需要处理这个问题。


常见的有 2 种思路:


  1. 如果是存在 redis 中不需要持久化的数据,比如 string 类型,set 具有天然的幂等性,无需处理。

  2. 插入 mysql 之前,进行一次 query 操作,针对每个客户端发的消息,我们为它生成一个唯一的 ID(比如 GUID),或者直接把消息的 ID 设置为唯一索引。


第 2 个方案的难点在于,全局唯一 ID 的生成,理论上 GUID 也是存在重复的可能性的,如果是客户端生成,那么插入失败,怎么让客户端感知呢?


所以,这里我认为还是需要自定义 ID 生产,比如通过组合法:用户 ID + 当前时间 + 32 位 GUID,是不是几乎不会重复了呢(试想,1 个人发 1 亿条文本需要多少年。。。)?


07 完整代码实例


consumer.go


1. type msgConsumerGroup struct{}2. 3. func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error   { return nil }4. func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }5. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {   6. for msg := range claim.Messages() {      7. fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))8. 9. // 查mysql去重      10. if check(msg) {          11. // 插入mysql          12. insertToMysql()      13. }14.15. // 标记,sarama会自动进行提交,默认间隔1秒      16. sess.MarkMessage(msg, "")  17. }   18. return nil19. }20.21. func main(){    22. consumerConfig := sarama.NewConfig()    23. consumerConfig.Version = sarama.V2_8_0_0 // specify appropriate version    24. consumerConfig.Consumer.Return.Errors = false    25. //consumerConfig.Consumer.Offsets.AutoCommit.Enable = true      26. // 禁用自动提交,改为手动  //27. consumerConfig.Consumer.Offsets.AutoCommit.Interval = time.Second * 1 // 测试3秒自动提交    consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest28.29. cGroup, err := sarama.NewConsumerGroup([]string{"10.0.56.153:9092",    "10.0.56.153:9093", "10.0.56.153:9094"},"testgroup", consumerConfig)  30. if err != nil {       31. panic(err)   32. }33. 34. for {      35. err := cGroup.Consume(context.Background(), []string{"testAutoSyncOffset"}, consumerGroup)       36. if err != nil {         37. fmt.Println(err.Error())         38. break     39. }   40. }41. 42.  _ = cGroup.Close()43. }
复制代码


producer.go


1. func main(){    2. config := sarama.NewConfig()    3. config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有follower都回复ack,确保Kafka不会丢消息    4. config.Producer.Return.Successes = true    5. config.Producer.Partitioner = sarama.NewHashPartitioner6.7.  // 对Key进行Hash,同样的Key每次都落到一个分区,这样消息是有序的    // 使用同步producer,异步模式下有更高的性能,但是处理更复杂,这里建议先从简单的入手    8. producer, err := sarama.NewSyncProducer([]string{"10.0.56.153:9092"}, config)    9. defer func() {       10. _ = producer.Close()    11. }()    12. if err != nil {       13. panic(err.Error())   14. }15.16. msgCount := 4   17. // 模拟4个消息    18. for i := 0; i < msgCount; i++ {        19. rand.Seed(int64(time.Now().Nanosecond()))        20. msg := &sarama.ProducerMessage{          21. Topic: "testAutoSyncOffset",          22. Value: sarama.StringEncoder("hello+" + strconv.Itoa(rand.Int())),   23. Key:   sarama.StringEncoder("BBB”),        24. }25.26.  t1 := time.Now().Nanosecond()        27. partition, offset, err := producer.SendMessage(msg)        28. t2 := time.Now().Nanosecond()29.30. if err == nil {            31. fmt.Println("produce success, partition:", partition, ",offset:", offset, ",cost:", (t2-t1)/(1000*1000), " ms")        32. } else {           33. fmt.Println(err.Error())      34.      }   35.   }36.}
复制代码

结束

OpenIM github 开源地址:


https://github.com/OpenIMSDK/Open-IM-Server


OpenIM 官网 : https://www.rentsoft.cn


OpenIM 官方论坛: https://forum.rentsoft.cn/


我们致力于通过开源模式,为全球企业/开发者提供简单、易用、高效的 IM 服务和实时音视频通讯能力,帮助开发者降低项目的开发成本,并让开发者掌控业务的核心数据。


IM 作为核心业务数据,安全的重要性毋庸置疑,OpenIM 开源以及私有化部署让企业能更放心使用。


如今 IM 云服务商收费高企,如何让企业低成本、安全、可靠接入 IM 服务,是 OpenIM 的历史使命,也是我们前进的方向。


如您有技术上面的高见请到我们的论坛联系沟通,用户也可与我们的技术人员谈讨使用方面的难题以及见解

用户头像

OpenIM

关注

还未添加个人签名 2021.08.30 加入

还未添加个人简介

评论

发布
暂无评论
Golang正确使用kafka的姿势-细节决定成败