写点什么

netty WebSocket 客户端实践

作者:FunTester
  • 2023-08-28
    北京
  • 本文字数:4510 字

    阅读完需:约 15 分钟

在之前的 Socket 学习中,主要都是基于两个 Socket 客户端:WebSocketSocket.IO。在做测试的时候也是基于WebSocket消息的发送和接收为主要测试对象。但是对于超多Socket连接没有涉及。


在实践中会发现,这两个实现类都存在一个问题,为了维护 1 个Socket连接及其功能,通常需要创建多个线程。在计算机硬件资源有限的情况下,线程是稀缺资源,不仅仅是内存占用,也会增加 CPU 的负担。


之前解决这个问题的方案直接换成 Go 语言版本的Socket客户端。例如:/net/websocketgorilla/websocket


其实 Java 也有相对应的解决方案:netty。话不多说,上代码。

依赖

<!-- https://mvnrepository.com/artifact/io.netty/netty-all --><dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.1.85.Final</version></dependency>
复制代码

netty WebSocket 客户端

客户端主要的功能就是创建连接,然后使用一个事件处理线程池管理连接以及收发消息io.netty.channel.EventLoopGroup,然后使用一个io.netty.bootstrap.Bootstrap来作为引导程序。


package com.funtester.socket.netty    import com.funtester.frame.execute.ThreadPoolUtil  import groovy.util.logging.Log4j2  import io.netty.bootstrap.Bootstrap  import io.netty.channel.*  import io.netty.channel.group.ChannelGroup  import io.netty.channel.group.DefaultChannelGroup  import io.netty.channel.nio.NioEventLoopGroup  import io.netty.channel.socket.SocketChannel  import io.netty.channel.socket.nio.NioSocketChannel  import io.netty.handler.codec.http.DefaultHttpHeaders  import io.netty.handler.codec.http.HttpClientCodec  import io.netty.handler.codec.http.HttpObjectAggregator  import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker  import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory  import io.netty.handler.codec.http.websocketx.WebSocketVersion  import io.netty.handler.stream.ChunkedWriteHandler  import io.netty.util.concurrent.GlobalEventExecutor    @Log4j2  class WebSocketConnector {        static Bootstrap bootstrap = new Bootstrap()        /**       * 处理事件的线程池       */      static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N"))        static {          bootstrap.group(group).channel(NioSocketChannel.class)      }        /**       * 用于记录和管理所有客户端的channel       */         static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)        WebSocketClientHandshaker handShaker        ChannelPromise handshakeFuture        String host        int port        /**       * 网络通道       */      Channel channel        WebSocketIoHandler handler        /**       * WebSocket协议类型的模拟客户端连接器构造方法       *       * @param serverIp       * @param serverSocketPort       * @param group       */    WebSocketConnector(String host, int port) {          this.host = host          this.port = port          String URL = this.host + ":" + this.port + "/test"          URI uri = new URI(URL)          handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()))          bootstrap.option(ChannelOption.TCP_NODELAY, true)                  .option(ChannelOption.SO_TIMEOUT, true)                  .option(ChannelOption.SO_BROADCAST, true)                  .option(ChannelOption.SO_KEEPALIVE, true)                  .handler(new ChannelInitializer<SocketChannel>() {                        @Override                      protected void initChannel(SocketChannel ch) throws Exception {                          ChannelPipeline pipeline = ch.pipeline()                          pipeline.addLast(new HttpClientCodec())                          pipeline.addLast(new ChunkedWriteHandler())                          pipeline.addLast(new HttpObjectAggregator(1024 * 1024))                          pipeline.addLast(handler)                      }                  })      }          /**       * 连接       */      void connect() {          try {              try {                  ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync()                  this.channel = future.channel()                  clients.add(channel)              } catch (e) {                  log.error("创建channel失败", e)              }          } catch (Exception e) {              log.error("连接服务失败", e)          } finally {              this.handshakeFuture = handler.handshakeFuture()          }      }        /**       * 关闭       */      void close() {          this.channel.close()      }    }
复制代码


这里用到了一个保存现在的所有的活跃channel的类io.netty.channel.group.ChannelGroup,有点就是可以自动管理所有的channel,还能自动剔除已经关闭的channel


这里还有补充 2 个发送消息的方法:


/**   * 发送文本消息   */  void sendText(String msg) {      channel.writeAndFlush(new TextWebSocketFrame(msg))  }    /**   * 发送ping消息   */  void ping() {      channel.writeAndFlush(new PingWebSocketFrame())  }
复制代码

消息处理器

这里需要处理的消息各种类型,继承io.netty.channel.SimpleChannelInboundHandler实现不同的方法即可。


这里有个泛型设置可以直接设置成不同的消息类型,例如io.netty.handler.codec.http.websocketx.WebSocketFrame及其子类,如果确定服务端发来消息的类型的话,可以更加省事儿。


package com.funtester.socket.netty    import groovy.util.logging.Log4j2  import io.netty.channel.*  import io.netty.channel.group.ChannelGroup  import io.netty.channel.group.DefaultChannelGroup  import io.netty.handler.codec.http.FullHttpResponse  import io.netty.handler.codec.http.websocketx.*  import io.netty.handler.timeout.IdleState  import io.netty.handler.timeout.IdleStateEvent  import io.netty.util.concurrent.GlobalEventExecutor  /**   * WebSocket协议类型的模拟客户端IO处理器类   */  @Log4j2  class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {        /**       * 用于记录和管理所有客户端的channel       */    private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)        private final WebSocketClientHandshaker handShaker        private ChannelPromise handshakeFuture        WebSocketIoHandler(WebSocketClientHandshaker handShaker) {          this.handShaker = handShaker      }        ChannelFuture handshakeFuture() {          return handshakeFuture      }        @Override      void handlerAdded(ChannelHandlerContext ctx) {          handshakeFuture = ctx.newPromise()      }        @Override      void channelActive(ChannelHandlerContext ctx) {          handShaker.handshake(ctx.channel());      }        @Override      void channelInactive(ChannelHandlerContext ctx) {          ctx.close()          try {              super.channelInactive(ctx)          } catch (Exception e) {              log.error("channelInactive 异常.", e)          }          log.warn("WebSocket链路与服务器连接已断开.")      }        @Override      void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {          Channel ch = ctx.channel()          if (!handShaker.isHandshakeComplete()) {              try {                  handShaker.finishHandshake(ch, (FullHttpResponse) msg)                  handshakeFuture.setSuccess()              } catch (WebSocketHandshakeException e) {                  log.warn("WebSocket Client failed to connect",e)                  handshakeFuture.setFailure(e)              }              return          }            WebSocketFrame frame = (WebSocketFrame) msg          if (frame instanceof TextWebSocketFrame) {              TextWebSocketFrame textFrame = (TextWebSocketFrame) frame              String s = textFrame.text()          } else if (frame instanceof CloseWebSocketFrame) {              log.info("WebSocket Client closing")              ch.close()          }      }        @Override      void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {          log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause)          if (!handshakeFuture.isDone()) {              handshakeFuture.setFailure(cause)          }          ctx.close()          super.exceptionCaught(ctx, cause)      }        @Override      void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {          if (evt instanceof IdleStateEvent) {              IdleStateEvent event = (IdleStateEvent) evt              // 如果写通道处于空闲状态,就发送心跳命令              if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) {                  // 发送心跳数据                  def channel = ctx.channel()                  channel.writeAndFlush(new TextWebSocketFrame("dsf"))              }          } else {              super.userEventTriggered(ctx, evt)          }      }  }
复制代码


这里处理接收到消息的时候并没有选择保存消息的功能,因为 netty WebSocket 使用场景就是超大量(超过 1w)连接。保留返回消息,进行业务验证通常不是这类测试场景的首要目的。所以以后等用到了再说吧。


后面会对比这 3 种Socket客户端包括Go语言两种Socket客户端在超大量连接方面的资源占用。


发布于: 刚刚阅读数: 5
用户头像

FunTester

关注

公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入

Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester

评论

发布
暂无评论
netty WebSocket客户端实践_FunTester_InfoQ写作社区