http 流量镜像
- 2025-07-05  中国台湾
 本文字数:4343 字
阅读完需:约 14 分钟
http 流量镜像
“流量镜像”是指将网络中的数据流量复制一份,并将这份复制流量发送到另一个目的地(如监控、分析或安全检测系统)。这项技术常用于网络安全、故障排查、业务灰度发布等场景。
主要应用场景
安全监控与威胁检测
将生产环境的流量镜像到安全分析设备(如 IDS/IPS),用于实时监控和威胁检测。
性能分析与故障排查
将流量镜像到分析平台,对网络异常、延迟、丢包等问题进行实时排查和定位。
灰度发布和 A/B 测试
将真实用户流量镜像到新版本服务,进行灰度环境验证和兼容性测试,不影响真实用户体验。
合规与审计
对重要业务流量进行行为记录,以满足合规和审计要求。
VKProxy 目前只支持 http 流量镜像, 需注意由于会再一次发送 http 请求,请求 body 会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别 body 很大的请求
设置
大家可以在Metadata中设置缓存, 具体设置项如下
MirrorCluster镜像流量发送到的集群 id
配置示例:
{  "ReverseProxy": {    "Routes": {      "a": {        "Order": 0,        "Match": {            "Hosts": [ "api.com" ],            "Paths": [ "*" ]        },        "ClusterId": "apidemo",        "Metadata": {          "MirrorCluster": "apidemoMirror"        }      }    },    "Clusters": {      "apidemo": {        "LoadBalancingPolicy": "Hash",        "Metadata": {          "HashBy": "header",          "Key": "X-forwarded-For"        },        "Destinations": [          {            "Address": "https://xxx.lt"          }        ]      },      "apidemoMirror": {        "LoadBalancingPolicy": "Hash",        "Metadata": {          "HashBy": "header",          "Key": "X-forwarded-For"        },        "Destinations": [          {            "Address": "http://xxx.org/"          }        ]      }    }  }}
具体实现
首先需要缓存 body 内容,这里实现一个简单的 ReadBufferingStream
public class ReadBufferingStream : Stream, IDisposable{    private readonly SparseBufferWriter<byte> bufferWriter;    protected Stream innerStream;
    public ReadBufferingStream(Stream innerStream)    {        this.innerStream = innerStream;        bufferWriter = new SparseBufferWriter<byte>();    }
    public override bool CanRead => innerStream.CanRead;
    public override bool CanSeek => innerStream.CanSeek;
    public override bool CanWrite => innerStream.CanWrite;
    public override long Length => innerStream.Length;
    public override long Position    {        get => innerStream.Position;        set => innerStream.Position = value;    }
    public override int WriteTimeout    {        get => innerStream.WriteTimeout;        set => innerStream.WriteTimeout = value;    }
    public Stream BufferingStream => bufferWriter.AsStream(true);
    public override void Flush()    {        innerStream.Flush();    }
    public override Task FlushAsync(CancellationToken cancellationToken)    {        return innerStream.FlushAsync(cancellationToken);    }
    public override int Read(byte[] buffer, int offset, int count)    {        var res = innerStream.Read(buffer, offset, count);
        // Zero-byte reads (where the passed in buffer has 0 length) can occur when using PipeReader, we don't want to accidentally complete the RequestBody logging in this case        if (count == 0)        {            return res;        }
        bufferWriter.Write(buffer.AsSpan(offset, res));
        return res;    }
    public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)    {        var res = await innerStream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
        if (count == 0)        {            return res;        }
        bufferWriter.Write(buffer.AsSpan(offset, res));
        return res;    }
    public override long Seek(long offset, SeekOrigin origin)    {        return innerStream.Seek(offset, origin);    }
    public override void SetLength(long value)    {        innerStream.SetLength(value);    }
    public override void Write(byte[] buffer, int offset, int count)    {        innerStream.Write(buffer, offset, count);    }
    public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)    {        return innerStream.WriteAsync(buffer, offset, count, cancellationToken);    }
    public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)    {        return innerStream.WriteAsync(buffer, cancellationToken);    }
    public override void Write(ReadOnlySpan<byte> buffer)    {        innerStream.Write(buffer);    }
    public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)    {        return innerStream.BeginRead(buffer, offset, count, callback, state);    }
    public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)    {        return innerStream.BeginWrite(buffer, offset, count, callback, state);    }
    public override int EndRead(IAsyncResult asyncResult)    {        return innerStream.EndRead(asyncResult);    }
    public override void EndWrite(IAsyncResult asyncResult)    {        innerStream.EndWrite(asyncResult);    }
    public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)    {        var res = await innerStream.ReadAsync(buffer, cancellationToken);        if (buffer.IsEmpty)        {            return res;        }
        bufferWriter.Write(buffer.Slice(0, res).Span);
        return res;    }
    public override ValueTask DisposeAsync()    {        return innerStream.DisposeAsync();    }
    protected override void Dispose(bool disposing)    {        if (disposing)        {            bufferWriter.Dispose();        }    }}
然后利用中间件进行镜像处理
public class MirrorFunc : IHttpFunc{    private readonly IServiceProvider serviceProvider;    private readonly IHttpForwarder forwarder;    private readonly ILoadBalancingPolicyFactory loadBalancing;    private readonly IForwarderHttpClientFactory forwarderHttpClientFactory;    private readonly ProxyLogger logger;
    public int Order => int.MinValue;
    public MirrorFunc(IServiceProvider serviceProvider, IHttpForwarder forwarder, ILoadBalancingPolicyFactory loadBalancing, IForwarderHttpClientFactory forwarderHttpClientFactory, ProxyLogger logger)    {        this.serviceProvider = serviceProvider;        this.forwarder = forwarder;        this.loadBalancing = loadBalancing;        this.forwarderHttpClientFactory = forwarderHttpClientFactory;        this.logger = logger;    }
    public RequestDelegate Create(RouteConfig config, RequestDelegate next)    {        if (config.Metadata == null || !config.Metadata.TryGetValue("MirrorCluster", out var mirrorCluster) || string.IsNullOrWhiteSpace(mirrorCluster)) return next;
        return c => Mirror(c, mirrorCluster, next);    }
    private async Task Mirror(HttpContext c, string mirrorCluster, RequestDelegate next)    {        var config = serviceProvider.GetRequiredService<IConfigSource<IProxyConfig>>();        if (config.CurrentSnapshot == null || config.CurrentSnapshot.Clusters == null || !config.CurrentSnapshot.Clusters.TryGetValue(mirrorCluster, out var cluster) || cluster == null)        {            await next(c);            return;        }
        var originBody = c.Request.Body;        using var buffer = new ReadBufferingStream(originBody);        c.Request.Body = buffer;
        try        {            await next(c);        }        finally        {            c.Request.Body = buffer.BufferingStream;            try            {                var proxyFeature = c.Features.GetRequiredFeature<IReverseProxyFeature>();                var origin = proxyFeature.SelectedDestination;                var selectedDestination = loadBalancing.PickDestination(proxyFeature, cluster);                proxyFeature.SelectedDestination = origin;                if (selectedDestination != null)                {                    cluster.InitHttp(forwarderHttpClientFactory);                    await forwarder.SendAsync(c, proxyFeature, selectedDestination, cluster, new NonHttpTransformer(proxyFeature.Route.Transformer));                }            }            catch (Exception ex)            {                logger.LogWarning(ex, "Mirror failed");            }            finally            {                c.Request.Body = originBody;            }        }    }}
所以说会再一次发送 http 请求,请求 body 会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别 body 很大的请求
VKProxy 是使用 c#开发的基于 Kestrel 实现 L4/L7 的代理(感兴趣的同学烦请点个github小赞赞呢)
版权声明: 本文为 InfoQ 作者【八苦-瞿昙】的原创文章。
原文链接:【http://xie.infoq.cn/article/dcc8cedf7ebdea56ca99b16ff】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
八苦-瞿昙
一个假和尚,不懂人情世故。 2018-11-23 加入
会点点技术,能写些代码,只爱静静。 g hub: https://github.com/fs7744 黑历史:https://www.cnblogs.com/fs7744







    


评论