http 流量镜像
- 2025-07-05 中国台湾
本文字数:4343 字
阅读完需:约 14 分钟
http 流量镜像
“流量镜像”是指将网络中的数据流量复制一份,并将这份复制流量发送到另一个目的地(如监控、分析或安全检测系统)。这项技术常用于网络安全、故障排查、业务灰度发布等场景。
主要应用场景
安全监控与威胁检测
将生产环境的流量镜像到安全分析设备(如 IDS/IPS),用于实时监控和威胁检测。
性能分析与故障排查
将流量镜像到分析平台,对网络异常、延迟、丢包等问题进行实时排查和定位。
灰度发布和 A/B 测试
将真实用户流量镜像到新版本服务,进行灰度环境验证和兼容性测试,不影响真实用户体验。
合规与审计
对重要业务流量进行行为记录,以满足合规和审计要求。
VKProxy 目前只支持 http 流量镜像, 需注意由于会再一次发送 http 请求,请求 body 会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别 body 很大的请求
设置
大家可以在Metadata中设置缓存, 具体设置项如下
MirrorCluster镜像流量发送到的集群 id
配置示例:
{ "ReverseProxy": { "Routes": { "a": { "Order": 0, "Match": { "Hosts": [ "api.com" ], "Paths": [ "*" ] }, "ClusterId": "apidemo", "Metadata": { "MirrorCluster": "apidemoMirror" } } }, "Clusters": { "apidemo": { "LoadBalancingPolicy": "Hash", "Metadata": { "HashBy": "header", "Key": "X-forwarded-For" }, "Destinations": [ { "Address": "https://xxx.lt" } ] }, "apidemoMirror": { "LoadBalancingPolicy": "Hash", "Metadata": { "HashBy": "header", "Key": "X-forwarded-For" }, "Destinations": [ { "Address": "http://xxx.org/" } ] } } }}
具体实现
首先需要缓存 body 内容,这里实现一个简单的 ReadBufferingStream
public class ReadBufferingStream : Stream, IDisposable{ private readonly SparseBufferWriter<byte> bufferWriter; protected Stream innerStream;
public ReadBufferingStream(Stream innerStream) { this.innerStream = innerStream; bufferWriter = new SparseBufferWriter<byte>(); }
public override bool CanRead => innerStream.CanRead;
public override bool CanSeek => innerStream.CanSeek;
public override bool CanWrite => innerStream.CanWrite;
public override long Length => innerStream.Length;
public override long Position { get => innerStream.Position; set => innerStream.Position = value; }
public override int WriteTimeout { get => innerStream.WriteTimeout; set => innerStream.WriteTimeout = value; }
public Stream BufferingStream => bufferWriter.AsStream(true);
public override void Flush() { innerStream.Flush(); }
public override Task FlushAsync(CancellationToken cancellationToken) { return innerStream.FlushAsync(cancellationToken); }
public override int Read(byte[] buffer, int offset, int count) { var res = innerStream.Read(buffer, offset, count);
// Zero-byte reads (where the passed in buffer has 0 length) can occur when using PipeReader, we don't want to accidentally complete the RequestBody logging in this case if (count == 0) { return res; }
bufferWriter.Write(buffer.AsSpan(offset, res));
return res; }
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { var res = await innerStream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
if (count == 0) { return res; }
bufferWriter.Write(buffer.AsSpan(offset, res));
return res; }
public override long Seek(long offset, SeekOrigin origin) { return innerStream.Seek(offset, origin); }
public override void SetLength(long value) { innerStream.SetLength(value); }
public override void Write(byte[] buffer, int offset, int count) { innerStream.Write(buffer, offset, count); }
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { return innerStream.WriteAsync(buffer, offset, count, cancellationToken); }
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) { return innerStream.WriteAsync(buffer, cancellationToken); }
public override void Write(ReadOnlySpan<byte> buffer) { innerStream.Write(buffer); }
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { return innerStream.BeginRead(buffer, offset, count, callback, state); }
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { return innerStream.BeginWrite(buffer, offset, count, callback, state); }
public override int EndRead(IAsyncResult asyncResult) { return innerStream.EndRead(asyncResult); }
public override void EndWrite(IAsyncResult asyncResult) { innerStream.EndWrite(asyncResult); }
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) { var res = await innerStream.ReadAsync(buffer, cancellationToken); if (buffer.IsEmpty) { return res; }
bufferWriter.Write(buffer.Slice(0, res).Span);
return res; }
public override ValueTask DisposeAsync() { return innerStream.DisposeAsync(); }
protected override void Dispose(bool disposing) { if (disposing) { bufferWriter.Dispose(); } }}
然后利用中间件进行镜像处理
public class MirrorFunc : IHttpFunc{ private readonly IServiceProvider serviceProvider; private readonly IHttpForwarder forwarder; private readonly ILoadBalancingPolicyFactory loadBalancing; private readonly IForwarderHttpClientFactory forwarderHttpClientFactory; private readonly ProxyLogger logger;
public int Order => int.MinValue;
public MirrorFunc(IServiceProvider serviceProvider, IHttpForwarder forwarder, ILoadBalancingPolicyFactory loadBalancing, IForwarderHttpClientFactory forwarderHttpClientFactory, ProxyLogger logger) { this.serviceProvider = serviceProvider; this.forwarder = forwarder; this.loadBalancing = loadBalancing; this.forwarderHttpClientFactory = forwarderHttpClientFactory; this.logger = logger; }
public RequestDelegate Create(RouteConfig config, RequestDelegate next) { if (config.Metadata == null || !config.Metadata.TryGetValue("MirrorCluster", out var mirrorCluster) || string.IsNullOrWhiteSpace(mirrorCluster)) return next;
return c => Mirror(c, mirrorCluster, next); }
private async Task Mirror(HttpContext c, string mirrorCluster, RequestDelegate next) { var config = serviceProvider.GetRequiredService<IConfigSource<IProxyConfig>>(); if (config.CurrentSnapshot == null || config.CurrentSnapshot.Clusters == null || !config.CurrentSnapshot.Clusters.TryGetValue(mirrorCluster, out var cluster) || cluster == null) { await next(c); return; }
var originBody = c.Request.Body; using var buffer = new ReadBufferingStream(originBody); c.Request.Body = buffer;
try { await next(c); } finally { c.Request.Body = buffer.BufferingStream; try { var proxyFeature = c.Features.GetRequiredFeature<IReverseProxyFeature>(); var origin = proxyFeature.SelectedDestination; var selectedDestination = loadBalancing.PickDestination(proxyFeature, cluster); proxyFeature.SelectedDestination = origin; if (selectedDestination != null) { cluster.InitHttp(forwarderHttpClientFactory); await forwarder.SendAsync(c, proxyFeature, selectedDestination, cluster, new NonHttpTransformer(proxyFeature.Route.Transformer)); } } catch (Exception ex) { logger.LogWarning(ex, "Mirror failed"); } finally { c.Request.Body = originBody; } } }}
所以说会再一次发送 http 请求,请求 body 会临时暂存内存,所以无论内存还是请求延迟都会受到影响,特别 body 很大的请求
VKProxy 是使用 c#开发的基于 Kestrel 实现 L4/L7 的代理(感兴趣的同学烦请点个github小赞赞呢)
版权声明: 本文为 InfoQ 作者【八苦-瞿昙】的原创文章。
原文链接:【http://xie.infoq.cn/article/dcc8cedf7ebdea56ca99b16ff】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
八苦-瞿昙
一个假和尚,不懂人情世故。 2018-11-23 加入
会点点技术,能写些代码,只爱静静。 g hub: https://github.com/fs7744 黑历史:https://www.cnblogs.com/fs7744









评论