netty WebSocket 客户端实践
- 2023-08-28 北京
本文字数:4510 字
阅读完需:约 15 分钟
在之前的 Socket 学习中,主要都是基于两个 Socket 客户端:WebSocket
和Socket.IO
。在做测试的时候也是基于WebSocket
消息的发送和接收为主要测试对象。但是对于超多Socket
连接没有涉及。
在实践中会发现,这两个实现类都存在一个问题,为了维护 1 个Socket
连接及其功能,通常需要创建多个线程。在计算机硬件资源有限的情况下,线程是稀缺资源,不仅仅是内存占用,也会增加 CPU 的负担。
之前解决这个问题的方案直接换成 Go 语言版本的Socket
客户端。例如:/net/websocket
和gorilla/websocket
。
其实 Java 也有相对应的解决方案:netty。话不多说,上代码。
依赖
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.85.Final</version>
</dependency>
netty WebSocket 客户端
客户端主要的功能就是创建连接,然后使用一个事件处理线程池管理连接以及收发消息io.netty.channel.EventLoopGroup
,然后使用一个io.netty.bootstrap.Bootstrap
来作为引导程序。
package com.funtester.socket.netty
import com.funtester.frame.execute.ThreadPoolUtil
import groovy.util.logging.Log4j2
import io.netty.bootstrap.Bootstrap
import io.netty.channel.*
import io.netty.channel.group.ChannelGroup
import io.netty.channel.group.DefaultChannelGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.DefaultHttpHeaders
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory
import io.netty.handler.codec.http.websocketx.WebSocketVersion
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.GlobalEventExecutor
@Log4j2
class WebSocketConnector {
static Bootstrap bootstrap = new Bootstrap()
/**
* 处理事件的线程池
*/
static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N"))
static {
bootstrap.group(group).channel(NioSocketChannel.class)
}
/**
* 用于记录和管理所有客户端的channel
*/
static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
WebSocketClientHandshaker handShaker
ChannelPromise handshakeFuture
String host
int port
/**
* 网络通道
*/
Channel channel
WebSocketIoHandler handler
/**
* WebSocket协议类型的模拟客户端连接器构造方法
*
* @param serverIp
* @param serverSocketPort
* @param group
*/ WebSocketConnector(String host, int port) {
this.host = host
this.port = port
String URL = this.host + ":" + this.port + "/test"
URI uri = new URI(URL)
handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()))
bootstrap.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_TIMEOUT, true)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline()
pipeline.addLast(new HttpClientCodec())
pipeline.addLast(new ChunkedWriteHandler())
pipeline.addLast(new HttpObjectAggregator(1024 * 1024))
pipeline.addLast(handler)
}
})
}
/**
* 连接
*/
void connect() {
try {
try {
ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync()
this.channel = future.channel()
clients.add(channel)
} catch (e) {
log.error("创建channel失败", e)
}
} catch (Exception e) {
log.error("连接服务失败", e)
} finally {
this.handshakeFuture = handler.handshakeFuture()
}
}
/**
* 关闭
*/
void close() {
this.channel.close()
}
}
这里用到了一个保存现在的所有的活跃channel
的类io.netty.channel.group.ChannelGroup
,有点就是可以自动管理所有的channel
,还能自动剔除已经关闭的channel
。
这里还有补充 2 个发送消息的方法:
/**
* 发送文本消息
*/
void sendText(String msg) {
channel.writeAndFlush(new TextWebSocketFrame(msg))
}
/**
* 发送ping消息
*/
void ping() {
channel.writeAndFlush(new PingWebSocketFrame())
}
消息处理器
这里需要处理的消息各种类型,继承io.netty.channel.SimpleChannelInboundHandler
实现不同的方法即可。
这里有个泛型设置可以直接设置成不同的消息类型,例如io.netty.handler.codec.http.websocketx.WebSocketFrame
及其子类,如果确定服务端发来消息的类型的话,可以更加省事儿。
package com.funtester.socket.netty
import groovy.util.logging.Log4j2
import io.netty.channel.*
import io.netty.channel.group.ChannelGroup
import io.netty.channel.group.DefaultChannelGroup
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.websocketx.*
import io.netty.handler.timeout.IdleState
import io.netty.handler.timeout.IdleStateEvent
import io.netty.util.concurrent.GlobalEventExecutor
/**
* WebSocket协议类型的模拟客户端IO处理器类
*/
@Log4j2
class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {
/**
* 用于记录和管理所有客户端的channel
*/ private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
private final WebSocketClientHandshaker handShaker
private ChannelPromise handshakeFuture
WebSocketIoHandler(WebSocketClientHandshaker handShaker) {
this.handShaker = handShaker
}
ChannelFuture handshakeFuture() {
return handshakeFuture
}
@Override
void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise()
}
@Override
void channelActive(ChannelHandlerContext ctx) {
handShaker.handshake(ctx.channel());
}
@Override
void channelInactive(ChannelHandlerContext ctx) {
ctx.close()
try {
super.channelInactive(ctx)
} catch (Exception e) {
log.error("channelInactive 异常.", e)
}
log.warn("WebSocket链路与服务器连接已断开.")
}
@Override
void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel()
if (!handShaker.isHandshakeComplete()) {
try {
handShaker.finishHandshake(ch, (FullHttpResponse) msg)
handshakeFuture.setSuccess()
} catch (WebSocketHandshakeException e) {
log.warn("WebSocket Client failed to connect",e)
handshakeFuture.setFailure(e)
}
return
}
WebSocketFrame frame = (WebSocketFrame) msg
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame
String s = textFrame.text()
} else if (frame instanceof CloseWebSocketFrame) {
log.info("WebSocket Client closing")
ch.close()
}
}
@Override
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause)
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause)
}
ctx.close()
super.exceptionCaught(ctx, cause)
}
@Override
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt
// 如果写通道处于空闲状态,就发送心跳命令
if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) {
// 发送心跳数据
def channel = ctx.channel()
channel.writeAndFlush(new TextWebSocketFrame("dsf"))
}
} else {
super.userEventTriggered(ctx, evt)
}
}
}
这里处理接收到消息的时候并没有选择保存消息的功能,因为 netty WebSocket 使用场景就是超大量(超过 1w)连接。保留返回消息,进行业务验证通常不是这类测试场景的首要目的。所以以后等用到了再说吧。
后面会对比这 3 种Socket
客户端包括Go
语言两种Socket
客户端在超大量连接方面的资源占用。
版权声明: 本文为 InfoQ 作者【FunTester】的原创文章。
原文链接:【http://xie.infoq.cn/article/efc8dc0cf49ec0a40c9bfc420】。文章转载请联系作者。
FunTester
公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入
Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester
评论