写点什么

netty 系列之: 在 netty 中使用 proxy protocol

作者:程序那些事
  • 2022-11-21
    广东
  • 本文字数:5856 字

    阅读完需:约 19 分钟

简介

我们知道 proxy protocol 是 haproxy 提出的一个代理协议,通过这个协议,所有实现这个协议的 proxy 或者 LBS,都可以附带真实客户端的 IP 地址和端口号,这使得 proxy protocol 在实际应用中非常有用。


这么优秀的协议,没有理由 netty 不支持。本文将会谈一下 netty 中对 proxy protoco 代理协议的支持。

netty 对 proxy protocol 协议的支持

proxy protocol 协议其实很简单,就是在请求前面带了 proxy header 信息。


在 netty 中这个 header 信息叫做 HAProxyMessage:


public final class HAProxyMessage extends AbstractReferenceCounted {
复制代码


HAProxyMessage 是一个 ReferenceCounted,这一点和 ByteBuf 很类似,说明 HAProxyMessage 保留着和 ByteBuf 很类似的特性。


根据 proxy protocol 协议,该协议可以分为两个版本,分别是 v1 和 v2,其中 v1 版本是文本协议,而 v2 版本支持二进制的格式。


显然从代码编写和调试的角度来看 v1 更加友好,但是从程序的角度来看,v2 可能性能更高。


HAProxyMessage 中有个专门的 HAProxyProtocolVersion 类,来表示 proxy protocol 的版本信息:


public enum HAProxyProtocolVersion {
V1(VERSION_ONE_BYTE),
V2(VERSION_TWO_BYTE);
复制代码


HAProxyProtocolVersion 是一个枚举类,在它里面定义了和 proxy 协议相对应的两个版本号。


在版本号之后是 command,在 netty 中用 HAProxyCommand 来表示:


public enum HAProxyCommand {
LOCAL(HAProxyConstants.COMMAND_LOCAL_BYTE),
PROXY(HAProxyConstants.COMMAND_PROXY_BYTE);
复制代码


HAProxyCommand 也是一个枚举类,里面定义了两个 command 的值,分别是 local 和 proxy。


其中 local 表示该请求是代理服务器主动发起的,而不是客户端发起的,比如监控检测等请求。


proxy 表示该请求是一个代理请求。


接下来是 AddressFamily 和 TransportProtocol,这两个字段用同一个 byte 来表示,所以这两个类都是 HAProxyProxiedProtocol 的内部类。


先看下 AddressFamily 的定义:


    public enum AddressFamily {
AF_UNSPEC(AF_UNSPEC_BYTE),
AF_IPv4(AF_IPV4_BYTE),
AF_IPv6(AF_IPV6_BYTE),
AF_UNIX(AF_UNIX_BYTE);
复制代码


AddressFamily 中定义了 4 个 address family 类型,分别是 unspec,ipv4,ipv6 和 unix。分别对应未知 family,ipv4,ipv6 和 unix domain socket。


再看下 TransportProtocol 的定义:


    public enum TransportProtocol {
UNSPEC(TRANSPORT_UNSPEC_BYTE),
STREAM(TRANSPORT_STREAM_BYTE),
DGRAM(TRANSPORT_DGRAM_BYTE);
复制代码


TransportProtocol 有 3 个值,分别是 unspec,stream 和 dgram。分别对应未知协议,http/https 协议,udp/tcp 协议。


因为 AddressFamily 和 TransportProtocol 实际上是同一个 byte,所以经过组合之后可以得到下面的几个枚举值:


    UNKNOWN(TPAF_UNKNOWN_BYTE, AddressFamily.AF_UNSPEC, TransportProtocol.UNSPEC),
TCP4(TPAF_TCP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.STREAM),
TCP6(TPAF_TCP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.STREAM),
UDP4(TPAF_UDP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.DGRAM),
UDP6(TPAF_UDP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.DGRAM),
UNIX_STREAM(TPAF_UNIX_STREAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.STREAM),
UNIX_DGRAM(TPAF_UNIX_DGRAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.DGRAM);
复制代码


以上的枚举值也是 HAProxyProxiedProtocol 中定义的值。


接下就是源 ip 地址,目标地 ip 地址,源端口和目标端口这几个值,定义为属性表示如下:


    private final String sourceAddress;    private final String destinationAddress;    private final int sourcePort;    private final int destinationPort;
复制代码


最后,proxy protocol 中还可以包含额外的字段 tlv,tlv 在 netty 中也是一种 byteBuf,使用 HAProxyTLV 表示:


public class HAProxyTLV extends DefaultByteBufHolder 
复制代码


因为 tlv 是 key value 结构,所以看下 HAProxyTLV 的构造函数:


    public HAProxyTLV(Type type, ByteBuf content) {        this(type, Type.byteValueForType(type), content);    }
复制代码


HAProxyTLV 接受一个 type 和 byteBuf 的 value。


Type 是一个枚举类,在 netty 中可以支持下面的值:


    public enum Type {        PP2_TYPE_ALPN,        PP2_TYPE_AUTHORITY,        PP2_TYPE_SSL,        PP2_TYPE_SSL_VERSION,        PP2_TYPE_SSL_CN,        PP2_TYPE_NETNS,        OTHER;
复制代码


在 HAProxyMessage 中,tlv 是一个 list 来保存的:


private final List<HAProxyTLV> tlvs;
复制代码


到此,所有 HAProxyMessage 所需要的参数都齐了,我们看下 HAProxyMessage 的构造函数:


    public HAProxyMessage(            HAProxyProtocolVersion protocolVersion, HAProxyCommand command, HAProxyProxiedProtocol proxiedProtocol,            String sourceAddress, String destinationAddress, int sourcePort, int destinationPort,            List<? extends HAProxyTLV> tlvs)
复制代码


HAProxyMessage 会将所有的参数都存储到本地的变量中,供后续使用。


因为 proxy protocol 有两个版本,v1 和 v2,所以 HAProxyMessage 中提供了两个将 header 编码为 AProxyMessage 对象的方法,分别是:


static HAProxyMessage decodeHeader(ByteBuf header) 
复制代码


和:


static HAProxyMessage decodeHeader(String header)
复制代码


有了 proxy protocol 的 java 表示之后,我们再来看一下 HAProxyMessage 的编码解码器。

HAProxyMessage 的编码解码器

netty 对 HAProxyMessage 对象的支持表现在两个地方,netty 提供了两个类分别对 HAProxyMessage 进行编码和解码,这两个类是 HAProxyMessageEncoder 和 HAProxyMessageDecoder。


先看一下 HAProxyMessageEncoder:


public final class HAProxyMessageEncoder extends MessageToByteEncoder<HAProxyMessage> 
复制代码


HAProxyMessageEncoder 继承自 MessageToByteEncoder,传入的泛型是 HAProxyMessage,表示是将 HAProxyMessage 编码成为 ByteBuf。


它的 encode 方法很简单,根据 HAProxyMessage 传入的 message 版本信息,分别进行编码:


    protected void encode(ChannelHandlerContext ctx, HAProxyMessage msg, ByteBuf out) throws Exception {        switch (msg.protocolVersion()) {            case V1:                encodeV1(msg, out);                break;            case V2:                encodeV2(msg, out);                break;            default:                throw new HAProxyProtocolException("Unsupported version: " + msg.protocolVersion());        }    }
复制代码


HAProxyMessageDecoder 是跟 HAProxyMessageEncoder 相反的动作,是将接收到的 ByteBuf 解析成为 HAProxyMessage:


public class HAProxyMessageDecoder extends ByteToMessageDecoder 
复制代码


因为 HAProxyMessage 有两个版本,那么怎么判断接收到的 ByeBuf 是哪个版本呢?


其实很简单,因为 v1 版本和 v2 版本的开始字符是不一样的,v1 版本的开头是一个 text:"PROXY", v2 版本的开头是一个固定的二进制串,如下所示:


    static final byte[] BINARY_PREFIX = {            (byte) 0x0D,            (byte) 0x0A,            (byte) 0x0D,            (byte) 0x0A,            (byte) 0x00,            (byte) 0x0D,            (byte) 0x0A,            (byte) 0x51,            (byte) 0x55,            (byte) 0x49,            (byte) 0x54,            (byte) 0x0A    };
static final byte[] TEXT_PREFIX = { (byte) 'P', (byte) 'R', (byte) 'O', (byte) 'X', (byte) 'Y', };
复制代码


看下它的 decode 方法实现:


    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {        if (version == -1) {            if ((version = findVersion(in)) == -1) {                return;            }        }
ByteBuf decoded;
if (version == 1) { decoded = decodeLine(ctx, in); } else { decoded = decodeStruct(ctx, in); }
if (decoded != null) { finished = true; try { if (version == 1) { out.add(HAProxyMessage.decodeHeader(decoded.toString(CharsetUtil.US_ASCII))); } else { out.add(HAProxyMessage.decodeHeader(decoded)); } } catch (HAProxyProtocolException e) { fail(ctx, null, e); } } }
复制代码


上面代码的逻辑是先从 ByteBuf 中根据版本号 decode 出 header 信息放到 ByteBuf 中。


然后再根据版本号的不同,分别调用 HAProxyMessage 的两个不同版本的 decodeHeader 方法进行解码。最终得到 HAProxyMessage。

netty 中 proxy protocol 的代码示例

有了 netty 对 proxy protocol 的支持,那么在 netty 中搭建支持 proxy protocol 的服务器和客户端就很容易了。


先看一下如何搭建支持 proxy protocol 的服务器:


    private static void startServer(int port) throws InterruptedException {        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .handler(new LoggingHandler(LogLevel.INFO))             .childHandler(new ServerInitializer());            b.bind(port).sync().channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }
复制代码


代码和常规的 netty server 一样,这里使用了 NioEventLoopGroup 和 NioServerSocketChannel,搭建了一个支持 TCP 协议的 netty 服务器。


ServerInitializer 中包含了 netty 自带的 HAProxy 编码器和自定义的消息处理器:


class ServerInitializer extends ChannelInitializer<SocketChannel> {    @Override    public void initChannel(SocketChannel ch) throws Exception {        ch.pipeline().addLast(                new LoggingHandler(LogLevel.DEBUG),                new HAProxyMessageDecoder(),                new SimpleChannelInboundHandler() {                    @Override                    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {                        if (msg instanceof HAProxyMessage) {                            log.info("proxy message is : {}", msg);                        } else if (msg instanceof ByteBuf) {                            log.info("bytebuf message is : {}", ByteBufUtil.prettyHexDump((ByteBuf) msg));                        }                    }                });    }}
复制代码


这里使用 netty 自带的 HAProxyMessageDecoder,用来将 ByteBuf 消息解码为 HAProxyMessage,然后在自定义的 SimpleChannelInboundHandler 中对 HAProxyMessage 进行处理。


这里的服务器可以处理两种消息,一种是 HAProxyMessage,一种是原始的 ByteBuf。处理的结果就是将消息打印出来。


然后看下客户端的定义:


EventLoopGroup group = new NioEventLoopGroup();            Bootstrap b = new Bootstrap();            b.group(group)                    .channel(NioSocketChannel.class)                    .handler(new ClientHander());            Channel ch = b.connect(host, port).sync().channel();
复制代码


客户端使用的是 EventLoopGroup 和 NioSocketChannel,是基于 TCP 协议的请求。


这里添加了自定义的 handler:ClientHander,ClientHander 继承自 ChannelOutboundHandlerAdapter 用来对 client 发出的消息进行处理。


这里看一下它的 handlerAdded 方法:


    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        ctx.pipeline().addBefore(ctx.name(), null, HAProxyMessageEncoder.INSTANCE);        super.handlerAdded(ctx);    }
复制代码


可以看到 handlerAdded 方法向 channelPipeline 中添加了 HAProxyMessageEncoder,用于编码 HAProxyMessage。


因为对于一个 connection 来说,HAProxyMessage 只需要用到一次,后续的正常消息就不需要这个编码器了,所以我们需要在 write 方法中监听 HAProxyMessage 的状态,如果写入成功之后,就从 pipeline 中移出 HAProxyMessageEncoder 和 ClientHander。


    public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        ChannelFuture future1 = ctx.write(msg, promise);        if (msg instanceof HAProxyMessage) {            future1.addListener((ChannelFutureListener) future2 -> {                if (future2.isSuccess()) {                    ctx.pipeline().remove(HAProxyMessageEncoder.INSTANCE);                    ctx.pipeline().remove(ClientHander.this);                } else {                    ctx.close();                }            });        }    }
复制代码


最后我们构建了一个虚拟的 HAProxyMessage,然后通过 netty 客户端进行发送:


HAProxyMessage message = new HAProxyMessage(                    HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, HAProxyProxiedProtocol.TCP4,                    "127.0.0.1", "127.0.0.2", 8000, 9000);            ch.writeAndFlush(message).sync();            ch.writeAndFlush(Unpooled.copiedBuffer("this is a proxy protocol message!", CharsetUtil.UTF_8)).sync();            ch.close().sync();
复制代码

总结

上面的代码只是一个简单的模拟 proxy protocol 在 netty 中的使用情况,并不代表上面的代码就可以在实际的项目中应用了。如果你想使用的话,可以在下面的代码上面继续丰富和完善。


本文的代码,大家可以参考:


learn-netty4

发布于: 刚刚阅读数: 5
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020-06-07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:在netty中使用proxy protocol_Java_程序那些事_InfoQ写作社区