写点什么

AREX Agent 源码解读之全链路跟踪和 Mock 数据读写

  • 2023-05-16
    上海
  • 本文字数:9145 字

    阅读完需:约 30 分钟

AREX 是一款开源的自动化测试工具,通过 Java Agent 字节码注入技术,在生产环境录制和存储请求、应答数据,随后在测试环境回放请求和注入 Mock 数据,存储新的应答,以此来达到自动录制、自动回放、自动比对,为接口回归测试提供便利。在进行数据采集时,同一个请求,会采集下来多条数据(如 Request/Response、其它服务调用的请求响应等),AREX 通过链路跟踪将这些数据串联起来,并做为一个完整的测试用例。本文将深入解读 AREX Agent 中关于全链路跟踪和 Mock 数据读写的源码。


AREX 的链路跟踪类似于 OpenTelemetry,下面就先简单介绍一下 OpenTelemetry 中如何实现全链路跟踪。

OpenTelemetry 的全链路跟踪的实现

OpenTelemetry 是一种用于分布式系统的开源观测性工具,其全链路跟踪的实现依赖于上下文(Context)传播机制,


数据传播按照场景分为进程内传播和分布式传播两类。在进程内传播中,上下文对象在一个服务内部传递 Trace,相对比较简单。而在分布式传播中,Context propagation 在不同的服务之间传递上下文信息。

上下文(Context)

在 OpenTelemetry 中,context 是一个包含键值对的数据结构,例如线程或循环程序,用于在请求处理过程中传递数据。在每种编程语言中,OpenTelemetry 都提供了一个上下文对象,例如在 Java 中,OpenTelemetry 使用 ThreadLocal 来存储上下文;在 Go 中,OpenTelemetry 使用 context 包来存储上下文;在 Node.js 中,OpenTelemetry 使用 async_hooks 包来存储上下文;在 Python 中,OpenTelemetry 使用 threading.local 来存储上下文。在 C++ 中,OpenTelemetry 使用 Boost.Context 库来实现上下文的管理。


Context 可用于存储跟踪(Tracing)、日志(Logging)和指标(Metrics)数据等信号,并且可以通过 API 进行访问,通过调用这些 API 可以访问整个上下文对象,这意味着 Tracing、Logging 和 Metrics 信号是相互集成的,在整个上下文中共享数据。例如,如果同时启用了 Tracing 和 Metrics 信号,记录一个 Metrics 可以自动创建一个 Tracing 范例。Logging 也是如此:如果有的话,Logging 会自动绑定到当前的 Tracing。

Context propagation

进程内传播可以是隐式的或显式的,具体取决于所使用的编程语言。隐式传播是通过将活动上下文存储在线程局部变量(Java、Python、Ruby、NodeJS)中自动完成的。显式传播需要显式地将活动上下文作为参数从一个函数传递到另一个函数 (Go)。


对于分布式传播的跟踪,tracer 会为第一个请求生成一个唯一的 transaction id,并将其添加到请求的上下文中。在后续的请求中,可以通过这个上下文来获取 transaction id,并用于关联整个请求链路。对于数据库访问的关联,可以使用 OpenTelemetry 提供的数据库集成库,例如 OpenTelemetry Java Instrumentation 中的 JDBC 集成库。这个集成库会自动将 transaction id 添加到数据库请求中,并将数据库访问的信息添加到 span 中,以便更好地理解整个请求链路的性能和行为。

AREX 实现的链路跟踪

ArexThreadLocal

ArexThreadLocal 是 AREX 的存储 Context 的基础类,继承 InheritableThreadLocal 类,用于存储 Tracing、Logging 和 Metrics 信号等数据。


public class ArexThreadLocal<T> extends InheritableThreadLocal<T> {}
复制代码


InheritableThreadLocal 是 Java 中的一个线程本地存储类,它允许子线程继承父线程的线程本地变量。与普通的 ThreadLocal 不同,InheritableThreadLocal 可以在子线程中访问父线程中设置的线程本地变量。


InheritableThreadLocal 的特点包括:


  1. 可以在子线程中访问父线程中设置的线程本地变量,这对于需要在多个线程之间共享数据的场景非常有用。

  2. InheritableThreadLocal 是线程安全的,多个线程可以同时访问同一个 InheritableThreadLocal 实例中的线程本地变量,而不会出现线程安全问题。

  3. InheritableThreadLocal 可以被继承,子线程可以继承父线程中设置的 InheritableThreadLocal 实例中的线程本地变量。这个特性可以让子线程继承父线程中的上下文信息,从而更方便地进行任务处理。


需要注意的是,InheritableThreadLocal 的使用需要谨慎,因为它可能会导致内存泄漏问题。如果在 InheritableThreadLocal 中存储的对象没有及时清理,那么这些对象会一直存在于内存中,直到应用程序退出。


因此,在使用 InheritableThreadLocal 时,需要注意及时清理其中存储的对象,以避免内存泄漏问题。

TraceContextManager

TraceContextManager 是 AREX 跟踪上下文的管理对象,其中包含了一个静态变量 TRACE_CONTEXT 的对象 (ArexThreadLocal),用于存储和读取 TraceID。同时,TraceContextManager 还包含了 IDGenerator,用于生成以"AREX-"为前缀的 ID。


通过 TraceContextManager 对静态变量 TRACE_CONTEXT 进行设置操作,可以理解为 Tracing 的入口,这样可以设置 TransactionID,进行上下文的跟踪。

ContextManager

划重点,这里要特别注意两个函数,currentContext() 这个是内部依赖的调用,currentContext(boolean createIfAbsent, String caseId) 是回放的调用,入口录制的调用。这两个函数是理解代码的重点。


public class ContextManager {...    public static ArexContext currentContext() {        return currentContext(false, null);    }     /**     * agent will call this method     */    public static ArexContext currentContext(boolean createIfAbsent, String caseId) {        // replay scene        if (StringUtil.isNotEmpty(caseId)) {            TraceContextManager.set(caseId);            ArexContext context = ArexContext.of(caseId, TraceContextManager.generateId());            // Each replay init generates the latest context(maybe exist previous recorded context)            RECORD_MAP.put(caseId, context);            return context;        } ...
复制代码


ContextManager 调用 Set


当 ContextManager 中 currentContext() 函数被调用时,如果传入的 caseID 不为空,则表示当前是回放场景,并设置 set(caseID)。


当调用 currentContext() 函数时,如果传入的 caseID 为空,则会调用 TRACE_CONTEXT.get() ,如果获取不到,则调用如下代码,通过 IDGenerator 生成一个新的 transactionID,并存储到 ThreadLocal 中,表示当前场景为录制(record)场景。


messageId = idGenerator.next();TRACE_CONTEXT.set(messageId);
复制代码


最后,生成的 transactionID 和对应的 ArexContext 会被存储到 ConcurrentHashMap 中,以 transactionID 为 key,ArexContext 为 value,供后续调用时使用。ContextManager 管理所有的 ArexContext,将它们存储在 Map 中,Key 是 caseID。


public class ContextManager {    public static Map<String, ArexContext> RECORD_MAP = new ConcurrentHashMap<>();}...
复制代码


其中 ArexContext 存储 caseID,replayID 等信息


public class ArexContext {     private final String caseId;    private final String replayId;    private final long createTime;    private final SequenceProvider sequence;    private final List<Integer> methodSignatureHashList = new ArrayList<>();    private final Map<String, Object> cachedReplayResultMap = new ConcurrentHashMap<>();    private Map<String, Set<String>> excludeMockTemplate; ...}
复制代码


ContextManager.currentContext() 函数用于在 Agent 注入脚本中查询当前上下文。


当函数被调用时,ContextManager.currentContext(true, id):


  • 在 EventProcessor 中,initContext 函数会调用 ContextManager.currentContext(true, id),onCreate 会调用 initContext 函数。

  • 在 CaseEventDispatcher 中,onEvent(Create) 会调用上述的 initContext 函数。

  • 在 ServletAdviceHelper 中,会调用上述的 onEvent 函数。

  • 在 FilterInstrumentationV3 中,会调用上述的 onEvent 函数。这些类和函数的注入可以在代码中看到。


public class FilterInstrumentationV3 extends TypeInstrumentation {     @Override    public ElementMatcher<TypeDescription> typeMatcher() {        return not(isInterface()).and(hasSuperType(named("javax.servlet.Filter")));    }     @Override    public List<MethodInstrumentation> methodAdvices() {        ElementMatcher<MethodDescription> matcher = named("doFilter")                .and(takesArgument(0, named("javax.servlet.ServletRequest")))                .and(takesArgument(1, named("javax.servlet.ServletResponse")));         return Collections.singletonList(new MethodInstrumentation(matcher, FilterAdvice.class.getName()));    }...} 
复制代码

ServletAdviceHelper

在 ServletAdviceHelper 中,shouldSkip 方法会校验是否超过频率限制(RecordLimiter.acquire),如果超过限制则不再进行 Trace 处理,而是直接返回。如果请求没有超过限制,则会生成 TraceID,并调用 CaseEventDispatcher 中的 onEvent 函数,传入一个 CreateEvent 对象(CaseEventDispatcher.onEvent(CaseEvent.ofCreateEvent),表示创建了一个新的 Trace。


CaseEventDispatcher.onEvent(CaseEvent.ofEnterEvent());if (shouldSkip(adapter, httpServletRequest)) {    return null;}
复制代码

实际调用过程

在 AREX 的注入代码中,会调用 ContextManager.needReplay()ContextManager.needRecord()函数。在这两个函数中,会通过调用 currentContext()函数获取 AREX 上下文对象,并根据上下文对象中的数据,判断当前是回放还是录制模式,如果 context.isReplay(),就是回放, 否则就是录制。


入口请求的录制是在 javax.servlet.Filter doFilter(还有几个其他的类和函数等等)收到请求后,如果通过录制频率检测,就会开始录制请求。

AREX 实现的录制与回放

关于 ByteBuddy 注解

AREX 的注入用 ByteBuddy 实现,ByteBuddy 功能强大易用,它提供了许多注解,用于在生成或修改字节码时进行注释和配置,如下:


以 apache-httpclient-v4 为例

public class SyncClientModuleInstrumentation extends ModuleInstrumentation
复制代码


定义一个名为 SyncClientModuleInstrumentation 的类,继承自 ModuleInstrumentation 类。并添加了 @AutoService(ModuleInstrumentation.class) 注解, 实现 InternalHttpClientInstrumentation 类,该类继承了 TypeInstrumentation 类。


public class InternalHttpClientInstrumentation extends TypeInstrumentation
复制代码


InternalHttpClientInstrumentation 类实现 typeMatcher 函数,用于匹配待注入的类名,这里使用了 "org.apache.http.impl.client.InternalHttpClient" 作为待注入的类名。


如果需要注入多个类,可以使用一些辅助函数如 nameContains()、nameEndsWith() 和 nameStartsWith() 等进行类名的匹配:


  • nameContains() 类名中包含指定的字符串

  • nameEndsWith() 类名以指定的字符串结尾

  • nameStartsWith() 类名以指定的字符串开头


InternalHttpClientInstrumentation 实现函数 methodAdvice,用于获取要注入的函数。


return singletonList(new MethodInstrumentation(                isMethod().and(named("doExecute"))                        .and(takesArguments(3))                        .and(takesArgument(0, named("org.apache.http.HttpHost")))                        .and(takesArgument(1, named("org.apache.http.HttpRequest")))                        .and(takesArgument(2, named("org.apache.http.protocol.HttpContext"))),            this.getClass().getName() + "$ExecuteAdvice"));
复制代码


实现 ExecuteAdvice 类,此处关联之前的 $ExecuteAdvice。


public static class ExecuteAdvice
复制代码


注入代码,在被注入的函数进入时,从参数中获取 Request 请求,创建本地变量 extractor 和 mockResult。


@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class, suppress = Throwable.class)        public static boolean onEnter(                @Advice.Argument(1) HttpRequest request,                @Advice.Local("extractor") HttpClientExtractor<HttpRequest, HttpResponse> extractor,                @Advice.Local("mockResult") MockResult mockResult) { ...
复制代码


判断请求报文 Request,如果满足 ingore 条件, 则退出。


从上下文中判断, 当前是否处于录制或者回放状态(ContextManager.needRecordOrReplay())


如果是回放状态,则查询回放需要的 MOCK 数据 mockResult = extractor.replay()


  • 如果从数据库中获取到 MOCK 数据是 Throwable 类型,则返回成功及 Object。

  • 如果获取到的 MOCK 数据是 HttpResponseWrapper 类型,则返回成功及处理好的响应报文(即 MOCK 数据)。


注入代码,在被注入函数退出时,获取异常和返回值 Request。


        @Advice.OnMethodExit(onThrowable = Exception.class, suppress = Throwable.class)        public static void onExit(                @Advice.Thrown(readOnly = false) Exception throwable,                @Advice.Return(readOnly = false) CloseableHttpResponse response,                @Advice.Local("extractor") HttpClientExtractor<HttpRequest, HttpResponse> extractor,                @Advice.Local("mockResult") MockResult mockResult) {...
复制代码


如果函数进入上处理的 MockResult 结果判断,如果不为空,且是 Throwable, 则 throwable 变量赋值否则返回 response


如果 MockResult 为空,则检查是否是在录制状态,在录制状态下开始记录数据到数据库(throwable 或者 response)。

以 Jedis v4 为例

  • JedisModuleInstrumentation 类是继承自 ModuleInstrumentation 的类。

  • JedisFactoryInstrumentation 类是继承自 TypeInstrumentation 的类。


匹配的类和函数名方法与之前的文本描述类似,这里不再重复。


makeObject 是一个被注入的函数,在进入函数时,会创建一个名为 JedisWrapper 的类并将其返回给原有类的 jedisSocketFactory 字段。


clientConfig 与上述步骤类似,也是被注入的函数。


makeObject 被注入函数,离开函数时会调用 jedis,并返回 result。

AREX 实现的难点

多线程

AREX 在进行数据采集时,同一个请求,会采集下来多条数据(Request/Response、其它服务调用的请求响应等),我们需要把这些数据串联起来,这样才能完整的做为一个测试用例。而我们的应用往往采用了异步框架,也大量的用到了多线程等,这给数据的串联带来很大的困难。


  1. FutureTaskInstrumentation


FutureTaskInstrumentation 包含 RunnableAdvice 两个静态内部类。


     public static class CallableAdvice {        @Advice.OnMethodEnter(suppress = Throwable.class)        public static void methodEnter(                @Advice.Argument(value = 0, readOnly = false) Callable<?> callable) {            callable = CallableWrapper.get(callable);        } FutureTask      }     @SuppressWarnings("unused")    public static class RunnableAdvice {        @Advice.OnMethodEnter(suppress = Throwable.class)        public static void methodEnter(                @Advice.Argument(value = 0, readOnly = false) Runnable runnable) {            runnable = RunnableWrapper.get(runnable);        }    }...
复制代码


  1. ThreadPoolInstrumentation


包含 ExecutorCallableAdvice 两个类,实现同上。


  1. ThreadInstrumentation


包含 $StartAdvice 类。其中包含一个名为 methodEnter 的静态方法,该方法使用了 Java Agent 中的 Advice 注解。这个方法的作用是在 run 方法执行前拦截它,并进行一些操作。具体来说,这个方法会将 run 方法的参数 runnable 通过 FieldValue 注解获取到,然后检查是否存在 ArexContext,如果存在,就使用 RunnableWrapper.get() 方法包装这个 runnable,然后再将包装后的 runnable 赋值回去。


    public static class StartAdvice {        @Advice.OnMethodEnter(suppress = Throwable.class)        public static void methodEnter(                @Advice.FieldValue(value = "target", readOnly = false) Runnable runnable) {            ArexContext context = ContextManager.currentContext();            if (context != null) {                runnable = RunnableWrapper.get(runnable);            }        }    }...
复制代码

异步

Java 生态中存在许多异步框架和类库,例如 Reactor 和 RxJava 等,也有许多类库提供了异步实现,例如 lettuce 提供了同步/异步访问 Redis 的方式。由于不同的场景通常需要不同的解决方案,所以需要使用不同的方法来解决异步编程中的跨线程跟踪问题。


以 ApacheAsyncClient 为例,它是通过一个固定的线程来监听响应并发起回调的。因此,在整个调用、监听、回调过程中,需要确保多个跨线程的 Trace 传递。


为了解决这个问题,可以注入如下 org.apache.http.impl.nio.client.InternalHttpAsyncClient 类的 execute 函数,并使用 FutureCallbackWrapper 中的 TraceTransmitter 来传递 Trace。


public class InternalHttpAsyncClientInstrumentation extends TypeInstrumentation {     @Override    public ElementMatcher<TypeDescription> typeMatcher() {        return named("org.apache.http.impl.nio.client.InternalHttpAsyncClient");    }     @Override    public List<MethodInstrumentation> methodAdvices() {        return singletonList(new MethodInstrumentation(                isMethod().and(named("execute"))                        .and(takesArguments(4))                        .and(takesArgument(0, named("org.apache.http.nio.protocol.HttpAsyncRequestProducer")))                        .and(takesArgument(1, named("org.apache.http.nio.protocol.HttpAsyncResponseConsumer")))                        .and(takesArgument(2, named("org.apache.http.protocol.HttpContext")))                        .and(takesArgument(3, named("org.apache.http.concurrent.FutureCallback"))),                this.getClass().getName() + "$ExecuteAdvice"));    }...
复制代码

代码隔离、互通

为了系统的稳定性,AREX agent 的框架代码是在一个独立的 Class loader 中加载,和应用代码并不互通,为了保证注入的代码可以正确在运行时被访问,我们也对 ClassLoader 进行了简单的修饰,保证运行时的代码会被正确的 ClassLoader 加载。


类似于 SpringBoot 的 LaunchedURLClassLoader,它是一个类加载器,主要负责加载应用程序的类和资源,并在应用程序启动时根据应用程序的 classpath 和 JAR 文件创建一个 URL 数组,然后使用这个 URL 数组来初始化 ClassLoader。当应用程序需要加载类或资源时,LaunchedURLClassLoader 会首先在自己的缓存中查找,如果找不到,就会从 URL 数组中的 URL 中加载类或资源。


自定义的 URLClassLoader 和系统自带的 ClassLoader 可能会冲突,这是因为 Java 中的类加载器采用了双亲委派模型。在这个模型中,每个类加载器都有一个父类加载器,当一个类需要被加载时,它首先会委托它的父类加载器去加载,如果父类加载器无法加载,它才会尝试自己去加载。 但是这种情况下,如果自定义的 URLClassLoader 和系统自带的 ClassLoader 都能够加载同一个类,那么就会出现两个不同的类实例,这就会导致程序出现问题。


为了避免这种冲突,可以在自定义的 URLClassLoader 中重写 findClass() 方法,让它只加载自己的类,而不是委托给父类加载器。这样就可以保证自定义的 URLClassLoader 和系统自带的 ClassLoader 不会冲突。


public class AgentClassLoader extends URLClassLoader
复制代码


由 package io.arex.inst.runtime 开头的类需要使用自定义的 URLClassLoader 进行加载,而不是系统自带的 ClassLoader。这是因为这些类是作为注入的代码来运行的,而为了保证代码能够正确地被访问,必须使用自定义的 URLClassLoader 来加载它们。因此,这些类必须走用户态 ClassLoader 加载,而不能使用系统自带的 ClassLoader。


    @Override    protected Class<?> findClass(String name) throws ClassNotFoundException {         if (StringUtil.startWithFrom(name, "runtime", 13)) {            return null;        }         JarEntryInfo jarEntryInfo = findJarEntry(name.replace('.', '/') + ".class");        if (jarEntryInfo != null && jarEntryInfo.getJarEntry() != null) {            byte[] bytes;            try {                bytes = getJarEntryBytes(jarEntryInfo);            } catch (IOException exception) {                throw new ClassNotFoundException(name, exception);            }             definePackageIfNeeded(jarEntryInfo, name);            return defineClass(name, bytes);        }        return null;    }...
复制代码


AgentInitializer 类在 Premain 方法中调用 initialize(),并访问 AgentClassLoader 以加载自己的 jar 包。如下图:


public class ArexJavaAgent {    public static void premain(String agentArgs, Instrumentation inst) {        agentmain(agentArgs, inst);    }     public static void agentmain(String agentArgs, Instrumentation inst) {        init(inst, agentArgs);    }     private static void init(Instrumentation inst, String agentArgs) {        try {            installBootstrapJar(inst);             // those services must load by app class loader            //ServiceInitializer.start(agentArgs);            AgentInitializer.initialize(inst, getJarFile(ArexJavaAgent.class), agentArgs);            System.out.println("ArexJavaAgent installed.");        } catch (Exception ex) {            System.out.println("ArexJavaAgent start failed.");            ex.printStackTrace();        }    }...
复制代码




AREX 文档:http://arextest.com/zh-Hans/docs/intro/


AREX 官网:http://arextest.com/


AREX GitHub:https://github.com/arextest


AREX 官方 QQ 交流群:656108079

用户头像

https://github.com/arextest 2023-01-11 加入

AREX 是一个基于真实请求与数据的自动化回归测试平台。通过复制线上真实流量到测试环境进行自动化回归测试,解决回归测试的难题。

评论

发布
暂无评论
AREX Agent 源码解读之全链路跟踪和 Mock 数据读写_Java_AREX 中文社区_InfoQ写作社区