写点什么

http 流量镜像

作者:八苦-瞿昙
  • 2025-07-05
    中国台湾
  • 本文字数:4343 字

    阅读完需:约 14 分钟

http 流量镜像

“流量镜像”是指将网络中的数据流量复制一份,并将这份复制流量发送到另一个目的地(如监控、分析或安全检测系统)。这项技术常用于网络安全、故障排查、业务灰度发布等场景。


主要应用场景


  • 安全监控与威胁检测

  • 将生产环境的流量镜像到安全分析设备(如 IDS/IPS),用于实时监控和威胁检测。

  • 性能分析与故障排查

  • 将流量镜像到分析平台,对网络异常、延迟、丢包等问题进行实时排查和定位。

  • 灰度发布和 A/B 测试

  • 将真实用户流量镜像到新版本服务,进行灰度环境验证和兼容性测试,不影响真实用户体验。

  • 合规与审计

  • 对重要业务流量进行行为记录,以满足合规和审计要求。


VKProxy 目前只支持 http 流量镜像, 需注意由于会再一次发送 http 请求,请求 body 会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别 body 很大的请求

设置

大家可以在Metadata中设置缓存, 具体设置项如下


  • MirrorCluster

  • 镜像流量发送到的集群 id


配置示例:


{  "ReverseProxy": {    "Routes": {      "a": {        "Order": 0,        "Match": {            "Hosts": [ "api.com" ],            "Paths": [ "*" ]        },        "ClusterId": "apidemo",        "Metadata": {          "MirrorCluster": "apidemoMirror"        }      }    },    "Clusters": {      "apidemo": {        "LoadBalancingPolicy": "Hash",        "Metadata": {          "HashBy": "header",          "Key": "X-forwarded-For"        },        "Destinations": [          {            "Address": "https://xxx.lt"          }        ]      },      "apidemoMirror": {        "LoadBalancingPolicy": "Hash",        "Metadata": {          "HashBy": "header",          "Key": "X-forwarded-For"        },        "Destinations": [          {            "Address": "http://xxx.org/"          }        ]      }    }  }}
复制代码

具体实现

首先需要缓存 body 内容,这里实现一个简单的 ReadBufferingStream



public class ReadBufferingStream : Stream, IDisposable{ private readonly SparseBufferWriter<byte> bufferWriter; protected Stream innerStream;
public ReadBufferingStream(Stream innerStream) { this.innerStream = innerStream; bufferWriter = new SparseBufferWriter<byte>(); }
public override bool CanRead => innerStream.CanRead;
public override bool CanSeek => innerStream.CanSeek;
public override bool CanWrite => innerStream.CanWrite;
public override long Length => innerStream.Length;
public override long Position { get => innerStream.Position; set => innerStream.Position = value; }
public override int WriteTimeout { get => innerStream.WriteTimeout; set => innerStream.WriteTimeout = value; }
public Stream BufferingStream => bufferWriter.AsStream(true);
public override void Flush() { innerStream.Flush(); }
public override Task FlushAsync(CancellationToken cancellationToken) { return innerStream.FlushAsync(cancellationToken); }
public override int Read(byte[] buffer, int offset, int count) { var res = innerStream.Read(buffer, offset, count);
// Zero-byte reads (where the passed in buffer has 0 length) can occur when using PipeReader, we don't want to accidentally complete the RequestBody logging in this case if (count == 0) { return res; }
bufferWriter.Write(buffer.AsSpan(offset, res));
return res; }
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { var res = await innerStream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
if (count == 0) { return res; }
bufferWriter.Write(buffer.AsSpan(offset, res));
return res; }
public override long Seek(long offset, SeekOrigin origin) { return innerStream.Seek(offset, origin); }
public override void SetLength(long value) { innerStream.SetLength(value); }
public override void Write(byte[] buffer, int offset, int count) { innerStream.Write(buffer, offset, count); }
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { return innerStream.WriteAsync(buffer, offset, count, cancellationToken); }
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) { return innerStream.WriteAsync(buffer, cancellationToken); }
public override void Write(ReadOnlySpan<byte> buffer) { innerStream.Write(buffer); }
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { return innerStream.BeginRead(buffer, offset, count, callback, state); }
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { return innerStream.BeginWrite(buffer, offset, count, callback, state); }
public override int EndRead(IAsyncResult asyncResult) { return innerStream.EndRead(asyncResult); }
public override void EndWrite(IAsyncResult asyncResult) { innerStream.EndWrite(asyncResult); }
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) { var res = await innerStream.ReadAsync(buffer, cancellationToken); if (buffer.IsEmpty) { return res; }
bufferWriter.Write(buffer.Slice(0, res).Span);
return res; }
public override ValueTask DisposeAsync() { return innerStream.DisposeAsync(); }
protected override void Dispose(bool disposing) { if (disposing) { bufferWriter.Dispose(); } }}
复制代码


然后利用中间件进行镜像处理


public class MirrorFunc : IHttpFunc{    private readonly IServiceProvider serviceProvider;    private readonly IHttpForwarder forwarder;    private readonly ILoadBalancingPolicyFactory loadBalancing;    private readonly IForwarderHttpClientFactory forwarderHttpClientFactory;    private readonly ProxyLogger logger;
public int Order => int.MinValue;
public MirrorFunc(IServiceProvider serviceProvider, IHttpForwarder forwarder, ILoadBalancingPolicyFactory loadBalancing, IForwarderHttpClientFactory forwarderHttpClientFactory, ProxyLogger logger) { this.serviceProvider = serviceProvider; this.forwarder = forwarder; this.loadBalancing = loadBalancing; this.forwarderHttpClientFactory = forwarderHttpClientFactory; this.logger = logger; }
public RequestDelegate Create(RouteConfig config, RequestDelegate next) { if (config.Metadata == null || !config.Metadata.TryGetValue("MirrorCluster", out var mirrorCluster) || string.IsNullOrWhiteSpace(mirrorCluster)) return next;
return c => Mirror(c, mirrorCluster, next); }
private async Task Mirror(HttpContext c, string mirrorCluster, RequestDelegate next) { var config = serviceProvider.GetRequiredService<IConfigSource<IProxyConfig>>(); if (config.CurrentSnapshot == null || config.CurrentSnapshot.Clusters == null || !config.CurrentSnapshot.Clusters.TryGetValue(mirrorCluster, out var cluster) || cluster == null) { await next(c); return; }
var originBody = c.Request.Body; using var buffer = new ReadBufferingStream(originBody); c.Request.Body = buffer;
try { await next(c); } finally { c.Request.Body = buffer.BufferingStream; try { var proxyFeature = c.Features.GetRequiredFeature<IReverseProxyFeature>(); var origin = proxyFeature.SelectedDestination; var selectedDestination = loadBalancing.PickDestination(proxyFeature, cluster); proxyFeature.SelectedDestination = origin; if (selectedDestination != null) { cluster.InitHttp(forwarderHttpClientFactory); await forwarder.SendAsync(c, proxyFeature, selectedDestination, cluster, new NonHttpTransformer(proxyFeature.Route.Transformer)); } } catch (Exception ex) { logger.LogWarning(ex, "Mirror failed"); } finally { c.Request.Body = originBody; } } }}
复制代码


所以说会再一次发送 http 请求,请求 body 会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别 body 很大的请求


VKProxy 是使用 c#开发的基于 Kestrel 实现 L4/L7 的代理(感兴趣的同学烦请点个github小赞赞呢)

发布于: 刚刚阅读数: 4
用户头像

八苦-瞿昙

关注

一个假和尚,不懂人情世故。 2018-11-23 加入

会点点技术,能写些代码,只爱静静。 g hub: https://github.com/fs7744 黑历史:https://www.cnblogs.com/fs7744

评论

发布
暂无评论
http流量镜像_八苦-瞿昙_InfoQ写作社区