Netty 进阶 -- 非阻塞网络编程 实现群聊 + 私聊 + 心跳检测系统
- 2022 年 8 月 10 日 河北
本文字数:9742 字
阅读完需:约 32 分钟
📢📢📢📣📣📣<br>哈喽!大家好,我是【Bug 终结者】 ,【CSDN 新星创作者】🏆,阿里云技术博主🏆,51CTO 人气博主🏆,INfoQ 写作专家🏆 <br/>一位上进心十足,拥有极强学习力的【Java 领域博主】😜😜😜 <br/>🏅【Bug 终结者】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。 偶尔会分享些前端基础知识,会更新实战项目,面向企业级开发应用!🏅 如果有对【后端技术】、【前端领域】感兴趣的【小可爱】,欢迎关注【Bug 终结者】💞💞💞<br>❤️❤️❤️ 感谢各位大可爱小可爱! ❤️❤️❤️
一、需求说明
使用 Netty 实现群聊+私聊系统
编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞形式)
实现多人群聊
实现单人私聊
利用心跳检测机制监听客户端是否存在连接(是否存在读、写、读写操作)
服务器端:可以检测用户上线,离线,并且实现消息转发功能
客户端:通过 channel 可以无阻塞发送消息给其它所有在线用户,同时可以接受所有在线用户发送的消息(由服务器转发消息得到)
二、什么是心跳检测机制?
心跳检测机制就是在一定的时间范围内客户端与服务器之间没有发生读、写、读写操作,那么就认定客户端与服务器无连接,这样就节省了服务器的资源
❤️Netty 实现心跳检测机制
服务器启动前添加前置处理器
//添加心跳检测
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
//添加自定义心跳处理器
pipeline.addLast(new HeartbeatServerHandler());
IdleStateHandler 是 Netty 提供的处理空闲状态的处理器
参数说明
// long readerIdleTime: 表示多长时间没有读,就会发送一个心跳检测包,检测是否还处于连接状态
// long writerIdleTime: 表示多长时间没有写,就会发送一个心跳检测包,检测是否还处于连接状态
// long allIdleTime: 表示多长时间没有读写操作,就会发送一个心跳检测包,检测是否处于连接状态
// 最后一个参数是当前时间的单位,秒或分钟或小时。
源码表示当前处理器类是表示多长时间内没有读、没有写、或者没有读写操作,就会触发 IdleStateEvent 事件 Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
当 IdleStateEvent 事件 触发后, 就会传递给管道的下一个 handler 处理通过调用(触发)handler 的 userEventTiggered 在该方法中处理 当 IdleStateEvent 事件
HeartbeatServerHandler 自定义心跳处理器
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端在指定时间内未触发相应操作执行此方法,即认为与客户端断开连接
* @param ctx 全局上下文对象
* @param evt 事件
* @throws Exception 发生异常时抛出
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//判断当前事件是否为IdleStateEvent
if (evt instanceof IdleStateEvent) {
//将evt强转为IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
//判断到底发生的事件是什么
String eventType = null;
//由于IdleStateEvent底层判断事件是根据枚举类型来的,所以直接判断即可
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "发生超时事件,事件类型为:" + eventType);
System.out.println("服务器做相应处理");
}
}
心跳检测机制就是这样,简单来说,就是每隔一段时间去检测客户端是否与服务器连接,如果无连接,那么就断开,从而节省服务器的资源
三、需求分析
🚝多人群聊
利用 map 集合,Map<String, Channel> 里面存入当前在线的所有用户,继承 SimpleChannelInboundHandler<String> 处理器 并在对应的处理器进行添加通道到 map
然后实现处理器的 channelRead0 方法进行转发数据,这就简单的实现了多人群聊
🚝单人私聊
单人私聊与多人群聊类似,也是在 channelRead0 方法内进行判断是否为私聊用户,私聊用户输入 #端口号 #要发送的内容,即可简单检测到本次消息为私聊,并从 map 中取出对应的 key,拿出 key 对应的 channel,进行转发,即可完成私聊
接受消息,其它用户不会看到此私聊消息
🚝服务器检测用户上线、离线
服务器端检测用户当前的状态,实现对应的方法进行相应的提示即可
实现 handlerAdded 检测某个用户加入聊天,
实现 channelActive 表示 channel 处于活跃状态,即上线
实现 channelInactive 表示 channel 处于非活跃状态,即离线,
实现 handlerRemoved 表示离线
四、效果图
五、核心源码
GroupChatServer 服务器端代码
package com.wanshi.netty.groupchat;
import com.wanshi.netty.heartbeat.HeartbeatServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class GroupChatServer {
// 监听端口
private int port;
public GroupChatServer(int port) {
this.port = port;
}
//编写run方法,处理客户端的请求
public void run() {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//Nio核数 * 2
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取pipeline
ChannelPipeline pipeline = socketChannel.pipeline();
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new GroupChatServerHandler());
//加入心跳检测机制
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture future = bootstrap.bind(port).sync();
//监听关闭事件
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
GroupChatServer groupChatServer = new GroupChatServer(7000);
groupChatServer.run();
}
}
GroupChatServerHandler 服务器自定义 handler
package com.wanshi.netty.groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//所有的channel存入map集合中,目的是为了私聊好获取用户
private static Map<String,Channel> allChannels = new HashMap<String,Channel>();
//格式化所有日期时间
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//转化日期
private String currentDate = sdf.format(new Date());
/**
* handlerAdded 表示连接建立,一旦连接建立,第一个被执行
* 将当前channel加入到map集合
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//获取当前channel
Channel channel = ctx.channel();
//推送客户加入聊天的信息推送给其它在线的客户端
//该方法会将channelGroup中所有的channel遍历并发送消息
allChannels.forEach((k, ch) ->{
ch.writeAndFlush(currentDate+" \n [客户端]" + channel.remoteAddress() + "加入聊天\n");
});
//获取端口号
String key = channel.remoteAddress().toString().split(":")[1];
allChannels.put(key, channel);
}
/**
* 表示断开连接了,将xx客户离开信息推送给当前在线的客户
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//获取当前channel
Channel channel = ctx.channel();
//推送客户加入聊天的信息推送给其它在线的客户端
//该方法会将map中所有的channel遍历并发送消息
allChannels.forEach((k, ch) ->{
ch.writeAndFlush(currentDate+" \n [客户端]" + channel.remoteAddress() + "离线\n");
});
System.out.println("当前在线人数:" + allChannels.size());
}
/**
* 读取数据并将数据转发给在线的客户端
* @param channelHandlerContext
* @param s
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
//获取到当前channel
Channel channel = channelHandlerContext.channel();
//私聊用户发送消息
if(s.contains("#")){
String id = s.split("#")[1];
String body = s.split("#")[2];
Channel userChannel = allChannels.get(id);
String key = channel.remoteAddress().toString().split(":")[1];
userChannel.writeAndFlush(currentDate+"\n "+key+"【私聊】 [用户] "+id+" 说 : "+body);
return;
}
//循环遍历hashmap集合进行转发消息
allChannels.forEach((k, ch) -> {
if (channel != ch) {
ch.writeAndFlush(currentDate + " \n [客户端]" + channel.remoteAddress() + ":" + s + "\n");
} else { // 发送消息给自己,回显自己发送的消息
channel.writeAndFlush(currentDate + " \n [我]:" + s + "\n");
}
});
}
/**
* 表示channel处于活动状态
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(currentDate + " -- " + ctx.channel().remoteAddress() + "上线~");
}
/**
* 失去连接时会触发此方法
* @param ctx 全局上下文对象
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String key = channel.remoteAddress().toString().split(":")[1];
allChannels.remove(key);
System.out.println(currentDate + " -- " + ctx.channel().remoteAddress() + "离线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭
ctx.close();
}
}
自定义心跳处理器 -- HeartbeatServerHandler
package com.wanshi.netty.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端在指定时间内未触发相应操作执行此方法,即认为与客户端断开连接
* @param ctx 全局上下文对象
* @param evt 事件
* @throws Exception 发生异常时抛出
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//判断当前事件是否为IdleStateEvent
if (evt instanceof IdleStateEvent) {
//将evt强转为IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
//判断到底发生的事件是什么
String eventType = null;
//由于IdleStateEvent底层判断事件是根据枚举类型来的,所以直接判断即可
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "发生超时事件,事件类型为:" + eventType);
System.out.println("服务器做相应处理");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生异常!");
}
}
GroupChatClient 客户端
package com.wanshi.netty.groupchat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
//定义属性
private final String host;
public final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run () {
EventLoopGroup eventExecutors = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//得到pipeline
ChannelPipeline pipeline = socketChannel.pipeline();
//加入相关的handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定义handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("-----" + channel.localAddress() + "----");
//客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.nextLine();
//通过channel发送到服务器端
channel.writeAndFlush(msg+"\r\n");
}
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
eventExecutors.shutdownGracefully();
}
}
public static void main(String[] args) {
GroupChatClient groupChatClient = new GroupChatClient("127.0.0.1", 7000);
groupChatClient.run();
}
}
GroupChatClientHandler 客户端自定义处理器 Handler
package com.wanshi.netty.groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//所有的channel存入map集合中,目的是为了私聊好获取用户
private static Map<String,Channel> allChannels = new HashMap<String,Channel>();
//格式化所有日期时间
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//转化日期
private String currentDate = sdf.format(new Date());
/**
* handlerAdded 表示连接建立,一旦连接建立,第一个被执行
* 将当前channel加入到map集合
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//获取当前channel
Channel channel = ctx.channel();
//推送客户加入聊天的信息推送给其它在线的客户端
//该方法会将channelGroup中所有的channel遍历并发送消息
allChannels.forEach((k, ch) ->{
ch.writeAndFlush(currentDate+" \n [客户端]" + channel.remoteAddress() + "加入聊天\n");
});
//获取端口号
String key = channel.remoteAddress().toString().split(":")[1];
allChannels.put(key, channel);
}
/**
* 表示断开连接了,将xx客户离开信息推送给当前在线的客户
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//获取当前channel
Channel channel = ctx.channel();
//推送客户加入聊天的信息推送给其它在线的客户端
//该方法会将map中所有的channel遍历并发送消息
allChannels.forEach((k, ch) ->{
ch.writeAndFlush(currentDate+" \n [客户端]" + channel.remoteAddress() + "离线\n");
});
System.out.println("当前在线人数:" + allChannels.size());
}
/**
* 读取数据并将数据转发给在线的客户端
* @param channelHandlerContext
* @param s
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
//获取到当前channel
Channel channel = channelHandlerContext.channel();
//私聊用户发送消息
if(s.contains("#")){
String id = s.split("#")[1];
String body = s.split("#")[2];
Channel userChannel = allChannels.get(id);
String key = channel.remoteAddress().toString().split(":")[1];
userChannel.writeAndFlush(currentDate+"\n "+key+"【私聊】 [用户] "+id+" 说 : "+body);
return;
}
//循环遍历hashmap集合进行转发消息
allChannels.forEach((k, ch) -> {
if (channel != ch) {
ch.writeAndFlush(currentDate + " \n [客户端]" + channel.remoteAddress() + ":" + s + "\n");
} else { // 发送消息给自己,回显自己发送的消息
channel.writeAndFlush(currentDate + " \n [我]:" + s + "\n");
}
});
}
/**
* 表示channel处于活动状态
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(currentDate + " -- " + ctx.channel().remoteAddress() + "上线~");
}
/**
* 失去连接时会触发此方法
* @param ctx 全局上下文对象
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String key = channel.remoteAddress().toString().split(":")[1];
allChannels.remove(key);
System.out.println(currentDate + " -- " + ctx.channel().remoteAddress() + "离线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭
ctx.close();
}
}
⛲小结
以上就是【Bug 终结者】对 Netty 非阻塞网络编程简单的理解,小编认为唯有代码实践,才可提升自己的技术,手不要懒,多敲,本案例完美的体现了 Netty 非阻塞式网络编程的模式,方便,快捷,代码略微有点多,但滤清思路,一步步来,总会慢慢理解的,加油,希望本文对你有帮助~
如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!
版权声明: 本文为 InfoQ 作者【Bug终结者】的原创文章。
原文链接:【http://xie.infoq.cn/article/9b6003da2ced129df090ecddd】。文章转载请联系作者。
Bug终结者
励志成为一个优秀的开发者~ 2021.12.09 加入
星星之火,可以燎原
评论