写点什么

微服务中台技术解析之项目环境隔离

用户头像
小江
关注
发布于: 2021 年 04 月 13 日
微服务中台技术解析之项目环境隔离

背景

在项目迭代过程中,我们经常遇到需要定制调用链路的情况,比如,参与下单优化项目的应用中,下单请求希望调用到参与该项目的支付应用接口,而不是基线接口。在这种情况下,通常是用项目环境隔离的方式解决。


在实践中,我们采用了项目标签(或者环境标签)的概念来解决。参与同一个项目的各个应用都使用同一个项目标签,而调用链路会根据项目标签进行路由和转发,保证带有项目标签的请求都落在有标签的机器上。


下面我们会依次介绍几个项目环境隔离的典型案例和具体实现原理,包括 dubbo 项目环境隔离、网关项目环境隔离、kafka 项目环境隔离,希望对大家有所帮助。

dubbo 项目环境隔离

调用链路图

dubbo 调用的项目环境隔离是指在发起调用时,优先选择具有相同项目标签的服务提供方,如下图所示:

dubbo项目环境隔离示意图

在上图中,绿色线条链路表示 consumer 收到一个带有 tag1 的请求,在处理这个请求时,需要发起 rpc 调用,那么它会优先选择 tag1 provider,如果找不到,再选择基线提供者。


蓝色线条链路表示 consumer 收到一个不带项目标签的请求,但是 consumer 本身带有项目标签 tag2(附加在启动参数上),那么在处理该请求需要发起 rpc 调用时,它会优先选择 tag2 provider,如果找不到,再选择基线提供者。


而黑色线条链路表示,不带项目标签的 consumer 收到了不带项目标签的请求,那么它将选择基线提供者进行 rpc 调用。

实现原理

dubbo 项目环境隔离是基于 dubbo 提供的标签路由功能(TagRouter)实现的,dubbo 标签路由基本思路是从候选提供者列表中选择跟dubbo.tag (或者 dubbo.provider.tag) 相同 tag 的提供者,如果没有相同 tag 的提供者,再依据是否强制使用 tag 决定是否使用基线提供者作为兜底。代码逻辑如下:

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {        // filter        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
// Dynamic param String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);
if (StringUtils.isEmpty(tag)){ String configTag = ConfigUtils.getProperties().getProperty("dubbo.provider.tag"); if (!StringUtils.isEmpty(configTag)){ RpcContext.getContext().setAttachment(Constants.TAG_KEY, configTag); tag = configTag; } }
// Tag request if (!StringUtils.isEmpty(tag)) { // Select tag invokers first for (Invoker<T> invoker : invokers) { // if clusterInvoker is used. we don't want to use tag to filter cluster. if (invoker.getUrl().getProtocol().equals("zookeeper") || tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) { result.add(invoker); } } } // If Constants.REQUEST_TAG_KEY unspecified or no invoker be selected, downgrade to normal invokers if (result.isEmpty()) { // Only forceTag = true force match, otherwise downgrade String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG); if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) { for (Invoker<T> invoker : invokers) { // if clusterInvoker is used. we don't want to use tag to filter cluster. if (invoker.getUrl().getProtocol().equals("zookeeper") || StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) { result.add(invoker); } } } } return result; }
复制代码


在具体实现中,为了支持从上游请求中获取项目标签以及从 consumer 机器自身获取项目标签,我们扩展实现了 dubbo 的 RouterFactory SPI,并且优先级在 TagRouter 之前:

public class TagFixRouter extends AbstractRouter {
/** * make sure this router is ahead of default tag router, see com.alibaba.dubbo.rpc.cluster.router.tag.TagRouter */ private static final int DEFAULT_PRIORITY = 90;
/** * 是否强制dubbo tag过滤生效,若集群中不存在与请求tag对应的服务,默认将降级请求tag为空的provider. */ private static final String DUBBO_FORCE_TAG_ENABLE = "dubbo.force.tag.enable";
public TagFixRouter(URL url) { this.priority = url.getParameter(Constants.PRIORITY_KEY, DEFAULT_PRIORITY); // 每次请求的时候均需要调用。 this.url = url.addParameter(Constants.RUNTIME_KEY, "true"); }
@Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { // 这里会优先获取请求中携带的项目标签,其次才是机器自身配置的项目标签 String tag = ProjectEnvUtil.calcCurrentProjectEnv(); if (!StringUtils.isEmpty(tag)){ RpcContext.getContext().setAttachment(Constants.TAG_KEY, tag); String dubboForceTag = System.getProperty(DUBBO_FORCE_TAG_ENABLE,"false"); if(Boolean.parseBoolean(dubboForceTag)){ /*在没有匹配标签的provider情况下,默认的规则会选择标签为空的provider,在某些情况下(项目环境开发联调等)会导致bug无法及时发现,需要强制标签匹配生效*/ RpcContext.getContext().setAttachment(Constants.FORCE_USE_TAG, "true"); } } return invokers; }
}

public static String calcCurrentProjectEnv() { // requestTag 在Filter中设置 String requestTag = ProjectEnvUtil.getRequestProjectEnv(); // configTag 在应用启动时候设置 String configTag = ProjectEnvUtil.getProjectEnv();
String tag = ""; if (requestTag != null && !"".equals(requestTag)){ tag = requestTag; }else if (configTag != null && !"".equals(configTag)){ tag = configTag; } return tag; }
复制代码


到这里,我们似乎解决了 dubbo 项目环境隔离的问题,但还有一种情况需要考虑,那就是 dubbo provider 本身也作为 consumer 调用其他服务,那么在 provider 收到带项目标签的请求时,如何正确地路由到下游相同标签的 provider,让这个链路一次进行下去呢?

为了解决这个问题,我们扩展实现了 dubbo Filter SPI,在 provider 侧进行拦截,将上游的 tag 暂存下来,处理完成本次请求后再销毁。代码逻辑如下:

@Slf4j@Activate(group = {Constants.PROVIDER_SIDE})public class DubboTagContextFilter implements Filter {
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { try { // 获取上游的项目标签 String tagFromCaller = RpcContext.getContext().getAttachment(Constants.TAG_KEY); if(log.isDebugEnabled()){ log.debug("[club-boot dubbo tag filter] tagFromCaller = {}, invoker = {}, invocation = {}",tagFromCaller,invoker,invocation); } if (tagFromCaller != null && !"".equals(tagFromCaller)) { // 设置在ThreadLocal变量中 ProjectEnvUtil.setRequestProjectEnv(tagFromCaller); }
return invoker.invoke(invocation);
}finally { ProjectEnvUtil.clearRequestProjectEnv(); } }
}
复制代码


注意,这里我们使用了线程局部变量 ThreadLocal 来暂存本次请求相关的信息(项目标签是其中一项),所以不支持异步线程池的项目环境请求。

网关项目环境隔离

关于网关的实践可以参看此前的文章(微服务中台技术解析之网关实践),网关项目环境隔离是指网关可以根据请求所带项目标签自动路由到相应服务,如下图所示:

网关项目环境调用链路示意图


实现原理

有了上节 dubbo 项目环境隔离的支持,网关侧实现项目环境调用链路则简单很多。基本思路是,根据 http 请求中的 header 或者 cookie 携带的项目标签(统一约定,e.g. project-env),在发起 dubbo 泛化调用前设置到 attachment 中即可。

	public static final List<String> PROJECT_TAG_ROUTE = Lists.newArrayList("project-env", "project.env", "project_env");	
public void setDubboRouteTag(GWRequest request){
String env = getProjectEnv(request);
if (!"".equals(env)){ RpcContext.getContext().setAttachment(Constants.TAG_KEY, env); }
}
public String getProjectEnv(GWRequest request){ for (String tag: PROJECT_TAG_ROUTE){ Collection<String> tags = request.getHeaders().get(tag); if (tags != null && tags.size() > 0 ){ return String.join( ",", tags); } }
for (String tag: PROJECT_TAG_ROUTE) { Collection<String> tags = request.getCookies().get(tag); if (tags != null && tags.size() > 0) { return String.join(",", tags); } } return ""; }
复制代码

kafka 项目环境隔离

背景

公司内部我们使用 kafka 作为消息中间件,在进行项目开发测试时,常会遇到这样的诉求:项目环境的机器只接收带有项目标签的消息,基线机器可以消费不带项目标签的消息,在项目环境机器不存在时,为了保证测试链路正常,基线机器也可以消费带项目标签的消息。

总体架构图

kafka项目环境隔离示意图

上图展示了三种调用链路:

case 1 : 基础生产者发送消息,存在基础消费者进行消费

case 2 : 带 tag1 的生产者发送消息,存在 tag1 的消费者进行消费

case 3 : 带 tag2 的生产者发送消息,不存在对应 tag2 的消费者,故由基础消费者进行消费

实现原理

要实现 kafka 消息隔离,重点要解决几个问题:

  • 如何识别带项目标签的消费者

  • 如何保证消息投递到项目环境机器

  • 消息如何带上项目标签以及如何验证项目标签


先看识别项目标签消费者的问题。

看过前面两节可以知道,我们可以从上游请求中或者应用启动时候的配置参数中获取项目标签,那么在 kafka 消费者初始化成功后就可以上传 topic 与消费者 tag 的映射关系,如上面示意图中的绿色方框所示,代码实现如下:

public class ClubBootKafkaListener {
@EventListener public void registerKafkaConsumerTagListener(ConsumerStartedEvent event){ GenericMessageListenerContainer listenerContainer = event.getSource(GenericMessageListenerContainer.class); ContainerProperties containerProperties = listenerContainer.getContainerProperties(); String[] consumedTopics = containerProperties.getTopics(); if(consumedTopics == null){ return; } for(String topic : consumedTopics){ KafkaConsumerRegistry.getInstance().registerTagConsumer(topic, KafkaConfigUtil.getProjectEnv()); } }
}
复制代码


接着看如何保证消息投递到项目环境机器。

我们知道 kafka 以 group.id 作为消费组唯一标示,每个消息只会往 group.id 投递一次,所以项目环境的 group.id 要跟基线环境区分开。为此,我们将应用配置的 group.id 加上项目标签作为项目环境下的 group.id ( e.g. mykafkaGroup@tag1 ),保证消息始终投递一份到项目环境。在实践中这个 group.id 的修改是自动进行的(由统一接入框架中的 kafka 自动配置完成,在消费初始化前进行),不需要业务方修改配置。


最后看如何让消息带上标签并且验证标签。

我们知道 kafka 提供了拦截机制,在消息发送前(ProducerInterceptor)和消费前进行拦截(ConsumerInterceptor),那么我们只需要在发送前拦截附加项目标签,在消费前拦截验证项目标签即可。代码如下:

public class TagProducerInterceptor implements ProducerInterceptor<String,Object> {

/** * 项目标签不为空,则添加 header(tag=xxx) * * @param record * @return */ @Override public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) { String tag = ProjectEnvUtil.calcCurrentProjectEnv(); if(log.isDebugEnabled()){ log.debug("生产端,项目环境tag="+tag); } if (!StringUtils.isEmpty(tag)) { Headers headers = record.headers(); // 消息头附件项目标签 headers.add(KafkaEnvConstant.KAFKA_INTERCEPT_TAG_NAME, tag.getBytes(StandardCharsets.UTF_8)); } return record; }}

public class TagConsumerInterceptor implements ConsumerInterceptor<String, Object> {
/** * 是否需要过滤消息:线上环境不需要拦截 */ private boolean needIntercept(){ return KafkaConfigUtil.isTestEnv() && Boolean.parseBoolean(KafkaConfigUtil.getProperty(KafkaConfigUtil.PROJECT_CONSUME, "true")); }
@Override public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> consumerRecords) { if (!needIntercept()){ if(log.isDebugEnabled()){ log.debug("客户端[" + Thread.currentThread().getName() + "]消费拦截器静默"); } return consumerRecords; }
try { String envTag = ProjectEnvUtil.calcCurrentProjectEnv(); Map<TopicPartition, List<ConsumerRecord<String, Object>>> filterMapRecords = new HashMap<>(8); Set<TopicPartition> topicPartitions = consumerRecords.partitions(); for (TopicPartition topicPartition : topicPartitions) { List<ConsumerRecord<String, Object>> filterRecords = new ArrayList<>(); for (ConsumerRecord<String, Object> record : consumerRecords.records(topicPartition)) { Headers headers = record.headers(); Header header = headers.lastHeader(KafkaEnvConstant.KAFKA_INTERCEPT_TAG_NAME); String msgTag = null; if(header != null){ msgTag = new String(header.value(), StandardCharsets.UTF_8); } if(log.isDebugEnabled()){ log.debug("消费端,msg标签tag={}, 项目环境tag={}",msgTag,envTag); } //基础环境且消息头不带标 或者 项目环境标签和消息头标签相等 if(StringUtils.isEmpty(envTag) && header == null || envTag.equalsIgnoreCase(msgTag)){ filterRecords.add(record); } else if(StringUtils.isEmpty(envTag) && header != null){ //基础环境且该msgTag无注册消费者 if( !KafkaConsumerRegistry.getInstance().existConsumerByTag(record.topic(),msgTag)){ filterRecords.add(record); } } } if (filterRecords.size() > 0) { filterMapRecords.put(topicPartition, filterRecords); } } if(log.isDebugEnabled()){ log.debug("[kafka消费端拦截器]kafka consumer tag filter result, original = {}, filtered = {}", Lists.newArrayList(consumerRecords.iterator()),filterMapRecords); } return new ConsumerRecords<>(filterMapRecords); } catch (Exception e) { log.error("intercept kafka record encounter exception.",e); } return consumerRecords; }}
复制代码


上面源码中,略加复杂的是验证项目标签逻辑,我们具体解释下。首先获取项目环境标签(来自上游请求或者机器启动参数),其次对每个消息,抽取其消息头中的项目标签,然后进行判断:

  • 消息不带标签,且机器为基础环境,允许消费,对应上面的 case 1

  • 消息带标签,且机器为项目环境,二者相等,允许消费,对应上面的 case 2

  • 消息带标签,且机器且基础环境,但是此时没有注册的对应标签的消费者,则允许消费,对应上面的 case 3


至此,我们将实现 kafka 消息隔离所涉及的问题解决完毕。

但需要注意的一点是,在消息拦截后,被过滤掉的消息不会提交 offset,因此会使得项目标签消费组的 lag 不一定总是 0,e.g. 生产者向某个 topic 发送了 100 个消息,其中只有 30 个是带标签的,那么项目标签消费组就只会消费到 30 个消息,lag 为 70。


总结

项目环境隔离是在研发迭代中是一项非常重要的功能,如何将常用组件串联起完整的项目调用链路也是比较有挑战的工作。本文介绍了在项目开发中涉及环境隔离的几个常用点,包括 dubbo 项目环境隔离,网关项目环境隔离,kafka 项目环境隔离,并介绍了以项目标签为核心的的设计方案。

如有疑问,欢迎共同探讨。


用户头像

小江

关注

~做一个安静的码男子~ 2019.02.11 加入

软件工程师,目前在电商公司做研发效能平台,中间件维护开发相关工作

评论

发布
暂无评论
微服务中台技术解析之项目环境隔离