写点什么

基于 DotNetty 实现一个接口自动发布工具 - 通信实现

作者:EquatorCoco
  • 2023-12-05
    福建
  • 本文字数:6336 字

    阅读完需:约 21 分钟

基于 DotNetty 实现通信


DotNetty : 是微软的 Azure 团队,使用 C#实现的 Netty 的版本发布。是.NET 平台的优秀网络库。


项目介绍


OpenDeploy.Communication 类库项目,是通信相关基础设施层



  • Codec 模块实现编码解码


  • Convention 模块定义约定,比如抽象的业务 Handler, 消息载体 NettyMessage, 消息上下文 'NettyContext' 等


自定义消息格式


消息类为 NettyMessage ,封装了消息头 NettyHeader 和消息体 Body



NettyMessage


封装了消息头 NettyHeader 和消息体 Body


NettyMessage 点击查看代码


/// <summary> Netty消息 </summary>public class NettyMessage{    /// <summary> 消息头 </summary>    public NettyHeader Header { get; init; } = default!;
/// <summary> 消息体(可空,可根据具体业务而定) </summary> public byte[]? Body { get; init; }
/// <summary> 消息头转为字节数组 </summary> public byte[] GetHeaderBytes() { var headerString = Header.ToString(); return Encoding.UTF8.GetBytes(headerString); }
/// <summary> 是否同步消息 </summary> public bool IsSync() => Header.Sync;
/// <summary> 创建Netty消息工厂方法 </summary> public static NettyMessage Create(string endpoint, bool sync = false, byte[]? body = null) { return new NettyMessage { Header = new NettyHeader { EndPoint = endpoint, Sync = sync }, Body = body }; }
/// <summary> 序列化为JSON字符串 </summary> public override string ToString() => Header.ToString();}
复制代码


NettyHeader


消息头,包含请求唯一标识,是否同步消息,终结点等, 在传输数据时会序列化为 JSON


NettyHeader 点击查看代码


/// <summary> Netty消息头 </summary>public class NettyHeader{    /// <summary> 请求消息唯一标识 </summary>    public Guid RequestId { get; init; } = Guid.NewGuid();
/// <summary> 是否同步消息, 默认false是异步消息 </summary> public bool Sync { get; init; }
/// <summary> 终结点 (借鉴MVC,约定为Control/Action模式) </summary> public string EndPoint { get; init; } = string.Empty;
/// <summary> 序列化为JSON字符串 </summary> public override string ToString() => this.ToJsonString();}
复制代码



  • 请求消息唯一标识 RequestId , 用来唯一标识消息, 主要用于 发送同步请求, 因为默认的消息是异步的,只管发出去,不需要等待响应


  • 是否同步消息 Sync , 可以不需要,主要为了可视化,便于调试


  • 终结点 EndPoint , (借鉴 MVC,约定为 Control/Action 模式), 服务端直接解析出对应的处理器


编码器


DefaultEncoder 点击查看代码


public class DefaultEncoder : MessageToByteEncoder<NettyMessage>{    protected override void Encode(IChannelHandlerContext context, NettyMessage message, IByteBuffer output)    {        //消息头转为字节数组        var headerBytes = message.GetHeaderBytes();
//写入消息头长度 output.WriteInt(headerBytes.Length);
//写入消息头字节数组 output.WriteBytes(headerBytes);
//写入消息体字节数组 if (message.Body != null && message.Body.Length > 0) { output.WriteBytes(message.Body); } }}
复制代码

解码器


DefaultDecoder 点击查看代码


public class DefaultDecoder : MessageToMessageDecoder<IByteBuffer>{    protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)    {        //消息总长度        var totalLength = input.ReadableBytes;
//消息头长度 var headerLength = input.GetInt(input.ReaderIndex);
//消息体长度 var bodyLength = totalLength - 4 - headerLength;
//读取消息头字节数组 var headerBytes = new byte[headerLength]; input.GetBytes(input.ReaderIndex + 4, headerBytes, 0, headerLength);
byte[]? bodyBytes = null; string? rawHeaderString = null; NettyHeader? header;
try { //把消息头字节数组,反序列化为JSON rawHeaderString = Encoding.UTF8.GetString(headerBytes); header = JsonSerializer.Deserialize<NettyHeader>(rawHeaderString); } catch (Exception ex) { Logger.Error($"解码失败: {rawHeaderString}, {ex}"); return; }
if (header is null) { Logger.Error($"解码失败: {rawHeaderString}"); return; }
//读取消息体字节数组 if (bodyLength > 0) { bodyBytes = new byte[bodyLength]; input.GetBytes(input.ReaderIndex + 4 + headerLength, bodyBytes, 0, bodyLength); }
//封装为NettyMessage对象 var message = new NettyMessage { Header = header, Body = bodyBytes, };
output.Add(message); }}
复制代码


NettyServer 实现


NettyServer 点击查看代码


public static class NettyServer{    /// <summary>    /// 开启Netty服务    /// </summary>    public static async Task RunAsync(int port = 20007)    {        var bossGroup = new MultithreadEventLoopGroup(1);        var workerGroup = new MultithreadEventLoopGroup();
try { var bootstrap = new ServerBootstrap().Group(bossGroup, workerGroup);
bootstrap .Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 100) .Option(ChannelOption.SoReuseaddr, true) .Option(ChannelOption.SoReuseport, true) .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast("framing-enc", new LengthFieldPrepender(4)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast("decoder", new DefaultDecoder()); pipeline.AddLast("encoder", new DefaultEncoder()); pipeline.AddLast("handler", new ServerMessageEntry()); }));
var boundChannel = await bootstrap.BindAsync(port);
Logger.Info($"NettyServer启动成功...{boundChannel}");
Console.ReadLine();
await boundChannel.CloseAsync();
Logger.Info($"NettyServer关闭监听了...{boundChannel}"); } finally { await Task.WhenAll( bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)) );
Logger.Info($"NettyServer退出了..."); }
}}
复制代码


  • 服务端管道最后我们添加了 ServerMessageEntry ,作为消息处理的入口


ServerMessageEntry 点击查看代码


public class ServerMessageEntry : ChannelHandlerAdapter{    /// <summary> Netty处理器选择器 </summary>    private readonly DefaultNettyHandlerSelector handlerSelector = new();
public ServerMessageEntry() { //注册Netty处理器 handlerSelector.RegisterHandlerTypes(typeof(EchoHandler).Assembly.GetTypes()); }
/// <summary> 通道激活 </summary> public override void ChannelActive(IChannelHandlerContext context) { Logger.Warn($"ChannelActive: {context.Channel}"); }
/// <summary> 通道关闭 </summary> public override void ChannelInactive(IChannelHandlerContext context) { Logger.Warn($"ChannelInactive: {context.Channel}"); }
/// <summary> 收到客户端的消息 </summary> public override async void ChannelRead(IChannelHandlerContext context, object message) { if (message is not NettyMessage nettyMessage) { Logger.Error("从客户端接收消息为空"); return; }
try { Logger.Info($"收到客户端的消息: {nettyMessage}");
//封装请求 var nettyContext = new NettyContext(context.Channel, nettyMessage);
//选择处理器 AbstractNettyHandler handler = handlerSelector.SelectHandler(nettyContext);
//处理请求 await handler.ProcessAsync(); } catch(Exception ex) { Logger.Error($"ServerMessageEntry.ChannelRead: {ex}"); } }}
复制代码


  • 按照约定, 把继承 AbstractNettyHandler 的类视为业务处理器


  • ServerMessageEntry 拿到消息后,首先把消息封装为 NettyContext, 类似与 MVC 中的 HttpContext, 封装了请求和响应对象, 内部解析请求的 EndPoint, 拆分为 HandlerNameActionName


  • DefaultNettyHandlerSelector 提供注册处理器的方法 RegisterHandlerTypes, 和选择处理器的方法 SelectHandler


  • SelectHandler, 默认规则是查找已注册的处理器中以 HandlerName 开头的类型


  • AbstractNettyHandler 的 ProcessAsync 方法,通过 ActionName, 反射拿到 MethodInfo, 调用终结点


NettyClient 实现


NettyClient 点击查看代码


public sealed class NettyClient(string serverHost, int serverPort) : IDisposable{    public EndPoint ServerEndPoint { get; } = new IPEndPoint(IPAddress.Parse(serverHost), serverPort);
private static readonly Bootstrap bootstrap = new(); private static readonly IEventLoopGroup eventLoopGroup = new SingleThreadEventLoop();
private bool _disposed; private IChannel? _channel; public bool IsConnected => _channel != null && _channel.Open; public bool IsWritable => _channel != null && _channel.IsWritable;
static NettyClient() { bootstrap .Group(eventLoopGroup) .Channel<TcpSocketChannel>() .Option(ChannelOption.SoReuseaddr, true) .Option(ChannelOption.SoReuseport, true) .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; //pipeline.AddLast("ping", new IdleStateHandler(0, 5, 0)); pipeline.AddLast("framing-enc", new LengthFieldPrepender(4)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast("decoder", new DefaultDecoder()); pipeline.AddLast("encoder", new DefaultEncoder()); pipeline.AddLast("handler", new ClientMessageEntry()); })); }
/// <summary> 连接服务器 </summary> private async Task TryConnectAsync() { try { if (IsConnected) { return; } _channel = await bootstrap.ConnectAsync(ServerEndPoint); } catch (Exception ex) { throw new Exception($"连接服务器失败 : {ServerEndPoint} {ex.Message}"); } }
/// <summary> /// 发送消息 /// </summary> /// <param name="endpoint">终结点</param> /// <param name="sync">是否同步等待响应</param> /// <param name="body">正文</param> public async Task SendAsync(string endpoint, bool sync = false, byte[]? body = null) { var message = NettyMessage.Create(endpoint, sync, body); if (sync) { var task = ClientMessageSynchronizer.TryAdd(message); try { await SendAsync(message); await task; } catch { ClientMessageSynchronizer.TryRemove(message); throw; } } else { await SendAsync(message); } }
/// <summary> /// 发送消息 /// </summary> private async Task SendAsync(NettyMessage message) { await TryConnectAsync(); await _channel!.WriteAndFlushAsync(message); }
/// <summary> 释放连接(程序员手动释放, 一般在代码使用using语句,或在finally里面Dispose) </summary> public void Dispose() { Dispose(true); GC.SuppressFinalize(this); }
/// <summary> 释放连接 </summary> private void Dispose(bool disposing) { if (_disposed) { return; }
//释放托管资源,比如嵌套的对象 if (disposing) {
}
//释放非托管资源 if (_channel != null) { _channel.CloseAsync(); _channel = null; }
_disposed = true; }
~NettyClient() { Dispose(true); }}
复制代码


  • NettyClient 封装了 Netty 客户端逻辑,提供发送异步请求(默认)和发布同步请求方法


  • DotNetty 默认不提供同步请求,但是有些情况我们需要同步等待服务器的响应,所有需要自行实现,实现也很简单,把消息 ID 缓存起来,收到服务器响应后激活就行了,具体实现在消息同步器 ClientMessageSynchronizer, 就不贴了


总结


至此,我们实现了基于 DotNetty 搭建通信模块, 实现了客户端和服务器的编解码,处理器选择,客户端实现了同步消息等,大家可以在 ConsoleHost 结尾的控制台项目中,测试下同步和异步的消息,实现的简单的 Echo 模式


代码仓库


项目暂且就叫 OpenDeploy 吧



欢迎大家拍砖,Star


下一步


计划下一步,基于WPF的客户端, 实现接口项目的配置与发现


文章转载自:Broadm

原文链接:https://www.cnblogs.com/broadm/p/17875559.html

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
基于DotNetty实现一个接口自动发布工具 - 通信实现_git_EquatorCoco_InfoQ写作社区