【kafka】kafka 的服务复用与隔离设计方案
作者:石臻臻,CSDN 博客之星 Top5、Kafka Contributor、nacos Contributor、华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家、 KnowStreaming。
KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源! 。
这篇文章主要讲一下 kafka 的服务复用与隔离;主要解决的问题是,在多个迭代环境下; 让消息的提供者和消费者都能正确的发出和消费;这个比 dubbo 的服务路由与隔离更复杂一点
11.问题描述
概念说明:稳定版本: ABC 属于全局共用的一套稳定服务;迭代版本: A1 C1 C2 属于他们对应系统的迭代版本, 比如针对 A 系统进行需求改动,部署一套新的迭代服务 A1;
要求: mq 提供者服务提供出去的消息尽量让 相同版本的消费者进行消费;
1.1. 入口是稳定服务
上图, 假设入口是 稳定服务 A ,发出消息; 那么消息链路中互相消费的就是 ABC ;跟迭代版本没啥事
1.2.入口是迭代服务
在这里插入图片描述
上图,假设入口是 迭代服务 A1 发出消息; 则整个链路中尽量让相同迭代版本的服务去消费;
A1 发消息 A1 发了消息; 找 B 系统发现只有稳定的 B,没有迭代版本,那么就让 B 消费;A1 发了消息;C 也是有订阅的,然后发现 C 系统有迭代 C1,跟 A1 版本相同,则让 C1 消费; C 和 C2 都不消费;
B 发消息 B 消费了 A1 过来的消息后也发出了消息; A 系统有消费,那么这个时候 B 发出的消息应该让 A1 消费而不是 A;同理, 也应该是 C1 消费而不是 C 或者 C2
C1 发消息 C1 发消息 让 A1 消费;C1 发消息 让 B 消费;
1.3.dubbo 服务传入迭代版本
上图 D1 调用了 B 的 dubbo 接口并且传递了版本号; B 此时发出消息也是属于迭代消息; 跟 2 一样;
22.解决方案
我们在之前的文章中有讲解如何 在 dubbo 中实现这样的功能; 通过 spi 给 dubbo 重新根据 version 来进行路由;
但是在 kafka 中,并没有这消费者路由这么一回事,那么也就无法控制哪个服务去消费这条消息;
那么下面,我给出自己的一些解决方案,如果觉得有问题,欢迎批评指正;
设计方案:
方案关键步骤:
消息发送的时候,在 Header 上加上 Version 信息
发送消息 将消息发 2 条出去,消息体相同,但是 Topic 不同; 迭代消息的 Topic 加上前缀 VERSION:对应的版本_
迭代服务启动的时候用 javaagent 修改所有监听的 Topic; 加上前缀 VERSION:对应的版本_
迭代服务消费对应的迭代消息
稳定服务 是否需要消费消息 需要判断当前消息 Header 不携带 Version 则直接消费当前消息 Header 携带 Version,再判断是否有对应的迭代服务存在;有则不消费,无则直接消费 6. 消费消息时,需要把 Version 保存到 ThreadLocal 中; 以便进行链路流转 7. 使用 ThreadLocal 的时候,在线程池的情况下,值传递会有问题. 解决方案 用 javaagent 方式使用 TransmittableThreadLocal8. 全程代码 0 侵入;kafka 的两个拦截器的和配置 都通过 Javaagent 来就行增强
如何判断迭代服务是否存在
上面的设计方案中,在kafka consumner 拦截器
判断是否需要消费的时候 写了两种方式
1. 方式一:获取当前消息的消费组currentGroupId = KafkaUtils.getConsumerGroupId()
获取所有消费组adminClient.listConsumerGroups()
然后再所有消费组中查找有没有 VERSION:1_currentGroupId 的消费组;如果有,则说明该消息会被迭代服务进行消费. 稳定环境就不用消费了;当前还有一部不可少,就是如何让迭代服务的 所有消费组名都加上前缀当然还是通过javaagent 去增强咯
, 找到合适修改点,修改掉消费组名
;合适的修改点自然是配置消费消费组名的地方; 有统一的消费组名; 每个 Listener 也可以配置单独的消费组名;找到Listener注解
就行增强;缺点: 这种方式有一个缺点就是 如果迭代服务刚好宕机了那么 消息就会问稳定服务消费了;
2.方式二(推荐)读取一个外部配置,这个配置维护了哪个服务是有迭代服务的;这样就很方便了;缺点: 就是需要维护这么一个配置优点: 规避了方式一的缺点; 也不需要用 javaagent 去修改消费组名称;
33.需要注意的问题
我们在传递 version 的时候,入口一般都是 http 接口;但是如果入口不是 http,是系统内部呢,那这样外面的版本信息就传不进来了;
说一个在出行行业 的情景 A: 是叫单服务 B: 是派单服务 C: 是订单/司机服务
在一个需求中, A C 都有改动; B 没有改动; 就有迭代服务 A1 C1;假设他们使用 MQ 交流的;我们期望的是下面流转 A1 ---->B----->C1
但是 A1 告诉了 B 有订单进来了, B 会把 A1 给的信息存到 redis 中; B 有一个线程一直在不停从 redis 中捞取数据进行和司机的匹配;匹配成功了之后 再发消息出去 匹配成功了;B 的这条链路就断了; B 存 redis 之后,就没有下一步操作了, ThreadLocal
中的 version 也就没有了; B 的匹配线程获取到的是 稳定版本;自然匹配成功发出去的消息就是 稳定消息;那么接收到的不是 C1 而是 C 了;
如何解决这类型的问题;
这种情况就应该将 B 也弄一个迭代版本 B1;那么流转路径就是 A1-B1-C1 ;这样就是正确的了;
还要注意: DB 隔离;
版权声明: 本文为 InfoQ 作者【石臻臻的杂货铺】的原创文章。
原文链接:【http://xie.infoq.cn/article/790483f0918f8a06bc892a2b6】。未经作者许可,禁止转载。
评论